Ultipa Spark Connector provides integration between Ultipa and Apache Spark through the Spark DataSource V2 API, enabling reading and writing of graph data as Spark DataFrames. It supports full-scan reads, GQL query reads, node/edge writes with INSERT, UPSERT, and OVERWRITE modes, and all Ultipa property types.
The connector supports Scala, Python, Java, and R. The examples in this guide are written in Scala.
Add the following dependency to your pom.xml:
XML<dependencies> <dependency> <groupId>com.ultipa</groupId> <artifactId>ultipa-spark-connector</artifactId> <version>2.0.0</version> </dependency> </dependencies>
Or use the fat JAR directly with spark-submit:
Bashspark-submit --jars ultipa-spark-connector-2.0.0.jar your-app.jar
Read all nodes of a schema into a DataFrame:
Scalaval persons = spark.read.format("ultipa") .option("host", "10.0.0.1") .option("port", "60061") .option("user", "root") .option("password", "root") .option("graph", "social") .option("entityType", "node") .option("schema", "Person") .load() persons.show()
Read all edges of a schema into a DataFrame:
Scalaval follows = spark.read.format("ultipa") .option("host", "10.0.0.1") .option("port", "60061") .option("user", "root") .option("password", "root") .option("graph", "social") .option("entityType", "edge") .option("schema", "Follows") .load()
Read data using a GQL query statement:
Scalaval results = spark.read.format("ultipa") .option("host", "10.0.0.1") .option("port", "60061") .option("user", "root") .option("password", "root") .option("graph", "social") .option("query", "MATCH (n:Person) WHERE n.age > 25 RETURN n._id, n.name, n.age") .load()
Write a DataFrame as nodes using SaveMode.Append (INSERT):
Scaladf.write.format("ultipa") .option("host", "10.0.0.1") .option("port", "60061") .option("user", "root") .option("password", "root") .option("graph", "social") .option("entityType", "node") .option("schema", "Person") .mode(SaveMode.Append) .save()
Scaladf.write.format("ultipa") .option("host", "10.0.0.1") .option("port", "60061") .option("user", "root") .option("password", "root") .option("graph", "social") .option("entityType", "edge") .option("schema", "Follows") .mode(SaveMode.Append) .save()
| Mode | Behavior |
|---|---|
SaveMode.Append | Inserts new data. |
SaveMode.Overwrite | Drops existing data of the schema and re-inserts. |
Custom writeMode=upsert | Upserts data. Edges require a key constraint. |
To use upsert mode:
Scaladf.write.format("ultipa") .option("host", "10.0.0.1") .option("port", "60061") .option("user", "root") .option("password", "root") .option("graph", "social") .option("entityType", "node") .option("schema", "Person") .option("writeMode", "upsert") .mode(SaveMode.Append) .save()
Option | Default | Description |
|---|---|---|
host | IP address of the Ultipa server. | |
port | 60061 | Port number of the Ultipa server. |
user | Username for authentication. | |
password | Password for authentication. | |
graph | Name of the graph to connect to. | |
useTls | false | Enables TLS encryption. |
certPath | Path to the TLS certificate file. |
Option | Default | Description |
|---|---|---|
entityType | Entity type to read: node or edge. | |
schema | Schema name to read. | |
query | GQL query statement for reading data. |
Option | Default | Description |
|---|---|---|
entityType | Entity type to write: node or edge. | |
schema | Schema name to write to. Non-existent schemas are created automatically. | |
writeMode | insert | Write mode: insert or upsert. |
batchSize | 10000 | Number of records per batch write. |
| Ultipa Property Type | Spark Data Type |
|---|---|
int32, uint32 | IntegerType |
int64, uint64 | LongType |
float | FloatType |
double | DoubleType |
string, text | StringType |
bool | BooleanType |
datetime, timestamp | TimestampType |
date | DateType |
blob | BinaryType |
decimal | DecimalType |
list | ArrayType(StringType) |
map | MapType(StringType, StringType) |
point, point3d | StringType (WKT format) |
record | StringType (JSON format) |