GitHub

Data Sources

The sources namespace provides comprehensive CRUD operations for managing data sources within knowledge graphs. It includes listing, statistics, deletion, and update operations with detailed metadata support.

Basic Operations

List All Knowledge Graphs with Sources

from functor_sdk import FunctorClient
client = FunctorClient()
# List all KGs with their sources
kgs = client.sources.list_kgs()
print(f"Found {len(kgs)} knowledge graphs with sources:")
for kg in kgs:
print(f"\n{kg.name}:")
print(f" Sources: {kg.sources_count}")
print(f" Total chunks: {kg.total_chunks}")

List Sources for Specific KG

# List sources for a specific knowledge graph
sources = client.sources.list_for_kg("KG_Universal")
print(f"Found {len(sources)} sources in KG_Universal:")
for source in sources:
print(f"\n{source.source_name}")
print(f" ID: {source.id}")
print(f" Type: {source.source_type}")
print(f" Chunks: {source.chunks_count}")
print(f" Entities: {source.entities_count}")

Complete Parameter Reference

list_kgs() Parameters

kgs = client.sources.list_kgs(
include_artifacts=True # Optional: Include artifact information
)

list_for_kg() Parameters

sources = client.sources.list_for_kg(
kg_name="KG_Universal", # Required: Knowledge graph name
include_stats=True # Optional: Include detailed statistics
)

get_stats() Parameters

stats = client.sources.get_stats(
source_id="source_123", # Required: Source ID
include_vector_stats=True # Optional: Include vector statistics
)

delete() Parameters

result = client.sources.delete(
kg_name="KG_Universal", # Required: Knowledge graph name
source_id="source_123", # Required: Source ID
delete_from_vector_db=True, # Optional: Delete from vector database
delete_visualizations=True, # Optional: Delete visualizations
delete_kg_files=False, # Optional: Delete KG files
force=False # Optional: Force deletion
)

update() Parameters

result = client.sources.update(
kg_name="KG_Universal", # Required: Knowledge graph name
source_id="source_123", # Required: Source ID
force_reprocess=False, # Optional: Force reprocessing
update_metadata={ # Optional: Update metadata
"priority": "high",
"tags": ["updated", "important"]
}
)

Response Formats

Source Object

sources = client.sources.list_for_kg("KG_Universal", include_stats=True)
for source in sources:
# Basic properties
print(f"ID: {source.id}")
print(f"Name: {source.source_name}")
print(f"Type: {source.source_type}")
print(f"KG: {source.kg_name}")
# Statistics (when include_stats=True)
print(f"Chunks: {source.chunks_count}")
print(f"Entities: {source.entities_count}")
print(f"Relations: {source.relations_count}")
# Metadata
print(f"Created: {source.created_at}")
print(f"Updated: {source.updated_at}")
print(f"Status: {source.status}")

SourceStats Object

stats = client.sources.get_stats("source_123", include_vector_stats=True)
# Basic statistics
print(f"Source: {stats.source_name}")
print(f"Chunks: {stats.chunks_count}")
print(f"Entities: {stats.entities_count}")
print(f"Relations: {stats.relations_count}")
# Vector statistics (when include_vector_stats=True)
print(f"Vector points: {stats.vector_points_count}")
print(f"Vector dimensions: {stats.vector_dimensions}")
# Processing information
print(f"Processing time: {stats.processing_time_ms}ms")
print(f"Last processed: {stats.last_processed}")

Advanced Usage Patterns

Source Management Workflow

def manage_sources(kg_name):
client = FunctorClient()
# 1. List all sources
sources = client.sources.list_for_kg(kg_name, include_stats=True)
print(f"Found {len(sources)} sources in {kg_name}")
# 2. Analyze sources
for source in sources:
print(f"\nSource: {source.source_name}")
print(f" Status: {source.status}")
print(f" Chunks: {source.chunks_count}")
print(f" Entities: {source.entities_count}")
# 3. Get detailed stats for large sources
if source.chunks_count > 100:
stats = client.sources.get_stats(source.id, include_vector_stats=True)
print(f" Vector points: {stats.vector_points_count}")
return sources
# Usage
sources = manage_sources("KG_Universal")

Batch Operations

