ORTA
.NET Entegrasyonu
.NET'ten RabbitMQ'ya bağlanmanın iki yolu var:
- RabbitMQ.Client — Doğrudan kontrol. Her şeyi kendin yazarsın (exchange, queue, binding declare, publish, consume). Basit senaryolarda veya tam kontrol istediğinde tercih edilir.
- MassTransit — Üst seviye framework. Queue/exchange isimlendirme, retry, hata yönetimi otomatik. Karmaşık projelerde zaman kazandırır ama "sihir" yapar — ne yaptığını bilmen gerekir.
Doğru seçim, projenin karmaşıklığına ve ekibin RabbitMQ deneyimine bağlıdır.
Karar Tablosu: Raw Client vs MassTransit
| Senaryo | Raw Client | MassTransit | Neden |
|---|---|---|---|
| Basit publisher/consumer, az queue | — | Overhead gereksiz, öğrenme eğrisi düşük | |
| Mevcut topology'yi birebir kontrol | — | MassTransit convention'ları kısıtlayıcı olabilir | |
| Saga/Orchestration pattern | — | Built-in state machine, persistence | |
| Retry + circuit breaker + outbox | — | Kendi implement etmek hata-prone | |
| Çok sayıda microservice, standarizasyon | — | Convention-based, tutarlı topology | |
| Performance-critical, ultra-low latency | — | MassTransit overhead (minimal ama var) |
MassTransit Topology: MassTransit varsayılan olarak her message type için bir exchange ve consumer type için bir queue oluşturur. Bu convention'ı anlamadan kullanmak, beklenmedik exchange/queue proliferation'a yol açar. Topology'yi
cfg.Message<T>(x => x.SetEntityName(...))ile özelleştirin.
// Program.cs
services.AddMassTransit(cfg =>
{
// Consumer'ları register et
cfg.AddConsumer<OrderCreatedConsumer>();
cfg.AddConsumer<PaymentCompletedConsumer>();
cfg.UsingRabbitMq((context, rabbit) =>
{
rabbit.Host("rabbitmq-cluster.internal", 5672, "/", h =>
{
h.Username("app_service");
h.Password(context.GetRequiredService<IConfiguration>()["RabbitMQ:Password"]);
});
// Retry policy (tüm consumer'lar için)
rabbit.UseMessageRetry(r => r.Exponential(
retryLimit: 3,
minInterval: TimeSpan.FromSeconds(1),
maxInterval: TimeSpan.FromSeconds(30),
intervalDelta: TimeSpan.FromSeconds(5)));
// Circuit breaker
rabbit.UseCircuitBreaker(cb =>
{
cb.TrackingPeriod = TimeSpan.FromMinutes(1);
cb.TripThreshold = 15;
cb.ActiveThreshold = 10;
cb.ResetInterval = TimeSpan.FromMinutes(5);
});
// Quorum queues by default
// ⚠️ API doğrulama: MassTransit v8.x'te SetQuorumQueue(replicaCount) olarak değişmiş olabilir.
// Güncel API için: https://masstransit.io/documentation/configuration/transports/rabbitmq
rabbit.SetQuorumQueue(); // parametre almayabilir — MassTransit version'ınıza göre kontrol edin
rabbit.ConfigureEndpoints(context);
});
});
// Consumer
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var order = context.Message;
await _orderService.ProcessAsync(order);
// Ack otomatik (exception yoksa)
}
}
// Singleton connection (DI)
services.AddSingleton<IConnection>(sp =>
{
var factory = new ConnectionFactory
{
HostName = "rabbitmq-cluster.internal",
AutomaticRecoveryEnabled = true,
TopologyRecoveryEnabled = true,
NetworkRecoveryInterval = TimeSpan.FromSeconds(10)
};
// Multiple endpoints for failover
return factory.CreateConnectionAsync(new[]
{
new AmqpTcpEndpoint("rabbit-node1", 5672),
new AmqpTcpEndpoint("rabbit-node2", 5672),
new AmqpTcpEndpoint("rabbit-node3", 5672)
}).GetAwaiter().GetResult();
});
// Background consumer service
public class OrderConsumerService : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
var channel = await _connection.CreateChannelAsync(ct);
await channel.BasicQosAsync(0, 20, false);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (_, ea) =>
{
try
{
var order = JsonSerializer.Deserialize<OrderEvent>(ea.Body.Span);
await _processor.HandleAsync(order!, ct);
await channel.BasicAckAsync(ea.DeliveryTag, false);
}
catch (Exception ex)
{
_logger.LogError(ex, "Order processing failed");
await channel.BasicNackAsync(ea.DeliveryTag, false, requeue: true);
}
};
await channel.BasicConsumeAsync("order-processing", false, consumer, ct);
await Task.Delay(Timeout.Infinite, ct);
}
}
MassTransit Saga State Machine
Saga, birden fazla servisi kapsayan uzun ömürlü iş akışlarını (sipariş süreci, ödeme → kargo → bildirim) state machine olarak modellemeni sağlar. Her adım bir event ile tetiklenir, state DB'de persist edilir.
.NET — Order Saga State Machine
// State instance (EF Core ile persist)
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; } = "";
public DateTime OrderDate { get; set; }
public decimal Amount { get; set; }
public int RetryCount { get; set; }
}
// State Machine tanımı
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; } = null!;
public State PaymentProcessing { get; private set; } = null!;
public State Completed { get; private set; } = null!;
public State Faulted { get; private set; } = null!;
public Event<OrderSubmitted> OrderSubmittedEvent { get; private set; } = null!;
public Event<PaymentCompleted> PaymentCompletedEvent { get; private set; } = null!;
public Event<PaymentFailed> PaymentFailedEvent { get; private set; } = null!;
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => OrderSubmittedEvent, x => x.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => PaymentCompletedEvent, x => x.CorrelateById(ctx => ctx.Message.OrderId));
Event(() => PaymentFailedEvent, x => x.CorrelateById(ctx => ctx.Message.OrderId));
Initially(
When(OrderSubmittedEvent)
.Then(ctx =>
{
ctx.Saga.OrderDate = ctx.Message.Timestamp;
ctx.Saga.Amount = ctx.Message.Amount;
})
.PublishAsync(ctx => ctx.Init<ProcessPayment>(new
{
OrderId = ctx.Saga.CorrelationId,
ctx.Saga.Amount
}))
.TransitionTo(PaymentProcessing));
During(PaymentProcessing,
When(PaymentCompletedEvent)
.PublishAsync(ctx => ctx.Init<ShipOrder>(new
{
OrderId = ctx.Saga.CorrelationId
}))
.TransitionTo(Completed),
When(PaymentFailedEvent)
.Then(ctx => ctx.Saga.RetryCount++)
.If(ctx => ctx.Saga.RetryCount < 3,
binder => binder
.PublishAsync(ctx => ctx.Init<ProcessPayment>(new
{
OrderId = ctx.Saga.CorrelationId,
ctx.Saga.Amount
})))
.If(ctx => ctx.Saga.RetryCount >= 3,
binder => binder.TransitionTo(Faulted)));
}
}
// DI Registration
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.EntityFrameworkRepository(r =>
{
r.ConcurrencyMode = ConcurrencyMode.Optimistic;
r.AddDbContext<DbContext, OrderSagaDbContext>((provider, builder) =>
{
builder.UseNpgsql(connectionString, m =>
{
m.MigrationsAssembly(typeof(OrderSagaDbContext).Assembly.FullName);
});
});
});
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq-cluster.internal");
cfg.ConfigureEndpoints(context);
});
});
.NET — Transactional Outbox Pattern
// Outbox: iş mantığı + mesaj publish AYNI DB transaction'ında
// Mesaj önce DB'ye yazılır, sonra background job ile RabbitMQ'ya gönderilir
// → DB commit olursa mesaj kesin gider, rollback olursa mesaj da silinir
services.AddMassTransit(x =>
{
x.AddEntityFrameworkOutbox<AppDbContext>(o =>
{
o.UsePostgres(); // veya UseSqlServer()
o.UseBusOutbox(); // Bus-level outbox (publish + send)
o.QueryDelay = TimeSpan.FromSeconds(1); // Polling interval
o.DuplicateDetectionWindow = TimeSpan.FromMinutes(5);
});
x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("rabbitmq-cluster.internal");
cfg.ConfigureEndpoints(context);
});
});
// Kullanım: Normal publish — Outbox otomatik araya girer
public class OrderService
{
private readonly AppDbContext _db;
private readonly IPublishEndpoint _publisher;
public async Task CreateOrderAsync(CreateOrderRequest request)
{
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = request.CustomerId,
Amount = request.Amount,
Status = OrderStatus.Created
};
_db.Orders.Add(order);
// Bu publish, DB transaction commit edilene kadar RabbitMQ'ya GİTMEZ
// Outbox middleware mesajı OutboxMessage tablosuna yazar
await _publisher.Publish(new OrderCreated
{
OrderId = order.Id,
Amount = order.Amount,
Timestamp = DateTime.UtcNow
});
await _db.SaveChangesAsync();
// Transaction commit → background delivery service mesajı RabbitMQ'ya gönderir
}
}
// DbContext'e Outbox tablolarını ekle
public class AppDbContext : DbContext
{
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
// MassTransit Outbox tabloları (InboxState, OutboxState, OutboxMessage)
modelBuilder.AddInboxStateEntity();
modelBuilder.AddOutboxStateEntity();
modelBuilder.AddOutboxMessageEntity();
}
}
Outbox vs Publisher Confirms: İkisi farklı problemi çözer. Publisher Confirms = "mesaj broker'a ulaştı mı?" garantisi. Outbox = "DB commit ve mesaj publish atomik olsun" garantisi. İkisini birlikte kullanmak en güvenli yoldur (MassTransit Outbox altında confirms otomatik aktiftir).
Gerçek hayat senaryosu: Bir SaaS platformunda 12 microservice MassTransit kullanır. Ortak retry policy ve circuit breaker config'i shared NuGet package'dan gelir. Yeni bir consumer eklemek = 1 class implement etmek. Topology otomatik oluşturulur, exchange/queue naming convention tutarlıdır. Saga ile sipariş akışı (submit → payment → ship → complete) 5 servis arasında koordine edilir; bir adım başarısız olursa compensating action tetiklenir.