LLM-Powered Workflows¶
Overview¶
Large language models are good at extracting structure from text, but they have no memory between calls and no native way to query what they already know. GraphForge bridges that gap: the LLM produces structured data, GraphForge stores and indexes it, and queries against the graph feed fresh, precise context back into the next LLM call.
The pattern looks like this:
Text corpus
│
▼
┌──────────────────────┐
│ LLM extraction │ entity + relationship extraction
└──────────┬───────────┘
│ structured data
▼
┌──────────────────────┐
│ GraphForge │ MERGE, SET, CREATE
│ (persistent graph) │◄── provenance metadata
└──────────┬───────────┘
│ Cypher queries
▼
┌──────────────────────┐
│ Context builder │ n-hop neighbourhood, aggregations
└──────────┬───────────┘
│ context string
▼
┌──────────────────────┐
│ LLM synthesis │ Q&A, summarisation, refinement
└──────────────────────┘
GraphForge is a good fit here because it runs embedded in the same Python process as your LLM library, requires zero infrastructure, and speaks full openCypher so queries stay readable.
1. Schema: Documents, Entities, and Relationships¶
A minimal schema that covers most extraction pipelines:
- Document — the source text (URL, file path, or an internal id)
- Entity — a named thing: person, organisation, concept, product, …
- Relationship — a typed, directed edge between two entities, with provenance
from graphforge import GraphForge
db = GraphForge("knowledge.db") # persistent SQLite; use GraphForge() for in-memory
# ── Documents ──────────────────────────────────────────────────────────────────
db.execute("""
CREATE (:Document {
id: 'doc-001',
title: 'GraphForge release notes',
source: 'https://example.com/release-notes',
ingested: datetime()
})
""")
# ── Entities ───────────────────────────────────────────────────────────────────
# Use MERGE so that re-running extraction never creates duplicates.
db.execute("""
MERGE (:Entity:Person {name: 'Alice Chen', canonical: 'alice-chen'})
MERGE (:Entity:Project {name: 'GraphForge', canonical: 'graphforge'})
MERGE (:Entity:Org {name: 'DecisionNerd', canonical: 'decisionnerd'})
""")
# ── Relationships between entities ─────────────────────────────────────────────
db.execute("""
MATCH (a:Person {canonical: 'alice-chen'}),
(p:Project {canonical: 'graphforge'})
MERGE (a)-[:CONTRIBUTES_TO {since: '2024-01', confidence: 0.92}]->(p)
""")
# ── Provenance: which document mentioned this fact ─────────────────────────────
db.execute("""
MATCH (doc:Document {id: 'doc-001'}),
(a:Person {canonical: 'alice-chen'}),
(p:Project {canonical: 'graphforge'})
MERGE (doc)-[:MENTIONS]->(a)
MERGE (doc)-[:MENTIONS]->(p)
""")
2. Entity Deduplication with MERGE¶
When you process many documents the same real-world entity will appear under slightly different surface forms. The safest strategy is to normalise to a canonical key before writing and always use MERGE on that key.
Label safety: openCypher cannot parameterise labels or relationship types — they must be interpolated as string literals. When these values come from LLM extraction output, validate them before interpolation to prevent parse errors from unexpected characters.
import re
_SAFE_IDENTIFIER = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def safe_identifier(value: str) -> str:
"""Validate a label or relationship type before Cypher interpolation."""
if not _SAFE_IDENTIFIER.match(value):
raise ValueError(f"Unsafe Cypher identifier: {value!r}")
return value
def store_entity(db, label: str, name: str, canonical: str, **extra):
"""Idempotent upsert — safe to call repeatedly."""
db.execute(
f"MERGE (e:Entity:{safe_identifier(label)} {{canonical: $canonical}}) "
"ON CREATE SET e.name = $name, e.created = datetime() "
"ON MATCH SET e.name = $name " # keep name fresh in case of corrections
+ (", ".join(f"e.{k} = ${k}" for k in extra) if extra else ""),
{"canonical": canonical, "name": name, **extra},
)
def store_relationship(db, src_canonical: str, dst_canonical: str,
rel_type: str, confidence: float, model: str):
"""Store a typed relationship with provenance."""
db.execute(
"MATCH (src:Entity {canonical: $src}), (dst:Entity {canonical: $dst}) "
f"MERGE (src)-[r:{safe_identifier(rel_type)}]->(dst) "
"ON CREATE SET r.confidence = $conf, r.model = $model, r.created = datetime() "
"ON MATCH SET r.confidence = $conf, r.model = $model, r.updated = datetime()",
{"src": src_canonical, "dst": dst_canonical, "conf": confidence, "model": model},
)
Running the same extraction twice over the same document will not create duplicate nodes or edges.
3. Tracking Extraction Provenance¶
Every fact in your graph should record where it came from and how reliable it is:
# Attach provenance directly to the relationship
db.execute("""
MATCH (src:Entity {canonical: $src}),
(dst:Entity {canonical: $dst})
MERGE (src)-[r:RELATED_TO]->(dst)
ON CREATE SET
r.confidence = $conf,
r.model = $model,
r.extracted = datetime(),
r.source_doc = $doc_id
ON MATCH SET
r.confidence = $conf,
r.model = $model,
r.updated = datetime()
""", {
"src": "graphforge",
"dst": "decisionnerd",
"conf": 0.88,
"model": "gpt-4o-mini",
"doc_id": "doc-001",
})
Query provenance later to audit the graph or filter by quality:
rows = db.to_dicts("""
MATCH (src:Entity)-[r]->(dst:Entity)
WHERE r.confidence >= 0.85
RETURN src.name AS subject,
type(r) AS predicate,
dst.name AS object,
r.model AS model,
r.confidence AS conf
ORDER BY r.confidence DESC
LIMIT 20
""")
for row in rows:
print(
f"{row['subject']!r:25s} "
f"--{row['predicate']}--> "
f"{row['object']!r:25s} "
f"[{row['conf']:.2f} via {row['model']}]"
)
4. Hybrid Retrieval: n-Hop Neighbourhood¶
The most useful context for an LLM is often the subgraph around an entity — its immediate neighbours and their neighbours. The Cypher variable-length path syntax handles this directly.
def get_neighbourhood(db, canonical: str, hops: int = 2) -> list[dict]:
"""Return all entities reachable within `hops` steps from `canonical`."""
return db.to_dicts(
"MATCH (seed:Entity {canonical: $canonical})"
"-[*1.." + str(hops) + "]-(neighbour:Entity) "
"WHERE neighbour.canonical <> $canonical "
"RETURN DISTINCT "
" neighbour.name AS name, "
" neighbour.canonical AS canonical, "
" labels(neighbour) AS labels",
{"canonical": canonical},
)
neighbours = get_neighbourhood(db, "graphforge", hops=2)
# [{'name': 'Alice Chen', 'canonical': 'alice-chen', 'labels': ['Entity', 'Person']}, ...]
You can also find the shortest path between two entities — useful for explaining
connections. shortestPath() is not yet supported (#468); use this BFS workaround for
graphs up to ~2K nodes:
rows = db.execute(
"MATCH path = (a:Entity {canonical: $src})-[*1..5]-(b:Entity {canonical: $dst}) "
"RETURN length(path) AS hops "
"ORDER BY hops ASC LIMIT 1",
{"src": "alice-chen", "dst": "decisionnerd"},
)
if rows:
print(f"Connected by {rows[0]['hops'].value} hops")
Note: The BFS workaround enumerates all paths up to
max_hopsbefore returning the shortest. On dense graphs (high-degree nodes) with largemax_hops, this can be slow. Usemax_hops ≤ 3for graphs above 1K nodes.
5. Building a Q&A Context String¶
Given a natural-language question, extract the key entity, pull its neighbourhood from the graph, then hand the resulting context to the LLM.
def build_context_for_question(db, entity_canonical: str, max_facts: int = 30) -> str:
"""
Build a compact context string from the graph for use in an LLM prompt.
Returns a bullet-list of (subject, predicate, object) triples.
"""
# Fetch outbound and inbound edges via UNION.
# ORDER BY after UNION only sorts the second branch — sort globally in Python.
rows = db.to_dicts(
"MATCH (src:Entity {canonical: $canonical})-[r]->(dst:Entity) "
"RETURN src.name AS subject, type(r) AS predicate, dst.name AS object, "
" r.confidence AS confidence "
"UNION "
"MATCH (src:Entity)-[r]->(dst:Entity {canonical: $canonical}) "
"RETURN src.name AS subject, type(r) AS predicate, dst.name AS object, "
" r.confidence AS confidence",
{"canonical": entity_canonical},
)
if not rows:
return "No relevant facts found in the knowledge graph."
# Global sort by confidence, then cap at max_facts
rows = sorted(rows, key=lambda r: r["confidence"], reverse=True)[:max_facts]
lines = ["Relevant facts from the knowledge graph:\n"]
for row in rows:
subj = row["subject"]
pred = row["predicate"].replace("_", " ").lower()
obj = row["object"]
conf = row["confidence"]
lines.append(f" - {subj} {pred} {obj} (confidence: {conf:.0%})")
return "\n".join(lines)
context = build_context_for_question(db, "graphforge")
# Pass to LLM
prompt = f"""
You are a helpful assistant. Use the facts below to answer the question.
{context}
Question: Who contributes to GraphForge and what organisation are they from?
"""
# response = llm.complete(prompt)
6. Iterative Knowledge Refinement¶
After the LLM processes graph data you can write its assessments back. This lets you build a feedback loop where the graph becomes progressively more accurate.
def update_confidence(db, src_canonical: str, dst_canonical: str,
rel_type: str, new_confidence: float, reviewer: str):
"""Update an edge's confidence after LLM or human review."""
db.execute(
f"MATCH (src:Entity {{canonical: $src}})"
f"-[r:{safe_identifier(rel_type)}]->(dst:Entity {{canonical: $dst}}) "
"SET r.confidence = $conf, r.reviewed_by = $reviewer, r.reviewed_at = datetime()",
{
"src": src_canonical,
"dst": dst_canonical,
"conf": new_confidence,
"reviewer": reviewer,
},
)
def mark_entity_verified(db, canonical: str, verified_by: str):
"""Flag an entity as verified after manual or LLM review."""
db.execute(
"MATCH (e:Entity {canonical: $canonical}) "
"SET e.verified = true, e.verified_by = $by, e.verified_at = datetime()",
{"canonical": canonical, "by": verified_by},
)
# Demote a low-quality edge found by LLM review
update_confidence(db, "alice-chen", "graphforge", "CONTRIBUTES_TO",
new_confidence=0.45, reviewer="gpt-4o")
# Promote a confirmed entity
mark_entity_verified(db, "graphforge", verified_by="human")
You can then filter your context queries to include only high-quality, verified data when stakes are high:
rows = db.execute("""
MATCH (src:Entity)-[r]->(dst:Entity)
WHERE r.confidence >= 0.8
AND src.verified = true
RETURN src.name AS subject, type(r) AS predicate, dst.name AS object
ORDER BY r.confidence DESC
""")
7. Complete Mini-Pipeline¶
The example below simulates an end-to-end extraction pipeline without calling a real LLM API. Swap mock_extract for your actual LLM client.
Transaction tip: Wrap the entire batch ingestion in a single
begin()/commit()rather than one transaction per document. Per-document transactions add ~240% overhead becausebegin()deep-copies the graph state on each call. A single batch transaction costs the same as no transaction while giving full rollback on failure.
from __future__ import annotations
import re
from graphforge import GraphForge
_SAFE_IDENTIFIER = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
def safe_identifier(value: str) -> str:
if not _SAFE_IDENTIFIER.match(value):
raise ValueError(f"Unsafe Cypher identifier: {value!r}")
return value
# ── Mock extraction function ───────────────────────────────────────────────────
# Replace this with a real LLM call (OpenAI, Anthropic, etc.)
def mock_extract(text: str) -> dict:
"""
Simulates LLM structured extraction.
Returns {'entities': [...], 'relationships': [...]}.
"""
if "Alice" in text and "GraphForge" in text:
return {
"entities": [
{"name": "Alice Chen", "label": "Person", "canonical": "alice-chen"},
{"name": "GraphForge", "label": "Project", "canonical": "graphforge"},
{"name": "DecisionNerd", "label": "Org", "canonical": "decisionnerd"},
],
"relationships": [
{"src": "alice-chen", "dst": "graphforge", "type": "CONTRIBUTES_TO", "confidence": 0.94},
{"src": "alice-chen", "dst": "decisionnerd", "type": "WORKS_AT", "confidence": 0.89},
{"src": "decisionnerd", "dst": "graphforge", "type": "MAINTAINS", "confidence": 0.97},
],
}
if "Bob" in text:
return {
"entities": [
{"name": "Bob Lim", "label": "Person", "canonical": "bob-lim"},
{"name": "GraphForge", "label": "Project", "canonical": "graphforge"},
],
"relationships": [
{"src": "bob-lim", "dst": "graphforge", "type": "USES", "confidence": 0.78},
],
}
return {"entities": [], "relationships": []}
# ── Pipeline ───────────────────────────────────────────────────────────────────
def ingest_documents(db: GraphForge, documents: list[dict], model: str = "mock-v1"):
"""Process a list of documents, extract entities and relationships, store all results."""
for doc in documents:
# 1. Upsert the document node
db.execute(
"MERGE (d:Document {id: $id}) "
"ON CREATE SET d.title = $title, d.ingested = datetime() "
"ON MATCH SET d.title = $title",
{"id": doc["id"], "title": doc["title"]},
)
# 2. Run extraction
extraction = mock_extract(doc["text"])
# 3. Store entities (idempotent)
for ent in extraction["entities"]:
db.execute(
f"MERGE (e:Entity:{safe_identifier(ent['label'])} {{canonical: $canonical}}) "
"ON CREATE SET e.name = $name, e.created = datetime() "
"ON MATCH SET e.name = $name",
{"canonical": ent["canonical"], "name": ent["name"]},
)
# Link entity → document
db.execute(
"MATCH (d:Document {id: $doc_id}), (e:Entity {canonical: $canonical}) "
"MERGE (d)-[:MENTIONS]->(e)",
{"doc_id": doc["id"], "canonical": ent["canonical"]},
)
# 4. Store relationships with provenance
for rel in extraction["relationships"]:
db.execute(
"MATCH (src:Entity {canonical: $src}), (dst:Entity {canonical: $dst}) "
f"MERGE (src)-[r:{safe_identifier(rel['type'])}]->(dst) "
"ON CREATE SET r.confidence = $conf, r.model = $model, r.created = datetime() "
"ON MATCH SET r.confidence = $conf, r.model = $model, r.updated = datetime()",
{
"src": rel["src"],
"dst": rel["dst"],
"conf": rel["confidence"],
"model": model,
},
)
def summarise_graph(db: GraphForge) -> str:
"""Query the graph and produce a human-readable summary."""
# Entity counts by label
entity_rows = db.to_dicts("""
MATCH (e:Entity)
UNWIND labels(e) AS lbl
WHERE lbl <> 'Entity'
RETURN lbl AS label, count(e) AS n
ORDER BY n DESC
""")
# High-confidence relationships
rel_rows = db.to_dicts("""
MATCH (src:Entity)-[r]->(dst:Entity)
WHERE r.confidence >= 0.8
RETURN src.name AS subject,
type(r) AS predicate,
dst.name AS object,
r.confidence AS conf
ORDER BY r.confidence DESC
LIMIT 10
""")
lines = ["=== Knowledge Graph Summary ===\n"]
lines.append("Entity counts:")
for row in entity_rows:
lines.append(f" {row['label']}: {row['n']}")
lines.append("\nHigh-confidence facts (≥ 0.80):")
for row in rel_rows:
subj = row["subject"]
pred = row["predicate"].replace("_", " ").lower()
obj = row["object"]
conf = row["conf"]
lines.append(f" {subj} {pred} {obj} [{conf:.0%}]")
return "\n".join(lines)
# ── Run the pipeline ───────────────────────────────────────────────────────────
if __name__ == "__main__":
db = GraphForge() # in-memory for this example
corpus = [
{
"id": "doc-001",
"title": "About GraphForge",
"text": "Alice Chen at DecisionNerd created GraphForge.",
},
{
"id": "doc-002",
"title": "Community update",
"text": "Bob Lim uses GraphForge in his research pipeline.",
},
# Re-processing doc-001 should not create duplicates
{
"id": "doc-001",
"title": "About GraphForge (updated)",
"text": "Alice Chen at DecisionNerd created GraphForge.",
},
]
# Wrap the whole batch in one transaction — cheap (one deepcopy on empty graph)
db.begin()
try:
ingest_documents(db, corpus)
db.commit()
except Exception:
db.rollback()
raise
print(summarise_graph(db))
Expected output:
=== Knowledge Graph Summary ===
Entity counts:
Project: 1
Person: 2
Org: 1
High-confidence facts (≥ 0.80):
DecisionNerd maintains graphforge [97%]
Alice Chen contributes to graphforge [94%]
Alice Chen works at decisionnerd [89%]
Bob Lim uses graphforge [78%]
8. Integration Tips¶
LangChain¶
Use GraphForge as a retriever inside a LangChain chain:
from langchain_core.documents import Document as LCDocument # langchain >= 0.1
from graphforge import GraphForge
db = GraphForge("knowledge.db")
def graphforge_retriever(query_entity: str) -> list[LCDocument]:
rows = db.to_dicts(
"MATCH (e:Entity {canonical: $canonical})-[r]->(other:Entity) "
"RETURN e.name AS subject, type(r) AS predicate, other.name AS object",
{"canonical": query_entity},
)
return [
LCDocument(
page_content=(
f"{row['subject']} "
f"{row['predicate'].replace('_', ' ').lower()} "
f"{row['object']}"
)
)
for row in rows
]
LlamaIndex¶
Expose the graph as a custom query engine:
from llama_index.core import QueryBundle
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore, TextNode
from graphforge import GraphForge
class GraphForgeRetriever(BaseRetriever):
def __init__(self, db: GraphForge):
self._db = db
super().__init__()
def _retrieve(self, query_bundle: QueryBundle) -> list[NodeWithScore]:
# Simple keyword → canonical lookup; replace with embedding search as needed
rows = self._db.to_dicts(
"MATCH (e:Entity) WHERE toLower(e.name) CONTAINS toLower($q) "
"RETURN e.canonical AS canonical LIMIT 1",
{"q": query_bundle.query_str},
)
if not rows:
return []
context = build_context_for_question(self._db, rows[0]["canonical"])
return [NodeWithScore(node=TextNode(text=context), score=1.0)]
Any Python LLM Library¶
GraphForge works with any library that accepts a Python string as context — OpenAI, Anthropic, Cohere, Ollama, etc. The pattern is always the same:
context = build_context_for_question(db, entity_canonical)
messages = [
{"role": "system", "content": "Answer using only the facts provided."},
{"role": "user", "content": f"{context}\n\nQuestion: {question}"},
]
# client.chat.completions.create(model="...", messages=messages)
9. Hybrid Retrieval with db.search¶
Once your knowledge graph is populated, db.search provides hybrid retrieval — combining FTS5
text search and vector cosine similarity — so you can find entities by natural language rather
than exact Cypher predicates.
Indexing Entities for Text Search¶
# After extracting entities, index their text for FTS retrieval
rows = db.execute("MATCH (e:Entity) RETURN id(e) AS nid, e.name AS name, e.description AS desc")
for row in rows:
text = f"{row['name'].value} {row.get('desc', {}).value or ''}"
db.search.index_node(row["nid"].value, text)
Storing Vector Embeddings (bring-your-own)¶
GraphForge stores and queries vectors but does not generate them. Use any embedding model:
import openai
client = openai.OpenAI()
rows = db.execute("MATCH (e:Entity) RETURN id(e) AS nid, e.description AS desc")
for row in rows:
text = row["desc"].value or ""
if text:
vec = client.embeddings.create(input=text, model="text-embedding-3-small").data[0].embedding
db.search.set_node_vector(row["nid"].value, vec, space="text-embedding-3-small")
Hybrid Retrieval¶
# Embed the user query with the same model
query = "renewable energy policy framework"
query_vec = client.embeddings.create(input=query, model="text-embedding-3-small").data[0].embedding
# Hybrid search — text and vector signals fused via RRF
from graphforge import SearchHit
results = db.search(query, vector=query_vec, top_k=10)
for hit in results:
print(f"{hit.ref.properties['name'].value} score={hit.score:.3f} via={hit.sources}")
# sources is ("text",), ("vector",), or ("text", "vector")
Using Results in Subsequent Queries¶
Every SearchHit.ref.id is a live graph node ID — use it directly in Cypher:
top_id = results[0].ref.id
context = db.execute("""
MATCH (e)-[r]-(related)
WHERE id(e) = $nid
RETURN e.name AS entity, type(r) AS rel, related.name AS neighbour
""", {"nid": top_id})
n-Hop Context Building with recipes¶
graphforge.recipes.neighbourhood() expands from a named entity outward, returning plain dicts
ready to paste into an LLM prompt:
from graphforge.recipes import neighbourhood
# 2-hop neighbourhood around a search result
anchor_name = results[0].ref.properties["name"].value
context_nodes = neighbourhood(db, anchor_name, hops=2, label="Entity", canonical_prop="name")
prompt_context = "\n".join(str(n) for n in context_nodes)
Summary¶
| Task | GraphForge feature |
|---|---|
| Idempotent entity upsert | MERGE … ON CREATE SET … ON MATCH SET |
| n-hop neighbourhood query | MATCH (e)-[*1..2]-(n) or recipes.neighbourhood() |
| Shortest path (workaround) | MATCH path = (a)-[*1..N]-(b) RETURN length(path) ORDER BY … LIMIT 1 (see #468 for native support) |
| Provenance on edges | Properties on relationships (r.model, r.confidence) |
| Iterative refinement | SET r.confidence = … after LLM review |
| List expansion | UNWIND for multi-value properties |
| Temporal tracking | datetime(), duration.between() |
| Safe identifier validation | safe_identifier() before f-string label/rel_type interpolation |
| Semantic retrieval | db.search(query, vector=vec) → list[SearchHit] |
GraphForge's embedded design means the graph lives in the same process as your LLM calls — no network hop, no server, no configuration. That makes it well-suited to iterative, notebook-style workflows where you extract, store, query, and refine in a tight loop.