Saga Pattern: распределенные транзакции в микросервисах

Как обеспечить consistency данных в микросервисной архитектуре? ACID транзакции не работают между сервисами? В этом руководстве мы разберем Saga Pattern — решение для распределенных транзакций. Вы узнаете про Choreography vs Orchestration, compensating transactions и примеры реализации для Node.js, Python, Go.

📋 Содержание

  1. Проблема распределенных транзакций
  2. Что такое Saga Pattern?
  3. Choreography Saga (Event-driven)
  4. Orchestration Saga (Centralized)
  5. Compensating Transactions
  6. Choreography vs Orchestration
  7. Примеры реализации
  8. Best Practices
  9. FAQ: Часто задаваемые вопросы

❌ Проблема распределенных транзакций

В монолитном приложении транзакции просты — база данных гарантирует ACID свойства:

-- Monolithic: одна БД, одна транзакция
BEGIN TRANSACTION;
  INSERT INTO orders (user_id, total) VALUES (1, 100);
  UPDATE inventory SET stock = stock - 1 WHERE product_id = 123;
  INSERT INTO payments (order_id, amount) VALUES (LAST_INSERT_ID(), 100);
COMMIT;

-- Если что-то пошло не так → ROLLBACK, все изменения отменяются

❌ В микросервисах это не работает

Проблема: Каждый микросервис имеет свою базу данных. Нет способа сделать ACID транзакцию между разными БД.

Пример: E-commerce заказ

Что может пойти не так:

  1. Order Service создал заказ ✅
  2. Payment Service списал деньги ✅
  3. Inventory Service упал ❌ (недостаточно товара)
  4. Проблема: Деньги списаны, но заказ не выполнен! Как откатить платеж?

Почему 2PC (Two-Phase Commit) не работает?

❌ Проблемы Two-Phase Commit в микросервисах

🔄 Что такое Saga Pattern?

Saga Pattern — это паттерн для управления распределенными транзакциями, который разбивает длинную транзакцию на последовательность локальных транзакций.

✅ Как работает Saga

Каждый шаг Saga:

  1. Выполняет локальную транзакцию в своей БД
  2. Публикует событие о завершении
  3. Следующий сервис слушает событие и выполняет свой шаг

Если шаг fails:

Пример Saga: E-commerce заказ

✅ Happy Path (все успешно):
1. Create Order      → order_id=123 ✅
2. Reserve Payment   → payment reserved ✅
3. Reserve Inventory → stock decreased ✅
4. Create Shipping   → shipping created ✅
   ✅ SUCCESS

❌ Failure Path (шаг 3 failed):
1. Create Order      → order_id=123 ✅
2. Reserve Payment   → payment reserved ✅
3. Reserve Inventory → FAILED (out of stock) ❌
   → Trigger Compensating Transactions:
   4. Cancel Payment   → payment released 🔄
   5. Cancel Order     → order cancelled 🔄
   ❌ SAGA ABORTED (consistent state restored)

🎭 Choreography Saga (Decentralized)

Choreography (Хореография) — децентрализованный подход, где каждый сервис знает, что делать при получении события. Нет центрального координатора.

Как работает Choreography

✅ Преимущества:

❌ Недостатки:

Choreography Flow Example

Order Service  →  [OrderCreated event]
                      ↓
Payment Service  →  Listen to OrderCreated
                      ↓ Reserve payment
                      ↓
                   [PaymentReserved event]
                      ↓
Inventory Service → Listen to PaymentReserved
                      ↓ Reserve stock
                      ↓
                   [StockReserved event]
                      ↓
Shipping Service  → Listen to StockReserved
                      ↓ Create shipping
                      ↓
                   [ShippingCreated event]
                      ↓
                   ✅ SAGA COMPLETE

--- Если Inventory fails ---
Inventory Service → [StockReservationFailed event]
                      ↓
Payment Service   → Listen to failed event
                      ↓ Release payment (compensate)
                      ↓
                   [PaymentCancelled event]
                      ↓
Order Service     → Listen to cancelled event
                      ↓ Cancel order (compensate)
                      ↓
                   ❌ SAGA ABORTED

Реализация Choreography с Kafka (Node.js)

// order-service.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'order-service' });

// 1. Create Order
async function createOrder(userId, items, total) {
  await producer.connect();

  // Локальная транзакция
  const order = await db.orders.create({
    user_id: userId,
    items,
    total,
    status: 'PENDING'
  });

  // Публикуем событие
  await producer.send({
    topic: 'order-created',
    messages: [{
      key: order.id.toString(),
      value: JSON.stringify({
        orderId: order.id,
        userId,
        items,
        total
      })
    }]
  });

  console.log(`✅ Order ${order.id} created`);
  return order;
}

