您的位置:首页 > 其它

关于Neo4j和Cypher批量更新和批量插入优化的5个建议

2017-12-04 16:40 302 查看
原文链接: http://jexp.de/blog/2017/03/5-tips-tricks-for-fast-batched-updates-of-graph-structures-with-neo4j-and-cypher

注:我在测试后,对原文中的部分Cypher语句进行修改,使得其符合语法规则

当通过程序向图形化数据库中写入大量数据的时候,你会希望它能够高效的处理。

低效的方式

下面这些方式不是十分有效:

- 将值直接写入到语句中,而不是通过参数的方式

- 每一个更新都通过一个Transaction发送一个请求

- 通过一个Transaction发送大量的单个请求

- 生成一个巨大复杂的语句(几百行),然后通过一个Transaction进行提交

- 在一个Transaction中,发送一个巨大的请求,会导致OOM错误

正确的方式

你需要构造尽可能小的请求,并且语句格式固定(这样可以利用缓存),然后通过参数方式进行使用。

每一个请求可以只修改一个属性,或者修改整个子图(上百个节点),但是它的语句结构必须是一致的,否则就不能使用缓存。

UNWIND – 救星

为了实现这个目标,你只需要在你单次请求的前面加上一个UNWIND语句。UNWIND会将大量的数据(高达10k或者50k条)分散成一行一行的,每一行都会包含每一次更新所需要的全部信息。

你添加一个{batch}参数,并且将它的值设置成一个Map列表,其中可以包含你的数据(10k或者50k条)。这些数据会被打包成一个完整的请求,并且符合语法结构,还用上了缓存(因为其结构一致)。

语法结构

输入:

{batch: [{row1},{row2},{row3},...10k]}
1
[/code]

语句:

UNWIND {batch} as row

// 基于每一行的Map数据,编写更新语句
1
2
3
[/code]

示例

下面是一些示例

创建节点并写入属性

数据:

{batch: [{name:"Alice",age:32},{name:"Bob",age:
1315f
42}]}
1
[/code]

语句:

UNWIND {batch} as row
CREATE (n:Label)
SET n.name = row.name, n.age = row.age
1
2
3
[/code]

Merge节点并写入属性

数据:

{batch: [{id:"alice@example.com",properties:{name:"Alice",age:32}},{id:"bob@example.com",properties:{name:"Bob",age:42}}]}
1
[/code]

语句:

UNWIND {batch} as row
MERGE (n:Label {id:row.id})
(ON CREATE) SET n.name = row.properties.name, n.age = row.properties.age
1
2
3
[/code]

寻找节点,创建/Merge关系,并写入属性

数据:

{batch: [{from:"alice@example.com",to:"bob@example.com",properties:{since:2012}},{from:"alice@example.com",to:"charlie@example.com",properties:{since:2016}}]}
1
[/code]

语句:

UNWIND {batch} as row
MATCH (from:Label {from:row.from})
MATCH (to:Label {to:row.to})
CREATE/MERGE (from)-[rel:KNOWS]->(to)
(ON CREATE) SET rel.since = row.properties.since
1
2
3
4
5
[/code]

通过id或者id列表找节点

对于多叉树很好用

在这里我们只传入了一个单独的属性
created
。实际上你可以不传入任何属性,或者传入一个map的属性来进行更新。

数据:

{batch: [{from:123,to:[44,12,128],created:"2016-01-13"}, {from:34,to:[23,35,2983],created:"2016-01-15"},...]}
1
[/code]

语句:

UNWIND {batch} as row
MATCH (from) WHERE id(from) = row.from
MATCH (to) WHERE id(from) IN row.to // list of ids
CREATE/MERGE (from)-[rel:FOO]->(to)
SET rel.created = row.created
1
2
3
4
5
[/code]

更快更高效

下面是一些更多的技巧。

你可以传入一个Map,其中的key是节点id或者关系id。这样以来,通过id查找会变得更高效。

通过id更新已有的节点

数据:

{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}
1
[/code]

语句:

WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH (n) WHERE id(n) IN ids

// 单个属性更新
SET n.count = data[toString(id(n))]
1
2
3
4
5
[/code]

通过id更新已有的关系

数据:

{ batch : [{"1":334,"2":222,3:3840, ... 100k}]}
1
[/code]

语句:

WITH {batch} as data, [k in keys({batch}) | toInt(k)] as ids
MATCH ()-[rel]->() WHERE id(rel) IN ids
SET rel.foo = data[toString(id(rel))]
1
2
3
[/code]

有条件的创建数据

有些时候,你希望根据输入动态的创建数据。但是Cypher目前没有诸如
WHEN
或者
IF
的条件语句,
CASE WHEN
也只是一个表达式,因此,你必须使用一个我多年前想出来的技巧。

Cypher提供
FOREACH
语句,用来遍历列表中的每一个元素并分别执行更新操作。于是,一个包含0个元素或者1个元素的列表则可以看成一个条件表达式。因为当0个元素的时候,就不会执行遍历,而当1个元素的时候,就只执行一次遍历。

大致思路如下:

...
FOREACH (_ IN CASE WHEN predicate THEN [true] ELSE [] END |
... update operations ....
)
1
2
3
4
[/code]

其中,列表中的
true
值可以是其他任何值,
42,"",null
等等。只要它是一个值,那么我们就可以得到一个非空的列表。

