Как обеспечить consistency данных в микросервисной архитектуре? ACID транзакции не работают между сервисами? В этом руководстве мы разберем Saga Pattern — решение для распределенных транзакций. Вы узнаете про Choreography vs Orchestration, compensating transactions и примеры реализации для Node.js, Python, Go.
📋 Содержание
❌ Проблема распределенных транзакций
В монолитном приложении транзакции просты — база данных гарантирует ACID свойства:
-- Monolithic: одна БД, одна транзакция
BEGIN TRANSACTION;
INSERT INTO orders (user_id, total) VALUES (1, 100);
UPDATE inventory SET stock = stock - 1 WHERE product_id = 123;
INSERT INTO payments (order_id, amount) VALUES (LAST_INSERT_ID(), 100);
COMMIT;
-- Если что-то пошло не так → ROLLBACK, все изменения отменяются
❌ В микросервисах это не работает
Проблема: Каждый микросервис имеет свою базу данных. Нет способа сделать ACID транзакцию между разными БД.
Пример: E-commerce заказ
- Order Service (PostgreSQL) — создает заказ
- Payment Service (MySQL) — списывает деньги
- Inventory Service (MongoDB) — уменьшает количество товара
- Shipping Service (PostgreSQL) — создает доставку
Что может пойти не так:
- Order Service создал заказ ✅
- Payment Service списал деньги ✅
- Inventory Service упал ❌ (недостаточно товара)
- Проблема: Деньги списаны, но заказ не выполнен! Как откатить платеж?
Почему 2PC (Two-Phase Commit) не работает?
❌ Проблемы Two-Phase Commit в микросервисах
- Блокировки: 2PC блокирует ресурсы на время транзакции → снижает availability
- Single Point of Failure: Coordinator падает → все транзакции заморожены
- NoSQL не поддерживает: MongoDB, Cassandra не поддерживают 2PC
- Не масштабируется: Чем больше участников, тем выше latency
- Timeout issues: Что если один сервис не отвечает 30 секунд?
🔄 Что такое Saga Pattern?
Saga Pattern — это паттерн для управления распределенными транзакциями, который разбивает длинную транзакцию на последовательность локальных транзакций.
✅ Как работает Saga
Каждый шаг Saga:
- Выполняет локальную транзакцию в своей БД
- Публикует событие о завершении
- Следующий сервис слушает событие и выполняет свой шаг
Если шаг fails:
- Saga выполняет Compensating Transactions (компенсирующие транзакции)
- Отменяет изменения всех предыдущих успешных шагов
- Возвращает систему в consistent state
Пример Saga: E-commerce заказ
✅ Happy Path (все успешно):
1. Create Order → order_id=123 ✅
2. Reserve Payment → payment reserved ✅
3. Reserve Inventory → stock decreased ✅
4. Create Shipping → shipping created ✅
✅ SUCCESS
❌ Failure Path (шаг 3 failed):
1. Create Order → order_id=123 ✅
2. Reserve Payment → payment reserved ✅
3. Reserve Inventory → FAILED (out of stock) ❌
→ Trigger Compensating Transactions:
4. Cancel Payment → payment released 🔄
5. Cancel Order → order cancelled 🔄
❌ SAGA ABORTED (consistent state restored)
🎭 Choreography Saga (Decentralized)
Choreography (Хореография) — децентрализованный подход, где каждый сервис знает, что делать при получении события. Нет центрального координатора.
Как работает Choreography
- Каждый сервис слушает события (event bus: Kafka, RabbitMQ)
- Когда получает событие → выполняет локальную транзакцию
- Публикует новое событие о результате
- Нет центрального Orchestrator
✅ Преимущества:
- Decoupling — сервисы независимы
- Нет single point of failure
- Простая для простых workflows (2-4 шага)
❌ Недостатки:
- Сложно понять flow (нет единого места)
- Cyclic dependencies между сервисами
- Сложный rollback для длинных Sagas
Choreography Flow Example
Order Service → [OrderCreated event]
↓
Payment Service → Listen to OrderCreated
↓ Reserve payment
↓
[PaymentReserved event]
↓
Inventory Service → Listen to PaymentReserved
↓ Reserve stock
↓
[StockReserved event]
↓
Shipping Service → Listen to StockReserved
↓ Create shipping
↓
[ShippingCreated event]
↓
✅ SAGA COMPLETE
--- Если Inventory fails ---
Inventory Service → [StockReservationFailed event]
↓
Payment Service → Listen to failed event
↓ Release payment (compensate)
↓
[PaymentCancelled event]
↓
Order Service → Listen to cancelled event
↓ Cancel order (compensate)
↓
❌ SAGA ABORTED
Реализация Choreography с Kafka (Node.js)
// order-service.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'order-service' });
// 1. Create Order
async function createOrder(userId, items, total) {
await producer.connect();
// Локальная транзакция
const order = await db.orders.create({
user_id: userId,
items,
total,
status: 'PENDING'
});
// Публикуем событие
await producer.send({
topic: 'order-created',
messages: [{
key: order.id.toString(),
value: JSON.stringify({
orderId: order.id,
userId,
items,
total
})
}]
});
console.log(`✅ Order ${order.id} created`);
return order;
}
// Compensating transaction: Cancel order
async function cancelOrder(orderId) {
await db.orders.update(
{ status: 'CANCELLED' },
{ where: { id: orderId } }
);
console.log(`🔄 Order ${orderId} cancelled (compensated)`);
}
// Listen to compensation events
await consumer.connect();
await consumer.subscribe({ topic: 'payment-failed' });
await consumer.run({
eachMessage: async ({ topic, message }) => {
const event = JSON.parse(message.value.toString());
if (topic === 'payment-failed') {
await cancelOrder(event.orderId);
}
}
});
module.exports = { createOrder };
// payment-service.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ brokers: ['localhost:9092'] });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'payment-service' });
await consumer.connect();
await producer.connect();
// Listen to order-created events
await consumer.subscribe({ topic: 'order-created' });
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
try {
// Reserve payment
const payment = await reservePayment(order.userId, order.total);
// Publish success event
await producer.send({
topic: 'payment-reserved',
messages: [{
key: order.orderId.toString(),
value: JSON.stringify({
orderId: order.orderId,
paymentId: payment.id,
amount: order.total
})
}]
});
console.log(`✅ Payment reserved for order ${order.orderId}`);
} catch (error) {
// Publish failure event
await producer.send({
topic: 'payment-failed',
messages: [{
key: order.orderId.toString(),
value: JSON.stringify({
orderId: order.orderId,
reason: error.message
})
}]
});
console.log(`❌ Payment failed for order ${order.orderId}`);
}
}
});
async function reservePayment(userId, amount) {
// Check balance
const wallet = await db.wallets.findOne({ where: { user_id: userId } });
if (wallet.balance < amount) {
throw new Error('Insufficient funds');
}
// Reserve (lock) the amount
return await db.payments.create({
user_id: userId,
amount,
status: 'RESERVED'
});
}
// Compensating transaction
async function cancelPayment(paymentId) {
await db.payments.update(
{ status: 'CANCELLED' },
{ where: { id: paymentId } }
);
}
// Listen to compensation events
await consumer.subscribe({ topic: 'inventory-failed' });
// ... handle compensation
🎼 Orchestration Saga (Centralized)
Orchestration (Оркестрация) — централизованный подход, где есть Orchestrator который управляет всей Saga и вызывает сервисы.
Как работает Orchestration
- Есть центральный Orchestrator (workflow engine)
- Orchestrator знает весь flow Saga
- Вызывает сервисы по очереди (sync или async)
- Отслеживает состояние каждого шага
- При ошибке вызывает compensating transactions
✅ Преимущества:
- Centralized control — легко понять flow
- Простой rollback (Orchestrator знает все шаги)
- Легко добавить новые шаги
- Visibility всего процесса
❌ Недостатки:
- Single point of failure (Orchestrator)
- Coupling — сервисы зависят от Orchestrator
- Orchestrator может стать bottleneck
Orchestration Flow Example
Orchestrator (Order Saga)
↓
1. Call Order Service → createOrder()
↓ Success
2. Call Payment Service → reservePayment()
↓ Success
3. Call Inventory Service → reserveStock()
↓ FAILED (out of stock)
Orchestrator triggers compensation:
4. Call Payment Service → cancelPayment() 🔄
5. Call Order Service → cancelOrder() 🔄
↓
❌ SAGA ABORTED
Orchestrator:
- Знает весь flow
- Отслеживает state каждого шага
- Вызывает compensation при ошибке
- Может retry шаги
Реализация Orchestration с Temporal.io (Node.js)
Temporal.io — workflow orchestration platform для распределенных систем.
# Установка Temporal
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
// activities.ts (сервисные вызовы)
import { Context } from '@temporalio/activity';
export async function createOrder(userId: number, items: any[], total: number) {
console.log(`Creating order for user ${userId}`);
const order = await db.orders.create({
user_id: userId,
items,
total,
status: 'PENDING'
});
return { orderId: order.id };
}
export async function reservePayment(userId: number, amount: number) {
console.log(`Reserving payment: $${amount}`);
const wallet = await db.wallets.findOne({ where: { user_id: userId } });
if (wallet.balance < amount) {
throw new Error('Insufficient funds');
}
const payment = await db.payments.create({
user_id: userId,
amount,
status: 'RESERVED'
});
return { paymentId: payment.id };
}
export async function reserveInventory(items: any[]) {
console.log(`Reserving inventory for ${items.length} items`);
for (const item of items) {
const product = await db.inventory.findOne({ where: { product_id: item.productId } });
if (product.stock < item.quantity) {
throw new Error(`Insufficient stock for product ${item.productId}`);
}
await db.inventory.decrement('stock', {
by: item.quantity,
where: { product_id: item.productId }
});
}
return { success: true };
}
export async function createShipping(orderId: number, address: string) {
console.log(`Creating shipping for order ${orderId}`);
const shipping = await db.shipping.create({
order_id: orderId,
address,
status: 'PENDING'
});
return { shippingId: shipping.id };
}
// Compensating transactions
export async function cancelOrder(orderId: number) {
console.log(`🔄 Cancelling order ${orderId}`);
await db.orders.update({ status: 'CANCELLED' }, { where: { id: orderId } });
}
export async function cancelPayment(paymentId: number) {
console.log(`🔄 Cancelling payment ${paymentId}`);
await db.payments.update({ status: 'CANCELLED' }, { where: { id: paymentId } });
}
export async function releaseInventory(items: any[]) {
console.log(`🔄 Releasing inventory`);
for (const item of items) {
await db.inventory.increment('stock', {
by: item.quantity,
where: { product_id: item.productId }
});
}
}
// workflow.ts (Saga Orchestrator)
import {
proxyActivities,
defineSignal,
defineQuery,
setHandler,
} from '@temporalio/workflow';
import type * as activities from './activities';
const {
createOrder,
reservePayment,
reserveInventory,
createShipping,
cancelOrder,
cancelPayment,
releaseInventory,
} = proxyActivities({
startToCloseTimeout: '30 seconds',
retry: {
maximumAttempts: 3,
},
});
export async function orderSaga(
userId: number,
items: any[],
total: number,
address: string
): Promise {
let orderId: number | null = null;
let paymentId: number | null = null;
let inventoryReserved = false;
try {
// Step 1: Create Order
const order = await createOrder(userId, items, total);
orderId = order.orderId;
console.log(`✅ Step 1: Order ${orderId} created`);
// Step 2: Reserve Payment
const payment = await reservePayment(userId, total);
paymentId = payment.paymentId;
console.log(`✅ Step 2: Payment ${paymentId} reserved`);
// Step 3: Reserve Inventory
await reserveInventory(items);
inventoryReserved = true;
console.log(`✅ Step 3: Inventory reserved`);
// Step 4: Create Shipping
const shipping = await createShipping(orderId, address);
console.log(`✅ Step 4: Shipping ${shipping.shippingId} created`);
return {
success: true,
orderId,
paymentId,
shippingId: shipping.shippingId
};
} catch (error) {
console.error(`❌ Saga failed: ${error.message}`);
// Compensation (rollback в обратном порядке)
if (inventoryReserved) {
await releaseInventory(items);
console.log(`🔄 Compensated: Inventory released`);
}
if (paymentId) {
await cancelPayment(paymentId);
console.log(`🔄 Compensated: Payment cancelled`);
}
if (orderId) {
await cancelOrder(orderId);
console.log(`🔄 Compensated: Order cancelled`);
}
throw new Error(`Order Saga failed: ${error.message}`);
}
}
// Query для отслеживания состояния Saga
export const sagaState = defineQuery('sagaState');
setHandler(sagaState, () => 'running');
// worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflow'),
activities,
taskQueue: 'order-saga',
});
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});
// client.ts (запуск Saga)
import { Connection, WorkflowClient } from '@temporalio/client';
import { orderSaga } from './workflow';
async function main() {
const connection = await Connection.connect();
const client = new WorkflowClient({ connection });
// Запускаем Order Saga
const handle = await client.start(orderSaga, {
taskQueue: 'order-saga',
workflowId: `order-saga-${Date.now()}`,
args: [
1, // userId
[{ productId: 123, quantity: 2 }], // items
200, // total
'123 Main St' // address
],
});
console.log(`Started workflow ${handle.workflowId}`);
// Ждем результата
const result = await handle.result();
console.log('Saga result:', result);
}
main();
🔄 Compensating Transactions
Compensating Transaction — это операция которая отменяет изменения сделанные предыдущим шагом Saga.
Ключевые принципы Compensating Transactions:
- Semantic rollback: Не database ROLLBACK, а бизнес-логика отмены
- Idempotent: Можно вызвать несколько раз без side effects
- Best effort: Compensation может тоже fails (нужен retry)
- Порядок: Compensation в обратном порядке выполнения
| Forward Transaction | Compensating Transaction |
|---|---|
| Create Order | Cancel Order (set status = CANCELLED) |
| Reserve Payment | Release Payment (unlock funds) |
| Decrease Inventory | Increase Inventory (restore stock) |
| Send Email | Send Cancellation Email |
| Charge Credit Card | Refund Credit Card |
Пример Compensating Transactions
✅ Forward Path
- Create Order: INSERT INTO orders → order_id=123
- Reserve Payment: UPDATE wallets SET balance = balance - 100
- Decrease Stock: UPDATE inventory SET stock = stock - 1
- Create Shipping: INSERT INTO shipping → shipping_id=456
🔄 Compensation Path (если шаг 4 failed)
- Cancel Shipping: — (шаг не выполнился)
- Restore Stock: UPDATE inventory SET stock = stock + 1
- Release Payment: UPDATE wallets SET balance = balance + 100
- Cancel Order: UPDATE orders SET status = 'CANCELLED'
📊 Choreography vs Orchestration сравнение
| Критерий | Choreography | Orchestration |
|---|---|---|
| Координация | Децентрализованная (event-driven) | Централизованная (Orchestrator) |
| Сложность | Проще для 2-4 шагов | Лучше для 5+ шагов |
| Coupling | Low (сервисы независимы) | Medium (зависят от Orchestrator) |
| Visibility | Сложно отследить flow | Легко (все в Orchestrator) |
| Rollback | Сложный (нужно отслеживать события) | Простой (Orchestrator знает все шаги) |
| Single Point of Failure | Нет | Да (Orchestrator) |
| Retry Logic | Каждый сервис сам | Centralized в Orchestrator |
| Инструменты | Kafka, RabbitMQ, AWS SNS/SQS | Temporal.io, Camunda, AWS Step Functions |
| Testing | Сложно (distributed) | Проще (isolated Orchestrator) |
| Monitoring | Distributed tracing нужен | Built-in в Orchestrator |
| Когда использовать | Простые workflows, event-driven | Сложные workflows, нужен контроль |
🎯 Когда что использовать?
Используйте Choreography если:
- Простой workflow (2-4 шага)
- Event-driven архитектура уже есть
- Важна decoupling между сервисами
- Нет сложных условных ветвлений
Используйте Orchestration если:
- Сложный workflow (5+ шагов)
- Нужна централизованная логика
- Важна visibility всего процесса
- Нужен легкий rollback
- Есть условная логика (if/else)
- Нужны timeout, retry, circuit breaker
💻 Примеры реализации
Python: Orchestration с AWS Step Functions
# lambda_functions.py
import boto3
import json
def create_order_handler(event, context):
"""Step 1: Create Order"""
user_id = event['userId']
items = event['items']
total = event['total']
# Create order in DynamoDB
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('orders')
order_id = str(uuid.uuid4())
table.put_item(Item={
'order_id': order_id,
'user_id': user_id,
'items': items,
'total': total,
'status': 'PENDING'
})
return {
'orderId': order_id,
'userId': user_id,
'items': items,
'total': total
}
def reserve_payment_handler(event, context):
"""Step 2: Reserve Payment"""
user_id = event['userId']
total = event['total']
# Check balance and reserve
# ... (similar logic)
payment_id = str(uuid.uuid4())
return {
**event,
'paymentId': payment_id
}
def reserve_inventory_handler(event, context):
"""Step 3: Reserve Inventory"""
items = event['items']
# Check stock and reserve
for item in items:
# ... check stock logic
if stock < item['quantity']:
raise Exception(f"Insufficient stock for {item['productId']}")
return event
# Compensating functions
def cancel_order_handler(event, context):
"""Compensation: Cancel Order"""
order_id = event['orderId']
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('orders')
table.update_item(
Key={'order_id': order_id},
UpdateExpression='SET #status = :status',
ExpressionAttributeNames={'#status': 'status'},
ExpressionAttributeValues={':status': 'CANCELLED'}
)
return event
def cancel_payment_handler(event, context):
"""Compensation: Cancel Payment"""
# ... release payment logic
return event
// step-functions-definition.json
{
"Comment": "Order Saga Orchestration",
"StartAt": "CreateOrder",
"States": {
"CreateOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:CreateOrder",
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "SagaFailed"
}],
"Next": "ReservePayment"
},
"ReservePayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:ReservePayment",
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "CancelOrder"
}],
"Next": "ReserveInventory"
},
"ReserveInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:ReserveInventory",
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "CompensatePayment"
}],
"Next": "CreateShipping"
},
"CreateShipping": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:CreateShipping",
"Catch": [{
"ErrorEquals": ["States.ALL"],
"Next": "CompensateInventory"
}],
"End": true
},
"CompensateInventory": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:ReleaseInventory",
"Next": "CompensatePayment"
},
"CompensatePayment": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:CancelPayment",
"Next": "CancelOrder"
},
"CancelOrder": {
"Type": "Task",
"Resource": "arn:aws:lambda:...:function:CancelOrder",
"Next": "SagaFailed"
},
"SagaFailed": {
"Type": "Fail",
"Error": "SagaExecutionFailed",
"Cause": "One or more steps in the saga failed"
}
}
}
✅ Best Practices для Saga Pattern
1. Idempotency обязательна
- Каждый шаг Saga должен быть idempotent
- Можно вызвать несколько раз → тот же результат
- Используйте idempotency keys (UUID)
// Idempotent payment reservation
async function reservePayment(idempotencyKey, userId, amount) {
// Check if already processed
const existing = await db.payments.findOne({
where: { idempotency_key: idempotencyKey }
});
if (existing) {
return existing; // Already processed, return same result
}
// Process payment
const payment = await db.payments.create({
idempotency_key: idempotencyKey,
user_id: userId,
amount,
status: 'RESERVED'
});
return payment;
}
2. Timeout для каждого шага
- Каждый шаг должен иметь timeout
- Если timeout → trigger compensation
- Не дайте Saga зависнуть бесконечно
3. Мониторинг и observability
- Distributed tracing для Choreography (Jaeger, Zipkin)
- Saga state tracking для Orchestration
- Алерты на failed Sagas
- Metrics: success rate, duration, compensation rate
4. Retry с exponential backoff
- Temporary failures могут решиться retry
- Exponential backoff: 1s, 2s, 4s, 8s...
- Max retries: 3-5
- После max retries → trigger compensation
5. Eventual Consistency приемлема
- Saga не гарантирует ACID
- Гарантирует eventual consistency
- UI должен показывать промежуточные состояния
- Пример: "Order pending", "Payment processing"
🔍 FAQ: Часто задаваемые вопросы
Зачем нужен:
- ACID транзакции не работают между микросервисами (разные БД)
- 2PC (Two-Phase Commit) блокирует ресурсы и не масштабируется
- Saga обеспечивает eventual consistency через compensating transactions
- Сервисы слушают события и реагируют
- Нет центрального координатора
- Лучше для простых workflows (2-4 шага)
- Есть Orchestrator который управляет всей транзакцией
- Вызывает сервисы по очереди
- Лучше для сложных workflows (5+ шагов)
- Легкий rollback, visibility всего процесса
Примеры:
- Reserve Payment → Release Payment (compensation)
- Decrease Stock → Increase Stock (compensation)
- Send Email → Send Cancellation Email (compensation)
- Блокировка ресурсов: 2PC блокирует БД на время транзакции → снижает availability
- Single Point of Failure: Coordinator падает → все транзакции заморожены
- NoSQL не поддерживает: MongoDB, Cassandra не поддерживают 2PC
- Плохо масштабируется: Чем больше участников, тем выше latency
- Timeout issues: Что если один сервис не отвечает 30 секунд?
- Простых workflows (2-4 шага)
- Event-driven архитектуры
- Когда важна decoupling между сервисами
- Сложных workflows (5+ шагов)
- Когда нужна централизованная логика
- Visibility всего процесса
- Легкий rollback
- Условная логика (if/else)
- Apache Kafka — distributed event streaming
- RabbitMQ — message broker
- AWS SNS/SQS — managed pub/sub
- Temporal.io — modern workflow orchestration (рекомендуется)
- Camunda — BPMN-based workflow engine
- Netflix Conductor — microservices orchestration
- AWS Step Functions — managed workflows
Тестируйте Saga Pattern без риска
LightBox API — создайте Mock API для тестирования распределенных транзакций
Начать бесплатно →📝 Выводы
В этой статье мы рассмотрели Saga Pattern для распределенных транзакций в микросервисах:
- Проблема: ACID транзакции не работают между микросервисами, 2PC не масштабируется
- Решение: Saga разбивает транзакцию на локальные шаги с compensating transactions
- Choreography: Децентрализованный, event-driven, для простых workflows
- Orchestration: Централизованный, для сложных workflows, легкий rollback
- Compensating Transactions: Semantic rollback для отмены изменений
- Инструменты: Kafka/RabbitMQ для Choreography, Temporal.io/Camunda для Orchestration
🎯 Главное:
- Saga Pattern необходим для distributed transactions в микросервисах
- Eventual consistency вместо ACID
- Choreography — для простых workflows (2-4 шага)
- Orchestration — для сложных workflows (5+ шагов)
- Idempotency и compensating transactions обязательны
- Используйте Temporal.io для production Orchestration
- Distributed tracing критичен для мониторинга
Related Articles
- Service Mesh: Istio vs Linkerd vs Consul
- Circuit Breaker Pattern: защита от каскадных сбоев
- API Horizontal Scaling: 100K RPS