UltipaDocs
Try Playground
  • Introduction
    • Quick Start
    • Configuration
    • Connection and Session
    • Executing Queries
    • Graph Management
    • Transactions
    • Data Operations
    • Bulk Import
    • Data Export
    • Health and Administration
    • Response Processing
    • Data Types
    • Error Handling
    • Quick Start
    • Configuration
    • Connection and Session
    • Executing Queries
    • Graph Management
    • Transactions
    • Data Operations
    • Bulk Import
    • Data Export
    • Health and Administration
    • Response Processing
    • Data Types
    • Error Handling
    • Quick Start
    • Configuration
    • Connection and Session
    • Executing Queries
    • Graph Management
    • Transactions
    • Data Operations
    • Bulk Import
    • Data Export
    • Health and Administration
    • Response Processing
    • Data Types
    • Error Handling
    • Quick Start
    • Configuration
    • Connection and Session
    • Executing Queries
    • Graph Management
    • Transactions
    • Data Operations
    • Bulk Import
    • Data Export
    • Health and Administration
    • Response Processing
    • Data Types
    • Error Handling
  1. Docs
  2. /
  3. Ultipa Drivers
  4. /
  5. Go

Bulk Import

The GQLDB Go driver provides bulk import functionality for high-throughput data ingestion with optimized write performance.

Bulk Import Methods

MethodDescription
StartBulkImport(ctx, graphName, opts)Start a bulk import session
Checkpoint(ctx, sessionID)Flush data to disk
EndBulkImport(ctx, sessionID)End session with final checkpoint
AbortBulkImport(ctx, sessionID)Cancel session without final sync
GetBulkImportStatus(ctx, sessionID)Get session status

Basic Usage

Go
import (
    "context"

    gqldb "github.com/gqldb/gqldb-go"
)

ctx := context.Background()

// Start bulk import session
opts := &gqldb.BulkImportOptions{
    CheckpointEvery: 10000,  // Auto-checkpoint every 10,000 records
}

session, err := client.StartBulkImport(ctx, "myGraph", opts)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Session ID: %s\n", session.SessionID)

// Insert nodes with bulk import session ID
nodeConfig := &gqldb.InsertNodesConfig{
    BulkImportSessionID: session.SessionID,
}

for _, batch := range nodeBatches {
    _, err := client.InsertNodes(ctx, "myGraph", batch, nodeConfig)
    if err != nil {
        client.AbortBulkImport(ctx, session.SessionID)
        log.Fatal(err)
    }
}

// Insert edges
edgeConfig := &gqldb.InsertEdgesConfig{
    BulkImportSessionID: session.SessionID,
}

for _, batch := range edgeBatches {
    _, err := client.InsertEdges(ctx, "myGraph", batch, edgeConfig)
    if err != nil {
        client.AbortBulkImport(ctx, session.SessionID)
        log.Fatal(err)
    }
}

// End session (final checkpoint)
result, err := client.EndBulkImport(ctx, session.SessionID)
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Import complete: %d total records\n", result.TotalRecords)

Starting a Bulk Import Session

StartBulkImport()

Go
opts := &gqldb.BulkImportOptions{
    CheckpointEvery: 10000,       // Records between auto-checkpoints (0 = manual only)
    EstimatedNodes:  1000000,     // Hint for pre-allocating node ID cache
    EstimatedEdges:  5000000,     // Hint for edge batch sizing
    MemtableSize:    67108864,    // Memtable size in bytes (default: 64MB)
    MaxMemtables:    4,           // Max immutable memtables before stall
}

session, err := client.StartBulkImport(ctx, "myGraph", opts)

BulkImportSession Struct

Go
type BulkImportSession struct {
    SessionID string
    Success   bool
    Message   string
}

Checkpointing

Manual Checkpoint

Go
// Manually flush data to disk
result, err := client.Checkpoint(ctx, session.SessionID)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Checkpoint complete:\n")
fmt.Printf("  Records flushed: %d\n", result.RecordCount)
fmt.Printf("  Message: %s\n", result.Message)

Auto-Checkpoint

With CheckpointEvery set, checkpoints happen automatically:

Go
opts := &gqldb.BulkImportOptions{
    CheckpointEvery: 50000,  // Checkpoint every 50,000 records
}

session, _ := client.StartBulkImport(ctx, "myGraph", opts)

// Auto-checkpoints happen during inserts
nodeConfig := &gqldb.InsertNodesConfig{BulkImportSessionID: session.SessionID}
for _, batch := range batches {
    client.InsertNodes(ctx, "myGraph", batch, nodeConfig)
}

Ending a Bulk Import

