Logstash with multiple kafka inputs

10,585

Solution 1

Its a very late reply but if you wanted to take input multiple topic and output to another kafka multiple output, you can do something like this :

input {
  kafka {
    topics => ["topic1", "topic2"]
    codec => "json"
    bootstrap_servers => "kafka-broker-1:9092,kafka-broker-2:9092,kafka-broker-3:9092"
    decorate_events => true
    group_id => "logstash-multi-topic-consumers"
    consumer_threads => 5
  }
}
    
output {
   if [kafka][topic] == "topic1" {
     kafka {
       codec => "json"
       topic_id => "new_topic1"
       bootstrap_servers => "output-kafka-1:9092"
     }
   }
   else if [kafka][topic] == "topic2" {
      kafka {
       codec => "json"
       topic_id => "new_topic2"
       bootstrap_servers => "output-kafka-1:9092"
      }
    }
}

Be careful while detailing your bootstrap servers, give name on which your kafka has advertised listeners.

Ref-1: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-group_id

Ref-2: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

Solution 2

The previous answer didn't work for me and it seems it doses not recognize conditional statements in output, Here is my answer which correct and valid at least for my case where I have defined tags in input for both Kafka consumers and documents (in my case they are logs) are ingested into separate indexes related to their consumer topics .

input {
  kafka {
        group_id => "35834"
        topics => ["First-Topic"]
        bootstrap_servers => "localhost:9092"
        codec => json
        tags => ["First-Topic"]
  }
    kafka {
        group_id => "35834"
        topics => ["Second-Topic"]
        bootstrap_servers => "localhost:9092"
        codec => json
        tags => ["Second-Topic"]
    }
}

filter {

}
output {
    if "Second-Topic" in [tags]{
     elasticsearch {
         hosts => ["localhost:9200"]
         document_type => "_doc"
         index => "logger"
     }
      stdout { codec => rubydebug
           }
   }
   else if "First-Topic" in [tags]{
    elasticsearch {
          hosts => ["localhost:9200"]
          document_type => "_doc"
          index => "saga"
      }
      stdout { codec => rubydebug
      }
    }
}
Share:
10,585

Related videos on Youtube

Abhijeet
Author by

Abhijeet

Updated on June 04, 2022

Comments

  • Abhijeet
    Abhijeet almost 2 years

    I am trying to filter kafka events from multiple topics, but once all events from one topic has been filtered logstash is not able to fetch events from the other kafka topic. I am using topics with 3 partitions and 2 replications Here is my logstash config file

    input {
        kafka{              
            auto_offset_reset => "smallest"
            consumer_id => "logstashConsumer1"          
            topic_id => "unprocessed_log1"
            zk_connect=>"192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
            type => "kafka_type_1"
    }
    kafka{              
        auto_offset_reset => "smallest"
        consumer_id => "logstashConsumer1"          
        topic_id => "unprocessed_log2"
        zk_connect => "192.42.79.67:2181,192.41.85.48:2181,192.10.13.14:2181"
        type => "kafka_type_2"
    }
    }
    filter{
        if [type] == "kafka_type_1"{
        csv { 
            separator=>" "
            source => "data"        
        }   
    }
    if [type] == "kafka_type_2"{    
        csv { 
            separator => " "        
            source => "data"
        }
    }
    }
    output{
        stdout{ codec=>rubydebug{metadata => true }}
    }
    
    • Val
      Val over 7 years
      Try to use a different consumer (e.g. logstashConsumer2) in your second kafka input
  • Jim Hoagland
    Jim Hoagland over 5 years
    Will this end up with 5 consumer threads per topic? Or 5 threads that read from both topics? Or 2 with one topic and 3 with another?
  • Jim Hoagland
    Jim Hoagland over 5 years
    Answering my own question, looking at the source, it looks like each thread will read from both topics
  • Lunatic
    Lunatic almost 3 years
    The suggested config seems doesn't work and Logstash can not understand the conditional statements ,I have defined tags inside inputs and change the conditional statements and it works now. I have also added my config script as an answer.