温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

怎么用Java客户端将数据加载到Grakn知识图中

发布时间:2021-09-17 09:22:21 来源:亿速云 阅读:152 作者:chen 栏目:编程语言

本篇内容介绍了“怎么用Java客户端将数据加载到Grakn知识图中”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

将数据迁移到Grakn

我们将概述迁移的发生方式。

  1. 首先,我们需要与我们的Grakn键空间进行对话。为此,我们将使用Grakn的Java客户端。

  2. 我们将遍历每个数据文件,提取每个数据项并将其解析为JSON对象。

  3. 我们将每个数据项(以JSON对象的形式)传递给其对应的模板。模板返回的是用于将该项插入Grakn的Graql查询。

  4. 我们将执行每个查询以将数据加载到目标键空间 - phone_calls

在继续之前,请确保已安装Java 1.8并在计算机上运行Grakn服务器

入门

创建一个新的Maven项目

该项目使用SDK 1.8并命名phone_calls。我将使用IntelliJ作为IDE。

将Grakn设置为依赖关系

修改pom.xml以包含最新版本的Grakn(1.4.2)作为依赖项。

<?xml  version =“1.0”encoding =“UTF-8”?>
< project  xmlns = “http://maven.apache.org/POM/4.0.0”
         xmlns:xsi = “http://www.w3.org/2001/XMLSchema-instance”
         xsi:schemaLocation = “http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd” >
    < modelVersion > 4.0.0 </ modelVersion >

    < groupId > ai.grakn.examples </ groupId >
    < artifactId > migrate-csv-to-grakn </ artifactId >
    < version > 1.0-SNAPSHOT </ version >

    < 存储库>
        < repository >
            < id >发布</ id >
            < url > https://oss.sonatype.org/content/repositories/releases </ url >
        </ repository >
    </ repositories >

    < properties >
        < grakn.version > 1.4.2 </ grakn.version >
        < maven.compiler.source > 1.7 </ maven.compiler.source >
        < maven.compiler.target > 1.7 </ maven.compiler.target >
    </ properties >

    < dependencies >
        < 依赖>
            < groupId > ai.grakn </ groupId >
            < artifactId > client-java </ artifactId >
            < version > $ {grakn.version} </ version >
        </ dependency >
    </ dependencies >
</ project >

配置日志记录

我们希望能够配置Grakn注销的内容。为此,请修改pom.xml以排除slf4j附带graknlogback作为依赖项添加。


<?xml  version =“1.0”encoding =“UTF-8”?>
< project  xmlns = “http://maven.apache.org/POM/4.0.0”
         xmlns:xsi = “http://www.w3.org/2001/XMLSchema-instance”
         xsi:schemaLocation = “http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd” >
    < modelVersion > 4.0.0 </ modelVersion >

    <! -  ...  - >

    < dependencies >
        < 依赖>
            < groupId > ai.grakn </ groupId >
            < artifactId > client-java </ artifactId >
            < version > $ {grakn.version} </ version >
            < exclusions >
                < 排除>
                    < groupId > org.slf4j </ groupId >
                    < artifactId > slf4j-simple </ artifactId >
                </ exclusion >
            </ exclusions >
        </ dependency >
        < 依赖>
            < groupId > ch.qos.logback </ groupId >
            < artifactId > logback-classic </ artifactId >
            < version > 1.2.3 </ version >
        </ dependency >
    </ dependencies >
</ project >

接下来,添加一个logback.xml 使用以下内容调用的新文件并将其放在下面src/main/resources

< configuration  debug = “false” >
    < root  level = “INFO” />
</ configuration >

创建迁移类

src/main创建一个名为的新文件Migration.java。这是我们要编写所有代码的地方。

包括数据文件

选择以下数据格式之一并下载文件。下载四个文件中的每个文件后,将它们放在src/main/resources/data目录下。我们将使用它们将数据加载到我们的phone_calls知识图中。

随后的所有代码都将被写入Migration.java

指定每个数据文件的详细信息

