Skip to main content

Batch Sending

Send multiple documents efficiently using GoRoute's batch API.

Batch API​

Send Multiple Documents​

import requests

def send_batch(documents: list[dict]) -> dict:
"""Send multiple documents in a single API call."""

response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/send/batch",
headers={
"X-API-Key": "your_api_key",
"Content-Type": "application/json"
},
json={
"documents": documents
}
)

return response.json()


# Example: Send 3 invoices
documents = [
{
"id": "inv-001",
"xml": open("invoices/invoice-001.xml").read()
},
{
"id": "inv-002",
"xml": open("invoices/invoice-002.xml").read()
},
{
"id": "inv-003",
"xml": open("invoices/invoice-003.xml").read()
}
]

result = send_batch(documents)
print(f"Accepted: {result['accepted_count']}/{result['total_count']}")

Batch Response​

{
"batch_id": "batch_abc123",
"total_count": 3,
"accepted_count": 2,
"rejected_count": 1,
"results": [
{
"id": "inv-001",
"status": "accepted",
"transaction_id": "txn_xyz001"
},
{
"id": "inv-002",
"status": "accepted",
"transaction_id": "txn_xyz002"
},
{
"id": "inv-003",
"status": "rejected",
"error": {
"code": "VALIDATION_ERROR",
"message": "Missing required element: InvoiceTypeCode"
}
}
]
}

Batch Limits​

LimitValue
Max documents per batch100
Max batch size50 MB
Max single document size15 MB
Rate limit10 batches/minute

Best Practices​

1. Pre-Validate Documents​

Validate before batching to avoid partial failures:

def send_batch_with_validation(documents: list[dict]) -> dict:
"""Validate all documents before sending."""

# Validate each document
valid_docs = []
invalid_docs = []

for doc in documents:
validation = validate_document(doc["xml"])

if validation["valid"]:
valid_docs.append(doc)
else:
invalid_docs.append({
"id": doc["id"],
"errors": validation["errors"]
})

# Only send valid documents
if valid_docs:
result = send_batch(valid_docs)
result["pre_rejected"] = invalid_docs
return result
else:
return {
"error": "All documents failed validation",
"invalid_docs": invalid_docs
}

2. Chunk Large Batches​

def send_in_chunks(documents: list[dict], chunk_size: int = 100) -> list[dict]:
"""Send documents in chunks to respect batch limits."""

results = []

for i in range(0, len(documents), chunk_size):
chunk = documents[i:i + chunk_size]
result = send_batch(chunk)
results.append(result)

# Respect rate limits
time.sleep(6) # 10 batches per minute

return results

3. Track Batch Status​

def get_batch_status(batch_id: str) -> dict:
"""Get the status of all transactions in a batch."""

response = requests.get(
f"https://app.goroute.ai/peppol-api/api/v1/batches/{batch_id}",
headers={"X-API-Key": "your_api_key"}
)

return response.json()


# Check status after sending
status = get_batch_status("batch_abc123")

print(f"Batch {status['batch_id']}:")
print(f" Delivered: {status['delivered_count']}")
print(f" Pending: {status['pending_count']}")
print(f" Failed: {status['failed_count']}")

4. Handle Partial Failures​

def handle_batch_result(result: dict):
"""Process batch results and handle failures."""

successful = []
failed = []

for item in result["results"]:
if item["status"] == "accepted":
successful.append(item)
else:
failed.append(item)

# Log success
for item in successful:
logger.info(f"Document {item['id']} accepted: {item['transaction_id']}")

# Handle failures
for item in failed:
logger.error(f"Document {item['id']} rejected: {item['error']['message']}")

# Queue for retry or manual review
if is_retriable(item["error"]["code"]):
retry_queue.add(item["id"])
else:
manual_review_queue.add(item["id"])

Async Batch Processing​

For very large batches, use async processing:

# Submit batch for async processing
response = requests.post(
"https://app.goroute.ai/peppol-api/api/v1/send/batch?async=true",
headers={
"X-API-Key": "your_api_key",
"Content-Type": "application/json"
},
json={"documents": documents}
)

result = response.json()
batch_id = result["batch_id"]

# Returns immediately with batch ID
# {
# "batch_id": "batch_abc123",
# "status": "processing",
# "message": "Batch queued for processing"
# }

# Poll for completion or use webhook
while True:
status = get_batch_status(batch_id)

if status["status"] == "completed":
break

time.sleep(5)

Webhooks for Batch Events​

Configure webhooks to receive batch updates:

# Webhook events for batches
BATCH_EVENTS = [
"batch.created",
"batch.processing",
"batch.completed",
"batch.failed"
]

# In your webhook handler
@app.post("/webhooks/goroute")
async def handle_webhook(request: Request):
event = await request.json()

if event["type"] == "batch.completed":
batch = event["data"]

print(f"Batch {batch['batch_id']} completed:")
print(f" Delivered: {batch['delivered_count']}")
print(f" Failed: {batch['failed_count']}")

# Process failed items
if batch["failed_count"] > 0:
handle_failed_documents(batch["failed_items"])

return {"status": "ok"}

Complete Example​

import requests
import time
from pathlib import Path
from typing import Generator

class BatchSender:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://app.goroute.ai/peppol-api"

def load_invoices(self, directory: str) -> Generator[dict, None, None]:
"""Load invoice files from a directory."""
for xml_file in Path(directory).glob("*.xml"):
yield {
"id": xml_file.stem,
"xml": xml_file.read_text(encoding="utf-8")
}

def send_all(self, directory: str, chunk_size: int = 100) -> dict:
"""Send all invoices from a directory."""

invoices = list(self.load_invoices(directory))
print(f"Found {len(invoices)} invoices to send")

results = {
"total": len(invoices),
"accepted": 0,
"rejected": 0,
"batches": []
}

# Send in chunks
for i in range(0, len(invoices), chunk_size):
chunk = invoices[i:i + chunk_size]
print(f"Sending batch {i // chunk_size + 1} ({len(chunk)} documents)")

batch_result = self._send_batch(chunk)
results["batches"].append(batch_result)
results["accepted"] += batch_result["accepted_count"]
results["rejected"] += batch_result["rejected_count"]

# Rate limit: wait between batches
if i + chunk_size < len(invoices):
time.sleep(6)

return results

def _send_batch(self, documents: list[dict]) -> dict:
"""Send a single batch of documents."""
response = requests.post(
f"{self.base_url}/api/v1/send/batch",
headers={
"X-API-Key": self.api_key,
"Content-Type": "application/json"
},
json={"documents": documents}
)
response.raise_for_status()
return response.json()

def wait_for_delivery(self, batch_id: str, timeout: int = 300) -> dict:
"""Wait for all documents in a batch to be delivered."""
start = time.time()

while time.time() - start < timeout:
status = self._get_batch_status(batch_id)

pending = status.get("pending_count", 0)
if pending == 0:
return status

print(f" Waiting... {pending} still pending")
time.sleep(5)

raise TimeoutError("Batch did not complete in time")

def _get_batch_status(self, batch_id: str) -> dict:
response = requests.get(
f"{self.base_url}/api/v1/batches/{batch_id}",
headers={"X-API-Key": self.api_key}
)
response.raise_for_status()
return response.json()


# Usage
sender = BatchSender("your_api_key")

# Send all invoices from a folder
results = sender.send_all("./invoices/pending/")

print(f"\nResults:")
print(f" Total: {results['total']}")
print(f" Accepted: {results['accepted']}")
print(f" Rejected: {results['rejected']}")

# Wait for delivery of each batch
for batch_result in results["batches"]:
batch_id = batch_result["batch_id"]
final_status = sender.wait_for_delivery(batch_id)
print(f" Batch {batch_id}: {final_status['delivered_count']} delivered")

Next Steps​