def batch_source_operations(kg_name, source_ids):
client = FunctorClient()
results = []
for source_id in source_ids:
try:
# Get stats
stats = client.sources.get_stats(source_id)
print(f"Source {source_id}: {stats.chunks_count} chunks")
# Update metadata
result = client.sources.update(
kg_name=kg_name,
source_id=source_id,
update_metadata={"batch_processed": True}
)
results.append({"source_id": source_id, "status": "success"})
except Exception as e:
print(f"Failed to process {source_id}: {e}")
results.append({"source_id": source_id, "status": "failed", "error": str(e)})
return results
# Usage
source_ids = ["source_1", "source_2", "source_3"]
results = batch_source_operations("KG_Universal", source_ids)

Async Operations

import asyncio
from functor_sdk import FunctorClient
async def async_source_management():
async with FunctorClient() as client:
# List sources
sources = await client.sources.list_for_kg_async("KG_Universal")
# Get stats for all sources concurrently
tasks = [
client.sources.get_stats_async(source.id)
for source in sources
]
stats_results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, stats in enumerate(stats_results):
if isinstance(stats, Exception):
print(f"Failed to get stats for {sources[i].source_name}: {stats}")
else:
print(f"{sources[i].source_name}: {stats.chunks_count} chunks")
# Usage
asyncio.run(async_source_management())

Source Lifecycle Management

Source Health Monitoring

def monitor_source_health(kg_name):
client = FunctorClient()
sources = client.sources.list_for_kg(kg_name, include_stats=True)
health_report = {
"total_sources": len(sources),
"healthy": 0,
"warnings": 0,
"errors": 0,
"issues": []
}
for source in sources:
issues = []
# Check for empty sources
if source.chunks_count == 0:
issues.append("No chunks processed")
health_report["errors"] += 1
# Check for stale sources
elif source.updated_at < "2024-01-01":
issues.append("Not updated recently")
health_report["warnings"] += 1
# Check for processing issues
elif source.status != "completed":
issues.append(f"Status: {source.status}")
health_report["warnings"] += 1
else:
health_report["healthy"] += 1
if issues:
health_report["issues"].append({
"source_name": source.source_name,
"source_id": source.id,
"issues": issues
})
return health_report
# Usage
health = monitor_source_health("KG_Universal")
print(f"Health Report: {health['healthy']} healthy, {health['warnings']} warnings, {health['errors']} errors")

Source Cleanup

def cleanup_old_sources(kg_name, days_threshold=30):
client = FunctorClient()
sources = client.sources.list_for_kg(kg_name)
from datetime import datetime, timedelta
cutoff_date = datetime.now() - timedelta(days=days_threshold)
sources_to_delete = []
for source in sources:
if source.updated_at < cutoff_date.isoformat():
sources_to_delete.append(source)
print(f"Found {len(sources_to_delete)} sources older than {days_threshold} days")
deleted_count = 0
for source in sources_to_delete:
try:
result = client.sources.delete(
kg_name=kg_name,
source_id=source.id,
delete_from_vector_db=True,
delete_visualizations=True,
force=False
)
if result.success:
print(f"Deleted: {source.source_name}")
deleted_count += 1
else:
print(f"Failed to delete: {source.source_name}")
except Exception as e:
print(f"Error deleting {source.source_name}: {e}")
print(f"Successfully deleted {deleted_count} sources")
return deleted_count
# Usage
deleted = cleanup_old_sources("KG_Universal", days_threshold=90)

Error Handling

Common Error Scenarios

from functor_sdk import (
FunctorClient,
FunctorAPIError,
FunctorNotFoundError,
FunctorValidationError
)
client = FunctorClient()
try:
sources = client.sources.list_for_kg("KG_Universal")
except FunctorNotFoundError:
print("Knowledge graph not found")
print("Available KGs:", client.knowledge_graphs.list())
except FunctorValidationError as e:
print(f"Validation error: {e.message}")
except FunctorAPIError as e:
print(f"API error {e.status_code}: {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")

Robust Source Operations

