GitHub

Knowledge Graphs

The knowledge graphs namespace provides methods for listing, exploring, and managing knowledge graphs. It offers comprehensive statistics and metadata about your knowledge base structure.

Basic Operations

List All Knowledge Graphs

from functor_sdk import FunctorClient
client = FunctorClient()
# List all knowledge graphs
kgs = client.knowledge_graphs.list()
print(f"Found {len(kgs)} knowledge graphs:")
for kg in kgs:
print(f"- {kg.name}: {kg.display_name}")

List with Statistics

# Include detailed statistics
kgs = client.knowledge_graphs.list(include_stats=True)
for kg in kgs:
print(f"\n{kg.name} ({kg.display_name})")
print(f" Entities: {kg.entities_count}")
print(f" Relations: {kg.relations_count}")
print(f" Sources: {kg.sources_count}")
print(f" Last updated: {kg.last_updated}")

Creating Data

Create new entities, relations, and chunks in your knowledge graphs. These operations allow you to programmatically build and extend your knowledge base.

Create Entity

POST

Create a new entity in a knowledge graph.

# Create a new entity
kg_name = "KG_Universal"
entity = client.knowledge_graphs.create_entity(
kg_name=kg_name,
label="Diabetes",
entity_type="DISEASE",
properties={
"severity": "high",
"prevalence": "common",
"description": "A chronic metabolic disorder"
},
sync_to_qdrant=True,
sync_to_neo4j=False
)
print(f"Created entity ID: {entity.id}")
print(f"Label: {entity.label}")
print(f"Type: {entity.entity_type}")
print(f"Properties: {entity.properties}")

Create Relation

POST

Create a relation between two entities.

# Create a relation between entities
kg_name = "KG_Universal"
relation = client.knowledge_graphs.create_relation(
kg_name=kg_name,
relation_type="TREATS",
source_entity_id=1,
target_entity_id=2,
confidence_score=0.95,
"evidence": "clinical_trial",
"effectiveness": "high"
},
sync_to_neo4j=False
)
print(f"Created relation ID: {relation.id}")
print(f"Type: {relation.relation_type}")
print(f"From entity {relation.source_entity_id} to {relation.target_entity_id}")
print(f"Confidence: {relation.confidence_score}")

Create Chunk

POST

Create a text chunk for semantic search.

# Create a text chunk
kg_name = "KG_Universal"
chunk = client.knowledge_graphs.create_chunk(
kg_name=kg_name,
content="Diabetes is a chronic metabolic disorder characterized by high blood sugar levels.",
"source": "medical_textbook",
"page": 42,
"section": "metabolic_disorders"
},
entity_ids=[],
sync_to_qdrant=True
)
print(f"Created chunk ID: {chunk.id}")
print(f"Content length: {len(chunk.content)} characters")
print(f"Metadata: {chunk.metadata}")

Data Management

Update Entity

PUT

Update properties of an existing entity.

# Update entity properties
kg_name = "my_kg"
entity_id = 123
updates = {
"description": "Updated description",
"properties": {
"confidence": 0.95,
"source": "manual_review"
}
}
result = client.knowledge_graphs.update_entity(kg_name, entity_id, updates)
print(f"Updated: {result}")

Delete Entity

DELETE

Delete an entity and its associated relations.

# Delete an entity
kg_name = "my_kg"
entity_id = 123
result = client.knowledge_graphs.delete_entity(kg_name, entity_id)
print(f"Deleted: {result}")

Update Relation

PUT

Update properties of an existing relation.

# Update relation properties
kg_name = "my_kg"
relation_id = 456
updates = {
"relation_type": "UPDATED_TYPE"
}
result = client.knowledge_graphs.update_relation(kg_name, relation_id, updates)
print(f"Updated: {result}")

Delete Relation

DELETE

Delete a relation between entities.

# Delete a relation
kg_name = "my_kg"
relation_id = 456
result = client.knowledge_graphs.delete_relation(kg_name, relation_id)
print(f"Deleted: {result}")

Delete Chunk

DELETE

Delete a text chunk from the knowledge graph.

# Delete a chunk
kg_name = "my_kg"
chunk_id = 789
result = client.knowledge_graphs.delete_chunk(kg_name, chunk_id)
print(f"Deleted: {result}")

Bulk Operations

Perform bulk operations for efficient management of multiple entities or relations in a single request.

Bulk Delete Entities

DELETE

Delete multiple entities in a single operation.