// Compensating transaction: Cancel order
async function cancelOrder(orderId) {
  await db.orders.update(
    { status: 'CANCELLED' },
    { where: { id: orderId } }
  );
  console.log(`🔄 Order ${orderId} cancelled (compensated)`);
}

// Listen to compensation events
await consumer.connect();
await consumer.subscribe({ topic: 'payment-failed' });

await consumer.run({
  eachMessage: async ({ topic, message }) => {
    const event = JSON.parse(message.value.toString());

    if (topic === 'payment-failed') {
      await cancelOrder(event.orderId);
    }
  }
});

module.exports = { createOrder };
// payment-service.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'payment-service' });

await consumer.connect();
await producer.connect();

// Listen to order-created events
await consumer.subscribe({ topic: 'order-created' });

await consumer.run({
  eachMessage: async ({ message }) => {
    const order = JSON.parse(message.value.toString());

    try {
      // Reserve payment
      const payment = await reservePayment(order.userId, order.total);

      // Publish success event
      await producer.send({
        topic: 'payment-reserved',
        messages: [{
          key: order.orderId.toString(),
          value: JSON.stringify({
            orderId: order.orderId,
            paymentId: payment.id,
            amount: order.total
          })
        }]
      });

      console.log(`✅ Payment reserved for order ${order.orderId}`);

    } catch (error) {
      // Publish failure event
      await producer.send({
        topic: 'payment-failed',
        messages: [{
          key: order.orderId.toString(),
          value: JSON.stringify({
            orderId: order.orderId,
            reason: error.message
          })
        }]
      });

      console.log(`❌ Payment failed for order ${order.orderId}`);
    }
  }
});

async function reservePayment(userId, amount) {
  // Check balance
  const wallet = await db.wallets.findOne({ where: { user_id: userId } });

  if (wallet.balance < amount) {
    throw new Error('Insufficient funds');
  }

  // Reserve (lock) the amount
  return await db.payments.create({
    user_id: userId,
    amount,
    status: 'RESERVED'
  });
}

// Compensating transaction
async function cancelPayment(paymentId) {
  await db.payments.update(
    { status: 'CANCELLED' },
    { where: { id: paymentId } }
  );
}

// Listen to compensation events
await consumer.subscribe({ topic: 'inventory-failed' });
// ... handle compensation

🎼 Orchestration Saga (Centralized)

Orchestration (Оркестрация) — централизованный подход, где есть Orchestrator который управляет всей Saga и вызывает сервисы.

Как работает Orchestration

✅ Преимущества:

❌ Недостатки:

Orchestration Flow Example

Orchestrator (Order Saga)
    ↓
1. Call Order Service → createOrder()
    ↓ Success
2. Call Payment Service → reservePayment()
    ↓ Success
3. Call Inventory Service → reserveStock()
    ↓ FAILED (out of stock)

Orchestrator triggers compensation:
4. Call Payment Service → cancelPayment() 🔄
5. Call Order Service → cancelOrder() 🔄
    ↓
❌ SAGA ABORTED

Orchestrator:
- Знает весь flow
- Отслеживает state каждого шага
- Вызывает compensation при ошибке
- Может retry шаги

Реализация Orchestration с Temporal.io (Node.js)

Temporal.io — workflow orchestration platform для распределенных систем.

# Установка Temporal
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
// activities.ts (сервисные вызовы)
import { Context } from '@temporalio/activity';

export async function createOrder(userId: number, items: any[], total: number) {
  console.log(`Creating order for user ${userId}`);

  const order = await db.orders.create({
    user_id: userId,
    items,
    total,
    status: 'PENDING'
  });

  return { orderId: order.id };
}

export async function reservePayment(userId: number, amount: number) {
  console.log(`Reserving payment: $${amount}`);

  const wallet = await db.wallets.findOne({ where: { user_id: userId } });

  if (wallet.balance < amount) {
    throw new Error('Insufficient funds');
  }

  const payment = await db.payments.create({
    user_id: userId,
    amount,
    status: 'RESERVED'
  });

  return { paymentId: payment.id };
}

export async function reserveInventory(items: any[]) {
  console.log(`Reserving inventory for ${items.length} items`);

  for (const item of items) {
    const product = await db.inventory.findOne({ where: { product_id: item.productId } });

    if (product.stock < item.quantity) {
      throw new Error(`Insufficient stock for product ${item.productId}`);
    }

    await db.inventory.decrement('stock', {
      by: item.quantity,
      where: { product_id: item.productId }
    });
  }

  return { success: true };
}

