Event-Driven Architecture: Kafka vs RabbitMQ vs AWS SNS/SQS

Как построить масштабируемую систему с асинхронной коммуникацией между сервисами? В этом руководстве мы разберем Event-Driven Architecture и сравним три популярных решения: Apache Kafka, RabbitMQ и AWS SNS/SQS. Вы узнаете про throughput, latency, ordering, Event Sourcing, CQRS и примеры реализации.

📋 Содержание

  1. Что такое Event-Driven Architecture?
  2. Apache Kafka: distributed streaming
  3. RabbitMQ: traditional message broker
  4. AWS SNS/SQS: managed services
  5. Детальное сравнение
  6. Event Sourcing pattern
  7. CQRS (Command Query Responsibility Segregation)
  8. Примеры реализации
  9. Best Practices
  10. FAQ: Часто задаваемые вопросы

📡 Что такое Event-Driven Architecture?

Event-Driven Architecture (EDA) — это архитектурный паттерн где компоненты системы взаимодействуют через события (events). Producers публикуют события, Consumers подписываются и реагируют асинхронно.

✅ Преимущества Event-Driven Architecture

Event-Driven Architecture Flow

Producer (Order Service)
    ↓
Publish Event: "OrderCreated"
    ↓
Event Bus (Kafka / RabbitMQ / SNS)
    ↓
    ├─→ Consumer 1 (Payment Service) → Process payment
    ├─→ Consumer 2 (Inventory Service) → Reserve stock
    ├─→ Consumer 3 (Email Service) → Send confirmation
    └─→ Consumer 4 (Analytics Service) → Track metrics

Асинхронная обработка, каждый Consumer независим

Request-Response vs Event-Driven

Критерий Request-Response (REST) Event-Driven (EDA)
Coupling Tight (сервисы знают друг о друге) Loose (знают только про события)
Communication Synchronous (блокирующий) Asynchronous (не блокирующий)
Failure Handling Immediate error (timeout) Retry, dead letter queue
Scalability Сложнее (нужен load balancer) Проще (consumers масштабируются)
Latency Lower (прямой вызов) Higher (через event bus)
Use Case CRUD operations, queries Notifications, workflows, streaming

🌊 Apache Kafka: Distributed Streaming Platform

Apache Kafka — это distributed event streaming platform, изначально созданный LinkedIn. Kafka — это не просто message broker, это distributed commit log.

Kafka: Ключевые концепции

Kafka Architecture

Topic: "orders" (3 partitions, replication factor 2)

Partition 0:  [msg1] [msg2] [msg3] [msg4] ...
              ↓ replicated to Broker 2
Partition 1:  [msg5] [msg6] [msg7] [msg8] ...
              ↓ replicated to Broker 3
Partition 2:  [msg9] [msg10] [msg11] [msg12] ...
              ↓ replicated to Broker 1

Consumer Group "payment-service" (3 consumers):
  Consumer 1 → reads Partition 0
  Consumer 2 → reads Partition 1
  Consumer 3 → reads Partition 2

Parallel processing, каждый consumer читает свою partition

Kafka: Преимущества

✅ Почему Kafka?

Kafka: Недостатки

❌ Сложности Kafka

Kafka Producer (Node.js)

// kafka-producer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094']
});

const producer = kafka.producer();

async function publishOrderCreated(order) {
  await producer.connect();

  // Publish event to "orders" topic
  await producer.send({
    topic: 'orders',
    messages: [
      {
        key: order.id.toString(),  // Key для partitioning
        value: JSON.stringify({
          eventType: 'OrderCreated',
          orderId: order.id,
          userId: order.userId,
          items: order.items,
          total: order.total,
          timestamp: new Date().toISOString()
        }),
        headers: {
          'correlation-id': order.correlationId
        }
      }
    ]
  });

  console.log(`✅ Event published: OrderCreated ${order.id}`);
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  await producer.disconnect();
});

module.exports = { publishOrderCreated };

Kafka Consumer (Node.js)

// kafka-consumer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'payment-service',
  brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094']
});