EndBulkImport()

Complete the session with a final checkpoint:

Go
result, err := client.EndBulkImport(ctx, session.SessionID)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Bulk import completed:\n")
fmt.Printf("  Success: %v\n", result.Success)
fmt.Printf("  Total records: %d\n", result.TotalRecords)
fmt.Printf("  Message: %s\n", result.Message)

AbortBulkImport()

Cancel without final sync (discards unflushed data):

Go
result, err := client.AbortBulkImport(ctx, session.SessionID)
if err != nil {
    log.Printf("Abort failed: %v", err)
}
fmt.Printf("Bulk import aborted: %s\n", result.Message)

Monitoring Status

GetBulkImportStatus()

Go
status, err := client.GetBulkImportStatus(ctx, session.SessionID)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Session: %s\n", status.GraphName)
fmt.Printf("Active: %v\n", status.IsActive)
fmt.Printf("Records: %d\n", status.RecordCount)
fmt.Printf("Last checkpoint: %d\n", status.LastCheckpointCount)
fmt.Printf("Created: %s\n", status.CreatedAt)
fmt.Printf("Last activity: %s\n", status.LastActivity)

Batch Processing Pattern

Go
func batchGenerator(items []*gqldb.NodeData, batchSize int) [][]*gqldb.NodeData {
    var batches [][]*gqldb.NodeData
    for i := 0; i < len(items); i += batchSize {
        end := i + batchSize
        if end > len(items) {
            end = len(items)
        }
        batches = append(batches, items[i:end])
    }
    return batches
}

func bulkImportData(ctx context.Context, client *gqldb.Client, graphName string,
    nodes []*gqldb.NodeData, edges []*gqldb.EdgeData, batchSize int) (*gqldb.EndBulkImportResult, error) {

    opts := &gqldb.BulkImportOptions{
        CheckpointEvery: int32(batchSize * 10),
    }

    session, err := client.StartBulkImport(ctx, graphName, opts)
    if err != nil {
        return nil, err
    }

    // Import nodes
    nodeConfig := &gqldb.InsertNodesConfig{BulkImportSessionID: session.SessionID}
    totalNodes := int64(0)

    for _, batch := range batchGenerator(nodes, batchSize) {
        result, err := client.InsertNodes(ctx, graphName, batch, nodeConfig)
        if err != nil {
            client.AbortBulkImport(ctx, session.SessionID)
            return nil, err
        }
        totalNodes += result.NodeCount
        fmt.Printf("Imported %d nodes...\n", totalNodes)
    }

    // Import edges
    edgeConfig := &gqldb.InsertEdgesConfig{BulkImportSessionID: session.SessionID}
    totalEdges := int64(0)

    edgeBatches := batchEdges(edges, batchSize)
    for _, batch := range edgeBatches {
        result, err := client.InsertEdges(ctx, graphName, batch, edgeConfig)
        if err != nil {
            client.AbortBulkImport(ctx, session.SessionID)
            return nil, err
        }
        totalEdges += result.EdgeCount
        fmt.Printf("Imported %d edges...\n", totalEdges)
    }

    // Complete
    return client.EndBulkImport(ctx, session.SessionID)
}

Result Structs

CheckpointResult

Go
type CheckpointResult struct {
    Success             bool
    RecordCount         int64
    LastCheckpointCount int64
    Message             string
}

EndBulkImportResult

Go
type EndBulkImportResult struct {
    Success      bool
    TotalRecords int64
    Message      string
}

AbortBulkImportResult

Go
type AbortBulkImportResult struct {
    Success bool
    Message string
}

BulkImportStatus

Go
type BulkImportStatus struct {
    IsActive            bool
    GraphName           string
    RecordCount         int64
    LastCheckpointCount int64
    CreatedAt           string
    LastActivity        string
}

Complete Example

Go
package main

import (
    "context"
    "fmt"
    "log"
    "time"

    gqldb "github.com/gqldb/gqldb-go"
)

func generateTestData(numNodes, numEdges int) ([]*gqldb.NodeData, []*gqldb.EdgeData) {
    nodes := make([]*gqldb.NodeData, numNodes)
    for i := 0; i < numNodes; i++ {
        nodes[i] = &gqldb.NodeData{
            ID:     fmt.Sprintf("n%d", i),
            Labels: []string{"TestNode"},
            Properties: map[string]interface{}{
                "index": int64(i),
                "value": fmt.Sprintf("value_%d", i),
            },
        }
    }

    edges := make([]*gqldb.EdgeData, numEdges)
    for i := 0; i < numEdges; i++ {
        edges[i] = &gqldb.EdgeData{
            Label:      "TestEdge",
            FromNodeID: fmt.Sprintf("n%d", i%numNodes),
            ToNodeID:   fmt.Sprintf("n%d", (i+1)%numNodes),
            Properties: map[string]interface{}{
                "weight": float64(i) * 0.1,
            },
        }
    }

    return nodes, edges
}