export async function createShipping(orderId: number, address: string) {
  console.log(`Creating shipping for order ${orderId}`);

  const shipping = await db.shipping.create({
    order_id: orderId,
    address,
    status: 'PENDING'
  });

  return { shippingId: shipping.id };
}

// Compensating transactions
export async function cancelOrder(orderId: number) {
  console.log(`🔄 Cancelling order ${orderId}`);
  await db.orders.update({ status: 'CANCELLED' }, { where: { id: orderId } });
}

export async function cancelPayment(paymentId: number) {
  console.log(`🔄 Cancelling payment ${paymentId}`);
  await db.payments.update({ status: 'CANCELLED' }, { where: { id: paymentId } });
}

export async function releaseInventory(items: any[]) {
  console.log(`🔄 Releasing inventory`);
  for (const item of items) {
    await db.inventory.increment('stock', {
      by: item.quantity,
      where: { product_id: item.productId }
    });
  }
}
// workflow.ts (Saga Orchestrator)
import {
  proxyActivities,
  defineSignal,
  defineQuery,
  setHandler,
} from '@temporalio/workflow';
import type * as activities from './activities';

const {
  createOrder,
  reservePayment,
  reserveInventory,
  createShipping,
  cancelOrder,
  cancelPayment,
  releaseInventory,
} = proxyActivities({
  startToCloseTimeout: '30 seconds',
  retry: {
    maximumAttempts: 3,
  },
});

export async function orderSaga(
  userId: number,
  items: any[],
  total: number,
  address: string
): Promise {
  let orderId: number | null = null;
  let paymentId: number | null = null;
  let inventoryReserved = false;

  try {
    // Step 1: Create Order
    const order = await createOrder(userId, items, total);
    orderId = order.orderId;
    console.log(`✅ Step 1: Order ${orderId} created`);

    // Step 2: Reserve Payment
    const payment = await reservePayment(userId, total);
    paymentId = payment.paymentId;
    console.log(`✅ Step 2: Payment ${paymentId} reserved`);

    // Step 3: Reserve Inventory
    await reserveInventory(items);
    inventoryReserved = true;
    console.log(`✅ Step 3: Inventory reserved`);

    // Step 4: Create Shipping
    const shipping = await createShipping(orderId, address);
    console.log(`✅ Step 4: Shipping ${shipping.shippingId} created`);

    return {
      success: true,
      orderId,
      paymentId,
      shippingId: shipping.shippingId
    };

  } catch (error) {
    console.error(`❌ Saga failed: ${error.message}`);

    // Compensation (rollback в обратном порядке)
    if (inventoryReserved) {
      await releaseInventory(items);
      console.log(`🔄 Compensated: Inventory released`);
    }

    if (paymentId) {
      await cancelPayment(paymentId);
      console.log(`🔄 Compensated: Payment cancelled`);
    }

    if (orderId) {
      await cancelOrder(orderId);
      console.log(`🔄 Compensated: Order cancelled`);
    }

    throw new Error(`Order Saga failed: ${error.message}`);
  }
}

// Query для отслеживания состояния Saga
export const sagaState = defineQuery('sagaState');
setHandler(sagaState, () => 'running');
// worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';

async function run() {
  const worker = await Worker.create({
    workflowsPath: require.resolve('./workflow'),
    activities,
    taskQueue: 'order-saga',
  });

  await worker.run();
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});
// client.ts (запуск Saga)
import { Connection, WorkflowClient } from '@temporalio/client';
import { orderSaga } from './workflow';

async function main() {
  const connection = await Connection.connect();
  const client = new WorkflowClient({ connection });

  // Запускаем Order Saga
  const handle = await client.start(orderSaga, {
    taskQueue: 'order-saga',
    workflowId: `order-saga-${Date.now()}`,
    args: [
      1,  // userId
      [{ productId: 123, quantity: 2 }],  // items
      200,  // total
      '123 Main St'  // address
    ],
  });

  console.log(`Started workflow ${handle.workflowId}`);

  // Ждем результата
  const result = await handle.result();
  console.log('Saga result:', result);
}

main();

🔄 Compensating Transactions

Compensating Transaction — это операция которая отменяет изменения сделанные предыдущим шагом Saga.

Ключевые принципы Compensating Transactions:

Forward Transaction Compensating Transaction
Create Order Cancel Order (set status = CANCELLED)
Reserve Payment Release Payment (unlock funds)
Decrease Inventory Increase Inventory (restore stock)
Send Email Send Cancellation Email
Charge Credit Card Refund Credit Card