const consumer = kafka.consumer({
  groupId: 'payment-service-group',
  // Retry configuration
  retry: {
    initialRetryTime: 100,
    retries: 8
  }
});

async function startConsumer() {
  await consumer.connect();

  // Subscribe to "orders" topic
  await consumer.subscribe({
    topic: 'orders',
    fromBeginning: false  // Start from latest offset
  });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      const event = JSON.parse(message.value.toString());

      console.log(`📨 Received event: ${event.eventType} from partition ${partition}`);

      try {
        if (event.eventType === 'OrderCreated') {
          await processPayment(event);
        }

        // Commit offset after successful processing
        // (auto-commit is enabled by default)

      } catch (error) {
        console.error(`❌ Error processing event: ${error.message}`);
        // Event will be retried or sent to DLQ
        throw error;
      }
    }
  });
}

async function processPayment(event) {
  console.log(`💳 Processing payment for order ${event.orderId}`);

  // Reserve payment
  const payment = await db.payments.create({
    order_id: event.orderId,
    user_id: event.userId,
    amount: event.total,
    status: 'RESERVED'
  });

  // Publish PaymentReserved event
  await publishPaymentReserved(payment);
}

startConsumer().catch(console.error);

🐰 RabbitMQ: Traditional Message Broker

RabbitMQ — это traditional message broker, реализующий AMQP (Advanced Message Queuing Protocol). Отлично подходит для task queues и routing.

RabbitMQ: Ключевые концепции

RabbitMQ Architecture

Producer → Exchange (type: topic) → Queues → Consumers

Exchange "orders" (topic):
    ↓ routing key: "order.created"
    ├─→ Queue "payment-service" → Consumer 1
    ├─→ Queue "inventory-service" → Consumer 2
    └─→ Queue "email-service" → Consumer 3

    ↓ routing key: "order.cancelled"
    ├─→ Queue "payment-service" → Consumer 1
    └─→ Queue "email-service" → Consumer 3

Flexible routing, push-based delivery

RabbitMQ: Преимущества

✅ Почему RabbitMQ?

RabbitMQ: Недостатки

❌ Ограничения RabbitMQ

RabbitMQ Producer (Node.js)

// rabbitmq-producer.js
const amqp = require('amqplib');

let connection, channel;

async function connect() {
  connection = await amqp.connect('amqp://localhost:5672');
  channel = await connection.createChannel();

  // Declare exchange
  await channel.assertExchange('orders', 'topic', { durable: true });

  console.log('✅ Connected to RabbitMQ');
}

async function publishOrderCreated(order) {
  const event = {
    eventType: 'OrderCreated',
    orderId: order.id,
    userId: order.userId,
    items: order.items,
    total: order.total,
    timestamp: new Date().toISOString()
  };

  // Publish to exchange with routing key
  channel.publish(
    'orders',                    // exchange
    'order.created',             // routing key
    Buffer.from(JSON.stringify(event)),
    {
      persistent: true,          // Save to disk
      contentType: 'application/json',
      headers: {
        'correlation-id': order.correlationId
      }
    }
  );

  console.log(`✅ Event published: OrderCreated ${order.id}`);
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  await channel.close();
  await connection.close();
});

connect().catch(console.error);

module.exports = { publishOrderCreated };

RabbitMQ Consumer (Node.js)

// rabbitmq-consumer.js
const amqp = require('amqplib');

async function startConsumer() {
  const connection = await amqp.connect('amqp://localhost:5672');
  const channel = await connection.createChannel();

  // Declare exchange
  await channel.assertExchange('orders', 'topic', { durable: true });

  // Declare queue
  const queue = 'payment-service-queue';
  await channel.assertQueue(queue, { durable: true });

  // Bind queue to exchange with routing key pattern
  await channel.bindQueue(queue, 'orders', 'order.*');

  // Set prefetch (how many messages to process in parallel)
  channel.prefetch(10);

  console.log('✅ Waiting for messages...');

  // Consume messages
  channel.consume(queue, async (msg) => {
    if (msg !== null) {
      const event = JSON.parse(msg.content.toString());

      console.log(`📨 Received event: ${event.eventType}`);

      try {
        if (event.eventType === 'OrderCreated') {
          await processPayment(event);
        }

        // Acknowledge message (remove from queue)
        channel.ack(msg);

      } catch (error) {
        console.error(`❌ Error processing event: ${error.message}`);

        // Reject and requeue (or send to DLQ)
        channel.nack(msg, false, false);  // false = don't requeue
      }
    }
  });
}

