Outbox Pattern: надежная доставка событий из БД

Как гарантировать доставку событий в Kafka при сохранении данных в базу? В этом руководстве мы разберем Outbox Pattern — решение проблемы dual writes (БД + message broker). Вы узнаете про Transactional Outbox, Debezium CDC, Polling и примеры реализации для PostgreSQL + Kafka.

📋 Содержание

  1. Проблема Dual Writes
  2. Что такое Outbox Pattern?
  3. Polling Outbox Table
  4. Change Data Capture (CDC)
  5. Debezium для Outbox Pattern
  6. Polling vs CDC сравнение
  7. Примеры реализации
  8. Best Practices
  9. FAQ: Часто задаваемые вопросы

❌ Проблема Dual Writes

Dual writes — это когда нужно сделать два независимых write операции (например, INSERT в БД + publish в Kafka). Проблема: если первый write успешен, а второй fails, данные становятся inconsistent.

❌ Проблема: БД + Kafka в разных транзакциях

Сценарий: Order Service создает заказ и должен опубликовать событие OrderCreated в Kafka.

// ❌ BAD: Two separate transactions (dual writes)
async function createOrder(order) {
  // 1. Save to database
  const savedOrder = await db.orders.create(order);

  // 2. Publish event to Kafka
  await kafka.publish('orders', {
    eventType: 'OrderCreated',
    orderId: savedOrder.id,
    ...savedOrder
  });

  return savedOrder;
}

Что может пойти не так:

  1. DB success, Kafka fails: Заказ в БД, но событие не опубликовано → другие сервисы не узнают
  2. DB fails, Kafka success: Событие опубликовано, но заказа нет в БД → inconsistency
  3. Kafka timeout: Неизвестно, опубликовано событие или нет
  4. Partial failure: Network split во время publish

❌ Почему нельзя использовать БД транзакцию?

Database transactions работают только внутри одной БД. Kafka — это внешняя система, она не участвует в БД транзакции.

// ❌ Это НЕ РАБОТАЕТ
await db.transaction(async (trx) => {
  await trx('orders').insert(order);
  await kafka.publish('orders', event);  // ← Kafka не в транзакции!

  // Если Kafka fails, ROLLBACK не отменит publish
});

✅ Что такое Outbox Pattern?

Outbox Pattern — это паттерн который решает проблему dual writes. События сначала сохраняются в outbox таблицу в той же БД транзакции, затем асинхронно отправляются в Kafka.

✅ Решение: Outbox Table

Как работает:

  1. В БД создается таблица outbox для хранения событий
  2. При создании заказа: INSERT в orders + INSERT в outbox в одной транзакции
  3. Отдельный процесс читает outbox таблицу и публикует события в Kafka
  4. После успешной публикации событие помечается как processed

Гарантия: Если транзакция успешна → событие обязательно попадет в outbox → будет опубликовано в Kafka (eventually).

Outbox Pattern Flow

1. Application Transaction:
   BEGIN TRANSACTION
     INSERT INTO orders (id, user_id, total) VALUES (123, 1, 100);
     INSERT INTO outbox (aggregate_id, event_type, payload)
       VALUES ('order-123', 'OrderCreated', '{"orderId": 123, ...}');
   COMMIT
   ✅ Атомарная операция — или оба успешны, или оба rollback

2. Outbox Publisher (separate process):
   SELECT * FROM outbox WHERE processed = false ORDER BY created_at
   ↓
   Для каждого события:
     - Publish to Kafka
     - UPDATE outbox SET processed = true WHERE id = ?
   ↓
   ✅ Событие доставлено в Kafka

3. Consumer:
   Kafka Consumer ← "OrderCreated" event
   ↓
   Process event (Payment Service, Inventory, etc.)

Outbox Table Schema

