GitHub

Data Ingestion

The ingestion namespace provides methods for uploading and processing data from various sources. It supports file uploads, URL ingestion, and batch processing with comprehensive metadata support.

Upload Methods

Upload from URL

Process documents directly from web URLs:

from functor_sdk import FunctorClient
client = FunctorClient()
# Basic URL upload
result = client.ingestion.upload_url(
url="https://example.com/document.pdf",
kg_name="KG_Universal"
)
print(f"Job ID: {result.job_id}")
print(f"Message: {result.message}")

Upload Local File

Upload files from your local filesystem:

# Upload a local file
result = client.ingestion.upload_file(
file_path="/path/to/document.pdf",
kg_name="KG_Universal",
source_name="Research Paper"
)
print(f"Job ID: {result.job_id}")
print(f"Status: {result.status}")

Unified Ingestion

The unified ingestion endpoint provides advanced features including smart processing modes, quality checks, deduplication, domain detection, and comprehensive analytics. It supports three content sources: direct text content, document URLs, and file uploads.

Basic Usage

Ingest content using the unified endpoint with automatic mode selection:

from functor_sdk import FunctorClient
client = FunctorClient()
# Ingest from direct content
result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your text content here...",
source_name="Research Article"
)
print(f"Success: {result.success}")
print(f"Entities extracted: {result.entities_extracted}")
print(f"Quality score: {result.quality_score}")

Content Sources

The unified ingestion supports three ways to provide content:

1. Direct Text Content

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Diabetes is a chronic disease affecting millions...",
source_name="Medical Text",
mode="comprehensive"
)

2. Document URL

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
document_url="https://example.com/research-paper.pdf",
source_name="Research Paper",
mode="comprehensive"
)

3. File Upload

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
file_path="/path/to/document.pdf",
source_name="Local Document",
mode="comprehensive"
)

Processing Modes

Choose the processing mode based on your needs:

Auto Mode (Default)

# Automatically selects optimal mode based on content
result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
mode="auto" # Default
)

Fast Mode

# Optimized for speed with basic processing
result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
mode="fast",
enable_relation_extraction=False, # Skip relation extraction
enable_quality_checks=False # Skip quality checks
)

Comprehensive Mode

# Full processing with all features enabled
result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
mode="comprehensive",
enable_entity_extraction=True,
enable_relation_extraction=True,
enable_quality_checks=True,
enable_deduplication=True,
auto_domain_detection=True
)

Domain-Specific Mode

# Specialized processing for specific domains
result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
mode="domain_specific",
domains=["healthcare", "medical"],
enable_entity_extraction=True,
enable_relation_extraction=True
)

Advanced Features

Quality Checks and Deduplication

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
enable_quality_checks=True,
enable_deduplication=True,
similarity_threshold=0.9 # Deduplication threshold
)
print(f"Quality score: {result.quality_score}")
print(f"Duplicates detected: {result.duplicates_detected}")
print(f"Duplicates removed: {result.duplicates_removed}")

Domain Detection

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
auto_domain_detection=True
)
print(f"Detected domains: {result.detected_domains}

Chunking Configuration

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content...",
chunking_strategy="semantic",
chunk_size=1000,
chunk_overlap=200
)

Async Processing

# Process asynchronously for large documents
result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
document_url="https://example.com/large-document.pdf",
async_processing=True,
priority="high",
webhook_url="https://your-app.com/ingestion-callback"
)
# Returns immediately with job_id
print(f"Job ID: {result.job_id}")
print(f"Status: {result.message}")

Response Format