相似的,你也可以使用
RANGE(1, CASE WHEN predicate THEN 1 ELSE 0 END)
。当predicate的值为false的时候,就会范围一个空列表。或者,如果你喜欢使用
filter
,那么也可以通过
filter(_ IN [1] WHERE predicate)
来构造。

下面是一个完整的示例:

LOAD CSV FROM {url} AS row
MATCH (o:Organization {name:row.org})
FOREACH (_ IN case when row.type = 'Person' then [1] else [] end|
MERGE (p:Person {name:row.name})
CREATE (p)-[:WORKS_FOR]->(o)
)
FOREACH (_ IN case when row.type = 'Agency' then [1] else [] end|
MERGE (a:Agency {name:row.name})
CREATE (a)-[:WORKS_FOR]->(o)
)
1
2
3
4
5
6
7
8
9
10
[/code]

需要注意的是,在
FOREACH
内部创建的变量无法在外部访问。你需要再重新查询一次,或者你需要再
FOREACH
内完成全部更新操作。

使用APOC库

APOC库提供了很多有用的方法供你使用。在这里,我推荐下面3个方法:

创建节点和关系,并且可以动态设定标签和属性

批量提交和更新

动态创建或者操作Map,并赋给属性

动态创建节点和关系

通过
apoc.create.node
apoc.create.relationship
你可以动态的计算节点标签,关系类型和任意的属性。

标签是一个String数组

属性就是一个Map

UWNIND {batch} as row
CALL apoc.create.node(row.labels, row.properties) yield node
RETURN count(*)
1
2
3
[/code]

apoc.create.*
方法中,也提供了设置/更新/删除属性和标签的功能。

UWNIND {batch} as row
MATCH (from) WHERE id(n) = row.from
MATCH (to:Label) where to.key = row.to
CALL apoc.create.relationship(from, row.type, row.properties, to) yield rel
RETURN count(*)
1
2
3
4
5
[/code]

批量提交

在一开始j就提到了,大量的提交Transaction是有问题的。你可以用2G-4G的heap来更新百万条记录,但当量级更大了之后就会很困难了。在使用32G的heap下,我最大的Transaction可以达到10M的节点。

这时,
apoc.periodic.iterate
可以提供很大的帮助。

它的原理很简单:你有两个Cypher语句,第一条语句能够提供可操纵的数据并产生巨大的数据流,第二条语句执行真正的更新操作,它对每一个数据都进行一次更新操作,但是它只在处理一定数量的数据后才创建一个新的Transaction。

打个比方,假如你第一条语句返回了五百万个需要更新的节点,如果使用内部语句的话,那么每一个节点都会进行一次更新操作。但是如果你设置批处理大小为10k的话,那么每一个Transaction会批量更新10k的节点。

如果你的更新操作是相互独立的话(创建节点,更新属性或者更新独立的子图),那么你可以添加
parallel:true
来充分利用cpu。

比方说,你想计算多个物品的评分,并通过批处理的方式来更新属性,你应该按下面这样操作

call apoc.periodic.iterate('
MATCH (n:User)-[r1:LIKES]->(thing)<-[r2:RATED]-(m:User) WHERE id(n)<id(m) RETURN thing, avg( r1.rating + r2.rating ) as score
','
WITH {thing} as t SET t.score = {score}
', {batchSize:10000, parallel:true})
1
2
3
4
5
[/code]

动态创建/更新Map

尽管Cypher为列表提供了相当遍历的操作,如
range, collect, unwind, reduce, extract, filter, size
等,但Map在有的时候也是需要进行创建和更改的。

apoc.map.*
提供了一系列的方法来简化这个过程。

通过其他数据创建Map:

RETURN apoc.map.fromPairs([["alice",38],["bob",42],...​])
// {alice:38, bob: 42, ...}

RETURN apoc.map.fromLists(["alice","bob",...],[38,42])
// {alice:38, bob: 42, ...}

// groups nodes, relationships, maps by key, good for quick lookups by that key
RETURN apoc.map.groupBy([{name:"alice",gender:"female"},{name:"bob",gender:"male"}],"gender")
// {female:{name:"alice",gender:"female"}, male:{name:"bob",gender:"male"}}

RETURN apoc.map.groupByMulti([{name:"alice",gender:"female"},{name:"bob",gender:"male"},{name:"Jane",gender:"female"}],"gender")
// {female:[{name:"alice",gender:"female"},{name:"jane",gender:"female"}], male:[{name:"bob",gender:"male"}]}
1
2
3
4
5
6
7
8
9
10
11
12
[/code]

更新Map:

RETURN apoc.map.merge({alice: 38},{bob:42})
// {alice:38, bob: 42}

RETURN apoc.map.setKey({alice:38},"bob",42)
// {alice:38, bob: 42}

RETURN apoc.map.removeKey({alice:38, bob: 42},"alice")
// {bob: 42}

RETURN apoc.map.removeKey({alice:38, bob: 42},["alice","bob","charlie"])
// {}

// remove the given keys and values, good for data from load-csv/json/jdbc/xml
RETURN apoc.map.clean({name: "Alice", ssn:2324434, age:"n/a", location:""},["ssn"],["n/a",""])
// {name:"Alice"}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[/code]

结论

通过上面这些方式,我能够快速的执行更新操作。当然,你也可以组合这些方法,来实现更复杂的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: