UltipaDocs
Products
Solutions
Resources
Company
Start Free Trial
UltipaDocs
Start Free Trial
  • CLI
  • Monitor
  • Spark Connector
  • JDBC Driver
  • LDAP
  • Migrator
  1. Docs
  2. /
  3. Ultipa Tools

Spark Connector

Overview

Ultipa Spark Connector provides integration between Ultipa and Apache Spark through the Spark DataSource V2 API, enabling reading and writing of graph data as Spark DataFrames. It supports full-scan reads, GQL query reads, node/edge writes with INSERT, UPSERT, and OVERWRITE modes, and all Ultipa property types.

The connector supports Scala, Python, Java, and R. The examples in this guide are written in Scala.

Installation

Prerequisites

  • Ultipa v5.x
  • Spark 3.4+ with Scala 2.12
  • Java 11+

Import Dependency

Add the following dependency to your pom.xml:

XML
<dependencies>
  <dependency>
    <groupId>com.ultipa</groupId>
    <artifactId>ultipa-spark-connector</artifactId>
    <version>2.0.0</version>
  </dependency>
</dependencies>

Or use the fat JAR directly with spark-submit:

Bash
spark-submit --jars ultipa-spark-connector-2.0.0.jar your-app.jar

Reading

Read by Node Schema

Read all nodes of a schema into a DataFrame:

Scala
val persons = spark.read.format("ultipa")
  .option("host", "10.0.0.1")
  .option("port", "60061")
  .option("user", "root")
  .option("password", "root")
  .option("graph", "social")
  .option("entityType", "node")
  .option("schema", "Person")
  .load()

persons.show()

Read by Edge Schema

Read all edges of a schema into a DataFrame:

Scala
val follows = spark.read.format("ultipa")
  .option("host", "10.0.0.1")
  .option("port", "60061")
  .option("user", "root")
  .option("password", "root")
  .option("graph", "social")
  .option("entityType", "edge")
  .option("schema", "Follows")
  .load()

Read via GQL Query

Read data using a GQL query statement:

Scala
val results = spark.read.format("ultipa")
  .option("host", "10.0.0.1")
  .option("port", "60061")
  .option("user", "root")
  .option("password", "root")
  .option("graph", "social")
  .option("query", "MATCH (n:Person) WHERE n.age > 25 RETURN n._id, n.name, n.age")
  .load()

Writing

Write Nodes

Write a DataFrame as nodes using SaveMode.Append (INSERT):

Scala
df.write.format("ultipa")
  .option("host", "10.0.0.1")
  .option("port", "60061")
  .option("user", "root")
  .option("password", "root")
  .option("graph", "social")
  .option("entityType", "node")
  .option("schema", "Person")
  .mode(SaveMode.Append)
  .save()

Write Edges

Scala
df.write.format("ultipa")
  .option("host", "10.0.0.1")
  .option("port", "60061")
  .option("user", "root")
  .option("password", "root")
  .option("graph", "social")
  .option("entityType", "edge")
  .option("schema", "Follows")
  .mode(SaveMode.Append)
  .save()

Write Modes

ModeBehavior
SaveMode.AppendInserts new data.
SaveMode.OverwriteDrops existing data of the schema and re-inserts.
Custom writeMode=upsertUpserts data. Edges require a key constraint.

To use upsert mode:

Scala
df.write.format("ultipa")
  .option("host", "10.0.0.1")
  .option("port", "60061")
  .option("user", "root")
  .option("password", "root")
  .option("graph", "social")
  .option("entityType", "node")
  .option("schema", "Person")
  .option("writeMode", "upsert")
  .mode(SaveMode.Append)
  .save()

Options

General Options

Option
DefaultDescription
hostIP address of the Ultipa server.
port60061Port number of the Ultipa server.
userUsername for authentication.
passwordPassword for authentication.
graphName of the graph to connect to.
useTlsfalseEnables TLS encryption.
certPathPath to the TLS certificate file.

Read Options

Option
DefaultDescription
entityTypeEntity type to read: node or edge.
schemaSchema name to read.
queryGQL query statement for reading data.

Write Options

Option
DefaultDescription
entityTypeEntity type to write: node or edge.
schemaSchema name to write to. Non-existent schemas are created automatically.
writeModeinsertWrite mode: insert or upsert.
batchSize10000Number of records per batch write.

Data Type Conversion

Ultipa Property TypeSpark Data Type
int32, uint32IntegerType
int64, uint64LongType
floatFloatType
doubleDoubleType
string, textStringType
boolBooleanType
datetime, timestampTimestampType
dateDateType
blobBinaryType
decimalDecimalType
listArrayType(StringType)
mapMapType(StringType, StringType)
point, point3dStringType (WKT format)
recordStringType (JSON format)