{
"eventType": "OrderPlaced",
"eventId": "evt-a1b2c3d4",
"timestamp": "2025-06-28T10:00:00Z",
"version": "1.0",
"payload": {
"orderId": "ord-456",
"customerId": "cust-789",
"items": [{"sku": "WIDGET-01", "quantity": 2}],
"total": 99.99,
"currency": "USD"
}
}┌──────────┐ "OrderPlaced" ┌──────────┐
│ Producer │ ─────────────────→ │ Consumer │
└──────────┘ └────┬─────┘
│
▼ Query for order details
┌──────────┐
│ API │
└──────────┘┌──────────┐ {full order data} ┌──────────┐
│ Producer │ ──────────────────────→ │ Consumer │
└──────────┘ │ stores │
│ locally │
└──────────┘Commands → Domain Logic → Events → Event Store
│
▼
┌─────────────────┐
│ Projections │
│ (Read Models) │
└─────────────────┘Commands ──→ Write Model ──→ Events ──→ Read Model ←── Queries
(Domain) (Denormalized)| Technology | Strengths | Best For |
|---|---|---|
| Apache Kafka | High throughput, replay, partitioning | Event streaming, high volume |
| RabbitMQ | Flexible routing, multiple protocols | Traditional messaging, complex routing |
| AWS EventBridge | Serverless, AWS integration, rules | AWS-native applications |
| Redis Streams | Low latency, simple setup | Real-time, moderate volume |
| NATS | Lightweight, fast, simple | Microservices, edge computing |
// Avro schema example
{
"type": "record",
"name": "OrderPlaced",
"namespace": "com.example.events",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "total", "type": "double"},
{"name": "timestamp", "type": "long", "logicalType": "timestamp-millis"}
]
}async function handleEvent(event: Event): Promise {
// Check if already processed
const exists = await db.processedEvents.findOne({
eventId: event.eventId
});
if (exists) {
console.log(Event ${event.eventId} already processed, skipping);
return;
}
// Process within transaction
await db.transaction(async (tx) => {
await processEvent(event, tx);
await tx.processedEvents.insert({
eventId: event.eventId,
processedAt: new Date()
});
});
} BEGIN TRANSACTION;
-- Business operation
INSERT INTO orders (id, customer_id, total)
VALUES ('ord-123', 'cust-456', 99.99);
-- Store event in outbox
INSERT INTO outbox (id, event_type, payload, created_at)
VALUES ('evt-789', 'OrderPlaced', '{"orderId":"ord-123"...}', NOW());
COMMIT;
-- Separate process polls outbox and publishes to broker
-- Then marks events as published// Use sequence numbers for ordering
interface OrderedEvent {
eventId: string;
aggregateId: string;
sequenceNumber: number; // Monotonically increasing per aggregate
payload: unknown;
}
// Consumer tracks last processed sequence
async function handleOrderedEvent(event: OrderedEvent) {
const lastSequence = await getLastSequence(event.aggregateId);
if (event.sequenceNumber <= lastSequence) {
return; // Already processed or duplicate
}
if (event.sequenceNumber > lastSequence + 1) {
// Gap detected - events arrived out of order
await bufferForReordering(event);
return;
}
await processEvent(event);
}- Dead letter queues for failed events
- Exponential backoff retry strategies
- Alerting on DLQ accumulation
- Manual intervention tools for poison messages
- Circuit breakers for downstream failures
Building Event-Driven Systems?
We help organizations design and implement event-driven architectures that scale. From initial design through production deployment, our team brings deep distributed systems expertise.
Discuss Your Architecture →
