zoukankan      html  css  js  c++  java
  • Neo4j导入数据的三种方式

      导入Neo4j的方式有三种,大概如下:

    1. 读取外部文件到内存中,然后使用create 语句导入之后建立关系。

    2. 用load csv 读取csv 文件

    3. 从JDBC直接load 到neo4j 中

    这里测试导入的场景如下:

    主要包含四个字段: 卡号|用户名称|转账卡号|转让金额;主要逻辑是: 采用序号递增,从0 - 10000,用户名称也是从"user" + 0-10000。转账卡号是转给下一个节点,转账金额也是递增。

      下面的构造数据统一采用从内存中构造数据,构造1W条数据,也就是1W个node,1W条关系。

        private static final Integer DATA_SIZE = 10000;
    
        private static List<Map<String, Object>> generateData() {
            List<Map<String, Object>> datas = new ArrayList<>(DATA_SIZE);
            Map<String, Object> tmpMap = null;
            for (int i = 0; i < DATA_SIZE; i++) {
                tmpMap = new HashMap<>();
                datas.add(tmpMap);
                tmpMap.put("cardNum", i);
                tmpMap.put("userName", "user" + i);
                tmpMap.put("transferCardNum", (i + 1) % 10000); // 每个卡给自己的下一个卡转钱
                tmpMap.put("transferAmount", i);
            }
            return datas;
        }

    0. pom 引入如下依赖

            <!-- neo4j 相关的API -->
            <dependency>
                <groupId>org.neo4j.driver</groupId>
                <artifactId>neo4j-java-driver</artifactId>
                <version>4.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.neo4j</groupId>
                <artifactId>neo4j</artifactId>
                <version>3.3.4</version>
            </dependency>

    1. 外部文件导入neo4j

      这里导入文件读入文件的过程忽略掉,从内存中模拟1W条数据。

        /**
         * 测试手动插入数据以及维护关系
         */
        private static void inertNeo4jTest() {
            // 构造数据, 数据和pg 库里面的数据一样
            List<Map<String, Object>> datas = generateData();
    
            Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j."));
            Session session = driver.session();
    
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            // 手动create to neo4j
            String createCQLTemplate = "create (n:transferDetail {cardNum: '$cardNum', userName: '$userName', transferCardNum: '$transferCardNum', transferAmount: '$transferAmount'})";
            datas.forEach(data -> {
                String createCQL = createCQLTemplate.replace("$cardNum", MapUtils.getString(data, "cardNum"))
                        .replace("$userName", MapUtils.getString(data, "userName"))
                        .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum"))
                        .replace("$transferAmount", MapUtils.getString(data, "transferAmount"));
                session.run(createCQL);
            });
            System.out.println("插入成功耗时: " + stopWatch.getTime() + " ms");
            // 手动维护关系
            String mergeCQLTemplate = "match (a:transferDetail{cardNum: '$cardNum1'}), (b:transferDetail{cardNum: '$cardNum2'}) MERGE(a)-[:TRANSFER{transferAmount: '$transferAmount'}]->(b)";
            datas.forEach(data -> {
                String mergeCQL = mergeCQLTemplate.replace("$cardNum1", MapUtils.getString(data, "cardNum"))
                        .replace("$cardNum2", MapUtils.getString(data, "transferCardNum"))
                        .replace("$transferAmount", MapUtils.getString(data, "transferAmount"));
                session.run(mergeCQL);
            });
            stopWatch.stop();
            System.out.println("转换关系成功,耗时: " + stopWatch.getTime() + " ms");
    
            // close resource
            session.close();
            driver.close();
        }

      代码逻辑很简单,从内存构造1W条数据-》create 到 neo4j -》手动维护关系。 测试结果耗时如下:

    插入成功耗时: 50976 ms
    转换关系成功,耗时: 285111 ms

    可以看到消耗时间大概为不到5min。到neo4j 查看数据如下:

    MATCH (n:transferDetail) RETURN count(n)

    2. 测试load csv

    1. pom 引入如下依赖:

            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-csv</artifactId>
                <version>1.3</version>
            </dependency>

    2. 测试方法: 生成csv 文件,然后load csv

        private static void loadCSVtest() throws Exception {
            // 读取CSV 文件
    //        Reader fileReader = new FileReader(fileName);
    //        Iterable<CSVRecord> records = CSVFormat.RFC4180.withFirstRecordAsHeader().parse(fileReader);
    //        for (CSVRecord record : records) {
    //            System.out.println(record.get("instanceId") + record.get("regionId") + record.get("zoneId"))
    //        }
            // 1. 写入一个csv 到本地, 构造相同的数据
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            Appendable fileWriter = new FileWriter("E:\neo4j3.5\neo4j-community-3.5.5\import\transfer.csv");
            CSVPrinter printer = CSVFormat.RFC4180.withHeader("cardnum", "username", "transfercardnum", "transferamount").print(fileWriter);
            List<Map<String, Object>> datas = generateData();
            datas.forEach(data -> {
                try {
                    printer.printRecord(MapUtils.getString(data, "cardNum"), MapUtils.getString(data, "userName"), MapUtils.getString(data, "transferCardNum"), MapUtils.getString(data, "transferAmount"));
                } catch (IOException ignore) {
                    // ignore
                }
            });
            printer.close();
            System.out.println("csv 文件输出完成, 耗时: " + stopWatch.getTime());
    
            // 2. csv load 到neo4j
            Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j."));
            Session session = driver.session();
            String deleteCQL = "match (n:transferdetail) detach delete n";
            session.run(deleteCQL);
            System.out.println("neo4j 清空数据库成功, 耗时: " + stopWatch.getTime() + "ms");
            String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique";
            session.run(constraintCQL);
            String createCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as line create(n:transferdetail{cardnum:line.cardnum, username:line.username, transfercardnum:line.transfercardnum, transferamount:line.transferamount})";
            session.run(createCQL);
            String relateCQL = "load csv WITH HEADERS from 'file:///transfer.csv' as row match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)";
            session.run(relateCQL);
            stopWatch.stop();
            System.out.println("load csv导入成功, 耗时: " + stopWatch.getTime() + "ms");
            session.close();
            driver.close();
    
        }

    结果: 可以看到非常的快,比自己手动创建节点然后建立关系快多了

    csv 文件输出完成, 耗时: 269
    neo4j 清空数据库成功, 耗时: 2730ms
    load csv导入成功, 耗时: 3070ms

    3. 测试从RDBMS 从加载数据

      这里采用apoc 从jdbc 加载数据。这里采用从PG数据库加载数据。

    1. 首先下载apoc 插件

    https://github.com/neo4j-contrib/neo4j-apoc-procedures/releases

    2. 将下载的jar 包和pg 的驱动包放到 %neo4j%plugins 目录下,如下:

    3. 修改%neo4j-community-3.5.5%conf文件夹下面neo4j.conf文件, 最后增加如下配置:

    dbms.security.procedures.unrestricted=apoc.*
    apoc.export.file.enabled=true

    4. 重启neo4j server

    5. 查看apoc 版本: 查看到版本证明apoc 插件安装成功

    return apoc.version()

    结果:

     6. 修改程序采用apoc 从jdbc load 数据

    (1) 增加pg 驱动

            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.2.2</version>
            </dependency>

    (2) 编写测试类

        public static final void inertFromPGTest() throws Exception {
            // 1. 构造数据
            List<Map<String, Object>> datas = generateData();
    
            // 2. 插入到PG库
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();
            Connection connection = null;
            try {
                Class.forName("org.postgresql.Driver");
                connection = DriverManager
                        .getConnection("jdbc:postgresql://127.0.0.1:5432/qlq_test",
                                "postgres", "postgres");
            } catch (Exception e) {
                e.printStackTrace();
            }
            Assert.notNull(connection, "链接失败");
            java.sql.Statement statement = connection.createStatement();
            // 清空数据库
            String trunacteSQL = "truncate table transferdetail";
            statement.execute(trunacteSQL);
            System.out.println("删除pg数据库成功, 耗时: " + stopWatch.getTime() + "ms");
            // 插入数据
            List<String> insertSQLs = new ArrayList<>();
            String valueSQL = "('$cardNum', '$userName', '$transferCardNum', '$transferAmount')";
            datas.forEach(data -> {
                insertSQLs.add(valueSQL.replace("$cardNum", MapUtils.getString(data, "cardNum"))
                        .replace("$userName", MapUtils.getString(data, "userName"))
                        .replace("$transferCardNum", MapUtils.getString(data, "transferCardNum"))
                        .replace("$transferAmount", MapUtils.getString(data, "transferAmount")));
            });
            String sql = "insert into transferdetail(cardNum, userName, transferCardNum, transferAmount) values";
            sql += StringUtils.join(insertSQLs, ",");
            statement.execute(sql);
            System.out.println("插入pg数据库成功, 耗时: " + stopWatch.getTime() + "ms");
    
            // 3. PG 库转换到neo4j 库
            Driver driver = GraphDatabase.driver("bolt://localhost:7687", AuthTokens.basic("neo4j", "neo4j."));
            Session session = driver.session();
            String deleteCQL = "match (n:transferdetail) detach delete n";
            session.run(deleteCQL);
            System.out.println("neo4j 清空数据库成功, 耗时: " + stopWatch.getTime() + "ms");
            String constraintCQL = "create constraint on (n:transferdetail) ASSERT n.cardnum is unique";
            session.run(constraintCQL);
    //        cypher = "CALL apoc.periodic.iterate("CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\"SELECT * FROM transferdetail\")","MERGE(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * MERGE(m:transferdetail{cardnum:row.transfercardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount}) with * create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)",{batchSize:10000,iterateList:true})";  //连接postgresSQL数据库和设计创建neo4j图数据库数据模型
            String createCQL = "CALL apoc.periodic.iterate("CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\"SELECT * FROM transferdetail\")","create(n:transferdetail{cardnum:row.cardnum, username:row.username, transfercardnum:row.transfercardnum, transferamount:row.transferamount})",{batchSize:10000,iterateList:true})";  //连接postgresSQL数据库和设计创建neo4j图数据库数据模型
            session.run(createCQL);
            String relateCQL = "CALL apoc.periodic.iterate("CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\"SELECT * FROM transferdetail\")","match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) merge (n)-[:transfer{transferamount:row.transferamount}]->(m)",{batchSize:10000,iterateList:true})";  //连接postgresSQL数据库和设计创建neo4j图数据库数据模型
    //        String cypher2 = "CALL apoc.periodic.iterate("CALL apoc.load.jdbc('jdbc:postgresql://10.95.24.33:5432/qlq_test?user=postgres&password=afim36owsx&characterEncoding=utf-8',\"SELECT * FROM transferdetail\")","match(n:transferdetail{cardnum:row.cardnum}),(m:transferdetail{cardnum:row.transfercardnum}) create p=(n)-[r:transfer{transferamount:row.transferamount}]->(m)",{batchSize:10000,iterateList:true})";  //连接postgresSQL数据库和设计创建neo4j图数据库数据模型
            session.run(relateCQL);
            stopWatch.stop();
            System.out.println("apoc导入成功, 耗时: " + stopWatch.getTime() + "ms");
    
            session.close();
            driver.close();
        }

    测试结果如下:

    删除pg数据库成功, 耗时: 546ms
    插入pg数据库成功, 耗时: 976ms
    neo4j 清空数据库成功, 耗时: 2578ms
    apoc导入成功, 耗时: 3013ms

      可以得出结论,从jdbc load 数据和load csv 从性能上差不多;两者都快于手动创建和维护关系。

    补充: 关于neo4j 4.1 导入

      在neo4j 4.1 采用apoc 导入的时候,报错无法找到JDBC的driver,jdbc 的驱动包确实是放到plugins 目录了。 未找到原因,最终的解决办法是采用官方提供的一个通用驱动包,下载地址:

    https://github.com/neo4j-contrib/neo4j-jdbc/releases

    APOC文档: https://neo4j.com/labs/apoc/4.1/

    apoc git: https://github.com/neo4j-contrib/neo4j-apoc-procedures

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    洛谷 P1934 封印
    洛谷 P2426 删数
    洛谷 P3399 丝绸之路
    SQL SERVER镜像配置,无法将 ALTER DATABASE 命令发送到远程服务器实例的解决办法
    Step7:SQL Server 多实例下的复制
    Step6:SQL Server 数据变更时间戳(timestamp)在复制中的运用
    Step5:SQL Server 跨网段(跨机房)FTP复制
    Step4:SQL Server 跨网段(跨机房)复制
    Step3 SQL Server 通过备份文件初始化复制
    Step2:SQL Server 复制事务发布
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14774488.html
Copyright © 2011-2022 走看看