-- Outbox table для PostgreSQL
CREATE TABLE outbox (
  id BIGSERIAL PRIMARY KEY,
  aggregate_id VARCHAR(255) NOT NULL,     -- Идентификатор aggregate (order-123)
  event_type VARCHAR(255) NOT NULL,        -- Тип события (OrderCreated)
  payload JSONB NOT NULL,                  -- Данные события
  created_at TIMESTAMP DEFAULT NOW(),      -- Когда создано
  processed_at TIMESTAMP,                  -- Когда опубликовано в Kafka
  processed BOOLEAN DEFAULT FALSE,         -- Опубликовано или нет
  INDEX idx_outbox_processed (processed, created_at)
);

-- Пример записи
INSERT INTO outbox (aggregate_id, event_type, payload)
VALUES (
  'order-123',
  'OrderCreated',
  '{"orderId": 123, "userId": 1, "items": [...], "total": 100}'::jsonb
);

🔄 Polling Outbox Table

Polling — это простой подход: отдельный процесс периодически проверяет outbox таблицу и публикует события.

Polling Outbox: Как работает

  1. Scheduled job (cron, setInterval) каждые N секунд
  2. SELECT * FROM outbox WHERE processed = false ORDER BY created_at LIMIT 100
  3. Для каждого события: publish to Kafka
  4. UPDATE outbox SET processed = true, processed_at = NOW()

✅ Преимущества:

❌ Недостатки:

Polling Implementation (Node.js)

// outbox-publisher.js (Polling approach)
const { Kafka } = require('kafkajs');
const db = require('./db');  // PostgreSQL client

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();

const POLLING_INTERVAL = 5000;  // 5 seconds
const BATCH_SIZE = 100;

async function startOutboxPublisher() {
  await producer.connect();
  console.log('✅ Outbox publisher started');

  // Poll every 5 seconds
  setInterval(async () => {
    await publishPendingEvents();
  }, POLLING_INTERVAL);
}

async function publishPendingEvents() {
  const client = await db.getClient();

  try {
    // Get pending events (with row-level lock to prevent duplicate processing)
    const result = await client.query(`
      SELECT * FROM outbox
      WHERE processed = false
      ORDER BY created_at ASC
      LIMIT $1
      FOR UPDATE SKIP LOCKED
    `, [BATCH_SIZE]);

    const events = result.rows;

    if (events.length === 0) {
      return;  // No events to publish
    }

    console.log(`📨 Publishing ${events.length} events...`);

    // Publish each event to Kafka
    for (const event of events) {
      try {
        await producer.send({
          topic: 'orders',
          messages: [{
            key: event.aggregate_id,
            value: JSON.stringify({
              eventType: event.event_type,
              payload: event.payload,
              timestamp: event.created_at
            })
          }]
        });

        // Mark as processed
        await client.query(`
          UPDATE outbox
          SET processed = true, processed_at = NOW()
          WHERE id = $1
        `, [event.id]);

        console.log(`✅ Published: ${event.event_type} (${event.aggregate_id})`);

      } catch (error) {
        console.error(`❌ Failed to publish event ${event.id}:`, error.message);
        // Event will be retried in next poll
      }
    }

  } catch (error) {
    console.error('❌ Outbox publisher error:', error.message);
  } finally {
    client.release();
  }
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  await producer.disconnect();
  await db.end();
  process.exit(0);
});

startOutboxPublisher().catch(console.error);

Application Code с Outbox

// order-service.js
async function createOrder(userId, items, total) {
  const client = await db.getClient();

  try {
    await client.query('BEGIN');

    // 1. Insert order
    const orderResult = await client.query(`
      INSERT INTO orders (user_id, items, total, status)
      VALUES ($1, $2, $3, 'PENDING')
      RETURNING *
    `, [userId, JSON.stringify(items), total]);

    const order = orderResult.rows[0];

    // 2. Insert event to outbox (same transaction!)
    await client.query(`
      INSERT INTO outbox (aggregate_id, event_type, payload)
      VALUES ($1, $2, $3)
    `, [
      `order-${order.id}`,
      'OrderCreated',
      JSON.stringify({
        orderId: order.id,
        userId: order.user_id,
        items: order.items,
        total: order.total,
        status: order.status
      })
    ]);

    await client.query('COMMIT');

    console.log(`✅ Order ${order.id} created (event in outbox)`);
    return order;

  } catch (error) {
    await client.query('ROLLBACK');
    console.error('❌ Failed to create order:', error.message);
    throw error;
  } finally {
    client.release();
  }
}

