Ultipa Spark Connector provides integration between Ultipa and Apache Spark through Ultipa Java SDK, facilitating the reading or writing of data from and to Ultipa within the Spark environment.
Built on the latest Spark DataSource API, Ultipa Spark Connector supports different languages for interacting with Spark, including Scala, Python, Java, and R. The examples provided in this manual are written in Scala; minor syntax adjustments may be required for other languages.
Before installing the Ultipa Spark Connector, ensure you have the right versions of Ultipa and Spark:
To import the dependency of the Ultipa Spark Connector, add the following code to your pom.xml file:
prom.xml<dependencies> <dependency> <groupId>com.ultipa</groupId> <artifactId>ultipa-spark-connector</artifactId> <version>1.0.0</version> </dependency> </dependencies>
You can read data from Ultipa into a Spark DataFrame by a node schema, an edge schema or a UQL query statement.
Spark does not support all property data types in Ultipa. Refer to the Data Type Conversion table for details.
Retrieve _id and all custom properties of nodes belonging to the specified schema.
Example: Read all nodes in the graphset Test with the schema Person
Scalaimport org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder().getOrCreate() val df = spark.read.format("com.ultipa.spark.DataSource") .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940") .option("auth.username","root") .option("auth.password","root") .option("graph","Test") .option("nodes","Person") .load() df.show()
Result:
| _id | name | gender |
|---|---|---|
| U001 | Alice | female |
| U002 | Bruce | male |
| U003 | Joe | male |
Retrieve _from, _to and all custom properties of edges belonging to the specified schema.
Example: Read all edges in the graphset Test with the schema Follows
Scalaimport org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder().getOrCreate() val df = spark.read.format("com.ultipa.spark.DataSource") .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940") .option("auth.username","root") .option("auth.password","root") .option("graph","Test") .option("edges","Follows") .load() df.show()
Result:
| _from | _to | since | level |
|---|---|---|---|
| U001 | U002 | 2019-12-15 12:10:09 | 1 |
| U003 | U001 | 2021-1-20 09:15:02 | 2 |
Retrieve data using a UQL query statement. The UQL query for reading must contain the RETURN clause, and you can return data with the type of ATTR or TABLE. Other types such as NODE, EDGE and PATH are not supported. Learn more about the types of returned data
Example: Read data in the graphset Test returned by a UQL statement
Scalaimport org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder().getOrCreate() val df = spark.read.format("com.ultipa.spark.DataSource") .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940") .option("auth.username","root") .option("auth.password","root") .option("graph","Test") .option("query","find().nodes() as n return n.name, n.gender") .load() df.show()
Result:
| n.name | n.gender |
|---|---|
| Alice | female |
| Bruce | male |
| Joe | male |
You can write a Spark DataFrame into Ultipa as either nodes or edges belonging to a single schema. Each column within the DataFrame will be mapped as a property of the nodes or edges, with the column name serving as the property name (except for the _id of nodes, and the _from/_to of edges). Non-existent properties will be created during the writing process.
The data type of each property is determined by the data type of the corresponding column within the DataFrame. Refer to the Data Type Conversion table for details.
Example: Write a DataFrame to the Person nodes in the graphset Test
Scalaimport org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder().getOrCreate() val data = Seq(("Alice", "Teacher", 25, 1.11), ("Bob", "Worker", 30, 2.22), ("Charlie", "Officer", 35, 3.33)) val df = spark.createDataFrame(data).toDF("name", "job", "age", "income") df.show() df.write.format("com.ultipa.spark.DataSource") .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940") .option("auth.username","root") .option("auth.password","root") .option("graph","Test") .option("nodes", "Person") .option("nodes.id", "name") .save()
Example: Write a DataFrame to the RelatesTo edges in the graphset Test
Scalaimport org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder().getOrCreate() val data = Seq(("Alice", "Bob", "couple"), ("Bob", "Charlie", "couple"), ("Charlie", "Alice", "friend")) val df = spark.createDataFrame(data).toDF("from", "to", "type") df.show() df.write.format("com.ultipa.spark.DataSource") .option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940") .option("auth.username","root") .option("auth.password","root") .option("graph","Test") .option("edges", "RelatesTo") .option("edges.from", "from") .option("edges.to", "to") .save()
In the Spark API, both the DataFrameReader and DataFrameWriter classes contain the option() method, which you can use to specify options for read and write operation.
Below are all the options supported in Ultipa Spark Connector:
General Options
Option Key | Default | Description | Optional |
|---|---|---|---|
| hosts | IP address(es) of the Ultipa server or cluster (comma-separated), or the host URL (excluding "https://" or "http://") | No | |
| auth.username | Username of the host | No | |
| auth.password | Password of the above user | No | |
| graph | default | Name of the graphset you want to connect | Yes |
| connection.timeout | 15 | Timeout threshold for requests (in seconds) | Yes |
| connection.connect.timeout | 2000 | Timeout threshold for connection (in milliseconds); each host will be attempted 3 times by default | Yes |
| connection.heartbeat | 10000 | Heartbeat milliseconds for all instances, set 0 to turn off heartbeat | Yes |
| connection.max.recv.size | 41943040 | Maximum bytes of the received data | Yes |
Read Options
Option Key | Default | Description | Optional |
|---|---|---|---|
| nodes | Name of a node schema | Yes | |
| edges | Name of an edge schema | Yes | |
| query | UQL query statement to read data | Yes |
Write Options
Option Key | Default | Description | Optional |
|---|---|---|---|
| nodes | Name of a node schema; if the specified schema does not exist, it will be created during write | Yes | |
| nodes.id | Name of the column in the DataFrame to be as the _id of the nodes | Yes | |
| edges | Name of an edge schema; if the specified schema does not exist, it will be created during write | Yes | |
| edges.from | Name of the column in the DataFrame to be as the _from of the edges | Yes | |
| edges.to | Name of the column in the DataFrame to be as the _to of the edges | Yes |
You can set the options for each connection, or specify global configurations in the Spark Session to avoid retyping the options each time. To do so, you can prepend the option key with ultipa. in the config() method.
Example: set global configurations for options hosts, auth.username, auth.password, graph and connection.timeout
Scalaimport org.apache.spark.sql.{SaveMode, SparkSession} val spark = SparkSession.builder() .config("ultipa.hosts", "192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940") .config("ultipa.auth.username","root") .config("ultipa.auth.password","root") .config("ultipa.graph", "Test") .config("ultipa.connection.timeout", 600) .getOrCreate() val dfPerson = spark.read.format("com.ultipa.spark.DataSource") .option("nodes", "Person") .load()
| Ultipa Property Type | Spark Data Type |
|---|---|
_id, _from, _to | StringType |
_uuid, _from_uuid, _to_uuid | LongType |
int32 | IntegerType |
uint32 | LongType |
int64 | LongType |
uint64 | StringType |
float | FloatType |
double | DoubleType |
decimal | |
string | StringType |
text | |
datetime | TimestampType |
timestamp | TimestampType |
point | |
blob | BinaryType |
list | |
set | |
ignore | NullType |
UNSET | NullType |
| _ | StringType |