ORTA
Pub/Sub
Fire-and-forget mesajlaşma. Subscriber yoksa mesaj kaybolur.
Kod örneği görünümü
Bu sayfadaki eşleşen örnekleri seçilen istemciye göre gösterir.
Ne Zaman Pub/Sub Kullan / Kullanma
| Kullan | Kullanma | Gerçek Hayat |
|---|---|---|
| Anlık bildirim (mesaj kaybı tolere edilebilir) | Mesaj kaybı kabul edilemez (ödeme, sipariş) | Chat app: Online kullanıcılara anlık mesaj — offline'daysa zaten göremez |
| Cache invalidation broadcast (tüm instance'lara) | Guaranteed delivery gerekli | Microservice: 8 pod'a "ürün güncellendi, cache'i temizle" sinyali |
| Realtime dashboard, live feed | Consumer geçici olarak kapanabiliyorsa | Monitoring: CPU/memory metrik'lerini dashboard'a push |
| SignalR backplane (multi-instance) | Queue semantiği (FIFO, retry, DLQ) | E-ticaret: Stok değişiklik bildirimi — kayıpsa sorun yok, 2s sonra refresh gelir |
KRİTİK: Pub/Sub at-most-once garantisi verir. Subscriber bağlı değilse mesaj kalıcı olarak kaybolur. Ödeme tamamlandı eventi, sipariş oluşturuldu — bunlar için asla Pub/Sub kullanma, Redis Streams veya RabbitMQ/Kafka tercih et.
Gerçek hayat senaryosu — Canlı bildirim sistemi: Kullanıcı online → Pub/Sub ile anlık push. Kullanıcı offline → mesajı DB'ye yaz + Streams'e ekle. Online'a döndüğünde Streams'ten unread mesajları çek. Hibrit mimari: Pub/Sub (hız) + Streams (güvenilirlik).
# Terminal 1 (subscriber):
SUBSCRIBE notifications:user:1001
PSUBSCRIBE notifications:* # pattern
# Terminal 2 (publisher):
PUBLISH notifications:user:1001 '{"type":"order_shipped","orderId":5432}'
public class NotificationService
{
private readonly ISubscriber _subscriber;
private readonly IDatabase _redis;
public NotificationService(IConnectionMultiplexer mux)
{
_subscriber = mux.GetSubscriber();
_redis = mux.GetDatabase();
}
// Publish
public async Task PublishAsync(string userId, object notification)
{
var channel = RedisChannel.Literal($"notifications:{userId}");
var message = JsonSerializer.Serialize(notification);
await _subscriber.PublishAsync(channel, message);
}
// Subscribe (genellikle BackgroundService'de)
public async Task SubscribeAsync(string userId, Action<string> handler)
{
var channel = RedisChannel.Literal($"notifications:{userId}");
await _subscriber.SubscribeAsync(channel, (ch, msg) =>
{
if (msg.HasValue)
handler(msg.ToString());
});
}
// Pattern subscribe
public async Task SubscribePatternAsync(Action<string, string> handler)
{
var pattern = RedisChannel.Pattern("notifications:*");
await _subscriber.SubscribeAsync(pattern, (ch, msg) =>
{
handler(ch.ToString(), msg.ToString());
});
}
public async Task UnsubscribeAsync(string userId)
{
var channel = RedisChannel.Literal($"notifications:{userId}");
await _subscriber.UnsubscribeAsync(channel);
}
}
// BackgroundService ile consumer
public class NotificationConsumer : BackgroundService
{
private readonly IConnectionMultiplexer _mux;
private readonly ILogger<NotificationConsumer> _logger;
public NotificationConsumer(IConnectionMultiplexer mux, ILogger<NotificationConsumer> logger)
{
_mux = mux;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
var sub = _mux.GetSubscriber();
await sub.SubscribeAsync(RedisChannel.Pattern("events:*"), (channel, message) =>
{
_logger.LogInformation("Received {Channel}: {Message}", channel, message);
// Process message...
});
await Task.Delay(Timeout.Infinite, ct);
}
}
Pub/Sub at-most-once. Mesaj kaybı kabul edilemezse → Redis Streams kullan.