在此之前,我们需要一个结构来包含读取数据文件和构建Graql查询所需的详细信息。这些细节包括:

  • 数据文件的路径,和

  • 接收JSON对象并生成Graql插入查询的模板函数。

为此,我们创建了一个名为的新子类Input

导入 mjson。杰森 ;

公共 类 迁移 {
    abstract  static  class  Input {
        字符串 路径 ;
        public  Input(String  path){
            这个。path  =  path ;
        }
        String  getDataPath(){ return  path ;}
        abstract  String  template(Json  数据);
    }
}

在本文的后面,我们将看到如何Input创建类的实例,但在我们开始之前,让我们将mjson依赖项添加到文件中的dependencies标记中pom.xml

< 依赖>
    < groupId > org.sharegov </ groupId >
    < artifactId > mjson </ artifactId >
    < version > 1.4.0 </ version >
</ dependency >

是时候初始化了inputs

下面的代码调用initialiseInputs()返回集合的方法inputs。然后,我们将使用input此集合中的每个元素将每个数据文件加载到Grakn中。


//其他进口
导入 java。util。ArrayList ;
导入 java。util。收藏 ;

公共 类 迁移 {
    抽象 静态 类 输入 {...}

    public  static  void  main(String [] args){
        集合< Input >  inputs  =  initialiseInputs();
    }

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();
        // 接下来的是
        回报 输入 ;
    }
}

公司的输入实例


//进口

公共 类 迁移 {
    抽象 静态 类 输入 {...}
    public  static  void  main(String [] args){...}

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        输入。add(new  Input(“data / companies”){
            @覆盖
            public  String  template(Json  company){
                返回 “插入$ company isa company has name”  +  公司。at(“name”)+  “;” ;
            }
        });

        回报 输入 ;
    }
}

input.getDataPath()会回来的data/companies

鉴于company

{ name:“Telecom” }

input.template(company) 将返回

插入 $公司 ISA  公司 拥有 的名称 “电信” ;

一个人的输入实例

//进口

公共 类 迁移 {
    抽象 静态 类 输入 {...}
    public  static  void  main(String [] args){...}

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        输入。add(new  Input(“data / companies”){...});

        输入。add(new  Input(“data / people”){
            @覆盖
            public  String  template(Json  person){
                //插入人
                String  graqlInsertQuery  =  “insert $ person isa person has phone-number”  +  person。at(“phone_number”);

                如果(! 人。有(“FIRST_NAME” )){
                    //人不是客户
                    graqlInsertQuery  + =  “has is-customer false” ;
                } else {
                    //人是顾客
                    graqlInsertQuery  + =  “has is-customer true” ;
                    graqlInsertQuery  + =  “有名字”  +  人。at(“first_name”);
                    graqlInsertQuery  + =  “有姓氏”  +  人。at(“last_name”);
                    graqlInsertQuery  + =  “有城市”  +  人。在(“城市”);
                    graqlInsertQuery  + =  “有年龄”  +  人。在(“年龄”)。asInteger();
                }

                graqlInsertQuery  + =  “;” ;
                return  graqlInsertQuery ;
            }
        });

        回报 输入 ;
    }
}

input.getDataPath()会回来的data/people

鉴于person

{ phone_number:“+ 44 091 xxx” }

input.template(person) 将返回

插入 $人 拥有 电话- 数字 “+44 091 XXX” ;

并给予person

{ 冷杉- 名称:“成龙”,最后- 名:“乔”,城市:“即墨”,年龄:77,PHONE_NUMBER:“+00 091 XXX” }

input.template(person) 将返回

插入 $人 拥有 电话- 数字 “+44 091 XXX”  具有 第一- 名字 “成龙”  已经 过去- 名字 “乔”  有 城 “即墨”  具有 年龄 77 ;

合同的输入实例

//进口

