UltipaDocs
Try Playground
  • CLI
  • Monitor
  • Spark Connector
  • LDAP
  • Migrator
  1. Docs
  2. /
  3. Ultipa Tools

Spark Connector

Overview

Ultipa Spark Connector provides integration between Ultipa and Apache Spark through Ultipa Java SDK, facilitating the reading or writing of data from and to Ultipa within the Spark environment.

Built on the latest Spark DataSource API, Ultipa Spark Connector supports different languages for interacting with Spark, including Scala, Python, Java, and R. The examples provided in this manual are written in Scala; minor syntax adjustments may be required for other languages.

Installation

Prerequisites

Before installing the Ultipa Spark Connector, ensure you have the right versions of Ultipa and Spark:

  • Ultipa v4.x (v4.3 and above), whether run as a single instance, or as a cluster
  • Spark 2.4.8 with Scala 2.12

Import Dependency

To import the dependency of the Ultipa Spark Connector, add the following code to your pom.xml file:

prom.xml
<dependencies>
  <dependency>
    <groupId>com.ultipa</groupId>
    <artifactId>ultipa-spark-connector</artifactId>
    <version>1.0.0</version>
  </dependency>
</dependencies>

Reading

You can read data from Ultipa into a Spark DataFrame by a node schema, an edge schema or a UQL query statement.

Spark does not support all property data types in Ultipa. Refer to the Data Type Conversion table for details.

Read by Node Schema

Retrieve _id and all custom properties of nodes belonging to the specified schema.

Example: Read all nodes in the graphset Test with the schema Person

Scala
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("com.ultipa.spark.DataSource")
  .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
  .option("auth.username","root")
  .option("auth.password","root")
  .option("graph","Test")
  .option("nodes","Person")
  .load()

df.show()

Result:

_idnamegender
U001Alicefemale
U002Brucemale
U003Joemale

Read by Edge Schema

Retrieve _from, _to and all custom properties of edges belonging to the specified schema.

Example: Read all edges in the graphset Test with the schema Follows

Scala
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("com.ultipa.spark.DataSource")
  .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
  .option("auth.username","root")
  .option("auth.password","root")
  .option("graph","Test")
  .option("edges","Follows")
  .load()

df.show()

Result:

_from_to
since
level
U001U0022019-12-15 12:10:091
U003U0012021-1-20 09:15:022

Read by UQL

Retrieve data using a UQL query statement. The UQL query for reading must contain the RETURN clause, and you can return data with the type of ATTR or TABLE. Other types such as NODE, EDGE and PATH are not supported. Learn more about the types of returned data

Example: Read data in the graphset Test returned by a UQL statement

Scala
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("com.ultipa.spark.DataSource")
  .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
  .option("auth.username","root")
  .option("auth.password","root")
  .option("graph","Test")
  .option("query","find().nodes() as n return n.name, n.gender")
  .load()

df.show()

Result:

n.namen.gender
Alicefemale
Brucemale
Joemale

Writing

You can write a Spark DataFrame into Ultipa as either nodes or edges belonging to a single schema. Each column within the DataFrame will be mapped as a property of the nodes or edges, with the column name serving as the property name (except for the _id of nodes, and the _from/_to of edges). Non-existent properties will be created during the writing process.

The data type of each property is determined by the data type of the corresponding column within the DataFrame. Refer to the Data Type Conversion table for details.

Write by Node Schema

Example: Write a DataFrame to the Person nodes in the graphset Test

Scala
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val data = Seq(("Alice", "Teacher", 25, 1.11), ("Bob", "Worker", 30, 2.22), ("Charlie", "Officer", 35, 3.33))

val df = spark.createDataFrame(data).toDF("name", "job", "age", "income")
df.show()

df.write.format("com.ultipa.spark.DataSource")
  .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
  .option("auth.username","root")
  .option("auth.password","root")
  .option("graph","Test")
  .option("nodes", "Person")
  .option("nodes.id", "name")
  .save()

Write by Edge Schema

Example: Write a DataFrame to the RelatesTo edges in the graphset Test

Scala
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val data = Seq(("Alice", "Bob", "couple"), ("Bob", "Charlie", "couple"), ("Charlie", "Alice", "friend"))

val df = spark.createDataFrame(data).toDF("from", "to", "type")
df.show()

df.write.format("com.ultipa.spark.DataSource")
  .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
  .option("auth.username","root")
  .option("auth.password","root")
  .option("graph","Test")
  .option("edges", "RelatesTo")
  .option("edges.from", "from")
  .option("edges.to", "to")
  .save()

Configurations

Options

In the Spark API, both the DataFrameReader and DataFrameWriter classes contain the option() method, which you can use to specify options for read and write operation.

Below are all the options supported in Ultipa Spark Connector:

General Options

Option Key
Default
Description
Optional
hostsIP address(es) of the Ultipa server or cluster (comma-separated), or the host URL (excluding "https://" or "http://")No
auth.usernameUsername of the hostNo
auth.passwordPassword of the above userNo
graphdefaultName of the graphset you want to connectYes
connection.timeout15Timeout threshold for requests (in seconds)Yes
connection.connect.timeout2000Timeout threshold for connection (in milliseconds); each host will be attempted 3 times by defaultYes
connection.heartbeat10000Heartbeat milliseconds for all instances, set 0 to turn off heartbeatYes
connection.max.recv.size41943040Maximum bytes of the received dataYes

Read Options

Option Key
Default
Description
Optional
nodesName of a node schemaYes
edgesName of an edge schemaYes
queryUQL query statement to read dataYes

Write Options

Option Key
Default
Description
Optional
nodesName of a node schema; if the specified schema does not exist, it will be created during writeYes
nodes.idName of the column in the DataFrame to be as the _id of the nodesYes
edgesName of an edge schema; if the specified schema does not exist, it will be created during writeYes
edges.fromName of the column in the DataFrame to be as the _from of the edgesYes
edges.toName of the column in the DataFrame to be as the _to of the edgesYes

Global Configurations

You can set the options for each connection, or specify global configurations in the Spark Session to avoid retyping the options each time. To do so, you can prepend the option key with ultipa. in the config() method.

Example: set global configurations for options hosts, auth.username, auth.password, graph and connection.timeout

Scala
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder()
  .config("ultipa.hosts", "192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
  .config("ultipa.auth.username","root")
  .config("ultipa.auth.password","root")
  .config("ultipa.graph", "Test")
  .config("ultipa.connection.timeout", 600)
  .getOrCreate()

val dfPerson = spark.read.format("com.ultipa.spark.DataSource")
  .option("nodes", "Person")
  .load()

Data Type Conversion

Ultipa Property TypeSpark Data Type
_id, _from, _toStringType
_uuid, _from_uuid, _to_uuidLongType
int32IntegerType
uint32LongType
int64LongType
uint64StringType
floatFloatType
doubleDoubleType
decimal
stringStringType
text
datetimeTimestampType
timestampTimestampType
point
blobBinaryType
list
set
ignoreNullType
UNSETNullType
_StringType