# Bulk delete entities
kg_name = "KG_Universal"
entity_ids = [123, 456, 789, 101112]
result = client.knowledge_graphs.delete_entities_bulk(
kg_name=kg_name,
entity_ids=entity_ids
)
print(f"Success: {result.success}")
print(f"Deleted: {result.deleted_count}")
print(f"Failed: {result.failed_count}")
if result.failed_ids:
print(f"Failed IDs: {result.failed_ids}")
print(f"Message: {result.message}")

Bulk Delete Relations

DELETE

Delete multiple relations in a single operation.

# Bulk delete relations
kg_name = "KG_Universal"
relation_ids = [456, 789, 101112, 131415]
result = client.knowledge_graphs.delete_relations_bulk(
kg_name=kg_name,
relation_ids=relation_ids
)
print(f"Success: {result.success}")
print(f"Deleted: {result.deleted_count}")
print(f"Failed: {result.failed_count}")
if result.failed_ids:
print(f"Failed IDs: {result.failed_ids}")
print(f"Message: {result.message}")

Complete CRUD Workflow

Here's a complete example showing create, update, and delete operations:

from functor_sdk import FunctorClient
client = FunctorClient()
kg_name = "KG_Universal"
# 1. Create entities
disease = client.knowledge_graphs.create_entity(
kg_name=kg_name,
label="Diabetes",
entity_type="DISEASE"
)
treatment = client.knowledge_graphs.create_entity(
kg_name=kg_name,
label="Metformin",
entity_type="MEDICATION"
)
print(f"Created disease entity: {disease.id}")
print(f"Created treatment entity: {treatment.id}")
# 2. Create a relation between them
relation = client.knowledge_graphs.create_relation(
kg_name=kg_name,
relation_type="TREATS",
source_entity_id=treatment.id,
target_entity_id=disease.id,
confidence_score=0.95
)
print(f"Created relation: {relation.id}")
# 3. Update the relation with more details
client.knowledge_graphs.update_relation(
kg_name=kg_name,
relation_id=relation.id,
updates={
"properties": {
"evidence_level": "high",
"studies": ["study_1", "study_2"]
}
}
)
# 4. Create a chunk with information
chunk = client.knowledge_graphs.create_chunk(
kg_name=kg_name,
content="Metformin is commonly used to treat type 2 diabetes.",
metadata={"source": "medical_guide"}
)
print(f"Created chunk: {chunk.id}")
# 5. Later, clean up (bulk delete)
entity_ids_to_delete = [disease.id, treatment.id]
delete_result = client.knowledge_graphs.delete_entities_bulk(
kg_name=kg_name,
entity_ids=entity_ids_to_delete
)
print(f"Bulk deleted {delete_result.deleted_count} entities")

Response Format

KnowledgeGraph Object

kgs = client.knowledge_graphs.list(include_stats=True)
for kg in kgs:
# Basic properties
print(f"Name: {kg.name}")
print(f"Display Name: {kg.display_name}")
print(f"Description: {kg.description}")
# Statistics (when include_stats=True)
print(f"Entities: {kg.entities_count}")
print(f"Relations: {kg.relations_count}")
print(f"Sources: {kg.sources_count}")
print(f"Chunks: {kg.chunks_count}")
# Metadata
print(f"Created: {kg.created_at}")
print(f"Updated: {kg.last_updated}")
print(f"Status: {kg.status}")

EntityResponse Object

entity = client.knowledge_graphs.create_entity(
kg_name="KG_Universal",
label="Diabetes",
entity_type="DISEASE"
)
# Access entity properties
print(f"ID: {entity.id}")
print(f"Label: {entity.label}")
print(f"Type: {entity.entity_type}")
print(f"Properties: {entity.properties}")
print(f"Created: {entity.created_at}")
print(f"Updated: {entity.updated_at}")
print(f"KG Name: {entity.kg_name}")
print(f"Synced to Qdrant: {entity.synced_to_qdrant}")
print(f"Synced to Neo4j: {entity.synced_to_neo4j}")

RelationResponse Object

relation = client.knowledge_graphs.create_relation(
kg_name="KG_Universal",
relation_type="TREATS",
source_entity_id=1,
target_entity_id=2
)
# Access relation properties
print(f"ID: {relation.id}")
print(f"Type: {relation.relation_type}")
print(f"Source: {relation.source_entity_id}")
print(f"Target: {relation.target_entity_id}")
print(f"Confidence: {relation.confidence_score}")
print(f"Properties: {relation.properties}")
print(f"Synced to Neo4j: {relation.synced_to_neo4j}")
print(f"Created: {relation.created_at}")

