how to use Kafka 0.8 Log4j appender

23,387

Solution 1

I think Jonas has identified the problem, that is the Kafka producer logging is also getting logged to the Kafka appender causing an infinite loop and eventual stack overflow (no pun intended) You can configure all Kafka logs to go to a different appender. The following shows sending the output to stdout:

log4j.logger.kafka=INFO, stdout

So you should end up with the following in your log4j.properties

log4j.rootLogger=INFO, stdout, KAFKA
log4j.logger.kafka=INFO, stdout
log4j.logger.HelloWorld=INFO, KAFKA

Solution 2

I have been able to generate events via log4j in Kafka 0.8.2.2. Here is my log4j configuration:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">

   <appender name="console" class="org.apache.log4j.ConsoleAppender">
      <param name="Target" value="System.out" />
      <layout class="org.apache.log4j.PatternLayout">
         <param name="ConversionPattern" value="%-5p %c{1} - %m%n" />
      </layout>
   </appender>
   <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender">
      <param name="Threshold" value="INFO" />
      <param name="MaxBackupIndex" value="100" />
      <param name="File" value="/tmp/agna-LogFile.log" />
      <layout class="org.apache.log4j.PatternLayout">
         <param name="ConversionPattern" value="%d  %-5p  [%c{1}] %m %n" />
      </layout>
   </appender>
   <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender">
      <param name="Topic" value="kafkatopic" />
      <param name="BrokerList" value="localhost:9092" />
      <param name="syncSend" value="true" />
      <layout class="org.apache.log4j.PatternLayout">
         <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" />
      </layout>
   </appender>
   <logger name="org.apache.kafka">
      <level value="error" />
      <appender-ref ref="console" />
   </logger>
   <logger name="com.example.kafkaLogger">
      <level value="debug" />
      <appender-ref ref="kafkaAppender" />
   </logger>
   <root>
      <priority value="debug" />
      <appender-ref ref="console" />
   </root>
</log4j:configuration>

Here is the source code:

package com.example;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class JsonProducer {
    static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class);
    static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger");

    public static void main(String args[]) {

        JsonProducer obj = new JsonProducer();

        String str = obj.getJsonObjAsString();

        // Use the logger
        kafkaLogger.info(str);

        try {
            // Construct and send message
            obj.constructAndSendMessage();
        } catch (InterruptedException e) {
            defaultLogger.error("Caught interrupted exception " + e);
        } catch (ExecutionException e) {
            defaultLogger.error("Caught execution exception " + e);
        }   
    }

    private String getJsonObjAsString() {
        JSONObject obj = new JSONObject();
        obj.put("name", "John");
        obj.put("age", new Integer(55));
        obj.put("address", "123 MainSt, Palatine, IL");

        JSONArray list = new JSONArray();
        list.add("msg 1");
        list.add("msg 2");
        list.add("msg 3");

        obj.put("messages", list);

        return obj.toJSONString();
    }

    private void constructAndSendMessage() throws InterruptedException, ExecutionException {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        boolean sync = false;
        String topic = "kafkatopic";
        String key = "mykey";
        String value = "myvalue1 mayvalue2 myvalue3";
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value);
        if (sync) {
            producer.send(producerRecord).get();
        } else {
            producer.send(producerRecord);
        }
        producer.close();
    }
}

The whole project is a available under the following link:

https://github.com/ypant/kafka-json-producer.git

Solution 3

Try to set the appender async, like this: log4j.appender.KAFKA.ProducerType=async

Seems reasonable that it goes in to an infinite loop because the kafka producer has logging in itself..

Share:
23,387
Admin
Author by

Admin

Updated on July 09, 2022

Comments

  • Admin
    Admin almost 2 years

    I am trying to run Kafka-0.8 Log4j appender and I am unable to make it. I want my application to send log directly to kafka via Log4j appender.

    Here is my log4j.properties. I couldn`t find any proper encoder, so I just configure it to use default encoder. (e.g I commented the line.)

    log4j.rootLogger=INFO, stdout, KAFKA
    
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
    
    log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender
    log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout
    log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n
    log4j.appender.KAFKA.BrokerList=hnode01:9092
    log4j.appender.KAFKA.Topic=DKTestEvent
    
    #log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder
    

    And this is my sample application.

    import org.apache.log4j.Logger;
    import org.apache.log4j.BasicConfigurator;
    import org.apache.log4j.PropertyConfigurator;
    
    public class HelloWorld {
    
            static Logger logger = Logger.getLogger(HelloWorld.class.getName());
    
            public static void main(String[] args) {
                PropertyConfigurator.configure(args[0]);
    
                logger.info("Entering application.");
                logger.debug("Debugging!.");
                logger.info("Exiting application.");
            }
    }
    

    I used maven for compiling. I included kafka_2.8.2-0.8.0 and log4j_1.2.17 in my pom.xml

    And I am getting these error:

    INFO [main] (Logging.scala:67) - Verifying properties
    INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092
    INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder
    INFO [main] (HelloWorld.java:14) - Entering application.
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent)
    .
    .
    .
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent)
    INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent)
    .
    .
    .
    INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent)
    ERROR [main] (Logging.scala:67) - 
    ERROR [main] (Logging.scala:67) - 
    ERROR [main] (Logging.scala:67) - 
    ERROR [main] (Logging.scala:67) - 
    ERROR [main] (Logging.scala:67) - 
    ERROR [main] (Logging.scala:67) - 
    java.lang.StackOverflowError
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:643)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:323)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:268)
    at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87)
    at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413)
    at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313)
    at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
    at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
    at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
    at org.apache.log4j.Category.callAppenders(Category.java:206)
    at org.apache.log4j.Category.forcedLog(Category.java:391)
    at org.apache.log4j.Category.error(Category.java:322)
    at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
    at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105)
    at kafka.utils.Utils$.swallow(Utils.scala:189)
    at kafka.utils.Logging$class.swallowError(Logging.scala:105)
    at kafka.utils.Utils$.swallowError(Utils.scala:46)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
    at kafka.producer.Producer.send(Producer.scala:76)
    at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
    at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
    at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
    at org.apache.log4j.Category.callAppenders(Category.java:206)
    at org.apache.log4j.Category.forcedLog(Category.java:391)
    at org.apache.log4j.Category.info(Category.java:666)
    at kafka.utils.Logging$class.info(Logging.scala:67)
    at kafka.client.ClientUtils$.info(ClientUtils.scala:31)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51)
    at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
    at kafka.utils.Utils$.swallow(Utils.scala:187)
    at kafka.utils.Logging$class.swallowError(Logging.scala:105)
    at kafka.utils.Utils$.swallowError(Utils.scala:46)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
    at kafka.producer.Producer.send(Producer.scala:76)
    at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96)
    at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
    at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
    .
    .
    .
    

    I am getting above error continuously if i don`t terminate the program.

    If I miss something, kindly let me know.