func main() {
    config := gqldb.NewConfigBuilder().
        Hosts("192.168.1.100:9000").
        Timeout(5 * time.Minute).
        Build()

    client, err := gqldb.NewClient(config)
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    ctx := context.Background()
    client.Login(ctx, "admin", "password")
    client.CreateGraph(ctx, "bulkDemo", gqldb.GraphTypeOpen, "")
    client.UseGraph(ctx, "bulkDemo")

    // Generate test data
    fmt.Println("=== Generating Test Data ===")
    numNodes := 100000
    numEdges := 500000
    nodes, edges := generateTestData(numNodes, numEdges)
    fmt.Printf("  Generated %d nodes and %d edges\n", len(nodes), len(edges))

    // Start bulk import
    fmt.Println("\n=== Starting Bulk Import ===")
    startTime := time.Now()

    opts := &gqldb.BulkImportOptions{
        CheckpointEvery: 50000,
        EstimatedNodes:  int32(numNodes),
        EstimatedEdges:  int32(numEdges),
    }

    session, err := client.StartBulkImport(ctx, "bulkDemo", opts)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("  Session ID: %s\n", session.SessionID)

    // Import nodes in batches
    fmt.Println("\n=== Importing Nodes ===")
    batchSize := 10000
    importedNodes := int64(0)
    nodeConfig := &gqldb.InsertNodesConfig{BulkImportSessionID: session.SessionID}

    for i := 0; i < len(nodes); i += batchSize {
        end := i + batchSize
        if end > len(nodes) {
            end = len(nodes)
        }
        batch := nodes[i:end]

        result, err := client.InsertNodes(ctx, "bulkDemo", batch, nodeConfig)
        if err != nil {
            client.AbortBulkImport(ctx, session.SessionID)
            log.Fatal(err)
        }
        importedNodes += result.NodeCount

        if importedNodes%50000 == 0 {
            status, _ := client.GetBulkImportStatus(ctx, session.SessionID)
            fmt.Printf("  Progress: %d nodes, pending: %d\n", importedNodes, status.RecordCount)
        }
    }
    fmt.Printf("  Total nodes imported: %d\n", importedNodes)

    // Manual checkpoint before edges
    fmt.Println("\n=== Checkpoint Before Edges ===")
    cpResult, _ := client.Checkpoint(ctx, session.SessionID)
    fmt.Printf("  Flushed %d records\n", cpResult.RecordCount)

    // Import edges in batches
    fmt.Println("\n=== Importing Edges ===")
    importedEdges := int64(0)
    edgeConfig := &gqldb.InsertEdgesConfig{BulkImportSessionID: session.SessionID}

    for i := 0; i < len(edges); i += batchSize {
        end := i + batchSize
        if end > len(edges) {
            end = len(edges)
        }
        batch := edges[i:end]

        result, err := client.InsertEdges(ctx, "bulkDemo", batch, edgeConfig)
        if err != nil {
            client.AbortBulkImport(ctx, session.SessionID)
            log.Fatal(err)
        }
        importedEdges += result.EdgeCount

        if importedEdges%100000 == 0 {
            fmt.Printf("  Progress: %d edges\n", importedEdges)
        }
    }
    fmt.Printf("  Total edges imported: %d\n", importedEdges)

    // End bulk import
    fmt.Println("\n=== Completing Bulk Import ===")
    endResult, err := client.EndBulkImport(ctx, session.SessionID)
    if err != nil {
        log.Fatal(err)
    }

    elapsed := time.Since(startTime)
    fmt.Printf("  Completed in %.2f seconds\n", elapsed.Seconds())
    fmt.Printf("  Total records: %d\n", endResult.TotalRecords)

    // Verify
    fmt.Println("\n=== Verification ===")
    response, _ := client.Gql(ctx, "MATCH (n:TestNode) RETURN count(n)", nil)
    nodeCount, _ := response.SingleInt()
    fmt.Printf("  Node count: %d\n", nodeCount)

    response, _ = client.Gql(ctx, "MATCH ()-[e:TestEdge]->() RETURN count(e)", nil)
    edgeCount, _ := response.SingleInt()
    fmt.Printf("  Edge count: %d\n", edgeCount)

    // Cleanup
    client.DropGraph(ctx, "bulkDemo", true)
}