result = client.ingestion.ingest_unified(
kg_name="KG_Universal",
content="Your content..."
)
# Access comprehensive response data
print(f"Success: {result.success}")
print(f"Message: {result.message}")
print(f"Job ID: {result.job_id}")
# Processing statistics
print(f"Chunks created: {result.chunks_created}")
print(f"Entities extracted: {result.entities_extracted}")
print(f"Relations extracted: {result.relations_extracted}")
# Quality metrics
print(f"Quality score: {result.quality_score}")
print(f"Duplicates removed: {result.duplicates_removed}")
# Storage information
print(f"SQL stored: {result.storage_info.get('sql_stored')}")
print(f"Vector stored: {result.storage_info.get('vector_stored')}")
# Performance metrics
print(f"Processing time: {result.processing_time_ms}ms")
print(f"Extraction time: {result.extraction_time_ms}ms")
print(f"Embedding time: {result.embedding_time_ms}ms")
# Domain detection
if result.detected_domains:
print(f"Detected domains: {result.detected_domains}")
# Warnings and errors
if result.warnings:
print(f"Warnings: {result.warnings}")
if result.errors:
print(f"Errors: {result.errors}")

Complete Parameter Reference

result = client.ingestion.ingest_unified(
# Content source (provide exactly one)
content="Your text content...", # OR
document_url="https://example.com/doc.pdf", # OR
file_path="/path/to/file.pdf",
# Required
kg_name="KG_Universal",
# Processing options
mode="auto", # auto, fast, comprehensive, domain_specific
source_type="auto", # auto, pdf, csv, text, url, json
# Chunking
chunking_strategy="semantic",
chunk_size=1000,
chunk_overlap=200,
# Extraction
enable_entity_extraction=True,
enable_relation_extraction=True,
extraction_confidence_threshold=0.7,
# Embedding
enable_text_embedding=True,
enable_graph_embedding=True,
# Storage
enable_sql_storage=True,
enable_vector_storage=True,
enable_kg_integration=True,
# Quality
enable_quality_checks=True,
enable_deduplication=True,
similarity_threshold=0.9,
# Domain
domains=["healthcare"],
auto_domain_detection=True,
# Metadata
source_name="Custom Source",
author="Author Name",
custom_tags=["tag1", "tag2"],
custom_fields={"key": "value"},
# Processing control
priority="normal", # low, normal, high, urgent
async_processing=False,
webhook_url=None
)

Async Version

import asyncio
from functor_sdk import FunctorClient
async def unified_ingest_async():
async with FunctorClient() as client:
result = await client.ingestion.ingest_unified_async(
kg_name="KG_Universal",
content="Your content...",
mode="comprehensive"
)
print(f"Success: {result.success}")
print(f"Entities: {result.entities_extracted}")
asyncio.run(unified_ingest_async())

Complete Parameter Reference

upload_url() Parameters

result = client.ingestion.upload_url(
url="https://example.com/document.pdf", # Required: URL to process
kg_name="KG_Universal", # Required: Target knowledge graph
source_name="Custom Source Name", # Optional: Custom source name
custom_metadata={ # Optional: Additional metadata
"domain": "health",
"priority": "high",
"author": "Dr. Smith"
}
)

upload_file() Parameters

result = client.ingestion.upload_file(
file_path="/path/to/document.pdf", # Required: Local file path
kg_name="KG_Universal", # Required: Target knowledge graph
source_name="Custom Source Name", # Optional: Custom source name
custom_metadata={ # Optional: Additional metadata
"domain": "research",
"version": "1.0",
"tags": ["AI", "machine learning"]
}
)

Supported File Types

Document Formats

  • PDF - Portable Document Format
  • DOCX - Microsoft Word documents
  • TXT - Plain text files
  • MD - Markdown files
  • HTML - Web pages and HTML documents
  • RTF - Rich Text Format

Data Formats

  • JSON - Structured data
  • CSV - Comma-separated values
  • XML - Extensible Markup Language
  • YAML - YAML Ain't Markup Language

Response Format

IngestionResult Object

result = client.ingestion.upload_url(
url="https://example.com/document.pdf",
kg_name="KG_Universal"
)
# Access response properties
print(f"Job ID: {result.job_id}")
print(f"Status: {result.status}")
print(f"Message: {result.message}")
print(f"Source ID: {result.source_id}")
# Check if successful
if result.status == "success":
print("Upload completed successfully!")
else:
print(f"Upload failed: {result.message}")