async function processPayment(event) {
  console.log(`💳 Processing payment for order ${event.orderId}`);

  // Reserve payment
  const payment = await db.payments.create({
    order_id: event.orderId,
    user_id: event.userId,
    amount: event.total,
    status: 'RESERVED'
  });
}

startConsumer().catch(console.error);

☁️ AWS SNS/SQS: Managed Services

AWS SNS (Simple Notification Service) — pub/sub для fan-out сообщений.
AWS SQS (Simple Queue Service) — managed message queue.

AWS SNS/SQS: Ключевые концепции

AWS SNS → SQS Architecture

Producer (Lambda / API)
    ↓
SNS Topic "orders"
    ↓
    ├─→ SQS Queue "payment-service" → Lambda Consumer
    ├─→ SQS Queue "inventory-service" → Lambda Consumer
    └─→ SQS Queue "email-service" → Lambda Consumer

Fan-out pattern: 1 message → N queues

AWS SNS/SQS: Преимущества

✅ Почему AWS SNS/SQS?

AWS SNS/SQS: Недостатки

❌ Ограничения AWS SNS/SQS

AWS SNS Producer (Node.js)

// aws-sns-producer.js
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');

const snsClient = new SNSClient({ region: 'us-east-1' });

async function publishOrderCreated(order) {
  const event = {
    eventType: 'OrderCreated',
    orderId: order.id,
    userId: order.userId,
    items: order.items,
    total: order.total,
    timestamp: new Date().toISOString()
  };

  const command = new PublishCommand({
    TopicArn: 'arn:aws:sns:us-east-1:123456789012:orders',
    Message: JSON.stringify(event),
    MessageAttributes: {
      eventType: {
        DataType: 'String',
        StringValue: 'OrderCreated'
      }
    }
  });

  const response = await snsClient.send(command);
  console.log(`✅ Event published to SNS: ${response.MessageId}`);
}

module.exports = { publishOrderCreated };

AWS SQS Consumer (Node.js)