Пример Compensating Transactions

✅ Forward Path

  1. Create Order: INSERT INTO orders → order_id=123
  2. Reserve Payment: UPDATE wallets SET balance = balance - 100
  3. Decrease Stock: UPDATE inventory SET stock = stock - 1
  4. Create Shipping: INSERT INTO shipping → shipping_id=456

🔄 Compensation Path (если шаг 4 failed)

  1. Cancel Shipping: — (шаг не выполнился)
  2. Restore Stock: UPDATE inventory SET stock = stock + 1
  3. Release Payment: UPDATE wallets SET balance = balance + 100
  4. Cancel Order: UPDATE orders SET status = 'CANCELLED'

📊 Choreography vs Orchestration сравнение

Критерий Choreography Orchestration
Координация Децентрализованная (event-driven) Централизованная (Orchestrator)
Сложность Проще для 2-4 шагов Лучше для 5+ шагов
Coupling Low (сервисы независимы) Medium (зависят от Orchestrator)
Visibility Сложно отследить flow Легко (все в Orchestrator)
Rollback Сложный (нужно отслеживать события) Простой (Orchestrator знает все шаги)
Single Point of Failure Нет Да (Orchestrator)
Retry Logic Каждый сервис сам Centralized в Orchestrator
Инструменты Kafka, RabbitMQ, AWS SNS/SQS Temporal.io, Camunda, AWS Step Functions
Testing Сложно (distributed) Проще (isolated Orchestrator)
Monitoring Distributed tracing нужен Built-in в Orchestrator
Когда использовать Простые workflows, event-driven Сложные workflows, нужен контроль

🎯 Когда что использовать?

Используйте Choreography если:

Используйте Orchestration если:

💻 Примеры реализации

Python: Orchestration с AWS Step Functions

# lambda_functions.py
import boto3
import json

def create_order_handler(event, context):
    """Step 1: Create Order"""
    user_id = event['userId']
    items = event['items']
    total = event['total']

    # Create order in DynamoDB
    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('orders')

    order_id = str(uuid.uuid4())
    table.put_item(Item={
        'order_id': order_id,
        'user_id': user_id,
        'items': items,
        'total': total,
        'status': 'PENDING'
    })

    return {
        'orderId': order_id,
        'userId': user_id,
        'items': items,
        'total': total
    }

def reserve_payment_handler(event, context):
    """Step 2: Reserve Payment"""
    user_id = event['userId']
    total = event['total']

    # Check balance and reserve
    # ... (similar logic)

    payment_id = str(uuid.uuid4())
    return {
        **event,
        'paymentId': payment_id
    }

def reserve_inventory_handler(event, context):
    """Step 3: Reserve Inventory"""
    items = event['items']

    # Check stock and reserve
    for item in items:
        # ... check stock logic
        if stock < item['quantity']:
            raise Exception(f"Insufficient stock for {item['productId']}")

    return event

# Compensating functions
def cancel_order_handler(event, context):
    """Compensation: Cancel Order"""
    order_id = event['orderId']

    dynamodb = boto3.resource('dynamodb')
    table = dynamodb.Table('orders')
    table.update_item(
        Key={'order_id': order_id},
        UpdateExpression='SET #status = :status',
        ExpressionAttributeNames={'#status': 'status'},
        ExpressionAttributeValues={':status': 'CANCELLED'}
    )

    return event

def cancel_payment_handler(event, context):
    """Compensation: Cancel Payment"""
    # ... release payment logic
    return event
// step-functions-definition.json
{
  "Comment": "Order Saga Orchestration",
  "StartAt": "CreateOrder",
  "States": {
    "CreateOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:CreateOrder",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "SagaFailed"
      }],
      "Next": "ReservePayment"
    },
    "ReservePayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:ReservePayment",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "CancelOrder"
      }],
      "Next": "ReserveInventory"
    },
    "ReserveInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:ReserveInventory",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "CompensatePayment"
      }],
      "Next": "CreateShipping"
    },
    "CreateShipping": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:CreateShipping",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "CompensateInventory"
      }],
      "End": true
    },

    "CompensateInventory": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:ReleaseInventory",
      "Next": "CompensatePayment"
    },
    "CompensatePayment": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:CancelPayment",
      "Next": "CancelOrder"
    },
    "CancelOrder": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:...:function:CancelOrder",
      "Next": "SagaFailed"
    },
    "SagaFailed": {
      "Type": "Fail",
      "Error": "SagaExecutionFailed",
      "Cause": "One or more steps in the saga failed"
    }
  }
}

