EElasticsearch Handbook

UZMAN

Logstash & Data Pipeline

Logstash, farklı kaynaklardan veri toplayıp dönüştüren ve Elasticsearch'e gönderen ETL aracıdır.

Kod örneği tercihiBu sayfadaki istemci örneklerini birlikte değiştirir.

Seviye: Uzman — Bu bölüm production deneyimi gerektirir.

INPUT Kafka Beats (Filebeat) HTTP / TCP JDBC (DB poll) S3 / Azure Blob Dead Letter Queue FILTER (Transform) grok (regex parse) date (timestamp parse) mutate (rename/remove) geoip (IP → location) fingerprint (dedup) ruby (custom logic) OUTPUT Elasticsearch (ILM + pipeline) Kafka (re-route) S3 (archive) HTTP (webhook) stdout (debug) MONITORING Pipeline metrics Events in/out/filtered Queue backpressure Worker utilization DLQ overflow

Karar Rehberi

DurumÖneriÖrnek veya gerekçe
Logstash Uygun: Karmaşık dönüşüm, çoklu kaynak Kafka → enrich → ES
Filebeat (direct) Uygun: Basit dosya → ES Nginx access log
Elastic Agent Uygun: Unified agent, policy-based Fleet-managed monitoring
Ingest Pipeline (ES) Uygun: Hafif dönüşüm, ES-native Geoip, date parse
Kafka + Logstash Uygun: Yüksek hacim, buffer gerekli Event sourcing
Dead Letter Queue Uygun: Parse error recovery Malformed JSON retry
# logstash.conf — Production pipeline
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["app-logs", "audit-logs"]
    group_id => "logstash-prod"
    codec => json
    consumer_threads => 4
  }

  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
  }
}

filter {
  if [type] == "app-log" {
    grok {
      match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{DATA:service} - %{GREEDYDATA:msg}" }
    }
    date {
      match => [ "timestamp", "ISO8601" ]
      target => "@timestamp"
    }
    mutate {
      remove_field => ["message", "timestamp"]
      add_field => { "environment" => "production" }
    }
    if [level] == "ERROR" {
      mutate { add_tag => ["alert"] }
    }
  }

  geoip {
    source => "client_ip"
    target => "geo"
  }

  fingerprint {
    source => ["service", "msg"]
    target => "dedup_id"
    method => "SHA256"
  }
}

output {
  elasticsearch {
    hosts => ["https://es-prod:9200"]
    api_key => "id:api_key_value"
    index => "logs-%{+YYYY.MM.dd}"
    pipeline => "enrich-pipeline"
    ilm_enabled => true
    ilm_rollover_alias => "logs-write"
    ilm_policy => "logs-policy"
  }

  if "alert" in [tags] {
    http {
      url => "https://hooks.slack.com/services/xxx"
      http_method => "post"
      format => "json"
      mapping => { "text" => "ALERT: %{service} - %{msg}" }
    }
  }
}
// ES Ingest Pipeline as lightweight Logstash alternative
public class IngestPipelineService
{
    private readonly ElasticsearchClient _client;

    public IngestPipelineService(ElasticsearchClient client) => _client = client;

    // Create ingest pipeline (enrich + transform)
    public async Task CreateLogPipelineAsync()
    {
        await _client.Ingest.PutPipelineAsync("log-enrichment", p => p
            .Description("Log enrichment: geoip + timestamp + user-agent")
            .Processors(proc => proc
                .GeoIp(g => g.Field("client_ip").TargetField("geo"))
                .DateIndexName(d => d
                    .Field("@timestamp")
                    .IndexNamePrefix("logs-")
                    .DateRounding("d"))
                .UserAgent(ua => ua.Field("user_agent_string"))
                .Remove(r => r.Field("user_agent_string"))));
    }

    // Bulk index with pipeline
    public async Task IndexWithPipelineAsync(IEnumerable<LogEvent> events)
    {
        var response = await _client.BulkAsync(b =>
        {
            b.Pipeline("log-enrichment");
            foreach (var e in events)
                b.Index<LogEvent>(op => op.Index("logs-write").Document(e));
            return b;
        });

        if (response.Errors)
        {
            foreach (var item in response.ItemsWithErrors)
                Console.WriteLine("Failed: " + item.Error?.Reason);
        }
    }
}

Örnek: Fintech'te Kafka → Logstash → ES pipeline: saniyede 50K event işlenir. Logstash grok ile parse, geoip ile zenginleştirme, fingerprint ile dedup, ERROR seviye loglar Slack'e alert gönderir.