Skip to main content

Job Queues

When processing invoices at scale, synchronous API calls aren't always practical. This guide covers best practices for implementing job queues and async processing patterns with GoRoute.

Why Use Job Queues?​

ChallengeSolution
Rate limits on API callsQueue and throttle requests
Network timeoutsRetry failed jobs automatically
Batch processing thousands of invoicesProcess in parallel with workers
System outagesPersist jobs and resume later
Audit trailTrack every job's status

Architecture Pattern​

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Your App │────▢│ Job Queue │────▢│ Workers β”‚
β”‚ (Producer) β”‚ β”‚ (Redis/SQS) β”‚ β”‚ (Consumers) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜
β”‚
β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ GoRoute API β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Implementation Examples​

Python with Celery​

# tasks.py
from celery import Celery
from goroute import GoRouteClient
import os

app = Celery('invoice_tasks', broker=os.environ['REDIS_URL'])
client = GoRouteClient(api_key=os.environ['GOROUTE_API_KEY'])

@app.task(
bind=True,
max_retries=5,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True
)
def send_invoice(self, invoice_xml: str, metadata: dict):
"""Send an invoice via GoRoute with automatic retries."""
try:
result = client.documents.send(
document=invoice_xml,
metadata=metadata
)

# Store result for tracking
store_result(metadata['invoice_id'], result)

return {
'status': 'success',
'transaction_id': result.transaction_id
}

except client.RateLimitError as e:
# Respect rate limits - retry after suggested delay
raise self.retry(countdown=e.retry_after)

except client.ValidationError as e:
# Don't retry validation errors - they won't succeed
return {
'status': 'failed',
'error': str(e),
'retryable': False
}

# Producer code
def queue_invoices(invoices: list):
"""Queue multiple invoices for async processing."""
for invoice in invoices:
send_invoice.delay(
invoice_xml=invoice.to_xml(),
metadata={
'invoice_id': invoice.id,
'customer_id': invoice.customer_id
}
)

Node.js with BullMQ​

// queue.ts
import { Queue, Worker } from 'bullmq';
import { GoRouteClient } from '@goroute/peppol-sdk';
import Redis from 'ioredis';

const connection = new Redis(process.env.REDIS_URL);
const client = new GoRouteClient({ apiKey: process.env.GOROUTE_API_KEY });

// Create the queue
export const invoiceQueue = new Queue('invoices', { connection });

// Create the worker
const worker = new Worker('invoices', async (job) => {
const { invoiceXml, metadata } = job.data;

try {
const result = await client.documents.send({
document: invoiceXml,
...metadata
});

return {
status: 'success',
transactionId: result.transactionId
};

} catch (error) {
if (error.status === 429) {
// Rate limited - throw to trigger retry
throw error;
}

if (error.status === 400) {
// Validation error - don't retry
return {
status: 'failed',
error: error.message,
retryable: false
};
}

throw error;
}
}, {
connection,
concurrency: 10, // Process 10 jobs in parallel
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per minute
}
});

// Handle events
worker.on('completed', (job, result) => {
console.log(`Invoice ${job.data.metadata.invoiceId} sent:`, result);
});

worker.on('failed', (job, error) => {
console.error(`Invoice ${job.data.metadata.invoiceId} failed:`, error);
});

C# with Hangfire​

// InvoiceJobs.cs
using Hangfire;
using GoRoute.Peppol;

public class InvoiceJobs
{
private readonly IGoRouteClient _client;
private readonly ILogger<InvoiceJobs> _logger;

public InvoiceJobs(IGoRouteClient client, ILogger<InvoiceJobs> logger)
{
_client = client;
_logger = logger;
}

[AutomaticRetry(Attempts = 5, DelaysInSeconds = new[] { 60, 300, 900, 3600, 7200 })]
public async Task<SendResult> SendInvoiceAsync(string invoiceXml, InvoiceMetadata metadata)
{
try
{
var result = await _client.Documents.SendAsync(new SendDocumentRequest
{
Document = invoiceXml,
Metadata = metadata.ToDictionary()
});

_logger.LogInformation(
"Invoice {InvoiceId} sent successfully. Transaction: {TransactionId}",
metadata.InvoiceId, result.TransactionId);

return new SendResult { Success = true, TransactionId = result.TransactionId };
}
catch (RateLimitException ex)
{
_logger.LogWarning("Rate limited. Retry after {Seconds}s", ex.RetryAfter);
throw; // Will be retried by Hangfire
}
catch (ValidationException ex)
{
_logger.LogError("Validation failed for {InvoiceId}: {Error}",
metadata.InvoiceId, ex.Message);

// Return failure without throwing - don't retry
return new SendResult { Success = false, Error = ex.Message };
}
}
}

// Usage
public class InvoiceService
{
public void QueueInvoice(Invoice invoice)
{
BackgroundJob.Enqueue<InvoiceJobs>(
x => x.SendInvoiceAsync(invoice.ToXml(), invoice.Metadata));
}

public void QueueBatch(IEnumerable<Invoice> invoices)
{
foreach (var invoice in invoices)
{
BackgroundJob.Enqueue<InvoiceJobs>(
x => x.SendInvoiceAsync(invoice.ToXml(), invoice.Metadata));
}
}
}

Queue Design Patterns​

Priority Queues​

Process urgent invoices first:

# High priority for due-date invoices
@app.task(queue='high_priority')
def send_urgent_invoice(invoice_xml, metadata):
return send_invoice(invoice_xml, metadata)

