Data Ingestion
The ingestion namespace provides methods for uploading and processing data from various sources. It supports file uploads, URL ingestion, and batch processing with comprehensive metadata support.
Upload Methods
Upload from URL
Process documents directly from web URLs:
from functor_sdk import FunctorClientclient = FunctorClient()# Basic URL uploadresult = client.ingestion.upload_url(url="https://example.com/document.pdf",kg_name="KG_Universal")print(f"Job ID: {result.job_id}")print(f"Message: {result.message}")
Upload Local File
Upload files from your local filesystem:
# Upload a local fileresult = client.ingestion.upload_file(file_path="/path/to/document.pdf",kg_name="KG_Universal",source_name="Research Paper")print(f"Job ID: {result.job_id}")print(f"Status: {result.status}")
Unified Ingestion
The unified ingestion endpoint provides advanced features including smart processing modes, quality checks, deduplication, domain detection, and comprehensive analytics. It supports three content sources: direct text content, document URLs, and file uploads.
Basic Usage
Ingest content using the unified endpoint with automatic mode selection:
from functor_sdk import FunctorClientclient = FunctorClient()# Ingest from direct contentresult = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your text content here...",source_name="Research Article")print(f"Success: {result.success}")print(f"Entities extracted: {result.entities_extracted}")print(f"Quality score: {result.quality_score}")
Content Sources
The unified ingestion supports three ways to provide content:
1. Direct Text Content
result = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Diabetes is a chronic disease affecting millions...",source_name="Medical Text",mode="comprehensive")
2. Document URL
result = client.ingestion.ingest_unified(kg_name="KG_Universal",document_url="https://example.com/research-paper.pdf",source_name="Research Paper",mode="comprehensive")
3. File Upload
result = client.ingestion.ingest_unified(kg_name="KG_Universal",file_path="/path/to/document.pdf",source_name="Local Document",mode="comprehensive")
Processing Modes
Choose the processing mode based on your needs:
Auto Mode (Default)
# Automatically selects optimal mode based on contentresult = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",mode="auto" # Default)
Fast Mode
# Optimized for speed with basic processingresult = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",mode="fast",enable_relation_extraction=False, # Skip relation extractionenable_quality_checks=False # Skip quality checks)
Comprehensive Mode
# Full processing with all features enabledresult = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",mode="comprehensive",enable_entity_extraction=True,enable_relation_extraction=True,enable_quality_checks=True,enable_deduplication=True,auto_domain_detection=True)
Domain-Specific Mode
# Specialized processing for specific domainsresult = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",mode="domain_specific",domains=["healthcare", "medical"],enable_entity_extraction=True,enable_relation_extraction=True)
Advanced Features
Quality Checks and Deduplication
result = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",enable_quality_checks=True,enable_deduplication=True,similarity_threshold=0.9 # Deduplication threshold)print(f"Quality score: {result.quality_score}")print(f"Duplicates detected: {result.duplicates_detected}")print(f"Duplicates removed: {result.duplicates_removed}")
Domain Detection
result = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",auto_domain_detection=True)print(f"Detected domains: {result.detected_domains}
Chunking Configuration
result = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...",chunking_strategy="semantic",chunk_size=1000,chunk_overlap=200)
Async Processing
# Process asynchronously for large documentsresult = client.ingestion.ingest_unified(kg_name="KG_Universal",document_url="https://example.com/large-document.pdf",async_processing=True,priority="high",webhook_url="https://your-app.com/ingestion-callback")# Returns immediately with job_idprint(f"Job ID: {result.job_id}")print(f"Status: {result.message}")
Response Format
result = client.ingestion.ingest_unified(kg_name="KG_Universal",content="Your content...")# Access comprehensive response dataprint(f"Success: {result.success}")print(f"Message: {result.message}")print(f"Job ID: {result.job_id}")# Processing statisticsprint(f"Chunks created: {result.chunks_created}")print(f"Entities extracted: {result.entities_extracted}")print(f"Relations extracted: {result.relations_extracted}")# Quality metricsprint(f"Quality score: {result.quality_score}")print(f"Duplicates removed: {result.duplicates_removed}")# Storage informationprint(f"SQL stored: {result.storage_info.get('sql_stored')}")print(f"Vector stored: {result.storage_info.get('vector_stored')}")# Performance metricsprint(f"Processing time: {result.processing_time_ms}ms")print(f"Extraction time: {result.extraction_time_ms}ms")print(f"Embedding time: {result.embedding_time_ms}ms")# Domain detectionif result.detected_domains:print(f"Detected domains: {result.detected_domains}")# Warnings and errorsif result.warnings:print(f"Warnings: {result.warnings}")if result.errors:print(f"Errors: {result.errors}")
Complete Parameter Reference
result = client.ingestion.ingest_unified(# Content source (provide exactly one)content="Your text content...", # ORdocument_url="https://example.com/doc.pdf", # ORfile_path="/path/to/file.pdf",# Requiredkg_name="KG_Universal",# Processing optionsmode="auto", # auto, fast, comprehensive, domain_specificsource_type="auto", # auto, pdf, csv, text, url, json# Chunkingchunking_strategy="semantic",chunk_size=1000,chunk_overlap=200,# Extractionenable_entity_extraction=True,enable_relation_extraction=True,extraction_confidence_threshold=0.7,# Embeddingenable_text_embedding=True,enable_graph_embedding=True,# Storageenable_sql_storage=True,enable_vector_storage=True,enable_kg_integration=True,# Qualityenable_quality_checks=True,enable_deduplication=True,similarity_threshold=0.9,# Domaindomains=["healthcare"],auto_domain_detection=True,# Metadatasource_name="Custom Source",author="Author Name",custom_tags=["tag1", "tag2"],custom_fields={"key": "value"},# Processing controlpriority="normal", # low, normal, high, urgentasync_processing=False,webhook_url=None)
Async Version
import asynciofrom functor_sdk import FunctorClientasync def unified_ingest_async():async with FunctorClient() as client:result = await client.ingestion.ingest_unified_async(kg_name="KG_Universal",content="Your content...",mode="comprehensive")print(f"Success: {result.success}")print(f"Entities: {result.entities_extracted}")asyncio.run(unified_ingest_async())
Complete Parameter Reference
upload_url() Parameters
result = client.ingestion.upload_url(url="https://example.com/document.pdf", # Required: URL to processkg_name="KG_Universal", # Required: Target knowledge graphsource_name="Custom Source Name", # Optional: Custom source namecustom_metadata={ # Optional: Additional metadata"domain": "health","priority": "high","author": "Dr. Smith"})
upload_file() Parameters
result = client.ingestion.upload_file(file_path="/path/to/document.pdf", # Required: Local file pathkg_name="KG_Universal", # Required: Target knowledge graphsource_name="Custom Source Name", # Optional: Custom source namecustom_metadata={ # Optional: Additional metadata"domain": "research","version": "1.0","tags": ["AI", "machine learning"]})
Supported File Types
Document Formats
- PDF - Portable Document Format
- DOCX - Microsoft Word documents
- TXT - Plain text files
- MD - Markdown files
- HTML - Web pages and HTML documents
- RTF - Rich Text Format
Data Formats
- JSON - Structured data
- CSV - Comma-separated values
- XML - Extensible Markup Language
- YAML - YAML Ain't Markup Language
Response Format
IngestionResult Object
result = client.ingestion.upload_url(url="https://example.com/document.pdf",kg_name="KG_Universal")# Access response propertiesprint(f"Job ID: {result.job_id}")print(f"Status: {result.status}")print(f"Message: {result.message}")print(f"Source ID: {result.source_id}")# Check if successfulif result.status == "success":print("Upload completed successfully!")else:print(f"Upload failed: {result.message}")
Advanced Usage Patterns
Batch Upload from URLs
# Batch upload multiple URLsurls = ["https://example.com/doc1.pdf","https://example.com/doc2.pdf","https://example.com/doc3.pdf"]results = []for i, url in enumerate(urls):result = client.ingestion.upload_url(url=url,kg_name="KG_Universal",source_name=f"Document {i+1}",custom_metadata={"batch_id": "batch_001"})results.append(result)# Check resultsfor i, result in enumerate(results):print(f"Document {i+1}: {result.status}")
Async Batch Processing
import asynciofrom functor_sdk import FunctorClientasync def batch_upload():async with FunctorClient() as client:urls = ["https://example.com/doc1.pdf","https://example.com/doc2.pdf","https://example.com/doc3.pdf"]# Upload concurrentlytasks = [client.ingestion.upload_url_async(url=url,kg_name="KG_Universal",source_name=f"Document {i+1}")for i, url in enumerate(urls)]results = await asyncio.gather(*tasks)for i, result in enumerate(results):print(f"Document {i+1}: {result.status}")asyncio.run(batch_upload())
Upload with Progress Tracking
import timefrom functor_sdk import FunctorClientdef upload_with_progress():client = FunctorClient()# Start uploadresult = client.ingestion.upload_url(url="https://example.com/large-document.pdf",kg_name="KG_Universal",source_name="Large Document")print(f"Upload started. Job ID: {result.job_id}")# Poll for completion (simplified example)max_attempts = 30for attempt in range(max_attempts):time.sleep(10) # Wait 10 seconds# Check job status (you'd implement this based on your API)# status = client.jobs.get_status(result.job_id)# if status == "completed":# print("Upload completed!")# break# elif status == "failed":# print("Upload failed!")# breakprint(f"Attempt {attempt + 1}: Still processing...")return resultresult = upload_with_progress()
Metadata Management
Custom Metadata
# Upload with rich metadataresult = client.ingestion.upload_url(url="https://medical-journal.com/research.pdf",kg_name="KG_Medical",source_name="Diabetes Research 2024",custom_metadata={"domain": "medical","category": "research","year": 2024,"authors": ["Dr. Smith", "Dr. Jones"],"keywords": ["diabetes", "treatment", "research"],"priority": "high","review_status": "peer_reviewed","doi": "10.1000/example.doi","journal": "Medical Research Journal"})
Metadata Best Practices
- Use consistent keys: Standardize metadata field names
- Include domain information: Help with query routing
- Add timestamps: Track when documents were processed
- Include source information: Author, publication, etc.
- Use tags: Enable better categorization and search
Error Handling
Common Error Scenarios
from functor_sdk import (FunctorClient,FunctorAPIError,FunctorValidationError,FunctorNotFoundError)client = FunctorClient()try:result = client.ingestion.upload_url(url="https://example.com/document.pdf",kg_name="KG_Universal")except FunctorValidationError as e:print(f"Validation error: {e.message}")print("Check your URL format and parameters")except FunctorNotFoundError:print("Knowledge graph not found")print("Available KGs:", client.knowledge_graphs.list())except FunctorAPIError as e:print(f"API error {e.status_code}: {e.message}")except Exception as e:print(f"Unexpected error: {e}")
File Upload Error Handling
import osfrom functor_sdk import FunctorClient, FunctorAPIErrorclient = FunctorClient()def safe_upload_file(file_path, kg_name):# Check if file existsif not os.path.exists(file_path):print(f"File not found: {file_path}")return None# Check file size (example: 50MB limit)file_size = os.path.getsize(file_path)if file_size > 50 * 1024 * 1024: # 50MBprint(f"File too large: {file_size} bytes")return Nonetry:result = client.ingestion.upload_file(file_path=file_path,kg_name=kg_name)return resultexcept FunctorAPIError as e:print(f"Upload failed: {e.message}")return None# Usageresult = safe_upload_file("/path/to/document.pdf", "KG_Universal")if result:print(f"Upload successful: {result.job_id}")
Performance Optimization
Upload Optimization Tips
- Use async for batch operations: Upload multiple files concurrently
- Compress large files: Reduce upload time and storage
- Use appropriate KG names: Target specific knowledge graphs
- Include meaningful metadata: Help with processing and retrieval
- Monitor job status: Track long-running uploads
Concurrent Upload Example
import asynciofrom pathlib import Pathfrom functor_sdk import FunctorClientasync def upload_directory(directory_path, kg_name):async with FunctorClient() as client:directory = Path(directory_path)files = list(directory.glob("*.pdf"))# Upload files concurrentlytasks = [client.ingestion.upload_file_async(file_path=str(file),kg_name=kg_name,source_name=file.stem)for file in files]results = await asyncio.gather(*tasks, return_exceptions=True)# Process resultssuccessful = 0failed = 0for i, result in enumerate(results):if isinstance(result, Exception):print(f"Failed to upload {files[i].name}: {result}")failed += 1else:print(f"Successfully uploaded {files[i].name}")successful += 1print(f"Upload complete: {successful} successful, {failed} failed")# Usageasyncio.run(upload_directory("/path/to/documents", "KG_Universal"))
Integration Examples
Web Scraping Integration
import requestsfrom functor_sdk import FunctorClientdef scrape_and_upload(urls, kg_name):client = FunctorClient()for url in urls:try:# Scrape contentresponse = requests.get(url)response.raise_for_status()# Upload to knowledge graphresult = client.ingestion.upload_url(url=url,kg_name=kg_name,source_name=f"Scraped: {url}",custom_metadata={"source_type": "web_scraping","scraped_at": "2024-01-01","content_length": len(response.content)})print(f"Uploaded {url}: {result.job_id}")except Exception as e:print(f"Failed to process {url}: {e}")# Usageurls = ["https://example.com/article1.html","https://example.com/article2.html"]scrape_and_upload(urls, "KG_Universal")
Database Integration
import sqlite3from functor_sdk import FunctorClientdef upload_from_database(db_path, kg_name):client = FunctorClient()# Connect to databaseconn = sqlite3.connect(db_path)cursor = conn.cursor()# Get documents from databasecursor.execute("SELECT id, title, content, url FROM documents")documents = cursor.fetchall()for doc_id, title, content, url in documents:try:if url:# Upload from URLresult = client.ingestion.upload_url(url=url,kg_name=kg_name,source_name=title,custom_metadata={"db_id": doc_id,"source": "database","title": title})else:# Upload content directly (if supported)# This would require a different methodprint(f"Skipping {title} - no URL provided")continueprint(f"Uploaded {title}: {result.job_id}")except Exception as e:print(f"Failed to upload {title}: {e}")conn.close()# Usageupload_from_database("documents.db", "KG_Universal")
Monitoring and Status
Job Status Tracking
def track_upload_progress(job_id):client = FunctorClient()# This is a conceptual example - implement based on your APImax_attempts = 60 # 10 minutes with 10-second intervalsfor attempt in range(max_attempts):try:# Check job status (implement this method)# status = client.jobs.get_status(job_id)# if status == "completed":# print("Upload completed successfully!")# return True# elif status == "failed":# print("Upload failed!")# return False# elif status == "processing":# print(f"Still processing... (attempt {attempt + 1})")time.sleep(10) # Wait 10 secondsexcept Exception as e:print(f"Error checking status: {e}")breakprint("Timeout waiting for upload completion")return False# Usageresult = client.ingestion.upload_url(url="https://example.com/document.pdf",kg_name="KG_Universal")track_upload_progress(result.job_id)
Next Steps
- Sources Namespace - Manage uploaded sources and get statistics
- Queries Namespace - Query your uploaded data
- Knowledge Graphs Namespace - Manage your knowledge graphs
- Error Handling - Comprehensive error management