// aws-sqs-consumer.js
const { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');

const sqsClient = new SQSClient({ region: 'us-east-1' });
const queueUrl = 'https://sqs.us-east-1.amazonaws.com/123456789012/payment-service-queue';

async function pollMessages() {
  while (true) {
    try {
      // Long polling (wait up to 20 seconds)
      const command = new ReceiveMessageCommand({
        QueueUrl: queueUrl,
        MaxNumberOfMessages: 10,
        WaitTimeSeconds: 20,
        VisibilityTimeout: 30
      });

      const response = await sqsClient.send(command);

      if (response.Messages) {
        for (const message of response.Messages) {
          await processMessage(message);
        }
      }

    } catch (error) {
      console.error(`❌ Error polling SQS: ${error.message}`);
      await new Promise(resolve => setTimeout(resolve, 5000));
    }
  }
}

async function processMessage(message) {
  try {
    const snsMessage = JSON.parse(message.Body);
    const event = JSON.parse(snsMessage.Message);

    console.log(`📨 Received event: ${event.eventType}`);

    if (event.eventType === 'OrderCreated') {
      await processPayment(event);
    }

    // Delete message from queue
    await sqsClient.send(new DeleteMessageCommand({
      QueueUrl: queueUrl,
      ReceiptHandle: message.ReceiptHandle
    }));

    console.log(`✅ Message processed and deleted`);

  } catch (error) {
    console.error(`❌ Error processing message: ${error.message}`);
    // Message will become visible again after VisibilityTimeout
  }
}

async function processPayment(event) {
  console.log(`💳 Processing payment for order ${event.orderId}`);
  // ... payment logic
}

pollMessages().catch(console.error);

📊 Kafka vs RabbitMQ vs AWS SNS/SQS: Детальное сравнение

Критерий Apache Kafka RabbitMQ AWS SNS/SQS
Тип Distributed log Message broker Managed pub/sub + queue
Throughput 1M+ msg/sec ~50K msg/sec ~100K msg/sec
Latency 10-50ms (batch) 1-10ms (push) 50-200ms
Delivery Pull-based Push-based Pull-based (SQS)
Ordering Per partition Per queue FIFO queue only
Retention Unlimited (configurable) До ACK (ephemeral) Max 14 days
Replay ✅ Да (time travel) ❌ Нет ❌ Нет
Durability Replication + disk Disk persistence Redundant storage
Scalability Horizontal (partitions) Vertical + clustering Auto-scaling
Complexity High (ZooKeeper, brokers) Medium Low (managed)
Operational Cost High (self-managed) Medium Low (pay-per-use)
Use Case Event streaming, logs, analytics Task queues, RPC Serverless, AWS ecosystem

🎯 Когда что использовать?

Используйте Kafka если:

Используйте RabbitMQ если:

Используйте AWS SNS/SQS если:

📜 Event Sourcing Pattern

Event Sourcing — это паттерн где состояние системы хранится как последовательность событий, а не current state. Все изменения записываются как events в append-only log.

✅ Преимущества Event Sourcing

Event Sourcing Example: Bank Account

Traditional (current state only):
Account { id: 123, balance: 1000 }

Event Sourcing (event log):
1. AccountCreated    { accountId: 123, initialBalance: 0 }
2. MoneyDeposited    { accountId: 123, amount: 500 }
3. MoneyDeposited    { accountId: 123, amount: 1000 }
4. MoneyWithdrawn    { accountId: 123, amount: 500 }

Current Balance = sum of all events = 1000

Можно восстановить баланс на любой момент времени!

Event Sourcing Implementation (Node.js + Kafka)

// event-store.js (Event Sourcing with Kafka)
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();

// Append event to event store (Kafka topic)
async function appendEvent(aggregateId, eventType, eventData) {
  await producer.send({
    topic: 'account-events',  // Event store topic
    messages: [{
      key: aggregateId,
      value: JSON.stringify({
        aggregateId,
        eventType,
        eventData,
        timestamp: new Date().toISOString(),
        version: await getNextVersion(aggregateId)
      })
    }]
  });
}

// Replay events to rebuild state
async function replayEvents(aggregateId) {
  const consumer = kafka.consumer({ groupId: `replay-${Date.now()}` });
  await consumer.connect();
  await consumer.subscribe({ topic: 'account-events', fromBeginning: true });

  let state = { balance: 0 };

  await consumer.run({
    eachMessage: async ({ message }) => {
      const event = JSON.parse(message.value.toString());

      if (event.aggregateId === aggregateId) {
        state = applyEvent(state, event);
      }
    }
  });

  return state;
}

function applyEvent(state, event) {
  switch (event.eventType) {
    case 'AccountCreated':
      return { ...state, balance: event.eventData.initialBalance };
    case 'MoneyDeposited':
      return { ...state, balance: state.balance + event.eventData.amount };
    case 'MoneyWithdrawn':
      return { ...state, balance: state.balance - event.eventData.amount };
    default:
      return state;
  }
}

// Usage
await appendEvent('account-123', 'AccountCreated', { initialBalance: 0 });
await appendEvent('account-123', 'MoneyDeposited', { amount: 500 });
await appendEvent('account-123', 'MoneyWithdrawn', { amount: 200 });

const currentState = await replayEvents('account-123');
console.log(currentState);  // { balance: 300 }

🔀 CQRS (Command Query Responsibility Segregation)

CQRS — это паттерн разделения write model (commands) и read model (queries). Часто используется вместе с Event Sourcing.

CQRS Architecture

Write Side (Commands):
Client → Command → Command Handler → Event Store (Kafka)
                                          ↓
                                    Publish Event
                                          ↓
Read Side (Queries):                      ↓
Event Consumer ← Event ← Event Store (Kafka)
      ↓
Update Read Model (PostgreSQL / Elasticsearch)
      ↓
Client ← Query ← Read Model

Разделение: Commands изменяют state, Queries читают

✅ Преимущества CQRS

✅ Best Practices для Event-Driven Architecture

1. Idempotency обязательна

Consumers должны быть idempotent — обработка одного события несколько раз не должна менять результат.

2. Event Schema Versioning

События живут долго, schema может меняться.

3. Dead Letter Queues

Что делать с failed messages?

4. Monitoring и Observability

🔍 FAQ: Часто задаваемые вопросы

❓ Что такое Event-Driven Architecture?
Event-Driven Architecture (EDA) — это архитектурный паттерн где компоненты системы взаимодействуют через события (events). Producers публикуют события, Consumers подписываются и реагируют асинхронно.

Преимущества:
  • Loose coupling между сервисами
  • Scalability (consumers масштабируются независимо)
  • Resilience (события не теряются если consumer падает)
  • Real-time обработка
❓ В чем разница между Kafka и RabbitMQ?
Apache Kafka — distributed log для high-throughput streaming:
  • 1M+ messages/sec
  • Сохраняет события навсегда (retention)
  • Pull-based (consumers читают сами)
  • Replay events (time travel)
RabbitMQ — traditional message broker для task queues:
  • ~50K messages/sec
  • Удаляет сообщения после ACK
  • Push-based (broker отправляет consumers)
  • Lower latency, flexible routing
Выбор: Kafka для event streaming, RabbitMQ для task processing.
❓ Когда использовать AWS SNS/SQS вместо Kafka?
AWS SNS/SQS — managed services, не требуют инфраструктуры.

Используйте если:
  • Serverless architecture (AWS Lambda)
  • Throughput <100K messages/sec
  • Не нужна долгосрочная retention (>14 дней)
  • Хотите избежать операционной сложности Kafka
  • AWS ecosystem (S3, DynamoDB, etc.)
Kafka лучше если: high-throughput streaming, event sourcing, replay events.
❓ Что такое Event Sourcing?
Event Sourcing — паттерн где состояние системы хранится как последовательность событий, а не current state.

Как работает:
  • Все изменения записываются как events в append-only log
  • Current state = replay всех events
  • Можно восстановить любое прошлое состояние
Преимущества: полный audit trail, time travel debugging, event replay.
❓ Что такое CQRS и как связано с Event-Driven Architecture?
CQRS (Command Query Responsibility Segregation) — разделение write model (commands) и read model (queries).

Связь с EDA:
  • Commands генерируют events
  • Events публикуются в event bus (Kafka)
  • Event consumers обновляют read models
Преимущества: оптимизация reads и writes независимо, scalability, multiple read models.
❓ At-least-once vs exactly-once delivery — что выбрать?
At-least-once — сообщение доставлено минимум 1 раз:
  • Могут быть дубликаты
  • Проще реализовать
  • RabbitMQ, AWS SQS (standard)
Exactly-once — доставлено ровно 1 раз:
  • Нет дубликатов
  • Сложнее и дороже
  • Kafka (transactional producer/consumer)
Выбор: Если consumers idempotent → at-least-once достаточно. Если idempotency сложна → exactly-once.

Тестируйте Event-Driven Architecture без риска

LightBox API — создайте Mock API для тестирования асинхронных событий

Начать бесплатно →

📝 Выводы

В этой статье мы рассмотрели Event-Driven Architecture и сравнили три решения:

  1. Apache Kafka: High-throughput streaming (1M+ msg/sec), event sourcing, replay
  2. RabbitMQ: Task queues, low latency, flexible routing
  3. AWS SNS/SQS: Managed services, serverless, AWS ecosystem
  4. Event Sourcing: Хранение state как последовательность events
  5. CQRS: Разделение write и read models
  6. Best Practices: Idempotency, schema versioning, DLQ, monitoring

🎯 Главное:

Related Articles

← Вернуться к статьям