@app.task(queue='default')
def send_normal_invoice(invoice_xml, metadata):
return send_invoice(invoice_xml, metadata)

# Queue based on urgency
def queue_invoice(invoice):
if invoice.days_until_due < 3:
send_urgent_invoice.delay(invoice.xml, invoice.metadata)
else:
send_normal_invoice.delay(invoice.xml, invoice.metadata)

Dead Letter Queues​

Handle permanently failed jobs:

// Configure DLQ for failed jobs
const invoiceQueue = new Queue('invoices', {
connection,
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 60000
},
removeOnComplete: 100,
removeOnFail: false // Keep failed jobs for analysis
}
});

// Process DLQ periodically
async function processDLQ() {
const failedJobs = await invoiceQueue.getFailed(0, 100);

for (const job of failedJobs) {
// Notify customer service
await notifyFailure(job.data.metadata);

// Move to manual review queue
await manualReviewQueue.add('review', {
originalJob: job.data,
error: job.failedReason,
attempts: job.attemptsMade
});

// Remove from failed queue
await job.remove();
}
}

Batch Processing​

Group invoices for efficient processing:

from celery import group

def send_invoice_batch(invoices: list, batch_size: int = 50):
"""Send invoices in batches with controlled parallelism."""

# Create groups of tasks
for i in range(0, len(invoices), batch_size):
batch = invoices[i:i + batch_size]

# Create a group of parallel tasks
job_group = group(
send_invoice.s(inv.xml, inv.metadata)
for inv in batch
)

# Execute batch and wait for results
result = job_group.apply_async()

# Wait for batch to complete before next batch
# This prevents overwhelming the API
results = result.get(timeout=300)

# Log batch results
success = sum(1 for r in results if r['status'] == 'success')
logger.info(f"Batch {i//batch_size + 1}: {success}/{len(batch)} successful")

Rate Limiting​

Respecting API Limits​

GoRoute has rate limits to ensure fair usage:

TierRequests/minConcurrent
Standard10010
Professional50050
EnterpriseCustomCustom

Implementing Rate Limiting​

from ratelimit import limits, sleep_and_retry

# Limit to 100 calls per minute
@sleep_and_retry
@limits(calls=100, period=60)
def rate_limited_send(invoice_xml, metadata):
return client.documents.send(invoice_xml, metadata)

@app.task
def send_invoice(invoice_xml, metadata):
return rate_limited_send(invoice_xml, metadata)

Token Bucket Pattern​

import Bottleneck from 'bottleneck';

const limiter = new Bottleneck({
reservoir: 100, // Initial tokens
reservoirRefreshAmount: 100,
reservoirRefreshInterval: 60 * 1000, // Refill every minute
maxConcurrent: 10,
minTime: 100 // Min 100ms between requests
});

async function sendWithRateLimit(invoice: Invoice) {
return limiter.schedule(() =>
client.documents.send(invoice)
);
}

Monitoring​

Job Metrics​

Track queue health:

from prometheus_client import Counter, Histogram, Gauge

jobs_total = Counter('invoice_jobs_total', 'Total jobs processed', ['status'])
job_duration = Histogram('invoice_job_duration_seconds', 'Job processing time')
queue_depth = Gauge('invoice_queue_depth', 'Current queue depth')

@app.task
def send_invoice(invoice_xml, metadata):
with job_duration.time():
try:
result = client.documents.send(invoice_xml, metadata)
jobs_total.labels(status='success').inc()
return result
except Exception as e:
jobs_total.labels(status='error').inc()
raise

Alerting​

Set up alerts for queue issues:

# prometheus-alerts.yaml
groups:
- name: invoice_queue
rules:
- alert: HighQueueDepth
expr: invoice_queue_depth > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Invoice queue depth is high"

- alert: HighFailureRate
expr: rate(invoice_jobs_total{status="error"}[5m]) > 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "High invoice job failure rate"

Best Practices​

1. Idempotency​

Ensure jobs can be safely retried:

@app.task
def send_invoice(invoice_xml, metadata):
# Check if already sent (idempotency)
existing = get_transaction(metadata['invoice_id'])
if existing:
return {'status': 'already_sent', 'transaction_id': existing.id}

result = client.documents.send(invoice_xml, metadata)

# Store result atomically
store_transaction(metadata['invoice_id'], result)

return result

2. Graceful Shutdown​

Handle shutdown without losing jobs:

import signal

def shutdown_handler(signum, frame):
logger.info("Shutting down gracefully...")
worker.shutdown() # Stop accepting new jobs
worker.wait() # Wait for current jobs to finish
sys.exit(0)

signal.signal(signal.SIGTERM, shutdown_handler)

3. Job Serialization​

Keep job payloads small:

# Good - store reference
@app.task
def send_invoice(invoice_id: str):
invoice = Invoice.get(invoice_id) # Fetch from DB
return client.documents.send(invoice.xml)

# Avoid - large payloads in queue
@app.task
def send_invoice(invoice_xml: str): # Could be 100KB+
return client.documents.send(invoice_xml)

4. Correlation IDs​

Track jobs across systems:

import uuid

def queue_invoice(invoice):
correlation_id = str(uuid.uuid4())

send_invoice.apply_async(
args=[invoice.xml],
kwargs={'correlation_id': correlation_id},
task_id=correlation_id # Use same ID for tracing
)

return correlation_id

Next Steps​