Vue d’ensemble
Ultipa Spark Connector offre une intégration entre Ultipa et Apache Spark via Ultipa Java SDK, facilitant la lecture ou l'écriture de données depuis et vers Ultipa dans l’environnement Spark.
Basé sur la dernière API de Source de Données de Spark, le connecteur Ultipa Spark prend en charge différentes langues pour interagir avec Spark, y compris Scala, Python, Java et R. Les exemples fournis dans ce manuel sont écrits en Scala ; des ajustements syntaxiques mineurs peuvent être nécessaires pour d'autres langues.
Installation
Prérequis
Avant d'installer le connecteur Ultipa Spark, assurez-vous d'avoir les bonnes versions d'Ultipa et Spark :
- Ultipa v4.x (v4.3 et supérieur), que ce soit en instance unique ou en cluster
- Spark 2.4.8 avec Scala 2.12
Importer la Dépendance
Pour importer la dépendance du connecteur Ultipa Spark, ajoutez le code suivant à votre fichier pom.xml :
<dependencies>
<dependency>
<groupId>com.ultipa</groupId>
<artifactId>ultipa-spark-connector</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
Lecture
Vous pouvez lire des données d'Ultipa dans un DataFrame Spark par un node schema, un edge schema ou une requête UQL.
Spark ne prend pas en charge tous les types de donnée de property dans Ultipa. Reportez-vous au tableau Conversion de Type de Données pour plus de détails.
Lecture par Node Schema
Récupérez _id
et toutes les custom properties des nodes appartenant au schema spécifié.
Exemple : Lire tous les nodes dans le graphset Test avec le schema Person
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("nodes","Person")
.load()
df.show()
Résultat :
_id | name | gender |
---|---|---|
U001 | Alice | female |
U002 | Bruce | male |
U003 | Joe | male |
Lecture par Edge Schema
Récupérez _from
, _to
et toutes les custom properties des edges appartenant au schema spécifié.
Exemple : Lire tous les edges dans le graphset Test avec le schema Follows
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("edges","Follows")
.load()
df.show()
Résultat :
_from | _to | since |
level |
---|---|---|---|
U001 | U002 | 2019-12-15 12:10:09 | 1 |
U003 | U001 | 2021-1-20 09:15:02 | 2 |
Lecture par UQL
Récupérez des données en utilisant une requête UQL. La requête UQL pour la lecture doit contenir l'instruction RETURN, et vous pouvez retourner des données du type ATTR ou TABLE. D'autres types tels que NODE, EDGE et PATH ne sont pas pris en charge. En savoir plus sur les types de données retournées
Exemple : Lire des données dans le graphset Test retournées par une instruction UQL
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("query","find().nodes() as n return n.name, n.gender")
.load()
df.show()
Résultat :
n.name | n.gender |
---|---|
Alice | female |
Bruce | male |
Joe | male |
Écriture
Vous pouvez écrire un DataFrame Spark dans Ultipa en tant que nodes ou edges appartenant à un seul schema. Chaque colonne du DataFrame sera mappée comme une property des nodes ou edges, le nom de la colonne servant de nom de property (sauf pour le _id
des nodes, et le _from
/_to
des edges). Les properties inexistantes seront créées durant le processus d'écriture.
Le type de chaque property est déterminé par le type de la colonne correspondante dans le DataFrame. Reportez-vous au tableau Conversion de Type de Données pour plus de détails.
Écriture par Node Schema
Exemple : Écrire un DataFrame dans les nodes Person du graphset Test
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val data = Seq(("Alice", "Teacher", 25, 1.11), ("Bob", "Worker", 30, 2.22), ("Charlie", "Officer", 35, 3.33))
val df = spark.createDataFrame(data).toDF("name", "job", "age", "income")
```scala
df.show()
df.write.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("nodes", "Person")
.option("nodes.id", "name")
.save()
Écriture par Edge Schema
Exemple : Écrire un DataFrame dans les edges RelatesTo du graphset Test
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val data = Seq(("Alice", "Bob", "couple"), ("Bob", "Charlie", "couple"), ("Charlie", "Alice", "friend"))
val df = spark.createDataFrame(data).toDF("from", "to", "type")
df.show()
df.write.format("com.ultipa.spark.DataSource")
.option("hosts","192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.option("auth.username","root")
.option("auth.password","root")
.option("graph","Test")
.option("edges", "RelatesTo")
.option("edges.from", "from")
.option("edges.to", "to")
.save()
Configurations
Options
Dans l'API Spark, les classes DataFrameReader
et DataFrameWriter
contiennent la méthode option()
, que vous pouvez utiliser pour spécifier les options pour les opérations de lecture et d'écriture.
Voici toutes les options prises en charge dans le connecteur Ultipa Spark :
Options Générales
Option Key |
Default |
Description | Optional |
---|---|---|---|
hosts | Adresse(s) IP du serveur ou cluster Ultipa (séparées par des virgules), ou l'URL de l'hôte (excluant "https://" ou "http://") | Non | |
auth.username | Nom d'utilisateur de l'hôte | Non | |
auth.password | Mot de passe de l'utilisateur ci-dessus | Non | |
graph | default | Nom du graphset que vous souhaitez connecter | Oui |
connection.timeout | 15 | Seuil de temps pour les requêtes (en secondes) | Oui |
connection.connect.timeout | 2000 | Seuil de temps pour la connexion (en millisecondes) ; chaque hôte sera tenté 3 fois par défaut | Oui |
connection.heartbeat | 10000 | Millisecondes du heartbeat pour toutes les instances, mettez 0 pour désactiver le heartbeat | Oui |
connection.max.recv.size | 41943040 | Nombre maximum d'octets des données reçues | Oui |
Options de Lecture
Option Key |
Default |
Description | Optional |
---|---|---|---|
nodes | Nom d'un node schema | Oui | |
edges | Nom d'un edge schema | Oui | |
query | Instruction UQL pour lire les données | Oui |
Options d'Écriture
Option Key |
Default |
Description | Optional |
---|---|---|---|
nodes | Nom d'un node schema ; si le schema spécifié n'existe pas, il sera créé lors de l’écriture | Oui | |
nodes.id | Nom de la colonne du DataFrame à utiliser comme _id des nodes |
Oui | |
edges | Nom d'un edge schema ; si le schema spécifié n'existe pas, il sera créé lors de l’écriture | Oui | |
edges.from | Nom de la colonne du DataFrame à utiliser comme _from des edges |
Oui | |
edges.to | Nom de la colonne du DataFrame à utiliser comme _to des edges |
Oui |
Configurations Globales
Vous pouvez définir les options pour chaque connexion, ou spécifier des configurations globales dans la session Spark pour éviter de retaper les options chaque fois. Pour ce faire, vous pouvez préfixer la clé d'option avec ultipa.
dans la méthode config()
.
Exemple : définir des configurations globales pour les options hosts
, auth.username
, auth.password
, graph
et connection.timeout
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.config("ultipa.hosts", "192.168.1.56:63940,192.168.1.57:63940,192.168.1.58:63940")
.config("ultipa.auth.username","root")
.config("ultipa.auth.password","root")
.config("ultipa.graph", "Test")
.config("ultipa.connection.timeout", 600)
.getOrCreate()
val dfPerson = spark.read.format("com.ultipa.spark.DataSource")
.option("nodes", "Person")
.load()
Conversion de Type de Données
Type de Property Ultipa | Type de Donnée Spark |
---|---|
_id , _from , _to |
StringType |
_uuid , _from_uuid , _to_uuid |
LongType |
int32 |
IntegerType |
uint32 |
LongType |
int64 |
LongType |
uint64 |
StringType |
float |
FloatType |
double |
DoubleType |
decimal |
|
string |
StringType |
text |
|
datetime |
TimestampType |
timestamp |
TimestampType |
point |
|
blob |
BinaryType |
list |
|
set |
|
ignore |
NullType |
UNSET |
NullType |
_ | StringType |