Batch Sending
Send multiple documents efficiently using GoRoute's batch API.
Batch API​
Send Multiple Documents​
import requests
def send_batch(documents: list[dict]) -> dict:
"""Send multiple documents in a single API call."""
response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/send/batch",
headers={
"X-API-Key": "your_api_key",
"Content-Type": "application/json"
},
json={
"documents": documents
}
)
return response.json()
# Example: Send 3 invoices
documents = [
{
"id": "inv-001",
"xml": open("invoices/invoice-001.xml").read()
},
{
"id": "inv-002",
"xml": open("invoices/invoice-002.xml").read()
},
{
"id": "inv-003",
"xml": open("invoices/invoice-003.xml").read()
}
]
result = send_batch(documents)
print(f"Accepted: {result['accepted_count']}/{result['total_count']}")
Batch Response​
{
"batch_id": "batch_abc123",
"total_count": 3,
"accepted_count": 2,
"rejected_count": 1,
"results": [
{
"id": "inv-001",
"status": "accepted",
"transaction_id": "txn_xyz001"
},
{
"id": "inv-002",
"status": "accepted",
"transaction_id": "txn_xyz002"
},
{
"id": "inv-003",
"status": "rejected",
"error": {
"code": "VALIDATION_ERROR",
"message": "Missing required element: InvoiceTypeCode"
}
}
]
}
Batch Limits​
| Limit | Value |
|---|---|
| Max documents per batch | 100 |
| Max batch size | 50 MB |
| Max single document size | 15 MB |
| Rate limit | 10 batches/minute |
Best Practices​
1. Pre-Validate Documents​
Validate before batching to avoid partial failures:
def send_batch_with_validation(documents: list[dict]) -> dict:
"""Validate all documents before sending."""
# Validate each document
valid_docs = []
invalid_docs = []
for doc in documents:
validation = validate_document(doc["xml"])
if validation["valid"]:
valid_docs.append(doc)
else:
invalid_docs.append({
"id": doc["id"],
"errors": validation["errors"]
})
# Only send valid documents
if valid_docs:
result = send_batch(valid_docs)
result["pre_rejected"] = invalid_docs
return result
else:
return {
"error": "All documents failed validation",
"invalid_docs": invalid_docs
}
2. Chunk Large Batches​
def send_in_chunks(documents: list[dict], chunk_size: int = 100) -> list[dict]:
"""Send documents in chunks to respect batch limits."""
results = []
for i in range(0, len(documents), chunk_size):
chunk = documents[i:i + chunk_size]
result = send_batch(chunk)
results.append(result)
# Respect rate limits
time.sleep(6) # 10 batches per minute
return results
3. Track Batch Status​
def get_batch_status(batch_id: str) -> dict:
"""Get the status of all transactions in a batch."""
response = requests.get(
f"https://app.goroute.ai/peppol-api/api/v1/batches/{batch_id}",
headers={"X-API-Key": "your_api_key"}
)
return response.json()
# Check status after sending
status = get_batch_status("batch_abc123")
print(f"Batch {status['batch_id']}:")
print(f" Delivered: {status['delivered_count']}")
print(f" Pending: {status['pending_count']}")
print(f" Failed: {status['failed_count']}")
4. Handle Partial Failures​
def handle_batch_result(result: dict):
"""Process batch results and handle failures."""
successful = []
failed = []
for item in result["results"]:
if item["status"] == "accepted":
successful.append(item)
else:
failed.append(item)
# Log success
for item in successful:
logger.info(f"Document {item['id']} accepted: {item['transaction_id']}")
# Handle failures
for item in failed:
logger.error(f"Document {item['id']} rejected: {item['error']['message']}")
# Queue for retry or manual review
if is_retriable(item["error"]["code"]):
retry_queue.add(item["id"])
else:
manual_review_queue.add(item["id"])
Async Batch Processing​
For very large batches, use async processing:
# Submit batch for async processing
response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/send/batch?async=true",
headers={
"X-API-Key": "your_api_key",
"Content-Type": "application/json"
},
json={"documents": documents}
)
result = response.json()
batch_id = result["batch_id"]
# Returns immediately with batch ID
# {
# "batch_id": "batch_abc123",
# "status": "processing",
# "message": "Batch queued for processing"
# }
# Poll for completion or use webhook
while True:
status = get_batch_status(batch_id)
if status["status"] == "completed":
break
time.sleep(5)
Webhooks for Batch Events​
Configure webhooks to receive batch updates:
# Webhook events for batches
BATCH_EVENTS = [
"batch.created",
"batch.processing",
"batch.completed",
"batch.failed"
]
# In your webhook handler
@app.post("/webhooks/goroute")
async def handle_webhook(request: Request):
event = await request.json()
if event["type"] == "batch.completed":
batch = event["data"]
print(f"Batch {batch['batch_id']} completed:")
print(f" Delivered: {batch['delivered_count']}")
print(f" Failed: {batch['failed_count']}")
# Process failed items
if batch["failed_count"] > 0:
handle_failed_documents(batch["failed_items"])
return {"status": "ok"}
Complete Example​
import requests
import time
from pathlib import Path
from typing import Generator
class BatchSender:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://app.goroute.ai/peppol-api"
def load_invoices(self, directory: str) -> Generator[dict, None, None]:
"""Load invoice files from a directory."""
for xml_file in Path(directory).glob("*.xml"):
yield {
"id": xml_file.stem,
"xml": xml_file.read_text(encoding="utf-8")
}
def send_all(self, directory: str, chunk_size: int = 100) -> dict:
"""Send all invoices from a directory."""
invoices = list(self.load_invoices(directory))
print(f"Found {len(invoices)} invoices to send")
results = {
"total": len(invoices),
"accepted": 0,
"rejected": 0,
"batches": []
}
# Send in chunks
for i in range(0, len(invoices), chunk_size):
chunk = invoices[i:i + chunk_size]
print(f"Sending batch {i // chunk_size + 1} ({len(chunk)} documents)")
batch_result = self._send_batch(chunk)
results["batches"].append(batch_result)
results["accepted"] += batch_result["accepted_count"]
results["rejected"] += batch_result["rejected_count"]
# Rate limit: wait between batches
if i + chunk_size < len(invoices):
time.sleep(6)
return results
def _send_batch(self, documents: list[dict]) -> dict:
"""Send a single batch of documents."""
response = requests.post(
f"{self.base_url}/api/v1/send/batch",
headers={
"X-API-Key": self.api_key,
"Content-Type": "application/json"
},
json={"documents": documents}
)
response.raise_for_status()
return response.json()
def wait_for_delivery(self, batch_id: str, timeout: int = 300) -> dict:
"""Wait for all documents in a batch to be delivered."""
start = time.time()
while time.time() - start < timeout:
status = self._get_batch_status(batch_id)
pending = status.get("pending_count", 0)
if pending == 0:
return status
print(f" Waiting... {pending} still pending")
time.sleep(5)
raise TimeoutError("Batch did not complete in time")
def _get_batch_status(self, batch_id: str) -> dict:
response = requests.get(
f"{self.base_url}/api/v1/batches/{batch_id}",
headers={"X-API-Key": self.api_key}
)
response.raise_for_status()
return response.json()
# Usage
sender = BatchSender("your_api_key")
# Send all invoices from a folder
results = sender.send_all("./invoices/pending/")
print(f"\nResults:")
print(f" Total: {results['total']}")
print(f" Accepted: {results['accepted']}")
print(f" Rejected: {results['rejected']}")
# Wait for delivery of each batch
for batch_result in results["batches"]:
batch_id = batch_result["batch_id"]
final_status = sender.wait_for_delivery(batch_id)
print(f" Batch {batch_id}: {final_status['delivered_count']} delivered")