ORTA
Redis Streams
Persistent, consumer group destekli event stream. Kafka-light.
Ne Zaman Streams Kullan / Kullanma
| Kullan | Kullanma | Gerçek Hayat |
|---|---|---|
| Mesaj kaybı kabul edilemez + Redis zaten varsa | Günlük milyonlarca event + uzun retention (>7 gün) | Sipariş: order_created → payment → shipment event zinciri |
| Consumer group ile iş dağıtımı (competing consumers) | Partition-level ordering + exactly-once gerekli | Bildirim: 3 worker arasında push notification işlemlerini paylaştır |
| Event replay gerekli (yeni consumer eski event'leri alsın) | Multi-region replication + schema registry gerekli | Audit: Yeni "fraud-detection" servisini deploy et → tüm geçmiş siparişleri replay et |
| Küçük-orta ölçek (<100K msg/s) | Yüksek throughput (>500K msg/s) → Kafka/Pulsar | Startup: RabbitMQ/Kafka cluster yönetmeden event-driven mimari kur |
Streams ≠ Kafka: Redis Streams tek node memory'sinde yaşar (cluster'da shard başına). 10M+ mesaj birikirse memory patlar.
MAXLEN ~10000veyaMINIDile trim zorunlu. Kafka: disk-based, TB'larca retention ucuz.
Gerçek hayat senaryosu — Sipariş işleme pipeline'ı:
XADD orders:stream→ 3 consumer group: (1)payment-svcödemeyi başlatır, (2)inventory-svcstok düşer, (3)notification-svckullanıcıya e-posta atar. Her biri bağımsız ACK eder. Biri çökerseXAUTOCLAIMile başka worker devralır. Dead letter'a 5 retry sonrası taşınır.
# Produce
XADD orders:stream * event_type "order_created" order_id "5432" user_id "1001"
# Consumer group oluştur
XGROUP CREATE orders:stream payment-svc 0 MKSTREAM
# Consume
XREADGROUP GROUP payment-svc worker-1 COUNT 5 BLOCK 2000 STREAMS orders:stream >
# Acknowledge
XACK orders:stream payment-svc <message-id>
# Pending (ACK edilmemiş)
XPENDING orders:stream payment-svc
# Claim (timeout olan mesajları devral)
XAUTOCLAIM orders:stream payment-svc worker-2 60000 0
public class OrderStreamProducer
{
private readonly IDatabase _redis;
private const string StreamKey = "orders:stream";
public OrderStreamProducer(IConnectionMultiplexer mux)
=> _redis = mux.GetDatabase();
public async Task<string> PublishOrderEventAsync(string eventType, int orderId, int userId)
{
var messageId = await _redis.StreamAddAsync(StreamKey, new NameValueEntry[]
{
new("event_type", eventType),
new("order_id", orderId.ToString()),
new("user_id", userId.ToString()),
new("timestamp", DateTimeOffset.UtcNow.ToUnixTimeMilliseconds().ToString())
});
return messageId.ToString();
}
}
public class OrderStreamConsumer : BackgroundService
{
private readonly IConnectionMultiplexer _mux;
private readonly ILogger<OrderStreamConsumer> _logger;
private const string StreamKey = "orders:stream";
private const string GroupName = "payment-svc";
private readonly string _consumerName;
public OrderStreamConsumer(IConnectionMultiplexer mux, ILogger<OrderStreamConsumer> logger)
{
_mux = mux;
_logger = logger;
_consumerName = $"worker-{Environment.MachineName}";
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
var db = _mux.GetDatabase();
// Consumer group oluştur (idempotent)
try
{
await db.StreamCreateConsumerGroupAsync(StreamKey, GroupName, "0", createStream: true);
}
catch (RedisServerException ex) when (ex.Message.Contains("BUSYGROUP"))
{
// Zaten var — sorun yok
}
while (!ct.IsCancellationRequested)
{
try
{
var entries = await db.StreamReadGroupAsync(
StreamKey, GroupName, _consumerName,
position: ">", count: 10);
if (entries.Length == 0)
{
await Task.Delay(1000, ct);
continue;
}
foreach (var entry in entries)
{
try
{
await ProcessMessageAsync(entry);
await db.StreamAcknowledgeAsync(StreamKey, GroupName, entry.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process {Id}", entry.Id);
// ACK yapma → XPENDING'de kalır → retry/dead-letter
}
}
}
catch (Exception ex) when (!ct.IsCancellationRequested)
{
_logger.LogError(ex, "Stream read error");
await Task.Delay(5000, ct);
}
}
}
private Task ProcessMessageAsync(StreamEntry entry)
{
var values = entry.Values.ToDictionary(v => v.Name.ToString(), v => v.Value.ToString());
_logger.LogInformation("Processing {EventType} for order {OrderId}",
values["event_type"], values["order_id"]);
return Task.CompletedTask;
}
}
| Özellik | Pub/Sub | Streams |
|---|---|---|
| Persistence | Yok | Evet |
| Consumer Groups | Yok | Evet |
| Replay (geçmiş) | Hayır | Evet |
| Delivery | At-most-once | At-least-once |
| Use case | Notification, invalidation | Event sourcing, task queue |
| Blocking read | Evet | Evet |
| Back-pressure | Yok | XLEN + trim |
Dead Letter & Retry Pattern
Consumer mesajı işleyemezse: retry count kontrol et, eşik aşılırsa dead letter queue'ya taşı.
# Pending mesajların retry sayısını gör
# XPENDING stream group - + 10 consumer
XPENDING orders:stream payment-svc - + 10
# <id> <consumer> <idle-ms> <delivery-count>
# 3+ kez denenmiş mesajları claim et (XAUTOCLAIM — Redis 6.2+)
XAUTOCLAIM orders:stream payment-svc dlq-worker 60000 0 COUNT 10
# Idle >60s olan mesajları dlq-worker'a ata
# Dead letter'a taşı (Lua ile)
EVAL "
local pending = redis.call('XPENDING', KEYS[1], ARGV[1], '-', '+', '100')
for _, msg in ipairs(pending) do
if tonumber(msg[4]) >= tonumber(ARGV[2]) then
local data = redis.call('XRANGE', KEYS[1], msg[1], msg[1])
if #data > 0 then
redis.call('XADD', KEYS[2], '*', unpack(data[1][2]))
redis.call('XACK', KEYS[1], ARGV[1], msg[1])
end
end
end
" 2 orders:stream orders:dead-letter payment-svc 3
public class DeadLetterProcessor
{
private readonly IDatabase _redis;
private readonly ILogger<DeadLetterProcessor> _logger;
private const string StreamKey = "orders:stream";
private const string DeadLetterKey = "orders:dead-letter";
private const string GroupName = "payment-svc";
private const int MaxRetries = 3;
public DeadLetterProcessor(IConnectionMultiplexer mux,
ILogger<DeadLetterProcessor> logger)
{
_redis = mux.GetDatabase();
_logger = logger;
}
// Periyodik: retry count aşılan mesajları dead letter'a taşı
public async Task ProcessPendingAsync(CancellationToken ct)
{
var pending = await _redis.StreamPendingMessagesAsync(
StreamKey, GroupName, count: 50, RedisValue.Null);
foreach (var msg in pending)
{
if (msg.DeliveryCount >= MaxRetries)
{
// Dead letter queue'ya kopyala
var entries = await _redis.StreamRangeAsync(
StreamKey, msg.MessageId, msg.MessageId, count: 1);
if (entries.Length > 0)
{
await _redis.StreamAddAsync(DeadLetterKey, entries[0].Values);
await _redis.StreamAcknowledgeAsync(StreamKey, GroupName, msg.MessageId);
_logger.LogWarning(
"Message {Id} moved to dead letter after {Count} retries",
msg.MessageId, msg.DeliveryCount);
}
}
else if (msg.IdleTimeInMilliseconds > 60_000)
{
// 60s+ idle → başka worker'a ata (XCLAIM)
await _redis.StreamClaimAsync(StreamKey, GroupName,
Environment.MachineName, 60_000,
new[] { msg.MessageId });
}
}
}
}
Dead letter monitoring: Dead letter queue büyüyorsa alarm kur.
XLEN orders:dead-lettermetriğini Prometheus'a ekle.