Skip to main content

Performance Optimization

Maximize throughput and minimize latency in your Peppol integration.

Batch Processing​

Batch Sending​

Send multiple documents in one API call:

def send_batch(invoices: list, batch_size: int = 50) -> list:
"""Send invoices in batches."""
results = []

for i in range(0, len(invoices), batch_size):
batch = invoices[i:i + batch_size]

response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/batch",
headers={"X-API-Key": api_key},
json={"documents": batch}
)

results.extend(response.json()["results"])

return results

Async Batch Processing​

import asyncio
import aiohttp

async def send_invoice_async(session, invoice):
"""Send single invoice asynchronously."""
async with session.post(
"https://app.goroute.ai/peppol-api/api/v1/send",
json=invoice,
headers={"X-API-Key": api_key}
) as response:
return await response.json()

async def send_batch_async(invoices: list, concurrency: int = 10):
"""Send invoices with controlled concurrency."""
semaphore = asyncio.Semaphore(concurrency)

async def send_with_limit(session, invoice):
async with semaphore:
return await send_invoice_async(session, invoice)

async with aiohttp.ClientSession() as session:
tasks = [send_with_limit(session, inv) for inv in invoices]
return await asyncio.gather(*tasks, return_exceptions=True)

# Usage
results = asyncio.run(send_batch_async(invoices, concurrency=10))

Connection Management​

HTTP Session Pooling​

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

def create_session() -> requests.Session:
"""Create optimized session with connection pooling."""
session = requests.Session()

# Configure retry strategy
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504]
)

# Configure adapter with connection pooling
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=20, # Connection pools
pool_maxsize=50, # Connections per pool
pool_block=False # Don't block on pool exhaustion
)

session.mount("https://", adapter)
session.mount("http://", adapter)

return session

# Reuse session across requests
session = create_session()

def send_invoice(invoice):
return session.post(url, json=invoice).json()

Keep-Alive​

# Ensure keep-alive headers
session.headers.update({
"Connection": "keep-alive",
"X-API-Key": api_key
})

Caching Strategies​

SMP Lookup Cache​

import redis
import json
from functools import lru_cache

redis_client = redis.Redis()

def lookup_participant_cached(scheme: str, identifier: str) -> dict:
"""Lookup with Redis caching."""
cache_key = f"peppol:participant:{scheme}:{identifier}"

# Check cache first
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)

# Make API call
response = session.get(
"https://app.goroute.ai/peppol-api/api/v1/participants/lookup",
params={"scheme": scheme, "identifier": identifier}
)
result = response.json()

# Cache for 1 hour (SMP data is relatively stable)
redis_client.setex(cache_key, 3600, json.dumps(result))

return result

# For in-memory caching (smaller deployments)
@lru_cache(maxsize=1000)
def lookup_participant_memory(scheme: str, identifier: str) -> dict:
"""Lookup with memory caching."""
response = session.get(
"https://app.goroute.ai/peppol-api/api/v1/participants/lookup",
params={"scheme": scheme, "identifier": identifier}
)
return response.json()

Validation Cache​

Cache validation results for repeated invoices:

import hashlib

def validate_cached(invoice_xml: str) -> dict:
"""Cache validation results by content hash."""

# Create hash of invoice content
content_hash = hashlib.sha256(invoice_xml.encode()).hexdigest()
cache_key = f"validation:{content_hash}"

# Check cache
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)

# Validate via API
response = session.post(
"https://app.goroute.ai/peppol-api/api/v1/validate",
data=invoice_xml
)
result = response.json()

# Cache for 24 hours (validation rules don't change often)
redis_client.setex(cache_key, 86400, json.dumps(result))

return result

Payload Optimization​

Compression​

import gzip

def send_compressed(invoice_xml: str) -> dict:
"""Send with gzip compression."""

compressed = gzip.compress(invoice_xml.encode())

response = session.post(
"https://app.goroute.ai/peppol-api/api/v1/send",
data=compressed,
headers={
"Content-Encoding": "gzip",
"Content-Type": "application/xml"
}
)

return response.json()

Minimize Payload Size​

def optimize_invoice_data(invoice: dict) -> dict:
"""Remove unnecessary fields before sending."""

