Job Queues
When processing invoices at scale, synchronous API calls aren't always practical. This guide covers best practices for implementing job queues and async processing patterns with GoRoute.
Why Use Job Queues?β
| Challenge | Solution |
|---|---|
| Rate limits on API calls | Queue and throttle requests |
| Network timeouts | Retry failed jobs automatically |
| Batch processing thousands of invoices | Process in parallel with workers |
| System outages | Persist jobs and resume later |
| Audit trail | Track every job's status |
Architecture Patternβ
βββββββββββββββ βββββββββββββββ βββββββββββββββ
β Your App ββββββΆβ Job Queue ββββββΆβ Workers β
β (Producer) β β (Redis/SQS) β β (Consumers) β
βββββββββββββββ βββββββββββββββ ββββββββ¬βββββββ
β
βΌ
βββββββββββββββ
β GoRoute API β
βββββββββββββββ
Implementation Examplesβ
Python with Celeryβ
# tasks.py
from celery import Celery
from goroute import GoRouteClient
import os
app = Celery('invoice_tasks', broker=os.environ['REDIS_URL'])
client = GoRouteClient(api_key=os.environ['GOROUTE_API_KEY'])
@app.task(
bind=True,
max_retries=5,
default_retry_delay=60,
autoretry_for=(Exception,),
retry_backoff=True
)
def send_invoice(self, invoice_xml: str, metadata: dict):
"""Send an invoice via GoRoute with automatic retries."""
try:
result = client.documents.send(
document=invoice_xml,
metadata=metadata
)
# Store result for tracking
store_result(metadata['invoice_id'], result)
return {
'status': 'success',
'transaction_id': result.transaction_id
}
except client.RateLimitError as e:
# Respect rate limits - retry after suggested delay
raise self.retry(countdown=e.retry_after)
except client.ValidationError as e:
# Don't retry validation errors - they won't succeed
return {
'status': 'failed',
'error': str(e),
'retryable': False
}
# Producer code
def queue_invoices(invoices: list):
"""Queue multiple invoices for async processing."""
for invoice in invoices:
send_invoice.delay(
invoice_xml=invoice.to_xml(),
metadata={
'invoice_id': invoice.id,
'customer_id': invoice.customer_id
}
)
Node.js with BullMQβ
// queue.ts
import { Queue, Worker } from 'bullmq';
import { GoRouteClient } from '@goroute/peppol-sdk';
import Redis from 'ioredis';
const connection = new Redis(process.env.REDIS_URL);
const client = new GoRouteClient({ apiKey: process.env.GOROUTE_API_KEY });
// Create the queue
export const invoiceQueue = new Queue('invoices', { connection });
// Create the worker
const worker = new Worker('invoices', async (job) => {
const { invoiceXml, metadata } = job.data;
try {
const result = await client.documents.send({
document: invoiceXml,
...metadata
});
return {
status: 'success',
transactionId: result.transactionId
};
} catch (error) {
if (error.status === 429) {
// Rate limited - throw to trigger retry
throw error;
}
if (error.status === 400) {
// Validation error - don't retry
return {
status: 'failed',
error: error.message,
retryable: false
};
}
throw error;
}
}, {
connection,
concurrency: 10, // Process 10 jobs in parallel
limiter: {
max: 100, // Max 100 jobs
duration: 60000 // Per minute
}
});
// Handle events
worker.on('completed', (job, result) => {
console.log(`Invoice ${job.data.metadata.invoiceId} sent:`, result);
});
worker.on('failed', (job, error) => {
console.error(`Invoice ${job.data.metadata.invoiceId} failed:`, error);
});
C# with Hangfireβ
// InvoiceJobs.cs
using Hangfire;
using GoRoute.Peppol;
public class InvoiceJobs
{
private readonly IGoRouteClient _client;
private readonly ILogger<InvoiceJobs> _logger;
public InvoiceJobs(IGoRouteClient client, ILogger<InvoiceJobs> logger)
{
_client = client;
_logger = logger;
}
[AutomaticRetry(Attempts = 5, DelaysInSeconds = new[] { 60, 300, 900, 3600, 7200 })]
public async Task<SendResult> SendInvoiceAsync(string invoiceXml, InvoiceMetadata metadata)
{
try
{
var result = await _client.Documents.SendAsync(new SendDocumentRequest
{
Document = invoiceXml,
Metadata = metadata.ToDictionary()
});
_logger.LogInformation(
"Invoice {InvoiceId} sent successfully. Transaction: {TransactionId}",
metadata.InvoiceId, result.TransactionId);
return new SendResult { Success = true, TransactionId = result.TransactionId };
}
catch (RateLimitException ex)
{
_logger.LogWarning("Rate limited. Retry after {Seconds}s", ex.RetryAfter);
throw; // Will be retried by Hangfire
}
catch (ValidationException ex)
{
_logger.LogError("Validation failed for {InvoiceId}: {Error}",
metadata.InvoiceId, ex.Message);
// Return failure without throwing - don't retry
return new SendResult { Success = false, Error = ex.Message };
}
}
}
// Usage
public class InvoiceService
{
public void QueueInvoice(Invoice invoice)
{
BackgroundJob.Enqueue<InvoiceJobs>(
x => x.SendInvoiceAsync(invoice.ToXml(), invoice.Metadata));
}
public void QueueBatch(IEnumerable<Invoice> invoices)
{
foreach (var invoice in invoices)
{
BackgroundJob.Enqueue<InvoiceJobs>(
x => x.SendInvoiceAsync(invoice.ToXml(), invoice.Metadata));
}
}
}
Queue Design Patternsβ
Priority Queuesβ
Process urgent invoices first:
# High priority for due-date invoices
@app.task(queue='high_priority')
def send_urgent_invoice(invoice_xml, metadata):
return send_invoice(invoice_xml, metadata)
@app.task(queue='default')
def send_normal_invoice(invoice_xml, metadata):
return send_invoice(invoice_xml, metadata)
# Queue based on urgency
def queue_invoice(invoice):
if invoice.days_until_due < 3:
send_urgent_invoice.delay(invoice.xml, invoice.metadata)
else:
send_normal_invoice.delay(invoice.xml, invoice.metadata)
Dead Letter Queuesβ
Handle permanently failed jobs:
// Configure DLQ for failed jobs
const invoiceQueue = new Queue('invoices', {
connection,
defaultJobOptions: {
attempts: 5,
backoff: {
type: 'exponential',
delay: 60000
},
removeOnComplete: 100,
removeOnFail: false // Keep failed jobs for analysis
}
});
// Process DLQ periodically
async function processDLQ() {
const failedJobs = await invoiceQueue.getFailed(0, 100);
for (const job of failedJobs) {
// Notify customer service
await notifyFailure(job.data.metadata);
// Move to manual review queue
await manualReviewQueue.add('review', {
originalJob: job.data,
error: job.failedReason,
attempts: job.attemptsMade
});
// Remove from failed queue
await job.remove();
}
}
Batch Processingβ
Group invoices for efficient processing:
from celery import group
def send_invoice_batch(invoices: list, batch_size: int = 50):
"""Send invoices in batches with controlled parallelism."""
# Create groups of tasks
for i in range(0, len(invoices), batch_size):
batch = invoices[i:i + batch_size]
# Create a group of parallel tasks
job_group = group(
send_invoice.s(inv.xml, inv.metadata)
for inv in batch
)
# Execute batch and wait for results
result = job_group.apply_async()
# Wait for batch to complete before next batch
# This prevents overwhelming the API
results = result.get(timeout=300)
# Log batch results
success = sum(1 for r in results if r['status'] == 'success')
logger.info(f"Batch {i//batch_size + 1}: {success}/{len(batch)} successful")