Skip to main content
Version: Next

Neo4j

Neo4j sink connector

Description

Write data to Neo4j.

neo4j-java-driver version 4.4.9

Key features

Options

nametyperequireddefault value
uriStringYes-
usernameStringNo-
passwordStringNo-
max_batch_sizeIntegerNo-
write_modeStringNoOneByOne
bearer_tokenStringNo-
kerberos_ticketStringNo-
databaseStringYes-
queryStringYes-
queryParamPositionObjectYes-
max_transaction_retry_timeLongNo30
max_connection_timeoutLongNo30
common-optionsconfigno-

uri [string]

The URI of the Neo4j database. Refer to a case: neo4j://localhost:7687

username [string]

username of the Neo4j

password [string]

password of the Neo4j. required if username is provided

max_batch_size[Integer]

max_batch_size refers to the maximum number of data entries that can be written in a single transaction when writing to a database.

write_mode

The default value is oneByOne, or set it to "Batch" if you want to have the ability to write in batches

unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age

"ttt" represents a batch of data.,"ttt" can be any arbitrary string as long as it matches the configured "batch_data_variable".

bearer_token [string]

base64 encoded bearer token of the Neo4j. for Auth.

kerberos_ticket [string]

base64 encoded kerberos ticket of the Neo4j. for Auth.

database [string]

database name.

query [string]

Query statement. contain parameter placeholders that are substituted with the corresponding values at runtime

queryParamPosition [object]

position mapping information for query parameters.

key name is parameter placeholder name.

associated value is position of field in input data row.

max_transaction_retry_time [long]

maximum transaction retry time(seconds). transaction fail if exceeded

max_connection_timeout [long]

The maximum amount of time to wait for a TCP connection to be established (seconds)

common options

Sink plugin common parameters, please refer to Sink Common Options for details

WriteOneByOneExample

sink {
Neo4j {
uri = "neo4j://localhost:7687"
username = "neo4j"
password = "1234"
database = "neo4j"

max_transaction_retry_time = 10
max_connection_timeout = 10

query = "CREATE (a:Person {name: $name, age: $age})"
queryParamPosition = {
name = 0
age = 1
}
}
}

WriteBatchExample

The unwind keyword provided by cypher supports batch writing, and the default variable for a batch of data is batch. If you write a batch write statement, then you should declare cypher:unwind $batch as row to do someting

sink {
Neo4j {
uri = "bolt://localhost:7687"
username = "neo4j"
password = "neo4j"
database = "neo4j"
max_batch_size = 1000
write_mode = "BATCH"

max_transaction_retry_time = 3
max_connection_timeout = 10

query = "unwind $batch as row create(n:MyLabel) set n.name = row.name,n.age = row.age"

}
}

Changelog

Change Log
ChangeCommitVersion
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Feature][Doris] Add Doris type converter (#6354)https://github.com/apache/seatunnel/commit/51899918432.3.6
[Feature][Core] Upgrade flink source translation (#5100)https://github.com/apache/seatunnel/commit/5aabb14a942.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Improve] Documentation and partial word optimization. (#4936)https://github.com/apache/seatunnel/commit/6e8de0e2a62.3.3
[Improve][connector-V2-Neo4j]Supports neo4j sink batch write and update docs (#4841)https://github.com/apache/seatunnel/commit/580276a8bd2.3.3
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][Neo4j] Unified exception for Neo4j source & sink connector (#3565)https://github.com/apache/seatunnel/commit/58584eefb12.3.0
[Feature][Connector][Neo4j] expose configurable options in Neo4j (#3342)https://github.com/apache/seatunnel/commit/efa04b38fe2.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f262.3.0
[Feature][Connector-v2] Neo4j source connector (#2777)https://github.com/apache/seatunnel/commit/38b0daf8b72.3.0
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be69b2.2.0-beta
[Feature][Connector-v2] Neo4j sink connector (#2434)https://github.com/apache/seatunnel/commit/950b27d1322.2.0-beta