module.exports = { createOrder };

🔍 Change Data Capture (CDC)

Change Data Capture (CDC) — это технология для отслеживания изменений в базе данных в реальном времени. CDC читает database transaction log вместо polling таблицы.

✅ Как работает CDC

Преимущества CDC:

CDC Architecture

Application
    ↓
PostgreSQL Transaction:
  BEGIN
    INSERT INTO orders ...
    INSERT INTO outbox ...
  COMMIT
    ↓
  Write to WAL (transaction log)
    ↓
Debezium CDC Connector
  ← Reads WAL in real-time
    ↓
  Publish to Kafka (automatically)
    ↓
Kafka Topic "outbox.events"
    ↓
Consumer processes event

Zero polling, near-instant delivery!

🚀 Debezium для Outbox Pattern

Debezium — это open-source CDC platform на основе Kafka Connect. Поддерживает PostgreSQL, MySQL, MongoDB, SQL Server, Oracle.

Debezium Setup для PostgreSQL

# docker-compose.yml
version: '3'
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_DB: orders
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # Enable logical replication for CDC
    ports:
      - "5432:5432"

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092

  debezium:
    image: debezium/connect:latest
    depends_on:
      - kafka
      - postgres
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: debezium_configs
      OFFSET_STORAGE_TOPIC: debezium_offsets
      STATUS_STORAGE_TOPIC: debezium_statuses

Debezium Connector Configuration

// Register Debezium PostgreSQL connector
POST http://localhost:8083/connectors
Content-Type: application/json

{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "orders",
    "database.server.name": "orderdb",
    "table.include.list": "public.outbox",
    "plugin.name": "pgoutput",

    // Outbox Event Router (transform outbox events)
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "transforms.outbox.table.field.event.id": "id",
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.timestamp": "created_at",
    "transforms.outbox.route.topic.replacement": "orders.${routedByValue}"
  }
}

Outbox Table для Debezium

-- Outbox table optimized for Debezium
CREATE TABLE outbox (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_type VARCHAR(255) NOT NULL,   -- Entity type (Order, Payment)
  aggregate_id VARCHAR(255) NOT NULL,      -- Entity ID
  event_type VARCHAR(255) NOT NULL,        -- Event type (OrderCreated)
  payload JSONB NOT NULL,                  -- Event data
  created_at TIMESTAMP DEFAULT NOW()
);

-- Index for Debezium performance
CREATE INDEX idx_outbox_created_at ON outbox(created_at);

-- Example: Insert event
INSERT INTO outbox (aggregate_type, aggregate_id, event_type, payload)
VALUES (
  'Order',
  'order-123',
  'OrderCreated',
  '{"orderId": 123, "userId": 1, "total": 100}'::jsonb
);

Debezium Consumer (Node.js)

// debezium-consumer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'order-service' });

async function startDebeziumConsumer() {
  await consumer.connect();

  // Subscribe to Debezium outbox topic
  await consumer.subscribe({
    topic: 'orders.OrderCreated',  // Transformed by EventRouter
    fromBeginning: false
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      // Debezium event structure
      const event = JSON.parse(message.value.toString());

      console.log(`📨 Received event from Debezium:`);
      console.log(`  Type: ${event.eventType}`);
      console.log(`  Payload:`, event.payload);

      // Process event
      await processOrderCreated(event.payload);
    }
  });
}

async function processOrderCreated(order) {
  console.log(`💳 Processing order ${order.orderId}`);
  // Business logic here
}

startDebeziumConsumer().catch(console.error);

📊 Polling vs CDC (Debezium) сравнение

