UZMAN
Testing & Verification
RabbitMQ entegrasyonlarını test etmek "çalışıyor gibi görünüyor" ile "production'da kesin çalışır" arasındaki farktır. Testcontainers ile gerçek RabbitMQ instance'ı üzerinde integration test yapmak, mock'lardan çok daha güvenilirdir.
Seviye: Uzman — Integration test altyapısı ve CI/CD pipeline deneyimi gerektirir.
Ne Zaman Hangi Test
| Test Tipi | Ne Test Eder | Araç | Hız |
|---|---|---|---|
| Unit | Serialization, routing key generation, retry logic | xUnit + Moq | ⚡ ms |
| Integration | Publish → queue → consume → ack/nack akışı | Testcontainers + gerçek RabbitMQ | 🐢 2-5s (container startup) |
| DLX Flow | Rejected mesajın DLX'e düşmesi, retry count | Testcontainers | 🐢 3-10s |
| Idempotency | Aynı mesajı 2x gönder, 1x işlendiğini doğrula | Testcontainers + DB | 🐢 5s |
| Load/Stress | Throughput limitleri, prefetch etkileri | k6 / bombardier + gerçek cluster | 🐌 dakikalar |
.NET — Testcontainers ile Integration Test
// NuGet: Testcontainers.RabbitMq, xUnit
public class RabbitMqIntegrationTests : IAsyncLifetime
{
private readonly RabbitMqContainer _container = new RabbitMqBuilder()
.WithImage("rabbitmq:4.3-management")
.Build();
private IConnection _connection = null!;
private IChannel _channel = null!;
public async Task InitializeAsync()
{
await _container.StartAsync();
var factory = new ConnectionFactory
{
Uri = new Uri(_container.GetConnectionString())
};
_connection = await factory.CreateConnectionAsync();
_channel = await _connection.CreateChannelAsync();
}
[Fact]
public async Task Publish_And_Consume_ShouldDeliverMessage()
{
// Arrange
await _channel.QueueDeclareAsync("test-queue", durable: false,
exclusive: false, autoDelete: true);
var receivedMessage = new TaskCompletionSource<string>();
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, ea) =>
{
var body = Encoding.UTF8.GetString(ea.Body.ToArray());
receivedMessage.SetResult(body);
await _channel.BasicAckAsync(ea.DeliveryTag, false);
};
await _channel.BasicConsumeAsync("test-queue", false, consumer);
// Act
var body = Encoding.UTF8.GetBytes("hello-rabbitmq");
await _channel.BasicPublishAsync("", "test-queue", body: body);
// Assert
var result = await receivedMessage.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal("hello-rabbitmq", result);
}
[Fact]
public async Task Nack_WithoutRequeue_ShouldRouteToDeadLetter()
{
// Arrange: Main queue with DLX
await _channel.ExchangeDeclareAsync("test-dlx", ExchangeType.Direct);
await _channel.QueueDeclareAsync("test-dead-letters", durable: false,
exclusive: false, autoDelete: true);
await _channel.QueueBindAsync("test-dead-letters", "test-dlx", "rejected");
var args = new Dictionary<string, object?>
{
["x-dead-letter-exchange"] = "test-dlx",
["x-dead-letter-routing-key"] = "rejected"
};
await _channel.QueueDeclareAsync("test-main", durable: false,
exclusive: false, autoDelete: true, arguments: args);
// DLX consumer
var deadLetterReceived = new TaskCompletionSource<string>();
var dlxConsumer = new AsyncEventingBasicConsumer(_channel);
dlxConsumer.ReceivedAsync += async (_, ea) =>
{
deadLetterReceived.SetResult(Encoding.UTF8.GetString(ea.Body.ToArray()));
await _channel.BasicAckAsync(ea.DeliveryTag, false);
};
await _channel.BasicConsumeAsync("test-dead-letters", false, dlxConsumer);
// Main consumer: reject
var mainConsumer = new AsyncEventingBasicConsumer(_channel);
mainConsumer.ReceivedAsync += async (_, ea) =>
{
await _channel.BasicNackAsync(ea.DeliveryTag, false, requeue: false);
};
await _channel.BasicConsumeAsync("test-main", false, mainConsumer);
// Act
await _channel.BasicPublishAsync("", "test-main",
body: Encoding.UTF8.GetBytes("fail-me"));
// Assert: mesaj DLX queue'da olmalı
var dlxResult = await deadLetterReceived.Task.WaitAsync(TimeSpan.FromSeconds(5));
Assert.Equal("fail-me", dlxResult);
}
public async Task DisposeAsync()
{
await _channel.DisposeAsync();
await _connection.DisposeAsync();
await _container.DisposeAsync();
}
}
.NET — Idempotency Integration Test
[Fact]
public async Task DuplicateMessage_ShouldBeProcessedOnlyOnce()
{
// Arrange
var messageId = Guid.NewGuid().ToString();
var processCount = 0;
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (_, ea) =>
{
var msgId = ea.BasicProperties.MessageId;
// Simulate idempotency check
if (!_processedIds.Contains(msgId))
{
_processedIds.Add(msgId);
Interlocked.Increment(ref processCount);
}
await _channel.BasicAckAsync(ea.DeliveryTag, false);
};
await _channel.BasicConsumeAsync("idempotency-test", false, consumer);
// Act: Aynı mesajı 3 kez gönder
var props = new BasicProperties { MessageId = messageId };
for (int i = 0; i < 3; i++)
{
await _channel.BasicPublishAsync("", "idempotency-test",
basicProperties: props, body: Encoding.UTF8.GetBytes("payment"));
}
await Task.Delay(1000); // Consumer'ın işlemesini bekle
// Assert: Sadece 1 kez işlendi
Assert.Equal(1, processCount);
}
Testcontainers Docker gerektirir: CI/CD pipeline'ında Docker-in-Docker veya Docker socket mount gerekir. GitHub Actions'da
services: dockerile çalışır. Testler gerçek RabbitMQ başlattığı için unit test'lerden yavaştır (~2-5s container startup).
Edge Case Test Senaryoları
Aşağıdaki senaryolar "happy path çalışıyor ama production'da patladı" durumlarını yakalar:
| Senaryo | Ne Test Eder | Nasıl Simüle Edersin |
|---|---|---|
| Broker restart mid-publish | Publisher confirm timeout handling | _container.StopAsync() → publish → assert retry |
| Consumer crash mid-processing | Unacked mesaj requeue davranışı | Exception throw in handler → mesajın tekrar geldiğini doğrula |
| Oversized message | Max message size rejection | 128MB+ body gönder → channel exception assert |
| TTL expiry → DLX | Zamanında işlenmeyen mesaj DLX'e düşer | x-message-ttl=100ms + slow consumer → DLX'te bul |
| Network partition (split-brain) | pause_minority davranışı | Testcontainers network disconnect → connection recovery doğrula |
| Duplicate delivery | At-least-once altında idempotency | BasicNack(requeue: true) → aynı mesajın 2x geldiğini doğrula, 1x işlendiğini assert |
| Queue full (max-length) | Overflow policy davranışı | x-max-length=5, 10 mesaj gönder → reject-publish veya drop-head doğrula |
.NET — Broker Restart Recovery Test
[Fact]
public async Task PublisherConfirm_AfterBrokerRestart_ShouldRecover()
{
// Arrange
await _channel.ConfirmSelectAsync();
await _channel.QueueDeclareAsync("recovery-test", durable: true,
exclusive: false, autoDelete: false);
// Act: Publish bir mesaj (confirm al)
await _channel.BasicPublishAsync("", "recovery-test",
mandatory: true, body: Encoding.UTF8.GetBytes("before-restart"));
await _channel.WaitForConfirmsOrDieAsync(TimeSpan.FromSeconds(5));
// Broker'ı restart et
await _container.StopAsync();
await _container.StartAsync();
// Connection recovery bekle (AutomaticRecoveryEnabled=true)
await Task.Delay(TimeSpan.FromSeconds(10));
// Assert: Yeni connection ile durable queue'daki mesaj hâlâ orada
var factory = new ConnectionFactory { Uri = new Uri(_container.GetConnectionString()) };
await using var newConn = await factory.CreateConnectionAsync();
await using var newChannel = await newConn.CreateChannelAsync();
var result = await newChannel.BasicGetAsync("recovery-test", autoAck: true);
Assert.NotNull(result);
Assert.Equal("before-restart", Encoding.UTF8.GetString(result.Body.ToArray()));
}
Gerçek hayat senaryosu: Fintech ödeme sistemi CI pipeline: Her PR'da Testcontainers ile 4 integration test çalışır — publish/consume, DLX flow, idempotency, publisher confirms. Test süresi toplam 15s. Bu testler sayesinde "consumer'ı refactor ettik ama ack'ı unuttuk" gibi hatalar production'a çıkmadan yakalanır.
Contract Testing (AsyncAPI & Pact)
Integration test'ler tek servisin doğru çalıştığını doğrular. Ancak servisler arası mesaj kontratı değiştiğinde (field ekleme/silme, type değişikliği) downstream consumer'lar kırılır. Contract testing bunu CI'da yakalar.
| Araç | Ne Yapar | Ne Zaman |
|---|---|---|
| AsyncAPI | Mesaj şemasını (schema) dokümante eder, schema validation | Event-driven API documentation + schema registry |
| Pact (Message) | Consumer-driven contracts: consumer beklentisini tanımlar, producer doğrular | Microservice'ler arası mesaj uyumluluğu |
| Schema Registry | Avro/JSON Schema ile runtime validation | Büyük ölçekli event-driven sistemler |
.NET — Pact Message Contract Test (Consumer Side)
// NuGet: PactNet
// Consumer tarafı: "Ben bu mesajı bu formatta bekliyorum" tanımı
public class OrderCreatedConsumerPactTest
{
private readonly IMessagePactBuilderV4 _pact;
public OrderCreatedConsumerPactTest()
{
var config = new PactConfig { PactDir = "../../../pacts" };
_pact = Pact.V4("OrderConsumer", "OrderService", config)
.WithMessageInteractions();
}
[Fact]
public async Task Expects_OrderCreated_WithRequiredFields()
{
await _pact
.ExpectsToReceive("an OrderCreated event")
.WithJsonContent(new
{
orderId = Match.Type(Guid.NewGuid()),
amount = Match.Decimal(99.99m),
currency = Match.Regex("TRY", "^[A-Z]{3}$"),
timestamp = Match.Type(DateTime.UtcNow),
items = Match.MinType(new { sku = "ABC-123", quantity = 1 }, 1)
})
.VerifyAsync<OrderCreatedEvent>(message =>
{
// Consumer handler'ının bu mesajı işleyebildiğini doğrula
Assert.NotEqual(Guid.Empty, message.OrderId);
Assert.True(message.Amount > 0);
Assert.NotEmpty(message.Currency);
});
}
}
AsyncAPI — Mesaj Kontratı Tanımı
# asyncapi.yaml — Order Service event kontratı
asyncapi: '2.6.0'
info:
title: Order Service Events
version: '1.0.0'
channels:
order.created:
subscribe:
operationId: onOrderCreated
message:
name: OrderCreated
contentType: application/json
payload:
type: object
required: [orderId, amount, currency, timestamp]
properties:
orderId:
type: string
format: uuid
amount:
type: number
minimum: 0.01
currency:
type: string
pattern: '^[A-Z]{3}$'
timestamp:
type: string
format: date-time
items:
type: array
minItems: 1
items:
type: object
required: [sku, quantity]
properties:
sku:
type: string
quantity:
type: integer
minimum: 1
Breaking change detection: Pact broker'a publish edilen contract'lar
can-i-deploykomutu ile CI'da kontrol edilir. Producer mesaj şemasını değiştirirse ve consumer contract'ını kırarsa → pipeline FAIL. Bu sayede "field adını değiştirdik, 3 downstream servis kırıldı" senaryosu production'a çıkmaz.
Gerçek hayat senaryosu: E-ticaret platformu: Order Service
OrderCreatedevent'ini 5 farklı consumer'a gönderir (notification, invoice, loyalty, analytics, warehouse). Pact ile her consumer kendi contract'ını tanımlar. Order Service yeni bir field eklediğinde → sorun yok (additive). Mevcut field'ı sildiğinde → Pact broker 3 consumer'ın kırıldığını bildirir → PR merge edilemez.