Data Sources
The sources namespace provides comprehensive CRUD operations for managing data sources within knowledge graphs. It includes listing, statistics, deletion, and update operations with detailed metadata support.
Basic Operations
List All Knowledge Graphs with Sources
from functor_sdk import FunctorClientclient = FunctorClient()# List all KGs with their sourceskgs = client.sources.list_kgs()print(f"Found {len(kgs)} knowledge graphs with sources:")for kg in kgs:print(f"\n{kg.name}:")print(f" Sources: {kg.sources_count}")print(f" Total chunks: {kg.total_chunks}")
List Sources for Specific KG
# List sources for a specific knowledge graphsources = client.sources.list_for_kg("KG_Universal")print(f"Found {len(sources)} sources in KG_Universal:")for source in sources:print(f"\n{source.source_name}")print(f" ID: {source.id}")print(f" Type: {source.source_type}")print(f" Chunks: {source.chunks_count}")print(f" Entities: {source.entities_count}")
Complete Parameter Reference
list_kgs() Parameters
kgs = client.sources.list_kgs(include_artifacts=True # Optional: Include artifact information)
list_for_kg() Parameters
sources = client.sources.list_for_kg(kg_name="KG_Universal", # Required: Knowledge graph nameinclude_stats=True # Optional: Include detailed statistics)
get_stats() Parameters
stats = client.sources.get_stats(source_id="source_123", # Required: Source IDinclude_vector_stats=True # Optional: Include vector statistics)
delete() Parameters
result = client.sources.delete(kg_name="KG_Universal", # Required: Knowledge graph namesource_id="source_123", # Required: Source IDdelete_from_vector_db=True, # Optional: Delete from vector databasedelete_visualizations=True, # Optional: Delete visualizationsdelete_kg_files=False, # Optional: Delete KG filesforce=False # Optional: Force deletion)
update() Parameters
result = client.sources.update(kg_name="KG_Universal", # Required: Knowledge graph namesource_id="source_123", # Required: Source IDforce_reprocess=False, # Optional: Force reprocessingupdate_metadata={ # Optional: Update metadata"priority": "high","tags": ["updated", "important"]})
Response Formats
Source Object
sources = client.sources.list_for_kg("KG_Universal", include_stats=True)for source in sources:# Basic propertiesprint(f"ID: {source.id}")print(f"Name: {source.source_name}")print(f"Type: {source.source_type}")print(f"KG: {source.kg_name}")# Statistics (when include_stats=True)print(f"Chunks: {source.chunks_count}")print(f"Entities: {source.entities_count}")print(f"Relations: {source.relations_count}")# Metadataprint(f"Created: {source.created_at}")print(f"Updated: {source.updated_at}")print(f"Status: {source.status}")
SourceStats Object
stats = client.sources.get_stats("source_123", include_vector_stats=True)# Basic statisticsprint(f"Source: {stats.source_name}")print(f"Chunks: {stats.chunks_count}")print(f"Entities: {stats.entities_count}")print(f"Relations: {stats.relations_count}")# Vector statistics (when include_vector_stats=True)print(f"Vector points: {stats.vector_points_count}")print(f"Vector dimensions: {stats.vector_dimensions}")# Processing informationprint(f"Processing time: {stats.processing_time_ms}ms")print(f"Last processed: {stats.last_processed}")
Advanced Usage Patterns
Source Management Workflow
def manage_sources(kg_name):client = FunctorClient()# 1. List all sourcessources = client.sources.list_for_kg(kg_name, include_stats=True)print(f"Found {len(sources)} sources in {kg_name}")# 2. Analyze sourcesfor source in sources:print(f"\nSource: {source.source_name}")print(f" Status: {source.status}")print(f" Chunks: {source.chunks_count}")print(f" Entities: {source.entities_count}")# 3. Get detailed stats for large sourcesif source.chunks_count > 100:stats = client.sources.get_stats(source.id, include_vector_stats=True)print(f" Vector points: {stats.vector_points_count}")return sources# Usagesources = manage_sources("KG_Universal")
Batch Operations
def batch_source_operations(kg_name, source_ids):client = FunctorClient()results = []for source_id in source_ids:try:# Get statsstats = client.sources.get_stats(source_id)print(f"Source {source_id}: {stats.chunks_count} chunks")# Update metadataresult = client.sources.update(kg_name=kg_name,source_id=source_id,update_metadata={"batch_processed": True})results.append({"source_id": source_id, "status": "success"})except Exception as e:print(f"Failed to process {source_id}: {e}")results.append({"source_id": source_id, "status": "failed", "error": str(e)})return results# Usagesource_ids = ["source_1", "source_2", "source_3"]results = batch_source_operations("KG_Universal", source_ids)
Async Operations
import asynciofrom functor_sdk import FunctorClientasync def async_source_management():async with FunctorClient() as client:# List sourcessources = await client.sources.list_for_kg_async("KG_Universal")# Get stats for all sources concurrentlytasks = [client.sources.get_stats_async(source.id)for source in sources]stats_results = await asyncio.gather(*tasks, return_exceptions=True)# Process resultsfor i, stats in enumerate(stats_results):if isinstance(stats, Exception):print(f"Failed to get stats for {sources[i].source_name}: {stats}")else:print(f"{sources[i].source_name}: {stats.chunks_count} chunks")# Usageasyncio.run(async_source_management())
Source Lifecycle Management
Source Health Monitoring
def monitor_source_health(kg_name):client = FunctorClient()sources = client.sources.list_for_kg(kg_name, include_stats=True)health_report = {"total_sources": len(sources),"healthy": 0,"warnings": 0,"errors": 0,"issues": []}for source in sources:issues = []# Check for empty sourcesif source.chunks_count == 0:issues.append("No chunks processed")health_report["errors"] += 1# Check for stale sourceselif source.updated_at < "2024-01-01":issues.append("Not updated recently")health_report["warnings"] += 1# Check for processing issueselif source.status != "completed":issues.append(f"Status: {source.status}")health_report["warnings"] += 1else:health_report["healthy"] += 1if issues:health_report["issues"].append({"source_name": source.source_name,"source_id": source.id,"issues": issues})return health_report# Usagehealth = monitor_source_health("KG_Universal")print(f"Health Report: {health['healthy']} healthy, {health['warnings']} warnings, {health['errors']} errors")
Source Cleanup
def cleanup_old_sources(kg_name, days_threshold=30):client = FunctorClient()sources = client.sources.list_for_kg(kg_name)from datetime import datetime, timedeltacutoff_date = datetime.now() - timedelta(days=days_threshold)sources_to_delete = []for source in sources:if source.updated_at < cutoff_date.isoformat():sources_to_delete.append(source)print(f"Found {len(sources_to_delete)} sources older than {days_threshold} days")deleted_count = 0for source in sources_to_delete:try:result = client.sources.delete(kg_name=kg_name,source_id=source.id,delete_from_vector_db=True,delete_visualizations=True,force=False)if result.success:print(f"Deleted: {source.source_name}")deleted_count += 1else:print(f"Failed to delete: {source.source_name}")except Exception as e:print(f"Error deleting {source.source_name}: {e}")print(f"Successfully deleted {deleted_count} sources")return deleted_count# Usagedeleted = cleanup_old_sources("KG_Universal", days_threshold=90)
Error Handling
Common Error Scenarios
from functor_sdk import (FunctorClient,FunctorAPIError,FunctorNotFoundError,FunctorValidationError)client = FunctorClient()try:sources = client.sources.list_for_kg("KG_Universal")except FunctorNotFoundError:print("Knowledge graph not found")print("Available KGs:", client.knowledge_graphs.list())except FunctorValidationError as e:print(f"Validation error: {e.message}")except FunctorAPIError as e:print(f"API error {e.status_code}: {e.message}")except Exception as e:print(f"Unexpected error: {e}")
Robust Source Operations
def safe_source_operation(operation, *args, **kwargs):client = FunctorClient()max_retries = 3for attempt in range(max_retries):try:if operation == "list":return client.sources.list_for_kg(*args, **kwargs)elif operation == "stats":return client.sources.get_stats(*args, **kwargs)elif operation == "delete":return client.sources.delete(*args, **kwargs)elif operation == "update":return client.sources.update(*args, **kwargs)else:raise ValueError(f"Unknown operation: {operation}")except FunctorAPIError as e:if e.status_code >= 500 and attempt < max_retries - 1:print(f"Server error, retrying... (attempt {attempt + 1})")time.sleep(2 ** attempt)else:raiseexcept Exception as e:print(f"Operation failed: {e}")return Nonereturn None# Usagesources = safe_source_operation("list", "KG_Universal", include_stats=True)if sources:print(f"Retrieved {len(sources)} sources")else:print("Failed to retrieve sources")
Performance Optimization
Optimization Tips
- Use include_stats sparingly: Only when you need detailed statistics
- Batch operations: Process multiple sources together
- Use async for concurrent operations: When processing many sources
- Cache source lists: Sources don't change frequently
- Filter by status: Only process relevant sources
Efficient Source Processing
def efficient_source_processing(kg_name):client = FunctorClient()# Get basic source list firstsources = client.sources.list_for_kg(kg_name, include_stats=False)# Filter sources of interestactive_sources = [s for s in sources if s.status == "completed"]large_sources = [s for s in active_sources if s.chunks_count > 100]print(f"Processing {len(large_sources)} large sources")# Get detailed stats only for filtered sourcesdetailed_stats = []for source in large_sources:try:stats = client.sources.get_stats(source.id, include_vector_stats=True)detailed_stats.append(stats)except Exception as e:print(f"Failed to get stats for {source.source_name}: {e}")return detailed_stats# Usagestats = efficient_source_processing("KG_Universal")print(f"Retrieved detailed stats for {len(stats)} sources")
Integration Examples
Source Analytics Dashboard
def create_source_dashboard(kg_name):client = FunctorClient()sources = client.sources.list_for_kg(kg_name, include_stats=True)dashboard = {"kg_name": kg_name,"total_sources": len(sources),"total_chunks": sum(s.chunks_count for s in sources),"total_entities": sum(s.entities_count for s in sources),"sources_by_type": {},"sources_by_status": {},"top_sources": []}# Group by type and statusfor source in sources:# By typeif source.source_type not in dashboard["sources_by_type"]:dashboard["sources_by_type"][source.source_type] = 0dashboard["sources_by_type"][source.source_type] += 1# By statusif source.status not in dashboard["sources_by_status"]:dashboard["sources_by_status"][source.status] = 0dashboard["sources_by_status"][source.status] += 1# Top sources by chunksdashboard["top_sources"] = sorted(sources,key=lambda s: s.chunks_count,reverse=True)[:10]return dashboard# Usagedashboard = create_source_dashboard("KG_Universal")print(f"Dashboard for {dashboard['kg_name']}:")print(f" Total sources: {dashboard['total_sources']}")print(f" Total chunks: {dashboard['total_chunks']:,}")print(f" Total entities: {dashboard['total_entities']:,}")
Source Backup and Restore
def backup_source_metadata(kg_name):client = FunctorClient()sources = client.sources.list_for_kg(kg_name, include_stats=True)backup_data = {"backup_timestamp": "2024-01-01T00:00:00Z","kg_name": kg_name,"sources": []}for source in sources:source_data = {"id": source.id,"name": source.source_name,"type": source.source_type,"status": source.status,"chunks_count": source.chunks_count,"entities_count": source.entities_count,"created_at": source.created_at,"updated_at": source.updated_at}backup_data["sources"].append(source_data)return backup_datadef restore_source_metadata(backup_data):client = FunctorClient()kg_name = backup_data["kg_name"]# Get current sourcescurrent_sources = client.sources.list_for_kg(kg_name)current_ids = {s.id for s in current_sources}# Find missing sourcesbackup_ids = {s["id"] for s in backup_data["sources"]}missing_ids = backup_ids - current_idsprint(f"Found {len(missing_ids)} missing sources")# Note: Actual restore would require re-uploading the source data# This is just metadata restorationreturn missing_ids# Usagebackup = backup_source_metadata("KG_Universal")missing = restore_source_metadata(backup)print(f"Backup created for {len(backup['sources'])} sources")
Next Steps
- Queries Namespace - Query your sources
- Ingestion Namespace - Add new sources
- Visualizations Namespace - Visualize your sources
- Error Handling - Comprehensive error management