Критерий Polling Outbox CDC (Debezium)
Latency Polling interval (5-30 sec) Near real-time (<100ms)
DB Overhead SELECT queries every N sec No polling (reads WAL)
Complexity Low (100 lines code) Medium (setup Debezium)
Scalability Hard (coordination needed) Easy (Kafka Connect scales)
Ordering Best-effort (ORDER BY) Guaranteed (from WAL)
Infrastructure Simple (app + DB) Complex (Kafka Connect, Debezium)
Ops Complexity Low Medium-High
Performance Lower (polling overhead) Higher (no overhead)
When to use Low throughput, simple cases High throughput, production

🎯 Когда что использовать?

Используйте Polling если:

Используйте CDC (Debezium) если:

💻 Примеры реализации

Python: Outbox Pattern с SQLAlchemy

# models.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime, JSON
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class Order(Base):
    __tablename__ = 'orders'

    id = Column(Integer, primary_key=True)
    user_id = Column(Integer, nullable=False)
    items = Column(JSON, nullable=False)
    total = Column(Integer, nullable=False)
    status = Column(String(50), default='PENDING')
    created_at = Column(DateTime, default=datetime.now)

class Outbox(Base):
    __tablename__ = 'outbox'

    id = Column(Integer, primary_key=True)
    aggregate_id = Column(String(255), nullable=False)
    event_type = Column(String(255), nullable=False)
    payload = Column(JSON, nullable=False)
    created_at = Column(DateTime, default=datetime.now)
    processed = Column(Boolean, default=False)
    processed_at = Column(DateTime, nullable=True)
# order_service.py
from sqlalchemy.orm import Session
from models import Order, Outbox
import json

def create_order(db: Session, user_id: int, items: list, total: int):
    """Create order with outbox pattern"""
    try:
        # 1. Create order
        order = Order(
            user_id=user_id,
            items=items,
            total=total,
            status='PENDING'
        )
        db.add(order)
        db.flush()  # Get order.id without committing

        # 2. Create outbox event (same transaction)
        event = Outbox(
            aggregate_id=f'order-{order.id}',
            event_type='OrderCreated',
            payload={
                'orderId': order.id,
                'userId': order.user_id,
                'items': order.items,
                'total': order.total,
                'status': order.status
            }
        )
        db.add(event)

        # 3. Commit both together
        db.commit()

        print(f"✅ Order {order.id} created (event in outbox)")
        return order

    except Exception as e:
        db.rollback()
        print(f"❌ Failed to create order: {e}")
        raise
# outbox_publisher.py (Polling)
from sqlalchemy.orm import Session
from kafka import KafkaProducer
from models import Outbox
import json
import time

producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

def publish_outbox_events(db: Session):
    """Poll outbox and publish to Kafka"""

    # Get unprocessed events
    events = db.query(Outbox)\
        .filter(Outbox.processed == False)\
        .order_by(Outbox.created_at)\
        .limit(100)\
        .with_for_update(skip_locked=True)\
        .all()

    if not events:
        return

    print(f"📨 Publishing {len(events)} events...")

    for event in events:
        try:
            # Publish to Kafka
            producer.send(
                topic='orders',
                key=event.aggregate_id.encode('utf-8'),
                value={
                    'eventType': event.event_type,
                    'payload': event.payload,
                    'timestamp': event.created_at.isoformat()
                }
            )
            producer.flush()

            # Mark as processed
            event.processed = True
            event.processed_at = datetime.now()
            db.commit()

            print(f"✅ Published: {event.event_type} ({event.aggregate_id})")

        except Exception as e:
            print(f"❌ Failed to publish event {event.id}: {e}")
            db.rollback()

# Main loop
while True:
    with get_db_session() as db:
        publish_outbox_events(db)
    time.sleep(5)  # Poll every 5 seconds

✅ Best Practices для Outbox Pattern

1. Ordering Events

Для сохранения порядка событий одного aggregate:

2. Cleanup Processed Events

Что делать с обработанными событиями:

-- Archive old events
INSERT INTO outbox_archive SELECT * FROM outbox
WHERE processed = true AND processed_at < NOW() - INTERVAL '30 days';

DELETE FROM outbox
WHERE processed = true AND processed_at < NOW() - INTERVAL '30 days';