Advanced Usage Patterns

Batch Upload from URLs

# Batch upload multiple URLs
urls = [
"https://example.com/doc1.pdf",
"https://example.com/doc2.pdf",
"https://example.com/doc3.pdf"
]
results = []
for i, url in enumerate(urls):
result = client.ingestion.upload_url(
url=url,
kg_name="KG_Universal",
source_name=f"Document {i+1}",
custom_metadata={"batch_id": "batch_001"}
)
results.append(result)
# Check results
for i, result in enumerate(results):
print(f"Document {i+1}: {result.status}")

Async Batch Processing

import asyncio
from functor_sdk import FunctorClient
async def batch_upload():
async with FunctorClient() as client:
urls = [
"https://example.com/doc1.pdf",
"https://example.com/doc2.pdf",
"https://example.com/doc3.pdf"
]
# Upload concurrently
tasks = [
client.ingestion.upload_url_async(
url=url,
kg_name="KG_Universal",
source_name=f"Document {i+1}"
)
for i, url in enumerate(urls)
]
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"Document {i+1}: {result.status}")
asyncio.run(batch_upload())

Upload with Progress Tracking

import time
from functor_sdk import FunctorClient
def upload_with_progress():
client = FunctorClient()
# Start upload
result = client.ingestion.upload_url(
url="https://example.com/large-document.pdf",
kg_name="KG_Universal",
source_name="Large Document"
)
print(f"Upload started. Job ID: {result.job_id}")
# Poll for completion (simplified example)
max_attempts = 30
for attempt in range(max_attempts):
time.sleep(10) # Wait 10 seconds
# Check job status (you'd implement this based on your API)
# status = client.jobs.get_status(result.job_id)
# if status == "completed":
# print("Upload completed!")
# break
# elif status == "failed":
# print("Upload failed!")
# break
print(f"Attempt {attempt + 1}: Still processing...")
return result
result = upload_with_progress()

Metadata Management

Custom Metadata

# Upload with rich metadata
result = client.ingestion.upload_url(
url="https://medical-journal.com/research.pdf",
kg_name="KG_Medical",
source_name="Diabetes Research 2024",
custom_metadata={
"domain": "medical",
"category": "research",
"year": 2024,
"authors": ["Dr. Smith", "Dr. Jones"],
"keywords": ["diabetes", "treatment", "research"],
"priority": "high",
"review_status": "peer_reviewed",
"doi": "10.1000/example.doi",
"journal": "Medical Research Journal"
}
)

Metadata Best Practices

  • Use consistent keys: Standardize metadata field names
  • Include domain information: Help with query routing
  • Add timestamps: Track when documents were processed
  • Include source information: Author, publication, etc.
  • Use tags: Enable better categorization and search

Error Handling

Common Error Scenarios

from functor_sdk import (
FunctorClient,
FunctorAPIError,
FunctorValidationError,
FunctorNotFoundError
)
client = FunctorClient()
try:
result = client.ingestion.upload_url(
url="https://example.com/document.pdf",
kg_name="KG_Universal"
)
except FunctorValidationError as e:
print(f"Validation error: {e.message}")
print("Check your URL format and parameters")
except FunctorNotFoundError:
print("Knowledge graph not found")
print("Available KGs:", client.knowledge_graphs.list())
except FunctorAPIError as e:
print(f"API error {e.status_code}: {e.message}")
except Exception as e:
print(f"Unexpected error: {e}")

File Upload Error Handling

import os
from functor_sdk import FunctorClient, FunctorAPIError
client = FunctorClient()
def safe_upload_file(file_path, kg_name):
# Check if file exists
if not os.path.exists(file_path):
print(f"File not found: {file_path}")
return None
# Check file size (example: 50MB limit)
file_size = os.path.getsize(file_path)
if file_size > 50 * 1024 * 1024: # 50MB
print(f"File too large: {file_size} bytes")
return None
try:
result = client.ingestion.upload_file(
file_path=file_path,
kg_name=kg_name
)
return result
except FunctorAPIError as e:
print(f"Upload failed: {e.message}")
return None
# Usage
result = safe_upload_file("/path/to/document.pdf", "KG_Universal")
if result:
print(f"Upload successful: {result.job_id}")