# Only include required fields
optimized = {
"sender": {
"scheme": invoice["sender"]["scheme"],
"identifier": invoice["sender"]["identifier"]
},
"receiver": {
"scheme": invoice["receiver"]["scheme"],
"identifier": invoice["receiver"]["identifier"]
},
"document": invoice["document"]
}

# Add optional fields only if present
if invoice.get("document_type"):
optimized["document_type"] = invoice["document_type"]

return optimized

Parallel Processing​

Thread Pool​

from concurrent.futures import ThreadPoolExecutor, as_completed

def send_parallel(invoices: list, max_workers: int = 10) -> list:
"""Send invoices in parallel using thread pool."""
results = []

with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_invoice = {
executor.submit(send_invoice, inv): inv
for inv in invoices
}

# Collect results as they complete
for future in as_completed(future_to_invoice):
invoice = future_to_invoice[future]
try:
result = future.result()
results.append({"success": True, "result": result})
except Exception as e:
results.append({
"success": False,
"error": str(e),
"invoice_id": invoice.get("id")
})

return results

Process Pool (CPU-Bound)​

from concurrent.futures import ProcessPoolExecutor

def validate_parallel(invoices: list) -> list:
"""Validate invoices in parallel processes."""

with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(validate_invoice, invoices))

return results

Queue-Based Architecture​

Producer-Consumer Pattern​

import queue
import threading

class InvoiceProcessor:
"""Queue-based invoice processor."""

def __init__(self, num_workers: int = 5):
self.queue = queue.Queue()
self.results = {}
self.workers = []

for _ in range(num_workers):
worker = threading.Thread(target=self._process_queue)
worker.daemon = True
worker.start()
self.workers.append(worker)

def _process_queue(self):
while True:
invoice_id, invoice = self.queue.get()
try:
result = send_invoice(invoice)
self.results[invoice_id] = {"success": True, "result": result}
except Exception as e:
self.results[invoice_id] = {"success": False, "error": str(e)}
finally:
self.queue.task_done()

def submit(self, invoice_id: str, invoice: dict):
"""Add invoice to processing queue."""
self.queue.put((invoice_id, invoice))

def wait_complete(self):
"""Wait for all queued items to complete."""
self.queue.join()

def get_result(self, invoice_id: str) -> dict:
"""Get result for a specific invoice."""
return self.results.get(invoice_id)

# Usage
processor = InvoiceProcessor(num_workers=10)

for invoice in invoices:
processor.submit(invoice["id"], invoice)

processor.wait_complete()

Monitoring Performance​

Request Timing​

import time
from contextlib import contextmanager
import logging

logger = logging.getLogger(__name__)

@contextmanager
def timed_operation(operation_name: str):
"""Context manager to time operations."""
start = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start
logger.info(f"{operation_name} took {duration:.3f}s")

# Usage
with timed_operation("send_invoice"):
result = send_invoice(invoice)

Metrics Collection​

from prometheus_client import Counter, Histogram, start_http_server

# Define metrics
INVOICES_SENT = Counter(
'peppol_invoices_sent_total',
'Total invoices sent',
['status']
)

SEND_DURATION = Histogram(
'peppol_send_duration_seconds',
'Time to send invoice',
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)

def send_with_metrics(invoice):
"""Send invoice with metrics collection."""
with SEND_DURATION.time():
try:
result = send_invoice(invoice)
INVOICES_SENT.labels(status='success').inc()
return result
except Exception:
INVOICES_SENT.labels(status='error').inc()
raise

# Start metrics endpoint
start_http_server(8000)

Performance Benchmarks​

Throughput Guidelines​

OperationExpected ThroughputOptimization
Single send1-2 req/secConnection pooling
Batch send50-100 docs/requestBatch API
Parallel send10-20 req/secThread pool
Async send50+ req/secaiohttp + semaphore
Validation5-10 req/secCache results

Latency Targets​

OperationP50P99
SMP Lookup200ms1s
Validation500ms2s
Send1s5s

Quick Reference​

TechniqueBenefitWhen to Use
Session poolingReduce connection overheadAlways
Batch APIFewer requests>10 docs at once
Async/parallelHigher throughputHigh volume
CachingReduce API callsRepeated lookups
CompressionSmaller payloadsLarge documents
QueuesSmooth loadVariable input rates

Next Steps​