Import\Load data into Neo4j using Load CSV command

Posted on Posted in Data & Business Intelligence

In the previous post we showed how we migrated our user base data into Neo4j using neo4j-inport tool. We created 2.65B nodes and 6.2B relationships in ~16 hours (not including index time)

In this post, we will show how we used load csv command to load data. For our use case, after we finished with neo4j-import, we needed to roll the changes that happened on the source databases (Oracle LDAP, MySQL [EjabberD], Riak, etc.)

We can’t use neo4j-import / neo4j-admin import tools, since you can only use it on an empty database:

Things to consider when using load csv

Important things to take into account while using load csv command:

  • Neo4j database size or more accurate, transaction logs size. Since usually after a load csv command there’s a merge/create statement (one or even more), you need to look out for the transaction log size. The transaction logs record all operations in the database and used for backup and recover and for cluster operations. One way to overcome a lot of transaction logs is to temporary modify dbms.tx_log.rotation.retention_policy (Example: dbms.tx_log.rotation.retention_policy=1G size). It’s important to reverse this change later.
  • Parallel execution –  For faster load time, you might want to execute the load csv command in several threads (each thread will handle a different file). You need to make sure that you can load in parallel (no dependencies in data or if there’s a dependency then the code – usually Cypher query, should know how to handle it)
  • Cluster leader can change – A write operation must go via the leader of the cluster (on causal cluster) . A heavy load operation can lead to an overloaded node, which may cause the other cluster members to elect a new leader (especially if the node is not responding to cluster heart beat messages). You will want to use bolt_routing in order to make sure that you’re executing the command on the leader.
  • Periodic commit – If you’re loading a lot of data, you might want to add periodic commit. That means that each X rows we will commit the transaction and keep on loading. This will  prevent running out of memory when importing large amounts of data, but it will also break transaction isolation.
  • Duplicates – Since we might get an error while loading a file (leader had changed or Cypher exceptions – see below), we need to make sure that our Cypher  queries are duplicate-proof. That’s because we will try to re-load the file later and since part of it was already committed (periodic commit), we may create duplicates, and no one wants that.

 

 Execute load csv in parallel – our script

We noticed that when executing a load csv command, we only used less than 5% of CPU. So we thought, why not execute in parallel?

So we wrote a python script which gives you:

  • You can control how many concurrent threads (parallel load csv command) to run using the parallel_degree constant (minimum 1, maximum – depends on number of CPU cores. We used 15)
  • You can define different handling per file. Since we had several csv files with different data, we needed different Cypher queries per file. In order to achieve that we created the COMMANDS array: The array contains the prefix of the file and an array of commands to execute. That way we have the flexibility to execute different load csv command on the same file and to add more file prefix with different Cypher queries easily.
  • Logging – Each time the script is running we create a new log for that execution. We write to the log the name of the file that is being processed, which thread number is handling it, which Cypher query is being executed and statistics about it (see below)
  • Statistics – When executing the load csv command using the Python driver we’re not getting any statistics back. So we are using the neo_stat procedure which queries current status of Neo4j: number of nodes and number of relationships. That’s not perfect, but it gives you a general idea of what is going on during the load. See “Monitoring and statistics” section below for examples.
  • Error handling – Each exception (Cypher or other) is logged to the log and the file which failed will go to a different folder (FAILED_FOLDER) so we could process it later, if needed. See “Common errors” section below for examples.

 

We used python3.4 which is (currently) the default python version for Ubuntu 14.04.2

Install Python drivers:

The load script:

Monitoring and statistics

The script generates a log file (per execution). See LOG_PATH.

This is example of the log:

In the above example, “0”-  is the thread number

Common errors

Errors – while loading in parallel, we saw 3 main Cypher exceptions:

which means that we tried to access the same node/ index entry twice from 2 different threads. Since we are using parallel load, we didn’t find a way of avoiding these errors, but we managed to decrease the error rate by sorting the csv by the node id column –

that way, statistically if a node was to repeat itself (with multiple relationships, for example) if would be in the same file.

Leave a Reply

Your email address will not be published. Required fields are marked *