GitHub

Async vs Sync

The Functor SDK provides both synchronous and asynchronous interfaces for all operations. Choose the right pattern based on your application's needs, performance requirements, and concurrency patterns.

Interface Overview

Synchronous Interface

Simple, blocking operations that are easy to understand and debug:

from functor_sdk import FunctorClient
# Synchronous usage
client = FunctorClient(api_key="your-key")
# All operations block until completion
result = client.queries.execute("What is machine learning?")
print(result.answer)
# Upload a file
upload_result = client.ingestion.upload_file(
file_path="/path/to/document.pdf",
kg_name="KG_Universal"
)
print(f"Uploaded: {upload_result.job_id}")
# List knowledge graphs
kgs = client.knowledge_graphs.list()
for kg in kgs:
print(f"{kg.name}: {kg.entities_count} entities")

Asynchronous Interface

Non-blocking operations that enable concurrent execution:

import asyncio
from functor_sdk import FunctorClient
async def async_example():
# Asynchronous usage with context manager
async with FunctorClient(api_key="your-key") as client:
# All operations are non-blocking
result = await client.queries.execute_async("What is machine learning?")
print(result.answer)
# Upload a file
upload_result = await client.ingestion.upload_file_async(
file_path="/path/to/document.pdf",
kg_name="KG_Universal"
)
print(f"Uploaded: {upload_result.job_id}")
# List knowledge graphs
kgs = await client.knowledge_graphs.list_async()
for kg in kgs:
print(f"{kg.name}: {kg.entities_count} entities")
# Run the async function
asyncio.run(async_example())

When to Use Each Interface

Use Synchronous When:

  • Simple scripts: One-off operations or simple automation
  • Sequential operations: Operations that must happen in order
  • Debugging: Easier to debug and understand execution flow
  • Learning: Getting started with the SDK
  • Blocking is acceptable: When waiting for results is not a problem

Use Asynchronous When:

  • Web applications: Server applications that handle multiple requests
  • Concurrent operations: Multiple operations that can run simultaneously
  • High throughput: Applications that need to process many requests
  • Real-time applications: Applications that need to remain responsive
  • Batch processing: Processing multiple items concurrently

Performance Comparison

Sequential Operations

import time
from functor_sdk import FunctorClient
def sync_sequential_operations():
client = FunctorClient()
start_time = time.time()
# Execute queries sequentially
queries = [
"What is machine learning?",
"What is deep learning?",
"What is natural language processing?"
]
results = []
for query in queries:
result = client.queries.execute(query)
results.append(result)
end_time = time.time()
print(f"Sync sequential: {end_time - start_time:.2f} seconds")
return results
# Usage
results = sync_sequential_operations()

Concurrent Operations

import asyncio
import time
from functor_sdk import FunctorClient
async def async_concurrent_operations():
async with FunctorClient() as client:
start_time = time.time()
# Execute queries concurrently
queries = [
"What is machine learning?",
"What is deep learning?",
"What is natural language processing?"
]
tasks = [
client.queries.execute_async(query)
for query in queries
]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Async concurrent: {end_time - start_time:.2f} seconds")
return results
# Usage
results = asyncio.run(async_concurrent_operations())

Advanced Async Patterns

Async Context Manager

import asyncio
from functor_sdk import FunctorClient
async def async_context_example():
# Automatic resource management
async with FunctorClient(api_key="your-key") as client:
# Multiple operations
result1 = await client.queries.execute_async("What is AI?")
result2 = await client.queries.execute_async("What is ML?")
# Upload operations
upload_result = await client.ingestion.upload_url_async(
url="https://example.com/doc.pdf",
kg_name="KG_Universal"
)
print(f"Query 1: {result1.answer[:100]}...")
print(f"Query 2: {result2.answer[:100]}...")
print(f"Upload: {upload_result.job_id}")
# Client automatically cleaned up
asyncio.run(async_context_example())

Async with Error Handling

import asyncio
from functor_sdk import (
FunctorClient,
FunctorAPIError,
FunctorTimeoutError
)
async def async_with_error_handling():
async with FunctorClient() as client:
queries = [
"What is machine learning?",
"What is deep learning?",
"What is quantum computing?"
]
# Execute with error handling
tasks = []
for query in queries:
task = client.queries.execute_async(query)
tasks.append(task)
# Gather results with error handling
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Query {i+1} failed: {result}")
else:
print(f"Query {i+1}: {result.answer[:100]}...")
asyncio.run(async_with_error_handling())

Async Batch Processing

import asyncio
from functor_sdk import FunctorClient
async def async_batch_processing():
async with FunctorClient() as client:
# Batch upload multiple files
files = [
{"path": "/path/to/doc1.pdf", "kg": "KG_Universal"},
{"path": "/path/to/doc2.pdf", "kg": "KG_Universal"},
{"path": "/path/to/doc3.pdf", "kg": "KG_Medical"}
]
# Upload all files concurrently
upload_tasks = [
client.ingestion.upload_file_async(
file_path=file["path"],
kg_name=file["kg"]
)
for file in files
]
upload_results = await asyncio.gather(*upload_tasks)
# Process upload results
for i, result in enumerate(upload_results):
print(f"File {i+1}: {result.job_id}")
# Wait a bit for processing
await asyncio.sleep(10)
# Query all knowledge graphs concurrently
kg_tasks = [
client.knowledge_graphs.list_async(),
client.sources.list_kgs_async()
]
kg_results, source_results = await asyncio.gather(*kg_tasks)
print(f"Knowledge graphs: {len(kg_results)}")
print(f"Sources: {len(source_results)}")
asyncio.run(async_batch_processing())