3. Idempotency

Consumers должны быть idempotent — at-least-once delivery может привести к дубликатам.

4. Monitoring

-- Monitor outbox lag
SELECT
  COUNT(*) as unprocessed_count,
  MIN(created_at) as oldest_event,
  AGE(NOW(), MIN(created_at)) as lag
FROM outbox
WHERE processed = false;

🔍 FAQ: Часто задаваемые вопросы

❓ Что такое Outbox Pattern и зачем он нужен?
Outbox Pattern — это паттерн для надежной доставки событий из базы данных в message broker (Kafka, RabbitMQ).

Проблема: Dual writes — сохранение в БД и публикация события в разных транзакциях может привести к inconsistency.

Решение: События сохраняются в outbox таблицу в той же БД транзакции, затем асинхронно публикуются в Kafka.

Гарантия: Если транзакция успешна → событие обязательно попадет в Kafka (eventually).
❓ В чем проблема dual writes?
Dual writes — это когда нужно сделать два независимых write (например, INSERT в БД + publish в Kafka).

Проблемы:
  • DB success, Kafka fails → событие не опубликовано
  • DB fails, Kafka success → событие опубликовано для несуществующего entity
  • Kafka timeout → неизвестно, опубликовано или нет
  • Невозможно использовать database transaction для обоих writes
Outbox Pattern решает эту проблему через единую БД транзакцию.
❓ Debezium vs Polling Outbox — что выбрать?
Debezium (CDC):
  • Читает database transaction log (WAL) в реальном времени
  • Zero-latency (<100ms), no polling overhead
  • Лучше для production, high throughput
  • Сложнее setup (Kafka Connect, Debezium)
Polling:
  • Периодически SELECT из outbox таблицы
  • Higher latency (5-30 sec), polling overhead
  • Проще setup (100 строк кода)
  • Подходит для MVP, low throughput
Выбор: Debezium для production, Polling для простых случаев.
❓ Что такое Change Data Capture (CDC)?
Change Data Capture (CDC) — технология для отслеживания изменений в БД в реальном времени.

Как работает:
  • Читает database transaction log (PostgreSQL WAL, MySQL binlog)
  • Отслеживает INSERT, UPDATE, DELETE
  • Публикует изменения в Kafka в реальном времени
Debezium — популярная open-source CDC платформа на основе Kafka Connect.

Преимущества: zero polling overhead, near real-time, ordering guarantee.
❓ Как обрабатывать порядок событий в Outbox Pattern?
Для сохранения ordering:
  • Используйте aggregate_id как Kafka partition key → события одного aggregate в одной partition
  • Добавьте sequence_number в outbox для ordering внутри aggregate
  • Polling: ORDER BY created_at, id при чтении outbox
  • Debezium: автоматически сохраняет порядок из transaction log
Важно: Ordering гарантирован только в пределах одной partition в Kafka.
❓ Что делать с обработанными событиями в outbox таблице?
Три подхода:

1. Soft delete (рекомендуется):
  • UPDATE outbox SET processed = true
  • Периодическая архивация в cold storage
  • Полный audit trail
2. Hard delete:
  • DELETE FROM outbox WHERE processed = true
  • Экономия места, но риск потери audit trail
3. Partition by date:
  • Table partitioning по created_at
  • DROP старых partitions (fast cleanup)
Рекомендация: soft delete + архивация после 30 дней.

Тестируйте Outbox Pattern без риска

LightBox API — создайте Mock API для тестирования event-driven архитектуры

Начать бесплатно →

📝 Выводы

В этой статье мы рассмотрели Outbox Pattern для надежной доставки событий:

  1. Проблема: Dual writes (БД + Kafka) приводят к inconsistency
  2. Решение: Outbox таблица + асинхронная публикация
  3. Polling: Простой подход, higher latency
  4. CDC (Debezium): Real-time, zero overhead, production-ready
  5. Best Practices: ordering, cleanup, idempotency, monitoring

🎯 Главное:

Related Articles

← Вернуться к статьям