RRabbitMQ Handbook

ORTA

.NET Entegrasyonu

.NET'ten RabbitMQ'ya bağlanmanın iki yolu var:

  • RabbitMQ.Client — Doğrudan kontrol. Her şeyi kendin yazarsın (exchange, queue, binding declare, publish, consume). Basit senaryolarda veya tam kontrol istediğinde tercih edilir.
  • MassTransit — Üst seviye framework. Queue/exchange isimlendirme, retry, hata yönetimi otomatik. Karmaşık projelerde zaman kazandırır ama "sihir" yapar — ne yaptığını bilmen gerekir.

Doğru seçim, projenin karmaşıklığına ve ekibin RabbitMQ deneyimine bağlıdır.

RabbitMQ.Client (Raw) Manuel kontrol, tam esneklik ConnectionFactory IChannel (per-thread) Exchange Declare Queue Declare Bind BasicPublishAsync + Confirms BasicConsumeAsync + Manual Ack ❌ Retry, Outbox, Saga, Scheduling: KENDİN YAZ MassTransit (Abstraction) Convention-based, batteries included AddMassTransit(cfg => ...) Auto topology creation IPublishEndpoint IConsumer<T> ISaga / IStateMachine Retry built-in Circuit Breaker Outbox Pattern ✅ Scheduling, Saga, Middleware, Observability: HAZIR

Karar Tablosu: Raw Client vs MassTransit

Senaryo Raw Client MassTransit Neden
Basit publisher/consumer, az queue Overhead gereksiz, öğrenme eğrisi düşük
Mevcut topology'yi birebir kontrol MassTransit convention'ları kısıtlayıcı olabilir
Saga/Orchestration pattern Built-in state machine, persistence
Retry + circuit breaker + outbox Kendi implement etmek hata-prone
Çok sayıda microservice, standarizasyon Convention-based, tutarlı topology
Performance-critical, ultra-low latency MassTransit overhead (minimal ama var)

MassTransit Topology: MassTransit varsayılan olarak her message type için bir exchange ve consumer type için bir queue oluşturur. Bu convention'ı anlamadan kullanmak, beklenmedik exchange/queue proliferation'a yol açar. Topology'yi cfg.Message<T>(x => x.SetEntityName(...)) ile özelleştirin.

// Program.cs
services.AddMassTransit(cfg =>
{
    // Consumer'ları register et
    cfg.AddConsumer<OrderCreatedConsumer>();
    cfg.AddConsumer<PaymentCompletedConsumer>();

    cfg.UsingRabbitMq((context, rabbit) =>
    {
        rabbit.Host("rabbitmq-cluster.internal", 5672, "/", h =>
        {
            h.Username("app_service");
            h.Password(context.GetRequiredService<IConfiguration>()["RabbitMQ:Password"]);
        });

        // Retry policy (tüm consumer'lar için)
        rabbit.UseMessageRetry(r => r.Exponential(
            retryLimit: 3,
            minInterval: TimeSpan.FromSeconds(1),
            maxInterval: TimeSpan.FromSeconds(30),
            intervalDelta: TimeSpan.FromSeconds(5)));

        // Circuit breaker
        rabbit.UseCircuitBreaker(cb =>
        {
            cb.TrackingPeriod = TimeSpan.FromMinutes(1);
            cb.TripThreshold = 15;
            cb.ActiveThreshold = 10;
            cb.ResetInterval = TimeSpan.FromMinutes(5);
        });

        // Quorum queues by default
        // ⚠️ API doğrulama: MassTransit v8.x'te SetQuorumQueue(replicaCount) olarak değişmiş olabilir.
        // Güncel API için: https://masstransit.io/documentation/configuration/transports/rabbitmq
        rabbit.SetQuorumQueue();  // parametre almayabilir — MassTransit version'ınıza göre kontrol edin

        rabbit.ConfigureEndpoints(context);
    });
});