✅ Best Practices для Saga Pattern

1. Idempotency обязательна

// Idempotent payment reservation
async function reservePayment(idempotencyKey, userId, amount) {
  // Check if already processed
  const existing = await db.payments.findOne({
    where: { idempotency_key: idempotencyKey }
  });

  if (existing) {
    return existing;  // Already processed, return same result
  }

  // Process payment
  const payment = await db.payments.create({
    idempotency_key: idempotencyKey,
    user_id: userId,
    amount,
    status: 'RESERVED'
  });

  return payment;
}

2. Timeout для каждого шага

3. Мониторинг и observability

4. Retry с exponential backoff

5. Eventual Consistency приемлема

🔍 FAQ: Часто задаваемые вопросы

❓ Что такое Saga Pattern и зачем он нужен?
Saga Pattern — это паттерн для управления распределенными транзакциями в микросервисной архитектуре. Разбивает длинную транзакцию на последовательность локальных транзакций, каждая из которых обновляет данные в своем сервисе и публикует событие для следующего шага.

Зачем нужен:
  • ACID транзакции не работают между микросервисами (разные БД)
  • 2PC (Two-Phase Commit) блокирует ресурсы и не масштабируется
  • Saga обеспечивает eventual consistency через compensating transactions
❓ В чем разница между Choreography и Orchestration Saga?
Choreography (Хореография) — децентрализованный подход:
  • Сервисы слушают события и реагируют
  • Нет центрального координатора
  • Лучше для простых workflows (2-4 шага)
Orchestration (Оркестрация) — централизованный подход:
  • Есть Orchestrator который управляет всей транзакцией
  • Вызывает сервисы по очереди
  • Лучше для сложных workflows (5+ шагов)
  • Легкий rollback, visibility всего процесса
❓ Что такое Compensating Transaction?
Compensating Transaction (компенсирующая транзакция) — это операция которая отменяет изменения сделанные предыдущим шагом Saga.

Примеры:
  • Reserve Payment → Release Payment (compensation)
  • Decrease Stock → Increase Stock (compensation)
  • Send Email → Send Cancellation Email (compensation)
Важно: Compensation должна быть idempotent (можно вызвать несколько раз).
❓ Почему 2PC (Two-Phase Commit) не работает для микросервисов?
Проблемы 2PC в распределенных системах:
  • Блокировка ресурсов: 2PC блокирует БД на время транзакции → снижает availability
  • Single Point of Failure: Coordinator падает → все транзакции заморожены
  • NoSQL не поддерживает: MongoDB, Cassandra не поддерживают 2PC
  • Плохо масштабируется: Чем больше участников, тем выше latency
  • Timeout issues: Что если один сервис не отвечает 30 секунд?
Saga Pattern решает эти проблемы через асинхронные, не блокирующие операции.
❓ Когда использовать Choreography, а когда Orchestration?
Choreography — для:
  • Простых workflows (2-4 шага)
  • Event-driven архитектуры
  • Когда важна decoupling между сервисами
Orchestration — для:
  • Сложных workflows (5+ шагов)
  • Когда нужна централизованная логика
  • Visibility всего процесса
  • Легкий rollback
  • Условная логика (if/else)
❓ Какие инструменты использовать для Saga Pattern?
Choreography (event streaming):
  • Apache Kafka — distributed event streaming
  • RabbitMQ — message broker
  • AWS SNS/SQS — managed pub/sub
Orchestration (workflow engines):
  • Temporal.io — modern workflow orchestration (рекомендуется)
  • Camunda — BPMN-based workflow engine
  • Netflix Conductor — microservices orchestration
  • AWS Step Functions — managed workflows

Тестируйте Saga Pattern без риска

LightBox API — создайте Mock API для тестирования распределенных транзакций

Начать бесплатно →

📝 Выводы

В этой статье мы рассмотрели Saga Pattern для распределенных транзакций в микросервисах:

  1. Проблема: ACID транзакции не работают между микросервисами, 2PC не масштабируется
  2. Решение: Saga разбивает транзакцию на локальные шаги с compensating transactions
  3. Choreography: Децентрализованный, event-driven, для простых workflows
  4. Orchestration: Централизованный, для сложных workflows, легкий rollback
  5. Compensating Transactions: Semantic rollback для отмены изменений
  6. Инструменты: Kafka/RabbitMQ для Choreography, Temporal.io/Camunda для Orchestration

🎯 Главное:

Related Articles

← Вернуться к статьям