TEMEL
CRUD & Bulk İşlemleri
Elasticsearch'te tek doküman işlemleri basittir ama production'da Bulk API kritik öneme sahiptir — binlerce dokümanı tek HTTP request'te indexleyebilirsiniz.
Kod örneği tercihiBu sayfadaki istemci örneklerini birlikte değiştirir.
Karar Rehberi
| Durum | Öneri | Örnek veya gerekçe |
|---|---|---|
| Single index | Uygun: Gerçek zamanlı event | Kullanıcı arama log'u |
| Bulk API | Uygun: Toplu veri yükleme (>100 doc) | Ürün kataloğu sync |
| _update_by_query | Uygun: Koşullu toplu güncelleme | Fiyat artışı: category=X |
| _reindex | Uygun: Mapping değişikliği, index migration | v1 → v2 geçişi |
| _delete_by_query | Uygun: Koşullu toplu silme | GDPR silme |
| refresh: wait_for | Uygun: Write sonrası okuma gerekli | Sipariş → arama |
Bulk API (Production Pattern)
# Bulk indexleme (NDJSON format)
curl -X POST "http://localhost:9200/_bulk" -H "Content-Type: application/x-ndjson" -d'
{"index":{"_index":"products","_id":"1"}}
{"name":"Nike Air Max","category":"spor-ayakkabi","price":3499.99,"stock":42}
{"index":{"_index":"products","_id":"2"}}
{"name":"Adidas Ultraboost","category":"spor-ayakkabi","price":4299.99,"stock":28}
{"index":{"_index":"products","_id":"3"}}
{"name":"Puma RS-X","category":"sneaker","price":2799.99,"stock":55}
{"delete":{"_index":"products","_id":"old-1"}}
{"update":{"_index":"products","_id":"2"}}
{"doc":{"stock":25}}
'
# Bulk performans optimizasyonu (initial load)
curl -X PUT "http://localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": {
"refresh_interval": "-1",
"number_of_replicas": 0
}
}'
# ... bulk import ...
# Ayarları geri al
curl -X PUT "http://localhost:9200/products/_settings" -H "Content-Type: application/json" -d'
{
"index": {
"refresh_interval": "1s",
"number_of_replicas": 1
}
}'
# Force merge (initial load sonrası)
curl -X POST "http://localhost:9200/products/_forcemerge?max_num_segments=1"
public class BulkIndexingService
{
private readonly ElasticsearchClient _client;
private const int BatchSize = 2000;
public BulkIndexingService(ElasticsearchClient client) => _client = client;
public async Task<BulkIndexResult> BulkIndexAsync(IEnumerable<Product> products)
{
var allProducts = products.ToList();
var totalIndexed = 0;
var errors = new List<string>();
// Disable refresh during bulk
await _client.Indices.PutSettingsAsync("products", s => s
.IndexSettings(i => i.RefreshInterval(-1)));
try
{
foreach (var batch in allProducts.Chunk(BatchSize))
{
var response = await _client.BulkAsync(b =>
{
foreach (var product in batch)
{
b.Index<Product>(op => op
.Index("products")
.Id(product.Id)
.Document(product));
}
return b;
});
if (response.Errors)
{
foreach (var item in response.ItemsWithErrors)
errors.Add(item.Id + ": " + item.Error?.Reason);
}
totalIndexed += batch.Length;
}
}
finally
{
await _client.Indices.PutSettingsAsync("products", s => s
.IndexSettings(i => i.RefreshInterval(TimeSpan.FromSeconds(1))));
await _client.Indices.RefreshAsync("products");
}
return new BulkIndexResult(totalIndexed, errors);
}
public async Task UpdatePricesByCategoryAsync(string category, decimal multiplier)
{
await _client.UpdateByQueryAsync("products", u => u
.Query(q => q
.Term(t => t
.Field(f => f.Category)
.Value(category)))
.Script(s => s
.Source("ctx._source.price *= " + multiplier)
.Lang("painless")));
}
public async Task ReindexAsync(string sourceIndex, string targetIndex)
{
var response = await _client.ReindexAsync(r => r
.Source(s => s.Index(sourceIndex))
.Dest(d => d.Index(targetIndex))
.WaitForCompletion(false));
var taskId = response.Task;
}
}
public record BulkIndexResult(int TotalIndexed, List<string> Errors);
Bulk boyut limiti: Tek bir bulk request 5-15 MB arasında olmalıdır. Çok büyük batch'ler (>50MB) memory pressure yaratır ve 429 Too Many Requests alırsınız. Çok küçük batch'ler (<100 doc) ise HTTP overhead'den dolayı yavaştır.
Örnek: Bir e-ticaret platformunda gece 02:00'de ürün kataloğu ERP'den sync edilir: 500K ürün bulk API ile indexlenir. refresh_interval=-1 yapılır, 2000'lik batch'ler halinde gönderilir, sonunda replica'lar açılır ve force merge yapılır. Toplam süre: ~3 dakika (vs tek tek indexleme: ~45 dakika).