// Consumer
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var order = context.Message;
        await _orderService.ProcessAsync(order);
        // Ack otomatik (exception yoksa)
    }
}
// Singleton connection (DI)
services.AddSingleton<IConnection>(sp =>
{
    var factory = new ConnectionFactory
    {
        HostName = "rabbitmq-cluster.internal",
        AutomaticRecoveryEnabled = true,
        TopologyRecoveryEnabled = true,
        NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
    };
    // Multiple endpoints for failover
    return factory.CreateConnectionAsync(new[]
    {
        new AmqpTcpEndpoint("rabbit-node1", 5672),
        new AmqpTcpEndpoint("rabbit-node2", 5672),
        new AmqpTcpEndpoint("rabbit-node3", 5672)
    }).GetAwaiter().GetResult();
});

// Background consumer service
public class OrderConsumerService : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var channel = await _connection.CreateChannelAsync(ct);
        await channel.BasicQosAsync(0, 20, false);

        var consumer = new AsyncEventingBasicConsumer(channel);
        consumer.ReceivedAsync += async (_, ea) =>
        {
            try
            {
                var order = JsonSerializer.Deserialize<OrderEvent>(ea.Body.Span);
                await _processor.HandleAsync(order!, ct);
                await channel.BasicAckAsync(ea.DeliveryTag, false);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Order processing failed");
                await channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
            }
        };

        await channel.BasicConsumeAsync("order-processing", false, consumer, ct);
        await Task.Delay(Timeout.Infinite, ct);
    }
}

MassTransit Saga State Machine

Saga, birden fazla servisi kapsayan uzun ömürlü iş akışlarını (sipariş süreci, ödeme → kargo → bildirim) state machine olarak modellemeni sağlar. Her adım bir event ile tetiklenir, state DB'de persist edilir.

.NET — Order Saga State Machine
// State instance (EF Core ile persist)
public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; } = "";
    public DateTime OrderDate { get; set; }
    public decimal Amount { get; set; }
    public int RetryCount { get; set; }
}

// State Machine tanımı
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; } = null!;
    public State PaymentProcessing { get; private set; } = null!;
    public State Completed { get; private set; } = null!;
    public State Faulted { get; private set; } = null!;

    public Event<OrderSubmitted> OrderSubmittedEvent { get; private set; } = null!;
    public Event<PaymentCompleted> PaymentCompletedEvent { get; private set; } = null!;
    public Event<PaymentFailed> PaymentFailedEvent { get; private set; } = null!;

    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => OrderSubmittedEvent, x => x.CorrelateById(ctx => ctx.Message.OrderId));
        Event(() => PaymentCompletedEvent, x => x.CorrelateById(ctx => ctx.Message.OrderId));
        Event(() => PaymentFailedEvent, x => x.CorrelateById(ctx => ctx.Message.OrderId));

        Initially(
            When(OrderSubmittedEvent)
                .Then(ctx =>
                {
                    ctx.Saga.OrderDate = ctx.Message.Timestamp;
                    ctx.Saga.Amount = ctx.Message.Amount;
                })
                .PublishAsync(ctx => ctx.Init<ProcessPayment>(new
                {
                    OrderId = ctx.Saga.CorrelationId,
                    ctx.Saga.Amount
                }))
                .TransitionTo(PaymentProcessing));

        During(PaymentProcessing,
            When(PaymentCompletedEvent)
                .PublishAsync(ctx => ctx.Init<ShipOrder>(new
                {
                    OrderId = ctx.Saga.CorrelationId
                }))
                .TransitionTo(Completed),
            When(PaymentFailedEvent)
                .Then(ctx => ctx.Saga.RetryCount++)
                .If(ctx => ctx.Saga.RetryCount < 3,
                    binder => binder
                        .PublishAsync(ctx => ctx.Init<ProcessPayment>(new
                        {
                            OrderId = ctx.Saga.CorrelationId,
                            ctx.Saga.Amount
                        })))
                .If(ctx => ctx.Saga.RetryCount >= 3,
                    binder => binder.TransitionTo(Faulted)));
    }
}

