Performance Optimization
Maximize throughput and minimize latency in your Peppol integration.
Batch Processing​
Batch Sending​
Send multiple documents in one API call:
def send_batch(invoices: list, batch_size: int = 50) -> list:
"""Send invoices in batches."""
results = []
for i in range(0, len(invoices), batch_size):
batch = invoices[i:i + batch_size]
response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/batch",
headers={"X-API-Key": api_key},
json={"documents": batch}
)
results.extend(response.json()["results"])
return results
Async Batch Processing​
import asyncio
import aiohttp
async def send_invoice_async(session, invoice):
"""Send single invoice asynchronously."""
async with session.post(
"https://app.goroute.ai/peppol-api/api/v1/send",
json=invoice,
headers={"X-API-Key": api_key}
) as response:
return await response.json()
async def send_batch_async(invoices: list, concurrency: int = 10):
"""Send invoices with controlled concurrency."""
semaphore = asyncio.Semaphore(concurrency)
async def send_with_limit(session, invoice):
async with semaphore:
return await send_invoice_async(session, invoice)
async with aiohttp.ClientSession() as session:
tasks = [send_with_limit(session, inv) for inv in invoices]
return await asyncio.gather(*tasks, return_exceptions=True)
# Usage
results = asyncio.run(send_batch_async(invoices, concurrency=10))
Connection Management​
HTTP Session Pooling​
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session() -> requests.Session:
"""Create optimized session with connection pooling."""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)
# Configure adapter with connection pooling
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=20, # Connection pools
pool_maxsize=50, # Connections per pool
pool_block=False # Don't block on pool exhaustion
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# Reuse session across requests
session = create_session()
def send_invoice(invoice):
return session.post(url, json=invoice).json()
Keep-Alive​
# Ensure keep-alive headers
session.headers.update({
"Connection": "keep-alive",
"X-API-Key": api_key
})
Caching Strategies​
SMP Lookup Cache​
import redis
import json
from functools import lru_cache
redis_client = redis.Redis()
def lookup_participant_cached(scheme: str, identifier: str) -> dict:
"""Lookup with Redis caching."""
cache_key = f"peppol:participant:{scheme}:{identifier}"
# Check cache first
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Make API call
response = session.get(
"https://app.goroute.ai/peppol-api/api/v1/participants/lookup",
params={"scheme": scheme, "identifier": identifier}
)
result = response.json()
# Cache for 1 hour (SMP data is relatively stable)
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
# For in-memory caching (smaller deployments)
@lru_cache(maxsize=1000)
def lookup_participant_memory(scheme: str, identifier: str) -> dict:
"""Lookup with memory caching."""
response = session.get(
"https://app.goroute.ai/peppol-api/api/v1/participants/lookup",
params={"scheme": scheme, "identifier": identifier}
)
return response.json()
Validation Cache​
Cache validation results for repeated invoices:
import hashlib
def validate_cached(invoice_xml: str) -> dict:
"""Cache validation results by content hash."""
# Create hash of invoice content
content_hash = hashlib.sha256(invoice_xml.encode()).hexdigest()
cache_key = f"validation:{content_hash}"
# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
# Validate via API
response = session.post(
"https://app.goroute.ai/peppol-api/api/v1/validate",
data=invoice_xml
)
result = response.json()
# Cache for 24 hours (validation rules don't change often)
redis_client.setex(cache_key, 86400, json.dumps(result))
return result
Payload Optimization​
Compression​
import gzip
def send_compressed(invoice_xml: str) -> dict:
"""Send with gzip compression."""
compressed = gzip.compress(invoice_xml.encode())
response = session.post(
"https://app.goroute.ai/peppol-api/api/v1/send",
data=compressed,
headers={
"Content-Encoding": "gzip",
"Content-Type": "application/xml"
}
)
return response.json()
Minimize Payload Size​
def optimize_invoice_data(invoice: dict) -> dict:
"""Remove unnecessary fields before sending."""
# Only include required fields
optimized = {
"sender": {
"scheme": invoice["sender"]["scheme"],
"identifier": invoice["sender"]["identifier"]
},
"receiver": {
"scheme": invoice["receiver"]["scheme"],
"identifier": invoice["receiver"]["identifier"]
},
"document": invoice["document"]
}
# Add optional fields only if present
if invoice.get("document_type"):
optimized["document_type"] = invoice["document_type"]
return optimized
Parallel Processing​
Thread Pool​
from concurrent.futures import ThreadPoolExecutor, as_completed
def send_parallel(invoices: list, max_workers: int = 10) -> list:
"""Send invoices in parallel using thread pool."""
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_invoice = {
executor.submit(send_invoice, inv): inv
for inv in invoices
}
# Collect results as they complete
for future in as_completed(future_to_invoice):
invoice = future_to_invoice[future]
try:
result = future.result()
results.append({"success": True, "result": result})
except Exception as e:
results.append({
"success": False,
"error": str(e),
"invoice_id": invoice.get("id")
})
return results
Process Pool (CPU-Bound)​
from concurrent.futures import ProcessPoolExecutor
def validate_parallel(invoices: list) -> list:
"""Validate invoices in parallel processes."""
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(validate_invoice, invoices))
return results
Queue-Based Architecture​
Producer-Consumer Pattern​
import queue
import threading
class InvoiceProcessor:
"""Queue-based invoice processor."""
def __init__(self, num_workers: int = 5):
self.queue = queue.Queue()
self.results = {}
self.workers = []
for _ in range(num_workers):
worker = threading.Thread(target=self._process_queue)
worker.daemon = True
worker.start()
self.workers.append(worker)
def _process_queue(self):
while True:
invoice_id, invoice = self.queue.get()
try:
result = send_invoice(invoice)
self.results[invoice_id] = {"success": True, "result": result}
except Exception as e:
self.results[invoice_id] = {"success": False, "error": str(e)}
finally:
self.queue.task_done()
def submit(self, invoice_id: str, invoice: dict):
"""Add invoice to processing queue."""
self.queue.put((invoice_id, invoice))
def wait_complete(self):
"""Wait for all queued items to complete."""
self.queue.join()
def get_result(self, invoice_id: str) -> dict:
"""Get result for a specific invoice."""
return self.results.get(invoice_id)
# Usage
processor = InvoiceProcessor(num_workers=10)
for invoice in invoices:
processor.submit(invoice["id"], invoice)
processor.wait_complete()
Monitoring Performance​
Request Timing​
import time
from contextlib import contextmanager
import logging
logger = logging.getLogger(__name__)
@contextmanager
def timed_operation(operation_name: str):
"""Context manager to time operations."""
start = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start
logger.info(f"{operation_name} took {duration:.3f}s")
# Usage
with timed_operation("send_invoice"):
result = send_invoice(invoice)
Metrics Collection​
from prometheus_client import Counter, Histogram, start_http_server
# Define metrics
INVOICES_SENT = Counter(
'peppol_invoices_sent_total',
'Total invoices sent',
['status']
)
SEND_DURATION = Histogram(
'peppol_send_duration_seconds',
'Time to send invoice',
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
def send_with_metrics(invoice):
"""Send invoice with metrics collection."""
with SEND_DURATION.time():
try:
result = send_invoice(invoice)
INVOICES_SENT.labels(status='success').inc()
return result
except Exception:
INVOICES_SENT.labels(status='error').inc()
raise
# Start metrics endpoint
start_http_server(8000)
Performance Benchmarks​
Throughput Guidelines​
| Operation | Expected Throughput | Optimization |
|---|---|---|
| Single send | 1-2 req/sec | Connection pooling |
| Batch send | 50-100 docs/request | Batch API |
| Parallel send | 10-20 req/sec | Thread pool |
| Async send | 50+ req/sec | aiohttp + semaphore |
| Validation | 5-10 req/sec | Cache results |
Latency Targets​
| Operation | P50 | P99 |
|---|---|---|
| SMP Lookup | 200ms | 1s |
| Validation | 500ms | 2s |
| Send | 1s | 5s |
Quick Reference​
| Technique | Benefit | When to Use |
|---|---|---|
| Session pooling | Reduce connection overhead | Always |
| Batch API | Fewer requests | >10 docs at once |
| Async/parallel | Higher throughput | High volume |
| Caching | Reduce API calls | Repeated lookups |
| Compression | Smaller payloads | Large documents |
| Queues | Smooth load | Variable input rates |