Performance Optimization

Upload Optimization Tips

  • Use async for batch operations: Upload multiple files concurrently
  • Compress large files: Reduce upload time and storage
  • Use appropriate KG names: Target specific knowledge graphs
  • Include meaningful metadata: Help with processing and retrieval
  • Monitor job status: Track long-running uploads

Concurrent Upload Example

import asyncio
from pathlib import Path
from functor_sdk import FunctorClient
async def upload_directory(directory_path, kg_name):
async with FunctorClient() as client:
directory = Path(directory_path)
files = list(directory.glob("*.pdf"))
# Upload files concurrently
tasks = [
client.ingestion.upload_file_async(
file_path=str(file),
kg_name=kg_name,
source_name=file.stem
)
for file in files
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful = 0
failed = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Failed to upload {files[i].name}: {result}")
failed += 1
else:
print(f"Successfully uploaded {files[i].name}")
successful += 1
print(f"Upload complete: {successful} successful, {failed} failed")
# Usage
asyncio.run(upload_directory("/path/to/documents", "KG_Universal"))

Integration Examples

Web Scraping Integration

import requests
from functor_sdk import FunctorClient
def scrape_and_upload(urls, kg_name):
client = FunctorClient()
for url in urls:
try:
# Scrape content
response = requests.get(url)
response.raise_for_status()
# Upload to knowledge graph
result = client.ingestion.upload_url(
url=url,
kg_name=kg_name,
source_name=f"Scraped: {url}",
custom_metadata={
"source_type": "web_scraping",
"scraped_at": "2024-01-01",
"content_length": len(response.content)
}
)
print(f"Uploaded {url}: {result.job_id}")
except Exception as e:
print(f"Failed to process {url}: {e}")
# Usage
urls = [
"https://example.com/article1.html",
"https://example.com/article2.html"
]
scrape_and_upload(urls, "KG_Universal")

Database Integration

import sqlite3
from functor_sdk import FunctorClient
def upload_from_database(db_path, kg_name):
client = FunctorClient()
# Connect to database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Get documents from database
cursor.execute("SELECT id, title, content, url FROM documents")
documents = cursor.fetchall()
for doc_id, title, content, url in documents:
try:
if url:
# Upload from URL
result = client.ingestion.upload_url(
url=url,
kg_name=kg_name,
source_name=title,
custom_metadata={
"db_id": doc_id,
"source": "database",
"title": title
}
)
else:
# Upload content directly (if supported)
# This would require a different method
print(f"Skipping {title} - no URL provided")
continue
print(f"Uploaded {title}: {result.job_id}")
except Exception as e:
print(f"Failed to upload {title}: {e}")
conn.close()
# Usage
upload_from_database("documents.db", "KG_Universal")

Monitoring and Status

Job Status Tracking

def track_upload_progress(job_id):
client = FunctorClient()
# This is a conceptual example - implement based on your API
max_attempts = 60 # 10 minutes with 10-second intervals
for attempt in range(max_attempts):
try:
# Check job status (implement this method)
# status = client.jobs.get_status(job_id)
# if status == "completed":
# print("Upload completed successfully!")
# return True
# elif status == "failed":
# print("Upload failed!")
# return False
# elif status == "processing":
# print(f"Still processing... (attempt {attempt + 1})")
time.sleep(10) # Wait 10 seconds
except Exception as e:
print(f"Error checking status: {e}")
break
print("Timeout waiting for upload completion")
return False
# Usage
result = client.ingestion.upload_url(
url="https://example.com/document.pdf",
kg_name="KG_Universal"
)
track_upload_progress(result.job_id)

Next Steps