UZMAN
Logstash & Data Pipeline
Logstash, farklı kaynaklardan veri toplayıp dönüştüren ve Elasticsearch'e gönderen ETL aracıdır.
Kod örneği tercihiBu sayfadaki istemci örneklerini birlikte değiştirir.
Seviye: Uzman — Bu bölüm production deneyimi gerektirir.
Karar Rehberi
| Durum | Öneri | Örnek veya gerekçe |
|---|---|---|
| Logstash | Uygun: Karmaşık dönüşüm, çoklu kaynak | Kafka → enrich → ES |
| Filebeat (direct) | Uygun: Basit dosya → ES | Nginx access log |
| Elastic Agent | Uygun: Unified agent, policy-based | Fleet-managed monitoring |
| Ingest Pipeline (ES) | Uygun: Hafif dönüşüm, ES-native | Geoip, date parse |
| Kafka + Logstash | Uygun: Yüksek hacim, buffer gerekli | Event sourcing |
| Dead Letter Queue | Uygun: Parse error recovery | Malformed JSON retry |
# logstash.conf — Production pipeline
input {
kafka {
bootstrap_servers => "kafka:9092"
topics => ["app-logs", "audit-logs"]
group_id => "logstash-prod"
codec => json
consumer_threads => 4
}
beats {
port => 5044
ssl => true
ssl_certificate => "/etc/logstash/certs/logstash.crt"
ssl_key => "/etc/logstash/certs/logstash.key"
}
}
filter {
if [type] == "app-log" {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:service} - %{GREEDYDATA:msg}" }
}
date {
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}
mutate {
remove_field => ["message", "timestamp"]
add_field => { "environment" => "production" }
}
if [level] == "ERROR" {
mutate { add_tag => ["alert"] }
}
}
geoip {
source => "client_ip"
target => "geo"
}
fingerprint {
source => ["service", "msg"]
target => "dedup_id"
method => "SHA256"
}
}
output {
elasticsearch {
hosts => ["https://es-prod:9200"]
api_key => "id:api_key_value"
index => "logs-%{+YYYY.MM.dd}"
pipeline => "enrich-pipeline"
ilm_enabled => true
ilm_rollover_alias => "logs-write"
ilm_policy => "logs-policy"
}
if "alert" in [tags] {
http {
url => "https://hooks.slack.com/services/xxx"
http_method => "post"
format => "json"
mapping => { "text" => "ALERT: %{service} - %{msg}" }
}
}
}
// ES Ingest Pipeline as lightweight Logstash alternative
public class IngestPipelineService
{
private readonly ElasticsearchClient _client;
public IngestPipelineService(ElasticsearchClient client) => _client = client;
// Create ingest pipeline (enrich + transform)
public async Task CreateLogPipelineAsync()
{
await _client.Ingest.PutPipelineAsync("log-enrichment", p => p
.Description("Log enrichment: geoip + timestamp + user-agent")
.Processors(proc => proc
.GeoIp(g => g.Field("client_ip").TargetField("geo"))
.DateIndexName(d => d
.Field("@timestamp")
.IndexNamePrefix("logs-")
.DateRounding("d"))
.UserAgent(ua => ua.Field("user_agent_string"))
.Remove(r => r.Field("user_agent_string"))));
}
// Bulk index with pipeline
public async Task IndexWithPipelineAsync(IEnumerable<LogEvent> events)
{
var response = await _client.BulkAsync(b =>
{
b.Pipeline("log-enrichment");
foreach (var e in events)
b.Index<LogEvent>(op => op.Index("logs-write").Document(e));
return b;
});
if (response.Errors)
{
foreach (var item in response.ItemsWithErrors)
Console.WriteLine("Failed: " + item.Error?.Reason);
}
}
}
Örnek: Fintech'te Kafka → Logstash → ES pipeline: saniyede 50K event işlenir. Logstash grok ile parse, geoip ile zenginleştirme, fingerprint ile dedup, ERROR seviye loglar Slack'e alert gönderir.