ChunkResponse Object

chunk = client.knowledge_graphs.create_chunk(
kg_name="KG_Universal",
content="Your content here..."
)
# Access chunk properties
print(f"ID: {chunk.id}")
print(f"Content: {chunk.content[:100]}...") # First 100 chars
print(f"Metadata: {chunk.metadata}")
print(f"Entity IDs: {chunk.entity_ids}")
print(f"Content Hash: {chunk.content_hash}")
print(f"Synced to Qdrant: {chunk.synced_to_qdrant}")
print(f"Created: {chunk.created_at}")

BulkDeleteResponse Object

result = client.knowledge_graphs.delete_entities_bulk(
kg_name="KG_Universal",
entity_ids=[1, 2, 3]
)
# Access bulk delete results
print(f"Success: {result.success}")
print(f"Deleted count: {result.deleted_count}")
print(f"Failed count: {result.failed_count}")
print(f"Failed IDs: {result.failed_ids}")
print(f"Message: {result.message}")

Advanced Usage Patterns

Filter Knowledge Graphs

# Get all knowledge graphs
all_kgs = client.knowledge_graphs.list(include_stats=True)
# Filter by criteria
active_kgs = [kg for kg in all_kgs if kg.status == "active"]
large_kgs = [kg for kg in all_kgs if kg.entities_count > 1000]
recent_kgs = [kg for kg in all_kgs if kg.last_updated > "2024-01-01"]
print(f"Active KGs: {len(active_kgs)}")
print(f"Large KGs (>1000 entities): {len(large_kgs)}")
print(f"Recently updated: {len(recent_kgs)}")

Async Operations

import asyncio
from functor_sdk import FunctorClient
async def get_kg_info():
async with FunctorClient() as client:
# Async list operation
kgs = await client.knowledge_graphs.list_async(include_stats=True)
# Process results
for kg in kgs:
print(f"{kg.name}: {kg.entities_count} entities")
return kgs
# Run async operation
kgs = asyncio.run(get_kg_info())

Knowledge Graph Analysis

def analyze_knowledge_graphs():
client = FunctorClient()
kgs = client.knowledge_graphs.list(include_stats=True)
# Calculate totals
total_entities = sum(kg.entities_count for kg in kgs)
total_relations = sum(kg.relations_count for kg in kgs)
total_sources = sum(kg.sources_count for kg in kgs)
print(f"Knowledge Graph Analysis:")
print(f"Total KGs: {len(kgs)}")
print(f"Total Entities: {total_entities:,}")
print(f"Total Relations: {total_relations:,}")
print(f"Total Sources: {total_sources:,}")
# Find largest KG
largest_kg = max(kgs, key=lambda kg: kg.entities_count)
print(f"\nLargest KG: {largest_kg.name} ({largest_kg.entities_count:,} entities)")
# Find most active KG
most_active = max(kgs, key=lambda kg: kg.sources_count)
print(f"Most Active KG: {most_active.name} ({most_active.sources_count} sources)")
return kgs
kgs = analyze_knowledge_graphs()

Integration Examples

Dashboard Integration

def create_kg_dashboard():
client = FunctorClient()
kgs = client.knowledge_graphs.list(include_stats=True)
dashboard_data = {
"total_kgs": len(kgs),
"total_entities": sum(kg.entities_count for kg in kgs),
"total_relations": sum(kg.relations_count for kg in kgs),
"total_sources": sum(kg.sources_count for kg in kgs),
"kgs": []
}
for kg in kgs:
kg_data = {
"name": kg.name,
"display_name": kg.display_name,
"entities": kg.entities_count,
"relations": kg.relations_count,
"sources": kg.sources_count,
"status": kg.status,
"last_updated": kg.last_updated
}
dashboard_data["kgs"].append(kg_data)
return dashboard_data
# Usage
dashboard = create_kg_dashboard()
print(f"Dashboard: {dashboard['total_kgs']} KGs, {dashboard['total_entities']:,} entities")

Monitoring and Alerts

def monitor_kgs():
client = FunctorClient()
kgs = client.knowledge_graphs.list(include_stats=True)
alerts = []
for kg in kgs:
# Check for empty KGs
if kg.entities_count == 0:
alerts.append(f"WARNING: {kg.name} has no entities")
# Check for stale KGs
if kg.last_updated < "2024-01-01":
alerts.append(f"WARNING: {kg.name} hasn't been updated recently")
# Check for large KGs (potential performance issues)
if kg.entities_count > 100000:
alerts.append(f"INFO: {kg.name} is very large ({kg.entities_count:,} entities)")
if alerts:
print("Knowledge Graph Alerts:")
for alert in alerts:
print(f" - {alert}")
else:
print("All knowledge graphs are healthy!")
return alerts
# Usage
alerts = monitor_kgs()