// DI Registration
services.AddMassTransit(x =>
{
    x.AddSagaStateMachine<OrderStateMachine, OrderState>()
        .EntityFrameworkRepository(r =>
        {
            r.ConcurrencyMode = ConcurrencyMode.Optimistic;
            r.AddDbContext<DbContext, OrderSagaDbContext>((provider, builder) =>
            {
                builder.UseNpgsql(connectionString, m =>
                {
                    m.MigrationsAssembly(typeof(OrderSagaDbContext).Assembly.FullName);
                });
            });
        });

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq-cluster.internal");
        cfg.ConfigureEndpoints(context);
    });
});
.NET — Transactional Outbox Pattern
// Outbox: iş mantığı + mesaj publish AYNI DB transaction'ında
// Mesaj önce DB'ye yazılır, sonra background job ile RabbitMQ'ya gönderilir
// → DB commit olursa mesaj kesin gider, rollback olursa mesaj da silinir

services.AddMassTransit(x =>
{
    x.AddEntityFrameworkOutbox<AppDbContext>(o =>
    {
        o.UsePostgres();             // veya UseSqlServer()
        o.UseBusOutbox();            // Bus-level outbox (publish + send)
        o.QueryDelay = TimeSpan.FromSeconds(1);  // Polling interval
        o.DuplicateDetectionWindow = TimeSpan.FromMinutes(5);
    });

    x.AddConsumer<OrderCreatedConsumer>();

    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq-cluster.internal");
        cfg.ConfigureEndpoints(context);
    });
});

// Kullanım: Normal publish — Outbox otomatik araya girer
public class OrderService
{
    private readonly AppDbContext _db;
    private readonly IPublishEndpoint _publisher;

    public async Task CreateOrderAsync(CreateOrderRequest request)
    {
        var order = new Order
        {
            Id = Guid.NewGuid(),
            CustomerId = request.CustomerId,
            Amount = request.Amount,
            Status = OrderStatus.Created
        };

        _db.Orders.Add(order);

        // Bu publish, DB transaction commit edilene kadar RabbitMQ'ya GİTMEZ
        // Outbox middleware mesajı OutboxMessage tablosuna yazar
        await _publisher.Publish(new OrderCreated
        {
            OrderId = order.Id,
            Amount = order.Amount,
            Timestamp = DateTime.UtcNow
        });

        await _db.SaveChangesAsync();
        // Transaction commit → background delivery service mesajı RabbitMQ'ya gönderir
    }
}

// DbContext'e Outbox tablolarını ekle
public class AppDbContext : DbContext
{
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        // MassTransit Outbox tabloları (InboxState, OutboxState, OutboxMessage)
        modelBuilder.AddInboxStateEntity();
        modelBuilder.AddOutboxStateEntity();
        modelBuilder.AddOutboxMessageEntity();
    }
}

Outbox vs Publisher Confirms: İkisi farklı problemi çözer. Publisher Confirms = "mesaj broker'a ulaştı mı?" garantisi. Outbox = "DB commit ve mesaj publish atomik olsun" garantisi. İkisini birlikte kullanmak en güvenli yoldur (MassTransit Outbox altında confirms otomatik aktiftir).

Gerçek hayat senaryosu: Bir SaaS platformunda 12 microservice MassTransit kullanır. Ortak retry policy ve circuit breaker config'i shared NuGet package'dan gelir. Yeni bir consumer eklemek = 1 class implement etmek. Topology otomatik oluşturulur, exchange/queue naming convention tutarlıdır. Saga ile sipariş akışı (submit → payment → ship → complete) 5 servis arasında koordine edilir; bir adım başarısız olursa compensating action tetiklenir.