The GQLDB Node.js driver provides a bulk import service for high-throughput data ingestion. Bulk import optimizes performance by batching operations and reducing overhead.
| Method | Description |
|---|---|
startBulkImport() | Start a bulk import session |
checkpoint() | Flush accumulated data to disk |
endBulkImport() | End the session with a final checkpoint |
abortBulkImport() | Cancel the session without saving |
getBulkImportStatus() | Get the current status of a session |
Initialize a bulk import session for a graph:
TypeScriptimport { GqldbClient, BulkImportSession, BulkImportOptions } from 'gqldb-nodejs'; async function startBulkImportExample(client: GqldbClient) { // Basic start const session: BulkImportSession = await client.startBulkImport('myGraph'); console.log('Session ID:', session.sessionId); console.log('Success:', session.success); // Start with options const options: BulkImportOptions = { checkpointEvery: 10000, // Auto-checkpoint every 10,000 records estimatedNodes: 1000000, // Hint for pre-allocating node ID cache estimatedEdges: 5000000 // Hint for edge batch sizing }; const optimizedSession = await client.startBulkImport('myGraph', options); }
TypeScriptinterface BulkImportOptions { checkpointEvery?: number; // Records between auto-checkpoints (0 = manual only) estimatedNodes?: number; // Hint for pre-allocating node ID cache estimatedEdges?: number; // Hint for edge batch sizing }
TypeScriptinterface BulkImportSession { success: boolean; sessionId: string; message: string; }
Use the session ID with insertNodes() and insertEdges():
TypeScriptasync function bulkInsertExample(client: GqldbClient) { const session = await client.startBulkImport('myGraph', { checkpointEvery: 50000 }); try { // Insert nodes in batches for (let batch = 0; batch < 100; batch++) { const nodes = generateNodeBatch(batch, 1000); // 1000 nodes per batch await client.insertNodes('myGraph', nodes, { bulkImportSessionId: session.sessionId }); } // Insert edges in batches for (let batch = 0; batch < 100; batch++) { const edges = generateEdgeBatch(batch, 5000); // 5000 edges per batch await client.insertEdges('myGraph', edges, { bulkImportSessionId: session.sessionId }); } // End with final checkpoint const result = await client.endBulkImport(session.sessionId); console.log(`Imported ${result.totalRecords} records`); } catch (error) { // Abort on error await client.abortBulkImport(session.sessionId); throw error; } }
Manually flush accumulated data to disk for durability:
TypeScriptimport { CheckpointResult } from 'gqldb-nodejs'; async function checkpointExample(client: GqldbClient) { const session = await client.startBulkImport('myGraph'); // Insert some data... await client.insertNodes('myGraph', nodes1, { bulkImportSessionId: session.sessionId }); // Checkpoint to ensure data is persisted const result: CheckpointResult = await client.checkpoint(session.sessionId); console.log('Checkpoint success:', result.success); console.log('Records since start:', result.recordCount); console.log('Records since last checkpoint:', result.lastCheckpointCount); console.log('Message:', result.message); // Continue importing... await client.insertNodes('myGraph', nodes2, { bulkImportSessionId: session.sessionId }); // Final checkpoint and end await client.endBulkImport(session.sessionId); }
TypeScriptinterface CheckpointResult { success: boolean; recordCount: number; // Total records since session start lastCheckpointCount: number; // Records since last checkpoint message: string; }
Complete the session with a final checkpoint:
TypeScriptimport { EndBulkImportResult } from 'gqldb-nodejs'; async function endBulkImportExample(client: GqldbClient) { const session = await client.startBulkImport('myGraph'); // ... insert data ... const result: EndBulkImportResult = await client.endBulkImport(session.sessionId); console.log('Success:', result.success); console.log('Total records:', result.totalRecords); console.log('Message:', result.message); }
TypeScriptinterface EndBulkImportResult { success: boolean; totalRecords: number; message: string; }
Cancel a session without saving uncommitted data:
TypeScriptimport { AbortBulkImportResult } from 'gqldb-nodejs'; async function abortBulkImportExample(client: GqldbClient) { const session = await client.startBulkImport('myGraph'); try { // ... insert data ... if (someErrorCondition) { const result: AbortBulkImportResult = await client.abortBulkImport(session.sessionId); console.log('Abort success:', result.success); console.log('Message:', result.message); return; } await client.endBulkImport(session.sessionId); } catch (error) { await client.abortBulkImport(session.sessionId); throw error; } }
TypeScriptinterface AbortBulkImportResult { success: boolean; message: string; }
Get the current status of a bulk import session:
TypeScriptimport { BulkImportStatus } from 'gqldb-nodejs'; async function checkStatusExample(client: GqldbClient) { const session = await client.startBulkImport('myGraph'); // ... insert some data ... const status: BulkImportStatus = await client.getBulkImportStatus(session.sessionId); console.log('Is active:', status.isActive); console.log('Graph name:', status.graphName); console.log('Record count:', status.recordCount); console.log('Last checkpoint count:', status.lastCheckpointCount); console.log('Created at:', new Date(status.createdAt)); console.log('Last activity:', new Date(status.lastActivity)); }
TypeScriptinterface BulkImportStatus { isActive: boolean; graphName: string; recordCount: number; lastCheckpointCount: number; createdAt: number; // Timestamp in milliseconds lastActivity: number; // Timestamp in milliseconds }
Choose appropriate batch sizes for optimal performance:
TypeScriptconst OPTIMAL_BATCH_SIZE = 10000; // Adjust based on your data async function efficientBulkImport(client: GqldbClient, allNodes: NodeData[]) { const session = await client.startBulkImport('myGraph', { checkpointEvery: 100000 }); try { // Process in batches for (let i = 0; i < allNodes.length; i += OPTIMAL_BATCH_SIZE) { const batch = allNodes.slice(i, i + OPTIMAL_BATCH_SIZE); await client.insertNodes('myGraph', batch, { bulkImportSessionId: session.sessionId }); // Progress logging if ((i + OPTIMAL_BATCH_SIZE) % 100000 === 0) { console.log(`Processed ${i + OPTIMAL_BATCH_SIZE} nodes`); } } await client.endBulkImport(session.sessionId); } catch (error) { await client.abortBulkImport(session.sessionId); throw error; } }
Implement checkpoint-based recovery:
TypeScriptasync function robustBulkImport(client: GqldbClient, data: NodeData[][]) { const session = await client.startBulkImport('myGraph'); let processedBatches = 0; try { for (const batch of data) { await client.insertNodes('myGraph', batch, { bulkImportSessionId: session.sessionId }); processedBatches++; // Checkpoint every 10 batches for recovery if (processedBatches % 10 === 0) { await client.checkpoint(session.sessionId); console.log(`Checkpoint at batch ${processedBatches}`); } } await client.endBulkImport(session.sessionId); } catch (error) { console.error(`Error at batch ${processedBatches}:`, error.message); // Data up to last checkpoint is safe const status = await client.getBulkImportStatus(session.sessionId); console.log(`Saved records: ${status.recordCount - status.lastCheckpointCount}`); await client.abortBulkImport(session.sessionId); throw error; } }
TypeScriptimport { GqldbClient, createConfig, NodeData, EdgeData } from 'gqldb-nodejs'; async function main() { const client = new GqldbClient(createConfig({ hosts: ['192.168.1.100:9000'] })); try { await client.login('admin', 'password'); // Create graph for bulk import await client.createGraph('bulkDemo'); // Start bulk import session const session = await client.startBulkImport('bulkDemo', { checkpointEvery: 10000, estimatedNodes: 100000, estimatedEdges: 500000 }); console.log('Started bulk import session:', session.sessionId); // Generate and insert nodes for (let batch = 0; batch < 10; batch++) { const nodes: NodeData[] = []; for (let i = 0; i < 10000; i++) { const id = batch * 10000 + i; nodes.push({ id: `user${id}`, labels: ['User'], properties: { name: `User ${id}`, index: id } }); } await client.insertNodes('bulkDemo', nodes, { bulkImportSessionId: session.sessionId }); console.log(`Inserted batch ${batch + 1}/10`); } // Check status const status = await client.getBulkImportStatus(session.sessionId); console.log('Current status:', status); // Manual checkpoint const checkpoint = await client.checkpoint(session.sessionId); console.log('Checkpoint:', checkpoint); // Generate and insert edges const edges: EdgeData[] = []; for (let i = 0; i < 50000; i++) { edges.push({ id: `edge${i}`, label: 'Knows', fromNodeId: `user${i}`, toNodeId: `user${(i + 1) % 100000}`, properties: {} }); } await client.insertEdges('bulkDemo', edges, { bulkImportSessionId: session.sessionId }); // End bulk import const result = await client.endBulkImport(session.sessionId); console.log('Bulk import completed:', result); // Verify await client.useGraph('bulkDemo'); const countResponse = await client.gql('MATCH (n) RETURN count(n)'); console.log('Total nodes:', countResponse.singleNumber()); // Clean up await client.dropGraph('bulkDemo'); } finally { await client.close(); } } main().catch(console.error);