公共 类 迁移 {
    抽象 静态 类 输入 {...}
    public  static  void  main(String [] args){...}

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        输入。add(new  Input(“data / companies”){...});

        输入。add(new  Input(“data / people”){...});

        输入。add(new  Input(“data / contracts”){
            @覆盖
            public  String  template(Json  contract){
                //匹配公司
                String  graqlInsertQuery  =  “匹配$ company isa company has name”  +  contract。at(“company_name”)+  “;” ;
                //匹配人
                graqlInsertQuery  + =  “$ customer isa person has phone-number”  +  contract。at(“person_id”)+  “;” ;
                //插入合同
                graqlInsertQuery  + =  “insert(provider:$ company,customer:$ customer)isa contract;” ;
                return  graqlInsertQuery ;
            }
        });

        回报 输入 ;
    }
}

input.getDataPath()会回来的data/contracts

鉴于contract

{ company_name:“Telecom”,person_id:“ + 00 091 xxx” }

input.template(contract) 将返回

比赛 $公司 ISA  公司 拥有 的名称 “电信” ; $客户 ISA  人 拥有 电话- 数字 “+00 091 XXX” ; insert(provider:$ company,customer:$ customer)isa  contract ;

呼叫的输入实例

//进口

公共 类 迁移 {
    抽象 静态 类 输入 {...}
    public  static  void  main(String [] args){...}

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        输入。add(new  Input(“data / companies”){...});

        输入。add(new  Input(“data / people”){...});

        输入。add(new  Input(“data / contracts”){...});

        输入。add(new  Input(“data / calls”){
            @覆盖
            public  String  template(Json  call){
                //匹配来电者
                String  graqlInsertQuery  =  “match $ caller isa person has phone-number”  +  call。at(“caller_id”)+  “;” ;
                //匹配被叫者
                graqlInsertQuery  + =  “$ callee isa person has phone-number”  +  call。at(“callee_id”)+  “;” ;
                //插入电话
                graqlInsertQuery  + =  “insert $ call(caller:$ caller,callee:$ callee)isa call;”  +
                        “$ call已经开始”  +  来电。at(“started_at”)。asString()+  “;”  +
                        “$ call has duration”  +  call。在(“持续时间”)。asInteger()+  “;” ;
                return  graqlInsertQuery ;
            }
        });

        回报 输入 ;
    }
}

input.getDataPath()会回来的data/calls

鉴于call

{ caller_id:“44 091 XXX” ,callee_id:“00 091 XXX” ,started_at:2018 - 08 - 10 T07:57:51,持续时间:148 }

input.template(call) 将返回

比赛 $呼叫者 ISA  人 拥有 电话- 数字 “+44 091 XXX” ; $被叫 ISA  人 拥有 电话- 数字 “+00 091 XXX” ; insert  $ call(caller:$ caller,callee:$ callee)isa  call ; $电话 已经 开始- 在 2018 - 08 - 10 T07:57:51 ; $电话 具有 持续时间 148 ;

连接和迁移

现在我们已经为每个数据文件定义了数据路径和模板,我们可以继续连接我们的phone_calls知识图并将数据加载到其中。

//其他进口
进口 ai。grakn。GraknTxType ;
进口 ai。grakn。Keyspace ;
进口 ai。grakn。客户。Grakn ;
进口 ai。grakn。util。SimpleURI ;
导入 java。io。UnsupportedEncodingException ;

公共 类 迁移 {
    抽象 静态 类 输入 {...}

    public  static  void  main(String [] args){
        集合< Input >  inputs  =  initialiseInputs();
        connectAndMigrate(输入);
    }

    static  void  connectAndMigrate(Collection < Input >  inputs){
        SimpleURI  localGrakn  =  new  SimpleURI(“localhost”,48555);
        Keyspace  keyspace  =  Keyspace。of(“phone_calls”);
        Grakn  grakn  =  new  Grakn(localGrakn);
        Grakn。会话 会话 =  grakn。session(keyspace);

        输入。forEach(输入 - > {
            系统。出。的println(“[由加载”  +  输入。getDataPath()+  “]成Grakn ...”);
            尝试 {
                loadDataIntoGrakn(输入,会话);
            } catch(UnsupportedEncodingException  e){
                e。printStackTrace();
            }
        });

        会议。close();
    }

    static  Collection < Input >  initialiseInputs(){...}

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException {...}
}

