GraphForge Architecture Overview¶
Version: 0.1.1 Last Updated: 2026-01-30 Status: Phase 4 Complete (TCK Compliant Query Engine)
Executive Summary¶
GraphForge is an embedded, openCypher-compatible graph engine designed as a "graph workbench" for research, investigative, and analytical workflows. The architecture prioritizes:
- Correctness over performance (strict openCypher TCK compliance)
- Developer experience (Python-first, zero-config, notebook-friendly)
- Simplicity (embedded, single-file storage, no server)
- Inspectability (observable query plans, transparent storage)
High-Level Architecture¶
┌─────────────────────────────────────────────────────────────┐
│ GraphForge API │
│ (src/graphforge/api.py) │
│ │
│ db = GraphForge("my-graph.db") │
│ results = db.execute("MATCH (n:Person) RETURN n") │
└─────────────────────────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────┐
│ Query Processing Pipeline │
└────────────────────────────────────────┘
│
┌────────────────────┼─────────────────────┐
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌──────────┐
│ Parser │ │ Planner │ │ Executor │
└────────┘ └─────────┘ └──────────┘
│ │ │
▼ ▼ ▼
┌────────┐ ┌─────────┐ ┌──────────┐
│ AST │ ────▶ │ Logical │ ────▶ │ Result │
│ │ │ Plan │ │ Rows │
└────────┘ └─────────┘ └──────────┘
│
▼
┌──────────────┐
│ Graph │
│ Storage │
└──────────────┘
│
▼
┌──────────────┐
│ SQLite │
│ (WAL Mode) │
└──────────────┘
Component Details¶
1. GraphForge API (src/graphforge/api.py)¶
Purpose: High-level user-facing interface
Responsibilities: - Database lifecycle management (open/close) - Query execution orchestration - Error handling and exception mapping
Example:
from graphforge import GraphForge
# In-memory graph (current implementation)
db = GraphForge()
# Durable graph (future: Phase 5)
db = GraphForge("analysis.db")
# Execute queries
results = db.execute("""
MATCH (p:Person)-[:KNOWS]->(f:Person)
WHERE p.age > 30
RETURN p.name, f.name
ORDER BY p.name
LIMIT 10
""")
for row in results:
print(row["p.name"], row["f.name"])
2. Parser (src/graphforge/parser/)¶
Purpose: Convert Cypher query strings to Abstract Syntax Tree (AST)
Implementation: Lark parser with EBNF grammar
Files:
- parser.py - Lark transformer that builds AST nodes
- cypher.lark - openCypher grammar definition
Supported Syntax (v1.0+): - MATCH patterns (nodes, relationships, directionality) - WHERE clause (boolean logic, comparisons, property access) - RETURN clause with aliasing (AS keyword) - ORDER BY clause (single/multi-key, ASC/DESC) - SKIP and LIMIT - Aggregation functions (COUNT, SUM, AVG, MIN, MAX)
Example AST:
# Query: MATCH (n:Person) WHERE n.age > 30 RETURN n.name AS name
CypherQuery(
clauses=[
MatchClause(
patterns=[[
NodePattern(
variable='n',
labels=['Person'],
properties={}
)
]]
),
WhereClause(
predicate=Comparison(
left=PropertyAccess(variable='n', property='age'),
op='>',
right=Literal(value=CypherInt(30))
)
),
ReturnClause(
items=[
ReturnItem(
expression=PropertyAccess(variable='n', property='name'),
alias='name'
)
]
)
]
)
See: docs/open_cypher_ast_logical_plan_spec_v_1.md
3. Planner (src/graphforge/planner/)¶
Purpose: Convert AST to executable logical plan
Implementation: Rule-based planner (no cost-based optimization)
Files:
- planner.py - AST → Logical Plan conversion
- operators.py - Logical plan operator definitions
Logical Operators:
@dataclass
class ScanNodes:
"""Scan all nodes or nodes by label."""
variable: str
labels: list[str] | None
@dataclass
class ExpandEdges:
"""Traverse relationships."""
src_var: str
edge_var: str | None
dst_var: str
edge_types: list[str]
direction: str # 'OUT', 'IN', 'UNDIRECTED'
@dataclass
class Filter:
"""Apply WHERE predicate."""
predicate: Any # Expression AST node
@dataclass
class Sort:
"""Sort by expressions (ORDER BY)."""
items: list[OrderByItem]
return_items: list[ReturnItem] | None # For alias resolution
@dataclass
class Aggregate:
"""Group and aggregate (implicit GROUP BY)."""
grouping_exprs: list[Any]
agg_exprs: list[FunctionCall]
return_items: list[ReturnItem]
@dataclass
class Project:
"""Evaluate RETURN expressions."""
items: list[ReturnItem]
@dataclass
class Skip:
"""Skip first N rows."""
count: int
@dataclass
class Limit:
"""Limit to N rows."""
count: int
Operator Ordering (Critical for Semantics): 1. MATCH (ScanNodes, ExpandEdges) 2. WHERE (Filter) 3. ORDER BY (Sort) - before RETURN to access all variables 4. RETURN (Project or Aggregate) 5. SKIP/LIMIT
Example Plan:
# Query: MATCH (n:Person) WHERE n.age > 30 RETURN n.name LIMIT 5
[
ScanNodes(variable='n', labels=['Person']),
Filter(predicate=Comparison(...)),
Project(items=[ReturnItem(...)]),
Limit(count=5)
]
See: docs/open_cypher_ast_logical_plan_spec_v_1.md
4. Executor (src/graphforge/executor/)¶
Purpose: Execute logical plans against graph storage
Implementation: Pipeline architecture with streaming rows
Files:
- executor.py - Main execution engine
- evaluator.py - Expression evaluation with NULL propagation
Execution Model:
class QueryExecutor:
def execute(self, operators: list) -> list[dict]:
"""Stream rows through operator pipeline."""
rows = [ExecutionContext()] # Start with empty context
for op in operators:
rows = self._execute_operator(op, rows)
return rows
ExecutionContext:
class ExecutionContext:
"""Variable bindings during query execution."""
bindings: dict[str, Any] # Variable name → CypherValue | NodeRef | EdgeRef
def bind(self, name: str, value: Any) -> None:
self.bindings[name] = value
def get(self, name: str) -> Any:
return self.bindings.get(name)
Operator Execution Examples:
def _execute_scan(self, op: ScanNodes, input_rows):
"""Scan nodes and bind to variable."""
result = []
for ctx in input_rows:
nodes = self.graph.get_nodes_by_label(op.labels[0]) if op.labels else self.graph.get_all_nodes()
for node in nodes:
new_ctx = ExecutionContext()
new_ctx.bindings = dict(ctx.bindings)
new_ctx.bind(op.variable, node)
result.append(new_ctx)
return result
def _execute_filter(self, op: Filter, input_rows):
"""Filter rows by predicate."""
result = []
for ctx in input_rows:
value = evaluate_expression(op.predicate, ctx)
if isinstance(value, CypherBool) and value.value:
result.append(ctx)
return result
def _execute_sort(self, op: Sort, input_rows):
"""Sort rows by expressions (with NULL handling)."""
# Pre-evaluate RETURN aliases for ORDER BY reference
extended_rows = []
for ctx in input_rows:
extended_ctx = ExecutionContext()
extended_ctx.bindings = dict(ctx.bindings)
if op.return_items:
for return_item in op.return_items:
if return_item.alias and not isinstance(return_item.expression, FunctionCall):
value = evaluate_expression(return_item.expression, ctx)
extended_ctx.bind(return_item.alias, value)
extended_rows.append(extended_ctx)
# Sort using multi-key comparison
sorted_rows = sorted(extended_rows, key=cmp_to_key(multi_key_compare))
# Map back to original contexts
return [original_context_for(row) for row in sorted_rows]
NULL Handling: - Three-valued logic (TRUE, FALSE, NULL) - NULL comparisons return NULL - WHERE filters out NULL predicates - Sorting: ASC puts NULLs last, DESC puts NULLs first
See: docs/runtime_value_model_graph_execution_v_1.md
5. Graph Storage (src/graphforge/storage/)¶
Current Implementation: In-memory storage (Phase 1-4)
Future Implementation: SQLite backend (Phase 5)
In-Memory Graph (Current)¶
File: storage/memory.py
class Graph:
"""In-memory graph storage with adjacency lists."""
def __init__(self):
self.nodes: dict[int, NodeRef] = {}
self.edges: dict[int, EdgeRef] = {}
self.adjacency_out: dict[int, list[int]] = {} # node_id → [edge_ids]
self.adjacency_in: dict[int, list[int]] = {}
self.label_index: dict[str, set[int]] = {}
def add_node(self, node: NodeRef) -> None:
self.nodes[node.id] = node
for label in node.labels:
self.label_index.setdefault(label, set()).add(node.id)
def get_outgoing_edges(self, node_id: int) -> list[EdgeRef]:
edge_ids = self.adjacency_out.get(node_id, [])
return [self.edges[eid] for eid in edge_ids]
SQLite Backend (Phase 5 - Planned)¶
Architecture Decision: Use SQLite for persistence (see docs/storage-architecture-analysis.md)
Schema Design:
-- Enable WAL mode for concurrency
PRAGMA journal_mode=WAL;
PRAGMA synchronous=FULL;
PRAGMA foreign_keys=ON;
-- Nodes table
CREATE TABLE nodes (
id INTEGER PRIMARY KEY,
labels BLOB, -- MessagePack: frozenset(['Person', 'Employee'])
properties BLOB -- MessagePack: {'name': 'Alice', 'age': 30}
);
-- Edges table
CREATE TABLE edges (
id INTEGER PRIMARY KEY,
type TEXT NOT NULL,
src_id INTEGER NOT NULL,
dst_id INTEGER NOT NULL,
properties BLOB,
FOREIGN KEY (src_id) REFERENCES nodes(id),
FOREIGN KEY (dst_id) REFERENCES nodes(id)
);
-- Adjacency lists (explicit storage for graph-native traversal)
CREATE TABLE adjacency_out (
node_id INTEGER NOT NULL,
edge_id INTEGER NOT NULL,
PRIMARY KEY (node_id, edge_id),
FOREIGN KEY (node_id) REFERENCES nodes(id),
FOREIGN KEY (edge_id) REFERENCES edges(id)
);
CREATE TABLE adjacency_in (
node_id INTEGER NOT NULL,
edge_id INTEGER NOT NULL,
PRIMARY KEY (node_id, edge_id),
FOREIGN KEY (node_id) REFERENCES nodes(id),
FOREIGN KEY (edge_id) REFERENCES edges(id)
);
-- Indexes for performance
CREATE INDEX idx_nodes_labels ON nodes(labels);
CREATE INDEX idx_edges_type ON edges(type);
CREATE INDEX idx_edges_src ON edges(src_id);
CREATE INDEX idx_edges_dst ON edges(dst_id);
Storage Backend Interface:
class StorageBackend(Protocol):
"""Storage backend interface (replaceable internals)."""
def add_node(self, node: NodeRef) -> None: ...
def add_edge(self, edge: EdgeRef) -> None: ...
def get_node(self, node_id: int) -> NodeRef | None: ...
def get_all_nodes(self) -> list[NodeRef]: ...
def get_nodes_by_label(self, label: str) -> list[NodeRef]: ...
def get_outgoing_edges(self, node_id: int) -> list[EdgeRef]: ...
def get_incoming_edges(self, node_id: int) -> list[EdgeRef]: ...
# Transaction support
def begin_transaction(self) -> None: ...
def commit_transaction(self) -> None: ...
def rollback_transaction(self) -> None: ...
SQLite Implementation:
class SQLiteBackend(StorageBackend):
def __init__(self, db_path: Path):
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA journal_mode=WAL")
self._init_schema()
def add_node(self, node: NodeRef) -> None:
labels_blob = msgpack.packb(list(node.labels))
props_blob = msgpack.packb({k: serialize_cypher_value(v) for k, v in node.properties.items()})
self.conn.execute(
"INSERT INTO nodes (id, labels, properties) VALUES (?, ?, ?)",
(node.id, labels_blob, props_blob)
)
def get_outgoing_edges(self, node_id: int) -> list[EdgeRef]:
cursor = self.conn.execute("""
SELECT e.id, e.type, e.src_id, e.dst_id, e.properties
FROM edges e
JOIN adjacency_out a ON e.id = a.edge_id
WHERE a.node_id = ?
""", (node_id,))
return [self._edge_from_row(row) for row in cursor.fetchall()]
Concurrency Model: - SQLite WAL mode: Single writer, multiple readers - Readers see consistent snapshot (snapshot isolation) - Writers don't block readers - Automatic crash recovery
See: docs/storage-architecture-analysis.md
Data Model¶
CypherValue Types (src/graphforge/types/values.py)¶
@dataclass
class CypherInt(CypherValue):
value: int
@dataclass
class CypherFloat(CypherValue):
value: float
@dataclass
class CypherString(CypherValue):
value: str
@dataclass
class CypherBool(CypherValue):
value: bool
@dataclass
class CypherNull(CypherValue):
pass
@dataclass
class CypherList(CypherValue):
value: list[CypherValue]
@dataclass
class CypherMap(CypherValue):
value: dict[str, CypherValue]
Operations:
- Comparison: less_than(), equals()
- Arithmetic: add(), subtract(), multiply(), divide()
- NULL propagation throughout
Graph Elements (src/graphforge/types/graph.py)¶
@dataclass(frozen=True)
class NodeRef:
"""Node reference with identity semantics."""
id: int
labels: frozenset[str]
properties: dict[str, CypherValue]
def __hash__(self) -> int:
return hash(self.id) # Identity by ID
@dataclass(frozen=True)
class EdgeRef:
"""Edge reference with identity semantics."""
id: int
type: str
src: NodeRef
dst: NodeRef
properties: dict[str, CypherValue]
def __hash__(self) -> int:
return hash(self.id) # Identity by ID
Design: Immutable, hashable, identity-based equality
Query Execution Flow (Example)¶
Query:
MATCH (p:Person)-[:KNOWS]->(f:Person)
WHERE p.age > 30 AND f.age < p.age
RETURN p.name AS person, f.name AS friend, p.age - f.age AS age_diff
ORDER BY age_diff DESC
LIMIT 5
1. Parse → AST:
CypherQuery(
clauses=[
MatchClause(...),
WhereClause(...),
ReturnClause(...),
OrderByClause(...),
LimitClause(...)
]
)
2. Plan → Logical Operators:
[
ScanNodes(variable='p', labels=['Person']),
ExpandEdges(src_var='p', edge_var=None, dst_var='f', edge_types=['KNOWS'], direction='OUT'),
Filter(predicate=And(...)),
Sort(items=[OrderByItem(...)], return_items=[...]),
Project(items=[ReturnItem(...), ReturnItem(...), ReturnItem(...)]),
Limit(count=5)
]
3. Execute → Results:
# Step 1: ScanNodes - Bind all Person nodes to 'p'
[ExecutionContext(bindings={'p': NodeRef(id=1, ...)}),
ExecutionContext(bindings={'p': NodeRef(id=2, ...)}),
...]
# Step 2: ExpandEdges - Traverse KNOWS edges, bind 'f'
[ExecutionContext(bindings={'p': NodeRef(1), 'f': NodeRef(2)}),
ExecutionContext(bindings={'p': NodeRef(1), 'f': NodeRef(3)}),
...]
# Step 3: Filter - Keep rows where p.age > 30 AND f.age < p.age
[ExecutionContext(bindings={'p': NodeRef(1), 'f': NodeRef(3)}),
...]
# Step 4: Sort - Sort by p.age - f.age DESC
[...sorted rows...]
# Step 5: Project - Evaluate RETURN expressions
[{'person': CypherString('Alice'), 'friend': CypherString('Charlie'), 'age_diff': CypherInt(10)},
...]
# Step 6: Limit - Take first 5
[...5 rows...]
Testing Strategy¶
Test Categories¶
Unit Tests (tests/unit/)
- Individual component testing
- Parser, AST, planner, executor, evaluator
- Fast (<1s total)
Integration Tests (tests/integration/)
- End-to-end query execution
- Multi-component interactions
- Moderate speed (<10s total)
TCK Tests (tests/tck/)
- openCypher TCK compliance
- Semantic correctness validation
- 17 tests, 100% passing
Property Tests (Hypothesis) - Fuzzing and edge cases - Configured but not yet implemented
Current Coverage¶
- 267 tests passing (100%)
- Test coverage: ~89%
- TCK compliance: 100% for implemented features
Test Execution:
# All tests
pytest
# By category
pytest -m unit
pytest -m integration
pytest -m tck
# With coverage
pytest --cov=src/graphforge --cov-report=html
Design Principles Applied¶
1. Spec-Driven Correctness¶
Implementation: - 17 TCK compliance tests passing - Three-valued NULL logic (TRUE, FALSE, NULL) - ORDER BY can reference RETURN aliases (non-trivial semantic fix) - Proper NULL handling in sorting, comparisons, aggregation
Trade-off: - Performance secondary to correctness - No unsafe optimizations
2. Embedded & Zero-Config¶
Implementation: - No server or daemon - SQLite storage (single file) - Zero configuration needed - Works in notebooks and scripts
Example:
3. Graph-Native Execution¶
Implementation: - Adjacency list traversal (no joins) - Pattern matching operators (ScanNodes, ExpandEdges) - Direct graph semantics
Schema:
-- Adjacency lists explicitly stored
CREATE TABLE adjacency_out (node_id, edge_id, PRIMARY KEY (node_id, edge_id));
4. Inspectable¶
Implementation:
- Observable query plans (future: db.explain())
- SQLite storage (inspectable with sqlite3 CLI)
- Clear operator pipeline
- Comprehensive logging
5. Replaceable Internals¶
Implementation: - Storage backend interface (Protocol) - Can swap SQLite for custom backend - Parser, planner, executor decoupled - Stable Python API
Performance Characteristics¶
Current (In-Memory)¶
Query Execution: - Simple MATCH: < 1ms (10K nodes) - Pattern matching: < 10ms (10K nodes, 50K edges) - Aggregation: < 50ms (10K nodes)
Test Suite: - 267 tests: ~1.3 seconds total - Unit tests: ~0.5 seconds - Integration tests: ~0.5 seconds - TCK tests: ~0.2 seconds
Expected (SQLite Backend)¶
Based on SQLite performance characteristics: - Node inserts: 50K-100K/sec (with transactions) - Point queries: 100K-500K/sec - Pattern matching: 1K-10K/sec - Aggregation: 10K-50K/sec
Target Scale: - ~10^6 nodes - ~10^7 edges - Query latency: < 100ms typical - Load time: < 10 seconds for full graph
Future Roadmap¶
Phase 5: SQLite Persistence (In Progress)¶
- SQLite backend implementation
- Transaction support (BEGIN/COMMIT/ROLLBACK)
- Crash recovery
- Concurrent readers
Estimated: 30 hours
Phase 6: Mutation Clauses¶
- CREATE clause (node/edge insertion)
- SET clause (property updates)
- DELETE clause (node/edge removal)
- MERGE clause (upsert semantics)
Estimated: 60 hours
Phase 7: Advanced Features¶
- OPTIONAL MATCH (left outer join)
- Variable-length paths
[:KNOWS*1..3] - WITH clause (query composition)
- String functions
- List operations
Estimated: 80 hours
References¶
Requirements & Specifications¶
docs/0-requirements.md- System requirements and scopedocs/open_cypher_ast_logical_plan_spec_v_1.md- AST and planner specdocs/runtime_value_model_graph_execution_v_1.md- Execution semanticsdocs/storage-architecture-analysis.md- Storage backend decision analysis
Feature Documentation¶
docs/feature-return-aliasing.md- RETURN AS clausedocs/feature-order-by.md- ORDER BY clausedocs/feature-aggregation-functions.md- Aggregation functionsdocs/tck-compliance.md- TCK test suite
Project Status¶
docs/project-status-and-roadmap.md- Implementation roadmapdocs/phase-1-complete.md- Core data model completiondocs/phase-2-complete.md- Parser completiondocs/phase-3-complete.md- Execution engine completion