Hybrid Patterns

Sync Client with Async Operations

import asyncio
from functor_sdk import FunctorClient
def sync_client_with_async_ops():
client = FunctorClient()
# Use sync client for simple operations
health = client.health.check()
print(f"System status: {health['status']}")
# Use async for concurrent operations
async def async_queries():
queries = [
"What is AI?",
"What is ML?",
"What is DL?"
]
tasks = [
client.queries.execute_async(query)
for query in queries
]
results = await asyncio.gather(*tasks)
return results
# Run async operations from sync context
results = asyncio.run(async_queries())
for i, result in enumerate(results):
print(f"Query {i+1}: {result.answer[:100]}...")
sync_client_with_async_ops()

Async Client with Sync Operations

import asyncio
from functor_sdk import FunctorClient
async def async_client_with_sync_ops():
async with FunctorClient() as client:
# Use async for concurrent operations
async def concurrent_uploads():
uploads = [
client.ingestion.upload_url_async(
url="https://example.com/doc1.pdf",
kg_name="KG_Universal"
),
client.ingestion.upload_url_async(
url="https://example.com/doc2.pdf",
kg_name="KG_Universal"
)
]
return await asyncio.gather(*uploads)
# Execute concurrent uploads
upload_results = await concurrent_uploads()
# Use sync operations for simple queries
for i, upload_result in enumerate(upload_results):
print(f"Upload {i+1}: {upload_result.job_id}")
# Simple sync query
result = client.queries.execute(f"What is document {i+1} about?")
print(f"Query {i+1}: {result.answer[:100]}...")
asyncio.run(async_client_with_sync_ops())

Performance Optimization

Connection Pooling

import asyncio
from functor_sdk import FunctorClient
async def optimized_async_operations():
# Use async context manager for connection pooling
async with FunctorClient() as client:
# Batch operations to minimize connection overhead
operations = []
# Add multiple query operations
queries = ["What is AI?", "What is ML?", "What is DL?"]
for query in queries:
operations.append(client.queries.execute_async(query))
# Add upload operations
uploads = [
client.ingestion.upload_url_async(
url=f"https://example.com/doc{i}.pdf",
kg_name="KG_Universal"
)
for i in range(3)
]
operations.extend(uploads)
# Execute all operations concurrently
results = await asyncio.gather(*operations)
# Process results
query_results = results[:3]
upload_results = results[3:]
print(f"Queries completed: {len(query_results)}")
print(f"Uploads completed: {len(upload_results)}")
asyncio.run(optimized_async_operations())

Rate Limiting

import asyncio
from functor_sdk import FunctorClient
async def rate_limited_operations():
async with FunctorClient() as client:
# Process operations in batches to respect rate limits
queries = [f"What is topic {i}?" for i in range(20)]
batch_size = 5
results = []
for i in range(0, len(queries), batch_size):
batch = queries[i:i + batch_size]
# Execute batch concurrently
batch_tasks = [
client.queries.execute_async(query)
for query in batch
]
batch_results = await asyncio.gather(*batch_tasks)
results.extend(batch_results)
# Wait between batches to respect rate limits
if i + batch_size < len(queries):
await asyncio.sleep(1) # 1 second between batches
print(f"Processed {len(results)} queries in batches")
asyncio.run(rate_limited_operations())

Migration Guide

From Sync to Async

# Before: Synchronous code
def sync_example():
client = FunctorClient()
result1 = client.queries.execute("What is AI?")
result2 = client.queries.execute("What is ML?")
upload_result = client.ingestion.upload_file(
file_path="/path/to/doc.pdf",
kg_name="KG_Universal"
)
return result1, result2, upload_result
# After: Asynchronous code
async def async_example():
async with FunctorClient() as client:
# Execute queries concurrently
result1_task = client.queries.execute_async("What is AI?")
result2_task = client.queries.execute_async("What is ML?")
upload_task = client.ingestion.upload_file_async(
file_path="/path/to/doc.pdf",
kg_name="KG_Universal"
)
# Wait for all operations to complete
result1, result2, upload_result = await asyncio.gather(
result1_task, result2_task, upload_task
)
return result1, result2, upload_result
# Usage
results = asyncio.run(async_example())

From Async to Sync

# Before: Asynchronous code
async def async_example():
async with FunctorClient() as client:
result = await client.queries.execute_async("What is AI?")
return result
# After: Synchronous code
def sync_example():
client = FunctorClient()
result = client.queries.execute("What is AI?")
return result
# Usage
result = sync_example()

Best Practices

Async Best Practices

  • Use async context managers: Ensure proper resource cleanup
  • Batch operations: Group related operations together
  • Handle exceptions properly: Use try-catch with async operations
  • Respect rate limits: Add delays between batches if needed
  • Use asyncio.gather(): For concurrent operations
  • Avoid blocking operations: Don't mix sync and async unnecessarily

Sync Best Practices

  • Use context managers: Ensure proper resource cleanup
  • Handle exceptions: Always wrap operations in try-catch
  • Keep operations simple: Avoid complex nested operations
  • Use appropriate timeouts: Prevent hanging operations
  • Consider async for batch operations: When processing many items

Next Steps