connectAndMigrate(Collection<Input> inputs)是启动数据迁移到phone_calls知识图中的唯一方法。

此方法发生以下情况:

  1. grakn创建Grakn实例,连接到我们在本地运行的服务器localhost:48555

  2. session创建A ,连接到键空间phone_calls

  3. 对于集合中的每个input对象inputs,我们称之为loadDataIntoGrakn(input, session)。这将负责将input对象中指定的数据加载到我们的键空间中。

  4. 最后session关闭了。

将数据加载到phone_calls

现在我们已经session连接到phone_calls键空间,我们可以继续将数据实际加载到我们的知识图中。

//进口

公共 类 迁移 {
    抽象 静态 类 输入 {...}

    public  static  void  main(String [] args){
        集合< Input >  inputs  =  initialiseInputs();
        connectAndMigrate(输入);
    }

    static  Collection < Input >  initialiseInputs(){...}

    static  void  connectAndMigrate(Collection < Input >  inputs){...}

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException {
        ArrayList < Json >  items  =  parseDataToJson(输入);
        物品。forEach(item  - > {
            Grakn。交易 tx  =  会话。交易(GraknTxType。WRITE);
            String  graqlInsertQuery  =  input。模板(项目);
            系统。出。println(“执行Graql查询:”  +  graqlInsertQuery);
            tx。graql()。解析(graqlInsertQuery)。execute();
            tx。commit();
            tx。close();
        });
        系统。出。的println(“\ nInserted”  +  项目。大小()+  “由项目[”  +  输入。getDataPath()+  “]。到Grakn \ n”个);
    }

    static  ArrayList < Json >  parseDataToJson(输入 输入)抛出 UnsupportedEncodingException {...}
}

为了将每个文件中的数据加载到Grakn中,我们需要:

  1. 检索一个ArrayListJSON对象,每个对象代表一个数据项。为此,我们呼吁parseDataToJson(input),和

  2. 对于每个JSON对象items:a)创建事务tx,b)构造graqlInsertQuery使用相应的template,c)运行query,d)commit事务和e)close事务。

有关创建和提交事务的注意事项:为避免内存不足,建议在单个事务中创建和提交每个查询。但是,为了更快地迁移大型数据集,每次查询都会发生一次,其中是保证在单个事务上运行的最大查询数。

现在我们已经完成了上述所有操作,我们已准备好读取每个文件并将每个数据项解析为JSON对象。这些JSON对象将被传递给template每个Input对象上的方法。

我们要编写实现parseDataToJson(input)

DataFormat特定实现

parseDataToJson(input)根据数据文件的格式,实现会有所不同。

但无论数据格式是什么,我们都需要正确的设置来逐行读取文件。为此,我们将使用InputStreamReader

//其他进口
导入 java。io。InputStreamReader ;
导入 java。io。读者 ;