def safe_source_operation(operation, *args, **kwargs):
client = FunctorClient()
max_retries = 3
for attempt in range(max_retries):
try:
if operation == "list":
return client.sources.list_for_kg(*args, **kwargs)
elif operation == "stats":
return client.sources.get_stats(*args, **kwargs)
elif operation == "delete":
return client.sources.delete(*args, **kwargs)
elif operation == "update":
return client.sources.update(*args, **kwargs)
else:
raise ValueError(f"Unknown operation: {operation}")
except FunctorAPIError as e:
if e.status_code >= 500 and attempt < max_retries - 1:
print(f"Server error, retrying... (attempt {attempt + 1})")
time.sleep(2 ** attempt)
else:
raise
except Exception as e:
print(f"Operation failed: {e}")
return None
return None
# Usage
sources = safe_source_operation("list", "KG_Universal", include_stats=True)
if sources:
print(f"Retrieved {len(sources)} sources")
else:
print("Failed to retrieve sources")

Performance Optimization

Optimization Tips

  • Use include_stats sparingly: Only when you need detailed statistics
  • Batch operations: Process multiple sources together
  • Use async for concurrent operations: When processing many sources
  • Cache source lists: Sources don't change frequently
  • Filter by status: Only process relevant sources

Efficient Source Processing

def efficient_source_processing(kg_name):
client = FunctorClient()
# Get basic source list first
sources = client.sources.list_for_kg(kg_name, include_stats=False)
# Filter sources of interest
active_sources = [s for s in sources if s.status == "completed"]
large_sources = [s for s in active_sources if s.chunks_count > 100]
print(f"Processing {len(large_sources)} large sources")
# Get detailed stats only for filtered sources
detailed_stats = []
for source in large_sources:
try:
stats = client.sources.get_stats(source.id, include_vector_stats=True)
detailed_stats.append(stats)
except Exception as e:
print(f"Failed to get stats for {source.source_name}: {e}")
return detailed_stats
# Usage
stats = efficient_source_processing("KG_Universal")
print(f"Retrieved detailed stats for {len(stats)} sources")

Integration Examples

Source Analytics Dashboard

def create_source_dashboard(kg_name):
client = FunctorClient()
sources = client.sources.list_for_kg(kg_name, include_stats=True)
dashboard = {
"kg_name": kg_name,
"total_sources": len(sources),
"total_chunks": sum(s.chunks_count for s in sources),
"total_entities": sum(s.entities_count for s in sources),
"sources_by_type": {},
"sources_by_status": {},
"top_sources": []
}
# Group by type and status
for source in sources:
# By type
if source.source_type not in dashboard["sources_by_type"]:
dashboard["sources_by_type"][source.source_type] = 0
dashboard["sources_by_type"][source.source_type] += 1
# By status
if source.status not in dashboard["sources_by_status"]:
dashboard["sources_by_status"][source.status] = 0
dashboard["sources_by_status"][source.status] += 1
# Top sources by chunks
dashboard["top_sources"] = sorted(
sources,
key=lambda s: s.chunks_count,
reverse=True
)[:10]
return dashboard
# Usage
dashboard = create_source_dashboard("KG_Universal")
print(f"Dashboard for {dashboard['kg_name']}:")
print(f" Total sources: {dashboard['total_sources']}")
print(f" Total chunks: {dashboard['total_chunks']:,}")
print(f" Total entities: {dashboard['total_entities']:,}")

Source Backup and Restore

def backup_source_metadata(kg_name):
client = FunctorClient()
sources = client.sources.list_for_kg(kg_name, include_stats=True)
backup_data = {
"backup_timestamp": "2024-01-01T00:00:00Z",
"kg_name": kg_name,
"sources": []
}
for source in sources:
source_data = {
"id": source.id,
"name": source.source_name,
"type": source.source_type,
"status": source.status,
"chunks_count": source.chunks_count,
"entities_count": source.entities_count,
"created_at": source.created_at,
"updated_at": source.updated_at
}
backup_data["sources"].append(source_data)
return backup_data
def restore_source_metadata(backup_data):
client = FunctorClient()
kg_name = backup_data["kg_name"]
# Get current sources
current_sources = client.sources.list_for_kg(kg_name)
current_ids = {s.id for s in current_sources}
# Find missing sources
backup_ids = {s["id"] for s in backup_data["sources"]}
missing_ids = backup_ids - current_ids
print(f"Found {len(missing_ids)} missing sources")
# Note: Actual restore would require re-uploading the source data
# This is just metadata restoration
return missing_ids
# Usage
backup = backup_source_metadata("KG_Universal")
missing = restore_source_metadata(backup)
print(f"Backup created for {len(backup['sources'])} sources")

Next Steps