Logstash with multiple kafka inputs
Solution 1
Its a very late reply but if you wanted to take input multiple topic and output to another kafka multiple output, you can do something like this :
input {
kafka {
topics => ["topic1", "topic2"]
codec => "json"
bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
decorate_events => true
group_id => "logstash-multi-topic-consumers"
consumer_threads => 5
}
}
output {
if [kafka][topic] == "topic1" {
kafka {
codec => "json"
topic_id => "new_topic1"
bootstrap_servers => "output-kafka-1:9092"
}
}
else if [kafka][topic] == "topic2" {
kafka {
codec => "json"
topic_id => "new_topic2"
bootstrap_servers => "output-kafka-1:9092"
}
}
}
Be careful while detailing your bootstrap servers, give name on which your kafka has advertised listeners.
Solution 2
The previous answer didn't work for me and it seems it doses not recognize conditional statements in output, Here is my answer which correct and valid at least for my case where I have defined tags in input for both Kafka consumers and documents (in my case they are logs) are ingested into separate indexes related to their consumer topics .
input {
kafka {
group_id => "35834"
topics => ["First-Topic"]
bootstrap_servers => "localhost:9092"
codec => json
tags => ["First-Topic"]
}
kafka {
group_id => "35834"
topics => ["Second-Topic"]
bootstrap_servers => "localhost:9092"
codec => json
tags => ["Second-Topic"]
}
}
filter {
}
output {
if "Second-Topic" in [tags]{
elasticsearch {
hosts => ["localhost:9200"]
document_type => "_doc"
index => "logger"
}
stdout { codec => rubydebug
}
}
else if "First-Topic" in [tags]{
elasticsearch {
hosts => ["localhost:9200"]
document_type => "_doc"
index => "saga"
}
stdout { codec => rubydebug
}
}
}
Related videos on Youtube
Abhijeet
Updated on June 04, 2022Comments
-
Abhijeet almost 2 years
I am trying to filter kafka events from multiple topics, but once all events from one topic has been filtered logstash is not able to fetch events from the other kafka topic. I am using topics with 3 partitions and 2 replications Here is my logstash config file
input { kafka{ auto_offset_reset => "smallest" consumer_id => "logstashConsumer1" topic_id => "unprocessed_log1" zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181" type => "kafka_type_1" } kafka{ auto_offset_reset => "smallest" consumer_id => "logstashConsumer1" topic_id => "unprocessed_log2" zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181" type => "kafka_type_2" } } filter{ if [type] == "kafka_type_1"{ csv { separator=>" " source => "data" } } if [type] == "kafka_type_2"{ csv { separator => " " source => "data" } } } output{ stdout{ codec=>rubydebug{metadata => true }} }
-
Val over 7 yearsTry to use a different consumer (e.g.
logstashConsumer2
) in your secondkafka
input
-
-
Jim Hoagland over 5 yearsWill this end up with 5 consumer threads per topic? Or 5 threads that read from both topics? Or 2 with one topic and 3 with another?
-
Jim Hoagland over 5 yearsAnswering my own question, looking at the source, it looks like each thread will read from both topics
-
Lunatic almost 3 yearsThe suggested config seems doesn't work and Logstash can not understand the conditional statements ,I have defined tags inside inputs and change the conditional statements and it works now. I have also added my config script as an answer.