公共 类 迁移 {
    抽象 静态 类 输入 {...}

    public  static  void  main(String [] args){...}

    static  void  connectAndMigrate(Collection < Input >  inputs){...}

    static  Collection < Input >  initialiseInputs(){...}

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException {...}

    public  static  Reader  getReader(String  relativePath)throws  UnsupportedEncodingException {
        返回 新 的InputStreamReader(迁移。类。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
    }
}

解析CSV

我们将使用Univocity CSV Parser来解析我们的.csv文件。让我们为它添加依赖项。我们需要在dependencies标签中添加以下内容pom.xml

< 依赖>
    < groupId > com.univocity </ groupId >
    < artifactId > univocity-parsers </ artifactId >
    < version > 2.7.6 </ version >
</ dependency >

完成后,我们将编写parseDataToJson(input)解析.csv文件的实现。

//其他进口

进口 com。不公平。解析器。csv。CsvParser ;
进口 com。不公平。解析器。csv。CsvParserSettings ;

公共 类 迁移 {
    抽象 静态 类 输入 {...}

    public  static  void  main(String [] args){...}

    static  void  connectAndMigrate(Collection < Input >  inputs){...}

    static  Collection < Input >  initialiseInputs(){...}

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException {...}

    static  ArrayList < Json >  parseDataToJson(输入 输入)抛出 UnsupportedEncodingException {
        ArrayList < Json >  items  =  new  ArrayList <>();

        CsvParserSettings  settings  =  new  CsvParserSettings();
        设置。setLineSeparatorDetectionEnabled(true);
        CsvParser  解析器 =  新的 CsvParser(设置);
        解析器。beginParsing(getReader(输入。getDataPath()+  “的.csv” ));

        String [] columns  =  parser。parseNext();
        String [] row ;
        而((行 =  分析器。parseNext())!=  空){
            Json  item  =  Json。object();
            对于(诠释 我 =  0 ; 我 <  行。长度 ; 我++){
                项目。set(columns [ i ],row [ i ]);
            }
            物品。add(item);
        }
        返回 的项目 ;
    }

    public  static  Reader  getReader(String  relativePath)throws  UnsupportedEncodingException {
        返回 新 的InputStreamReader(迁移。类。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
    }
}

除了这个实现,我们还需要进行一次更改。

鉴于CSV文件的性质,生成的JSON对象将把.csv文件的所有列作为其键,即使该值不存在,它也将被视为一个null

出于这个原因,我们需要在persontemplateinput实例方法中更改一行。

if (! person.has("first_name")) {...}

if (person.at("first_name").isNull()) {...}

阅读JSON

我们将使用Gson的JsonReader来读取我们的.json文件。让我们为它添加依赖项。我们需要在dependencies标签中添加以下内容pom.xml

< 依赖>
    < groupId > com.google.code.gson </ groupId >
    < artifactId > gson </ artifactId >
    < version > 2.7 </ version >
</ dependency >

完成后,我们将编写parseDataToJson(input)用于读取.json文件的实现。

//其他进口
进口 com。谷歌。gson。流。JsonReader ;

公共 类 迁移 {
    抽象 静态 类 输入 {...}

    public  static  void  main(String [] args){...}

    static  void  connectAndMigrate(Collection < Input >  inputs){...}

    static  Collection < Input >  initialiseInputs(){...}

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException {...}

    static  ArrayList < Json >  parseDataToJson(输入 输入)抛出 IOException {
        ArrayList < Json >  items  =  new  ArrayList <>();

        JsonReader  jsonReader  =  新 JsonReader(getReader(输入。getDataPath()+  “上传.json” ));

        jsonReader。beginArray();
        而(jsonReader。hasNext()){
            jsonReader。beginObject();
            Json  item  =  Json。object();
            而(jsonReader。hasNext()){
                String  key  =  jsonReader。nextName();
                开关(jsonReader。PEEK()){
                    案例 STRING:
                        项目。集(键,jsonReader。nextString());
                        打破 ;
                    案件 编号:
                        项目。集(键,jsonReader。nextInt());
                        打破 ;
                }
            }
            jsonReader。endObject();
            物品。add(item);
        }
        jsonReader。endArray();
        返回 的项目 ;
    }

    public  static  Reader  getReader(String  relativePath)throws  UnsupportedEncodingException {
        返回 新 的InputStreamReader(迁移。类。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
    }
}

解析XML

我们将使用Java的内置StAX来解析我们的.xml文件。

要解析XML数据,我们需要知道目标标记的名称。这需要在Input类中声明并在构造每个input对象时指定。

//进口

公共 类 XmlMigration {
    abstract  static  class  Input {
        字符串 路径 ;
        串 选择 ;
        public  Input(String  path,String  selector){
            这个。path  =  path ;
            这个。selector  =  selector ;
        }
        String  getDataPath(){ return  path ;}
        String  getSelector(){ return  selector ;}
        abstract  String  template(Json  数据);
    }

    public  static  void  main(String [] args){...}

    static  void  connectAndMigrate(Collection < Input >  inputs){...}

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        输入。add(new  Input(“data / companies”,“company”){...});
        输入。添加(新 输入(“数据/人”,“人”){...});
        输入。add(new  Input(“data / contracts”,“contract”){...});
        输入。add(new  Input(“data / calls”,“call”){...});

        回报 输入 ;
    }

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException,XMLStreamException {...}

    public  static  Reader  getReader(String  relativePath)throws  UnsupportedEncodingException {...}
}

现在用于parseDataToJson(input)解析.xml文件的实现。

//其他进口
导入 javax。xml。流。XMLInputFactory ;
导入 javax。xml。流。XMLStreamConstants ;
导入 javax。xml。流。XMLStreamException ;
导入 javax。xml。流。XMLStreamReader ;

公共 类 XmlMigration {
    abstract  static  class  Input {
        字符串 路径 ;
        串 选择 ;
        public  Input(String  path,String  selector){
            这个。path  =  path ;
            这个。selector  =  selector ;
        }
        String  getDataPath(){ return  path ;}
        String  getSelector(){ return  selector ;}
        abstract  String  template(Json  数据);
    }

    public  static  void  main(String [] args){...}

    static  void  connectAndMigrate(Collection < Input >  inputs){...}

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        输入。add(new  Input(“data / companies”,“company”){...});
        输入。添加(新 输入(“数据/人”,“人”){...});
        输入。add(new  Input(“data / contracts”,“contract”){...});
        输入。add(new  Input(“data / calls”,“call”){...});

        回报 输入 ;
    }

    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException,XMLStreamException {...}

    static  ArrayList < Json >  parseDataToJson(输入 输入)抛出 UnsupportedEncodingException,XMLStreamException {
        ArrayList < Json >  items  =  new  ArrayList <>();

        XMLStreamReader  r  =  XMLInputFactory。newInstance()。createXMLStreamReader(getReader(输入。getDataPath()+  “的.xml” ));
        字符串 键 ;
        String  value  =  null ;
        Boolean  inSelector  =  false ;
        Json  item  =  null ;
        而([R 。hasNext()){
            int  event  =  r。next();

            开关(事件){
                case  XMLStreamConstants。START_ELEMENT:
                    如果(ř。号·getLocalName()。等于(输入。getSelector())){
                        inSelector  =  true ;
                        item  =  Json。object();
                    }
                    打破 ;

                case  XMLStreamConstants。字符:
                    value  =  r。getText();
                    打破 ;

                case  XMLStreamConstants。END_ELEMENT:
                    key  =  r。getLocalName();
                    如果(inSelector  &&  ! 关键。平等(输入。getSelector())){
                        项目。set(key,value);
                    }
                    如果(键。等于(输入。getSelector())){
                        inSelector  =  false ;
                        物品。add(item);
                    }

                    打破 ;
            }
        }
        返回 的项目 ;
    }

    public  static  Reader  getReader(String  relativePath)throws  UnsupportedEncodingException {...}
}

把它放在一起

以下是我们Migrate.java将CSV数据加载到Grakn中的样子,并在这里找到JSON和XML文件的样子。

包 ai。grakn。例子 ;

进口 ai。grakn。GraknTxType ;
进口 ai。grakn。Keyspace ;
进口 ai。grakn。客户。Grakn ;
进口 ai。grakn。util。SimpleURI ;

/ **
 *用于CSV,TSV和固定宽度文件的快速可靠的基于Java的解析器的集合
 * @see <a href="https://www.univocity.com/pages/univocity_parsers_documentation"> univocity </a>
 * /
进口 com。不公平。解析器。csv。CsvParser ;
进口 com。不公平。解析器。csv。CsvParserSettings ;

/ **
 *适用于Java的精简JSON库,
 * @see <a href="https://bolerio.github.io/mjson/"> mjson </a>
 * /
导入 mjson。杰森 ;

导入 java。io。InputStreamReader ;
导入 java。io。读者 ;
导入 java。io。UnsupportedEncodingException ;
导入 java。util。ArrayList ;
导入 java。util。收藏 ;

公共 课 CsvMigration {
    / **
     *表示将输入文件链接到自己的模板函数的Input对象,
     *用于将Json对象映射到Graql查询字符串
     * /
    abstract  static  class  Input {
        字符串 路径 ;

        public  Input(String  path){
            这个。path  =  path ;
        }

        String  getDataPath(){ return  path ;}

        abstract  String  template(Json  数据);
    }

    / **
     * 1.创建Grakn实例
     * 2.创建到目标键空间的会话
     * 3.初始化输入列表,每个输入包含解析数据所需的详细信息
     * 4.将csv数据加载到每个文件的Grakn
     * 5.结束会议
     * /
    public  static  void  main(String [] args){
        集合< Input >  inputs  =  initialiseInputs();
        connectAndMigrate(输入);
    }

    static  void  connectAndMigrate(Collection < Input >  inputs){
        SimpleURI  localGrakn  =  new  SimpleURI(“localhost”,48555);
        Grakn  grakn  =  new  Grakn(localGrakn);
        Keyspace  keyspace  =  Keyspace。of(“phone_calls”);
        Grakn。会话 会话 =  grakn。session(keyspace);

        输入。forEach(输入 - > {
            系统。出。的println(“[由加载”  +  输入。getDataPath()+  “]成Grakn ...”);
            尝试 {
                loadDataIntoGrakn(输入,会话);
            } catch(UnsupportedEncodingException  e){
                e。printStackTrace();
            }
        });

        会议。close();
    }

    static  Collection < Input >  initialiseInputs(){
        集合< Input >  inputs  =  new  ArrayList <>();

        //定义用于构建公司Graql插入查询的模板
        输入。add(new  Input(“data / companies”){
            @覆盖
            public  String  template(Json  company){
                返回 “插入$ company isa company has name”  +  公司。at(“name”)+  “;” ;
            }
        });
        //定义用于构造人Graql插入查询的模板
        输入。add(new  Input(“data / people”){
            @覆盖
            public  String  template(Json  person){
                //插入人
                String  graqlInsertQuery  =  “insert $ person isa person has phone-number”  +  person。at(“phone_number”);

                如果(个人。在(“FIRST_NAME” )。参考isNull()){
                    //人不是客户
                    graqlInsertQuery  + =  “has is-customer false” ;
                } else {
                    //人是顾客
                    graqlInsertQuery  + =  “has is-customer true” ;
                    graqlInsertQuery  + =  “有名字”  +  人。at(“first_name”);
                    graqlInsertQuery  + =  “有姓氏”  +  人。at(“last_name”);
                    graqlInsertQuery  + =  “有城市”  +  人。在(“城市”);
                    graqlInsertQuery  + =  “有年龄”  +  人。在(“年龄”)。asInteger();
                }

                graqlInsertQuery  + =  “;” ;
                return  graqlInsertQuery ;
            }
        });
        //定义用于构造合同的模板Graql插入查询
        输入。add(new  Input(“data / contracts”){
            @覆盖
            public  String  template(Json  contract){
                //匹配公司
                String  graqlInsertQuery  =  “匹配$ company isa company has name”  +  contract。at(“company_name”)+  “;” ;
                //匹配人
                graqlInsertQuery  + =  “$ customer isa person has phone-number”  +  contract。at(“person_id”)+  “;” ;
                //插入合同
                graqlInsertQuery  + =  “insert(provider:$ company,customer:$ customer)isa contract;” ;
                return  graqlInsertQuery ;
            }
        });
        //定义用于构造调用Graql插入查询的模板
        输入。add(new  Input(“data / calls”){
            @覆盖
            public  String  template(Json  call){
                //匹配来电者
                String  graqlInsertQuery  =  “match $ caller isa person has phone-number”  +  call。at(“caller_id”)+  “;” ;
                //匹配被叫者
                graqlInsertQuery  + =  “$ callee isa person has phone-number”  +  call。at(“callee_id”)+  “;” ;
                //插入电话
                graqlInsertQuery  + =  “insert $ call(caller:$ caller,callee:$ callee)isa call;”  +
                        “$ call已经开始”  +  来电。at(“started_at”)。asString()+  “;”  +
                        “$ call has duration”  +  call。在(“持续时间”)。asInteger()+  “;” ;
                return  graqlInsertQuery ;
            }
        });
        回报 输入 ;
    }

    / **
     *将csv数据加载到我们的Grakn phone_calls键空间:
     * 1.将数据项作为json对象列表获取
     * 2.对于每个json对象
     * 一个。创建Grakn事务
     * b。构造相应的Graql插入查询
     * C。运行查询
     * d。提交交易
     * e。关闭交易
     *
     * @param输入包含解析数据所需的详细信息
     * @param会话将创建一个事务
     * @throws UnsupportedEncodingException
     * /
    static  void  loadDataIntoGrakn(输入 输入,Grakn。会话 会话)抛出 UnsupportedEncodingException {
        ArrayList < Json >  items  =  parseDataToJson(输入); // 1
        物品。forEach(item  - > {
            Grakn。交易 tx  =  会话。交易(GraknTxType。WRITE); // 2a
            String  graqlInsertQuery  =  input。模板(项目); // 2b
            系统。出。println(“执行Graql查询:”  +  graqlInsertQuery);
            tx。graql()。解析(graqlInsertQuery)。execute(); // 2c
            tx。commit(); // 2d
            tx。close(); // 2e

        });
        系统。出。的println(“\ nInserted”  +  项目。大小()+  “由项目[”  +  输入。getDataPath()+  “]。到Grakn \ n”个);
    }

    / **
     * 1.通过流读取csv文件
     * 2.将每行解析为json对象
     * 3.将json对象添加到项列表中
     *
     * @param输入用于获取数据文件的路径,减去格式
     * @return json对象列表
     * @throws UnsupportedEncodingException
     * /
    static  ArrayList < Json >  parseDataToJson(输入 输入)抛出 UnsupportedEncodingException {
        ArrayList < Json >  items  =  new  ArrayList <>();

        CsvParserSettings  settings  =  new  CsvParserSettings();
        设置。setLineSeparatorDetectionEnabled(true);
        CsvParser  解析器 =  新的 CsvParser(设置);
        解析器。beginParsing(getReader(输入。getDataPath()+  “的.csv” )); // 1

        String [] columns  =  parser。parseNext();
        String [] row ;
        而((行 =  分析器。parseNext())!=  空){
            Json  item  =  Json。object();
            对于(诠释 我 =  0 ; 我 <  行。长度 ; 我++){
                项目。set(columns [ i ],row [ i ]); // 2
            }
            物品。add(item); // 3
        }
        返回 的项目 ;
    }

    public  static  Reader  getReader(String  relativePath)throws  UnsupportedEncodingException {
        返回 新 的InputStreamReader(CsvMigration。类。getClassLoader()。的getResourceAsStream(relativePath),“UTF-8”);
    }
}

加载时间

运行main方法,坐下来,放松并观察日志,同时数据开始涌入Grakn。

回顾一下

  • 我们从设置项目和定位数据文件开始。

  • 接下来,我们继续设置迁移机制,该机制独立于数据格式。

  • 然后,我们了解了如何将具有不同数据格式的文件解析为JSON对象。

  • 最后,我们运行了使用给定main方法触发connectAndMigrate方法的方法inputs。这将数据加载到我们的Grakn知识图中。

“怎么用Java客户端将数据加载到Grakn知识图中”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注亿速云网站,小编将为大家输出更多高质量的实用文章!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI