Kafka Storm Integration using Kafka Spout

15,205

Solution 1

I had the same problem. Finally resolved it, and I put the complete running example up on github.

You are welcome to check it out here > https://github.com/buildlackey/cep

(click on the storm+kafka directory for a sample program that should get you up and running).

Solution 2

We had a similar issue.

Our solution:

  1. Open pom.xml

  2. Change scope from provided to <scope>compile</scope>

If you want to know more about dependency scopes check the maven docu: Maven docu - dependency scopes

Share:
15,205

Related videos on Youtube

Yavar
Author by

Yavar

I design Algorithms and construct Data Structures for Search, Information Retrieval &amp; Machine Learning.

Updated on September 14, 2022

Comments

  • Yavar
    Yavar over 1 year

    I am using KafkaSpout. Please find the test program below.

    I am using Storm 0.8.1. Multischeme class is there in Storm 0.8.2. I will be using that. I just want to know how were the earlier versions working just by instantiating the StringScheme() class? Where can I download earlier versions of Kafka Spout? But I doubt that would be a correct alternative than to work on Storm 0.8.2. ??? (Confused)

    When I run the code (given below) on storm cluster (i.e. when I push my topology) I get the following error (This happens when the Scheme part is commented else of course I will get compiler error as the class is not there in 0.8.1):

    java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
            at storm.kafka.TestTopology.main(TestTopology.java:37)
    Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme
    

    In the code given below you may find the spoutConfig.scheme=new StringScheme(); part commented. I was getting compiler error if I don't comment that line which is but natural as there are no constructors in there. Also when I instantiate MultiScheme I get error as I dont have that class in 0.8.1.

    public class TestTopology {
        public static class PrinterBolt extends BaseBasicBolt {
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
    
            public void execute(Tuple tuple, BasicOutputCollector collector) {
                System.out.println(tuple.toString());
            }
        }
    
        public static void main(String [] args) throws Exception {
            List<HostPort> hosts = new ArrayList<HostPort>();
            hosts.add(new HostPort("127.0.0.1",9092));
            LocalCluster cluster = new LocalCluster();
            TopologyBuilder builder = new TopologyBuilder();
            SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
            spoutConfig.zkServers=ImmutableList.of("localhost");
            spoutConfig.zkPort=2181;
            //spoutConfig.scheme=new StringScheme();
            spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
            builder.setSpout("spout",new KafkaSpout(spoutConfig));
            builder.setBolt("printer", new PrinterBolt())
                    .shuffleGrouping("spout");
            Config config = new Config();
    
            cluster.submitTopology("kafka-test", config, builder.createTopology());
    
            Thread.sleep(600000);
        }