Error Handling

Common Error Scenarios

from functor_sdk import (
FunctorClient,
FunctorAPIError,
FunctorAuthenticationError,
FunctorConnectionError
)
client = FunctorClient()
try:
kgs = client.knowledge_graphs.list(include_stats=True)
except FunctorAuthenticationError:
print("Authentication failed - check your API key")
except FunctorConnectionError:
print("Connection failed - check your network and base URL")
except FunctorAPIError as e:
print(f"API error {e.status_code}: {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")

Robust Error Handling

def safe_list_kgs(max_retries=3):
client = FunctorClient()
for attempt in range(max_retries):
try:
kgs = client.knowledge_graphs.list(include_stats=True)
return kgs
except FunctorConnectionError as e:
print(f"Connection error (attempt {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
print("Max retries reached")
return []
except FunctorAPIError as e:
print(f"API error: {e.message}")
return []
except Exception as e:
print(f"Unexpected error: {e}")
return []
return []
# Usage
kgs = safe_list_kgs()
if kgs:
print(f"Successfully retrieved {len(kgs)} knowledge graphs")
else:
print("Failed to retrieve knowledge graphs")

Performance Optimization

Optimization Tips

  • Use include_stats sparingly: Only when you need statistics
  • Cache results: Knowledge graphs don't change frequently
  • Use async for concurrent operations: When combining with other operations
  • Filter results: Process only the KGs you need

Caching Example

import time
from functools import lru_cache
from functor_sdk import FunctorClient
class KGCache:
def __init__(self, client):
self.client = client
self._cache = {}
self._cache_time = {}
self.cache_duration = 300 # 5 minutes
def get_kgs(self, include_stats=False, force_refresh=False):
cache_key = f"kgs_{include_stats}"
now = time.time()
# Check cache
if not force_refresh and cache_key in self._cache:
if now - self._cache_time[cache_key] < self.cache_duration:
print("Using cached knowledge graphs")
return self._cache[cache_key]
# Fetch fresh data
print("Fetching fresh knowledge graphs")
kgs = self.client.knowledge_graphs.list(include_stats=include_stats)
# Update cache
self._cache[cache_key] = kgs
self._cache_time[cache_key] = now
return kgs
# Usage
client = FunctorClient()
kg_cache = KGCache(client)
# First call - fetches from API
kgs1 = kg_cache.get_kgs(include_stats=True)
# Second call - uses cache
kgs2 = kg_cache.get_kgs(include_stats=True)
# Force refresh
kgs3 = kg_cache.get_kgs(include_stats=True, force_refresh=True)

Common Use Cases

System Health Check

def health_check():
client = FunctorClient()
try:
kgs = client.knowledge_graphs.list(include_stats=True)
if not kgs:
return {"status": "warning", "message": "No knowledge graphs found"}
total_entities = sum(kg.entities_count for kg in kgs)
if total_entities == 0:
return {"status": "warning", "message": "No entities in any knowledge graph"}
return {
"status": "healthy",
"message": f"{len(kgs)} knowledge graphs with {total_entities:,} total entities",
"kgs": len(kgs),
"entities": total_entities
}
except Exception as e:
return {"status": "error", "message": str(e)}
# Usage
health = health_check()
print(f"System status: {health['status']}")
print(f"Message: {health['message']}")

Data Export Preparation

def prepare_export():
client = FunctorClient()
kgs = client.knowledge_graphs.list(include_stats=True)
export_data = {
"export_timestamp": "2024-01-01T00:00:00Z",
"total_knowledge_graphs": len(kgs),
"knowledge_graphs": []
}
for kg in kgs:
kg_info = {
"name": kg.name,
"display_name": kg.display_name,
"description": kg.description,
"statistics": {
"entities": kg.entities_count,
"relations": kg.relations_count,
"sources": kg.sources_count,
"chunks": kg.chunks_count
},
"metadata": {
"created_at": kg.created_at,
"last_updated": kg.last_updated,
"status": kg.status
}
}
export_data["knowledge_graphs"].append(kg_info)
return export_data
# Usage
export_data = prepare_export()
print(f"Prepared export for {export_data['total_knowledge_graphs']} knowledge graphs")

Next Steps