Как гарантировать доставку событий в Kafka при сохранении данных в базу? В этом руководстве мы разберем Outbox Pattern — решение проблемы dual writes (БД + message broker). Вы узнаете про Transactional Outbox, Debezium CDC, Polling и примеры реализации для PostgreSQL + Kafka.
📋 Содержание
❌ Проблема Dual Writes
Dual writes — это когда нужно сделать два независимых write операции (например, INSERT в БД + publish в Kafka). Проблема: если первый write успешен, а второй fails, данные становятся inconsistent.
❌ Проблема: БД + Kafka в разных транзакциях
Сценарий: Order Service создает заказ и должен опубликовать событие OrderCreated в Kafka.
// ❌ BAD: Two separate transactions (dual writes)
async function createOrder(order) {
// 1. Save to database
const savedOrder = await db.orders.create(order);
// 2. Publish event to Kafka
await kafka.publish('orders', {
eventType: 'OrderCreated',
orderId: savedOrder.id,
...savedOrder
});
return savedOrder;
}
Что может пойти не так:
- DB success, Kafka fails: Заказ в БД, но событие не опубликовано → другие сервисы не узнают
- DB fails, Kafka success: Событие опубликовано, но заказа нет в БД → inconsistency
- Kafka timeout: Неизвестно, опубликовано событие или нет
- Partial failure: Network split во время publish
❌ Почему нельзя использовать БД транзакцию?
Database transactions работают только внутри одной БД. Kafka — это внешняя система, она не участвует в БД транзакции.
// ❌ Это НЕ РАБОТАЕТ
await db.transaction(async (trx) => {
await trx('orders').insert(order);
await kafka.publish('orders', event); // ← Kafka не в транзакции!
// Если Kafka fails, ROLLBACK не отменит publish
});
✅ Что такое Outbox Pattern?
Outbox Pattern — это паттерн который решает проблему dual writes. События сначала сохраняются в outbox таблицу в той же БД транзакции, затем асинхронно отправляются в Kafka.
✅ Решение: Outbox Table
Как работает:
- В БД создается таблица
outboxдля хранения событий - При создании заказа: INSERT в
orders+ INSERT вoutboxв одной транзакции - Отдельный процесс читает
outboxтаблицу и публикует события в Kafka - После успешной публикации событие помечается как processed
Гарантия: Если транзакция успешна → событие обязательно попадет в outbox → будет опубликовано в Kafka (eventually).
Outbox Pattern Flow
1. Application Transaction:
BEGIN TRANSACTION
INSERT INTO orders (id, user_id, total) VALUES (123, 1, 100);
INSERT INTO outbox (aggregate_id, event_type, payload)
VALUES ('order-123', 'OrderCreated', '{"orderId": 123, ...}');
COMMIT
✅ Атомарная операция — или оба успешны, или оба rollback
2. Outbox Publisher (separate process):
SELECT * FROM outbox WHERE processed = false ORDER BY created_at
↓
Для каждого события:
- Publish to Kafka
- UPDATE outbox SET processed = true WHERE id = ?
↓
✅ Событие доставлено в Kafka
3. Consumer:
Kafka Consumer ← "OrderCreated" event
↓
Process event (Payment Service, Inventory, etc.)
Outbox Table Schema
-- Outbox table для PostgreSQL
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_id VARCHAR(255) NOT NULL, -- Идентификатор aggregate (order-123)
event_type VARCHAR(255) NOT NULL, -- Тип события (OrderCreated)
payload JSONB NOT NULL, -- Данные события
created_at TIMESTAMP DEFAULT NOW(), -- Когда создано
processed_at TIMESTAMP, -- Когда опубликовано в Kafka
processed BOOLEAN DEFAULT FALSE, -- Опубликовано или нет
INDEX idx_outbox_processed (processed, created_at)
);
-- Пример записи
INSERT INTO outbox (aggregate_id, event_type, payload)
VALUES (
'order-123',
'OrderCreated',
'{"orderId": 123, "userId": 1, "items": [...], "total": 100}'::jsonb
);
🔄 Polling Outbox Table
Polling — это простой подход: отдельный процесс периодически проверяет outbox таблицу и публикует события.
Polling Outbox: Как работает
- Scheduled job (cron, setInterval) каждые N секунд
- SELECT * FROM outbox WHERE processed = false ORDER BY created_at LIMIT 100
- Для каждого события: publish to Kafka
- UPDATE outbox SET processed = true, processed_at = NOW()
✅ Преимущества:
- Простая реализация (100 строк кода)
- Не требует дополнительных инструментов
- Легко debugить
❌ Недостатки:
- Latency (задержка = polling interval)
- Polling overhead (лишние SELECT queries)
- Сложно масштабировать (нужно координировать несколько publishers)
Polling Implementation (Node.js)
// outbox-publisher.js (Polling approach)
const { Kafka } = require('kafkajs');
const db = require('./db'); // PostgreSQL client
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
const POLLING_INTERVAL = 5000; // 5 seconds
const BATCH_SIZE = 100;
async function startOutboxPublisher() {
await producer.connect();
console.log('✅ Outbox publisher started');
// Poll every 5 seconds
setInterval(async () => {
await publishPendingEvents();
}, POLLING_INTERVAL);
}
async function publishPendingEvents() {
const client = await db.getClient();
try {
// Get pending events (with row-level lock to prevent duplicate processing)
const result = await client.query(`
SELECT * FROM outbox
WHERE processed = false
ORDER BY created_at ASC
LIMIT $1
FOR UPDATE SKIP LOCKED
`, [BATCH_SIZE]);
const events = result.rows;
if (events.length === 0) {
return; // No events to publish
}
console.log(`📨 Publishing ${events.length} events...`);
// Publish each event to Kafka
for (const event of events) {
try {
await producer.send({
topic: 'orders',
messages: [{
key: event.aggregate_id,
value: JSON.stringify({
eventType: event.event_type,
payload: event.payload,
timestamp: event.created_at
})
}]
});
// Mark as processed
await client.query(`
UPDATE outbox
SET processed = true, processed_at = NOW()
WHERE id = $1
`, [event.id]);
console.log(`✅ Published: ${event.event_type} (${event.aggregate_id})`);
} catch (error) {
console.error(`❌ Failed to publish event ${event.id}:`, error.message);
// Event will be retried in next poll
}
}
} catch (error) {
console.error('❌ Outbox publisher error:', error.message);
} finally {
client.release();
}
}
// Graceful shutdown
process.on('SIGTERM', async () => {
await producer.disconnect();
await db.end();
process.exit(0);
});
startOutboxPublisher().catch(console.error);
Application Code с Outbox
// order-service.js
async function createOrder(userId, items, total) {
const client = await db.getClient();
try {
await client.query('BEGIN');
// 1. Insert order
const orderResult = await client.query(`
INSERT INTO orders (user_id, items, total, status)
VALUES ($1, $2, $3, 'PENDING')
RETURNING *
`, [userId, JSON.stringify(items), total]);
const order = orderResult.rows[0];
// 2. Insert event to outbox (same transaction!)
await client.query(`
INSERT INTO outbox (aggregate_id, event_type, payload)
VALUES ($1, $2, $3)
`, [
`order-${order.id}`,
'OrderCreated',
JSON.stringify({
orderId: order.id,
userId: order.user_id,
items: order.items,
total: order.total,
status: order.status
})
]);
await client.query('COMMIT');
console.log(`✅ Order ${order.id} created (event in outbox)`);
return order;
} catch (error) {
await client.query('ROLLBACK');
console.error('❌ Failed to create order:', error.message);
throw error;
} finally {
client.release();
}
}
module.exports = { createOrder };
🔍 Change Data Capture (CDC)
Change Data Capture (CDC) — это технология для отслеживания изменений в базе данных в реальном времени. CDC читает database transaction log вместо polling таблицы.
✅ Как работает CDC
- PostgreSQL: Читает WAL (Write-Ahead Log)
- MySQL: Читает binlog (binary log)
- MongoDB: Читает oplog (operations log)
Преимущества CDC:
- Zero latency: События публикуются сразу после commit
- No polling overhead: Не нужно постоянно SELECT
- Scalable: CDC connector масштабируется независимо
- Ordering guarantee: Порядок событий из transaction log
CDC Architecture
Application
↓
PostgreSQL Transaction:
BEGIN
INSERT INTO orders ...
INSERT INTO outbox ...
COMMIT
↓
Write to WAL (transaction log)
↓
Debezium CDC Connector
← Reads WAL in real-time
↓
Publish to Kafka (automatically)
↓
Kafka Topic "outbox.events"
↓
Consumer processes event
Zero polling, near-instant delivery!
🚀 Debezium для Outbox Pattern
Debezium — это open-source CDC platform на основе Kafka Connect. Поддерживает PostgreSQL, MySQL, MongoDB, SQL Server, Oracle.
Debezium Setup для PostgreSQL
# docker-compose.yml
version: '3'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orders
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # Enable logical replication for CDC
ports:
- "5432:5432"
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
debezium:
image: debezium/connect:latest
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: debezium_configs
OFFSET_STORAGE_TOPIC: debezium_offsets
STATUS_STORAGE_TOPIC: debezium_statuses
Debezium Connector Configuration
// Register Debezium PostgreSQL connector
POST http://localhost:8083/connectors
Content-Type: application/json
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orders",
"database.server.name": "orderdb",
"table.include.list": "public.outbox",
"plugin.name": "pgoutput",
// Outbox Event Router (transform outbox events)
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.timestamp": "created_at",
"transforms.outbox.route.topic.replacement": "orders.${routedByValue}"
}
}
Outbox Table для Debezium
-- Outbox table optimized for Debezium
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(255) NOT NULL, -- Entity type (Order, Payment)
aggregate_id VARCHAR(255) NOT NULL, -- Entity ID
event_type VARCHAR(255) NOT NULL, -- Event type (OrderCreated)
payload JSONB NOT NULL, -- Event data
created_at TIMESTAMP DEFAULT NOW()
);
-- Index for Debezium performance
CREATE INDEX idx_outbox_created_at ON outbox(created_at);
-- Example: Insert event
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
'Order',
'order-123',
'OrderCreated',
'{"orderId": 123, "userId": 1, "total": 100}'::jsonb
);
Debezium Consumer (Node.js)
// debezium-consumer.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'order-service' });
async function startDebeziumConsumer() {
await consumer.connect();
// Subscribe to Debezium outbox topic
await consumer.subscribe({
topic: 'orders.OrderCreated', // Transformed by EventRouter
fromBeginning: false
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
// Debezium event structure
const event = JSON.parse(message.value.toString());
console.log(`📨 Received event from Debezium:`);
console.log(` Type: ${event.eventType}`);
console.log(` Payload:`, event.payload);
// Process event
await processOrderCreated(event.payload);
}
});
}
async function processOrderCreated(order) {
console.log(`💳 Processing order ${order.orderId}`);
// Business logic here
}
startDebeziumConsumer().catch(console.error);
📊 Polling vs CDC (Debezium) сравнение
| Критерий | Polling Outbox | CDC (Debezium) |
|---|---|---|
| Latency | Polling interval (5-30 sec) | Near real-time (<100ms) |
| DB Overhead | SELECT queries every N sec | No polling (reads WAL) |
| Complexity | Low (100 lines code) | Medium (setup Debezium) |
| Scalability | Hard (coordination needed) | Easy (Kafka Connect scales) |
| Ordering | Best-effort (ORDER BY) | Guaranteed (from WAL) |
| Infrastructure | Simple (app + DB) | Complex (Kafka Connect, Debezium) |
| Ops Complexity | Low | Medium-High |
| Performance | Lower (polling overhead) | Higher (no overhead) |
| When to use | Low throughput, simple cases | High throughput, production |
🎯 Когда что использовать?
Используйте Polling если:
- Low throughput (<100 events/sec)
- Latency 5-30 секунд приемлема
- Простая инфраструктура (нет Kafka Connect)
- MVP / proof of concept
Используйте CDC (Debezium) если:
- High throughput (1000+ events/sec)
- Near real-time latency критична
- Production system с высокими требованиями
- Нужна гарантия ordering событий
- Есть ресурсы для операционной поддержки
💻 Примеры реализации
Python: Outbox Pattern с SQLAlchemy
# models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class Order(Base):
__tablename__ = 'orders'
id = Column(Integer, primary_key=True)
user_id = Column(Integer, nullable=False)
items = Column(JSON, nullable=False)
total = Column(Integer, nullable=False)
status = Column(String(50), default='PENDING')
created_at = Column(DateTime, default=datetime.now)
class Outbox(Base):
__tablename__ = 'outbox'
id = Column(Integer, primary_key=True)
aggregate_id = Column(String(255), nullable=False)
event_type = Column(String(255), nullable=False)
payload = Column(JSON, nullable=False)
created_at = Column(DateTime, default=datetime.now)
processed = Column(Boolean, default=False)
processed_at = Column(DateTime, nullable=True)
# order_service.py
from sqlalchemy.orm import Session
from models import Order, Outbox
import json
def create_order(db: Session, user_id: int, items: list, total: int):
"""Create order with outbox pattern"""
try:
# 1. Create order
order = Order(
user_id=user_id,
items=items,
total=total,
status='PENDING'
)
db.add(order)
db.flush() # Get order.id without committing
# 2. Create outbox event (same transaction)
event = Outbox(
aggregate_id=f'order-{order.id}',
event_type='OrderCreated',
payload={
'orderId': order.id,
'userId': order.user_id,
'items': order.items,
'total': order.total,
'status': order.status
}
)
db.add(event)
# 3. Commit both together
db.commit()
print(f"✅ Order {order.id} created (event in outbox)")
return order
except Exception as e:
db.rollback()
print(f"❌ Failed to create order: {e}")
raise
# outbox_publisher.py (Polling)
from sqlalchemy.orm import Session
from kafka import KafkaProducer
from models import Outbox
import json
import time
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish_outbox_events(db: Session):
"""Poll outbox and publish to Kafka"""
# Get unprocessed events
events = db.query(Outbox)\
.filter(Outbox.processed == False)\
.order_by(Outbox.created_at)\
.limit(100)\
.with_for_update(skip_locked=True)\
.all()
if not events:
return
print(f"📨 Publishing {len(events)} events...")
for event in events:
try:
# Publish to Kafka
producer.send(
topic='orders',
key=event.aggregate_id.encode('utf-8'),
value={
'eventType': event.event_type,
'payload': event.payload,
'timestamp': event.created_at.isoformat()
}
)
producer.flush()
# Mark as processed
event.processed = True
event.processed_at = datetime.now()
db.commit()
print(f"✅ Published: {event.event_type} ({event.aggregate_id})")
except Exception as e:
print(f"❌ Failed to publish event {event.id}: {e}")
db.rollback()
# Main loop
while True:
with get_db_session() as db:
publish_outbox_events(db)
time.sleep(5) # Poll every 5 seconds
✅ Best Practices для Outbox Pattern
1. Ordering Events
Для сохранения порядка событий одного aggregate:
- Используйте
aggregate_idкак Kafka partition key - Добавьте
sequence_numberв outbox для ordering - Debezium автоматически сохраняет порядок из WAL
2. Cleanup Processed Events
Что делать с обработанными событиями:
- Soft delete:
processed = true, периодическая архивация - Hard delete: DELETE после N дней (риск потери audit trail)
- Partition by date: DROP старых partitions
-- Archive old events
INSERT INTO outbox_archive SELECT * FROM outbox
WHERE processed = true AND processed_at < NOW() - INTERVAL '30 days';
DELETE FROM outbox
WHERE processed = true AND processed_at < NOW() - INTERVAL '30 days';
3. Idempotency
Consumers должны быть idempotent — at-least-once delivery может привести к дубликатам.
- Используйте unique constraint на
event_idв consumer - Check if event already processed
4. Monitoring
- Outbox lag: Количество unprocessed events
- Publishing rate: Events/sec published to Kafka
- Failed events: Events that failed to publish
- Oldest unprocessed: Age of oldest unprocessed event
-- Monitor outbox lag
SELECT
COUNT(*) as unprocessed_count,
MIN(created_at) as oldest_event,
AGE(NOW(), MIN(created_at)) as lag
FROM outbox
WHERE processed = false;
🔍 FAQ: Часто задаваемые вопросы
Проблема: Dual writes — сохранение в БД и публикация события в разных транзакциях может привести к inconsistency.
Решение: События сохраняются в outbox таблицу в той же БД транзакции, затем асинхронно публикуются в Kafka.
Гарантия: Если транзакция успешна → событие обязательно попадет в Kafka (eventually).
Проблемы:
- DB success, Kafka fails → событие не опубликовано
- DB fails, Kafka success → событие опубликовано для несуществующего entity
- Kafka timeout → неизвестно, опубликовано или нет
- Невозможно использовать database transaction для обоих writes
- Читает database transaction log (WAL) в реальном времени
- Zero-latency (<100ms), no polling overhead
- Лучше для production, high throughput
- Сложнее setup (Kafka Connect, Debezium)
- Периодически SELECT из outbox таблицы
- Higher latency (5-30 sec), polling overhead
- Проще setup (100 строк кода)
- Подходит для MVP, low throughput
Как работает:
- Читает database transaction log (PostgreSQL WAL, MySQL binlog)
- Отслеживает INSERT, UPDATE, DELETE
- Публикует изменения в Kafka в реальном времени
Преимущества: zero polling overhead, near real-time, ordering guarantee.
- Используйте
aggregate_idкак Kafka partition key → события одного aggregate в одной partition - Добавьте
sequence_numberв outbox для ordering внутри aggregate - Polling:
ORDER BY created_at, idпри чтении outbox - Debezium: автоматически сохраняет порядок из transaction log
1. Soft delete (рекомендуется):
UPDATE outbox SET processed = true- Периодическая архивация в cold storage
- Полный audit trail
DELETE FROM outbox WHERE processed = true- Экономия места, но риск потери audit trail
- Table partitioning по
created_at DROPстарых partitions (fast cleanup)
Тестируйте Outbox Pattern без риска
LightBox API — создайте Mock API для тестирования event-driven архитектуры
Начать бесплатно →📝 Выводы
В этой статье мы рассмотрели Outbox Pattern для надежной доставки событий:
- Проблема: Dual writes (БД + Kafka) приводят к inconsistency
- Решение: Outbox таблица + асинхронная публикация
- Polling: Простой подход, higher latency
- CDC (Debezium): Real-time, zero overhead, production-ready
- Best Practices: ordering, cleanup, idempotency, monitoring
🎯 Главное:
- Outbox Pattern решает проблему dual writes через единую БД транзакцию
- Polling — для MVP и simple cases (5-30 sec latency)
- Debezium CDC — для production (<100ms latency)
- Используйте aggregate_id как partition key для ordering
- Cleanup обработанных событий обязателен
- Consumers должны быть idempotent
- Monitoring outbox lag критичен для production
Related Articles
- Event-Driven Architecture: Kafka vs RabbitMQ vs AWS
- Saga Pattern: распределенные транзакции
- Service Mesh: Istio vs Linkerd vs Consul