Kafka Java Producer with kerberos

32,864

Solution 1

I don't know what mistake did first time, below things I did again, and it works fine.

First give all access to topic:

bin/kafka-acls.sh --add --allow-principals user:ctadmin --operation ALL --topic marchTesting --authorizer-properties zookeeper.connect={hostname}:2181

create jass file: kafka-jaas.conf

KafkaClient {
 com.sun.security.auth.module.Krb5LoginModule required
 doNotPrompt=true
 useTicketCache=true
 principal="[email protected]"
 useKeyTab=true
 serviceName="kafka"
 keyTab="/etc/security/keytabs/ctadmin.keytab"
 client=true;
};

Java Program:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class KafkaProducer {

    public static void main(String[] args) {
        String topic = args[0];

        Properties props = new Properties();
        props.put("metadata.broker.list", "{Hostname}:6667");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");
        props.put("security.protocol", "PLAINTEXTSASL");

        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);

        for (int i = 0; i < 10; i++){
            producer.send(new KeyedMessage<String, String>(topic, "Test Date: " + new Date()));
        }
    }
}

Run application:

java -Djava.security.auth.login.config=/home/ctadmin/kafka-jaas.conf -Djava.security.krb5.conf=/etc/krb5.conf -Djavax.security.auth.useSubjectCredsOnly=true -cp kafka-testing-0.0.1-jar-with-dependencies.jar com.ct.test.kafka.KafkaProducer

Solution 2

The error is in a semicolon you have in your jaas file as you can see in this piece of output:

Line 6: expected [controlFlag]

This line cannot have the semicolon:

principal="ctadmin/[email protected]";

it can only exist in the last line:

Share:
32,864
Kalpesh
Author by

Kalpesh

Working as a Hadoop Developer. Mostly on Spark Framework with Scala. Having more than 8.5 yrs of experience in IT industry. Currently working in Healthcare domain, also experienced in Retail and digital media domains.

Updated on July 09, 2022

Comments

  • Kalpesh
    Kalpesh almost 2 years

    Getting error while sending message to kafka topic in kerberosed enviornment. We have cluster on hdp 2.3

    I followed this http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

    But for sending messages, I have to do kinit explicitly first, then only I am able to send message to kafka topic. I tried to do knit through java class but that also doesn't work. PFB code:

    package com.ct.test.kafka;
    
    import java.util.Date;
    import java.util.Properties;
    import java.util.Random;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    public class TestProducer {
    
        public static void main(String[] args) {
    
            String principalName = "ctadmin";
            String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
            boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);
    
            if (!authStatus) {
                System.out.println("Authntication fails, try something else  "  + authStatus);
            } else {
                System.out.println("Authntication successfull " + authStatus);
            }
    
            System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
            System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            System.setProperty("sun.security.krb5.debug", "true");
    
            try {
                long events = Long.parseLong("3");
                Random rnd = new Random();
    
                Properties props = new Properties();
                System.out.println("After broker list- " + args[0]);
    
                props.put("metadata.broker.list", args[0]);
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                props.put("request.required.acks", "1");
                props.put("security.protocol", "PLAINTEXTSASL");
    
                //props.put("partitioner.class", "com.ct.test.kafka.SimplePartitioner");
    
    
                System.out.println("After config prop -1");
    
                ProducerConfig config = new ProducerConfig(props);
    
                System.out.println("After config prop -2 config" + config);
    
                Producer<String, String> producer = new Producer<String, String>(config);
    
                System.out.println("After config prop -3");
    
                for (long nEvents = 0L; nEvents < events; nEvents += 1L) {
                    Date runtime = new Date();
                    String ip = "192.168.2" + rnd.nextInt(255);
                    String msg = runtime + " www.example.com, " + ip;
                    KeyedMessage<String, String> data = new KeyedMessage<String, String>("test_march4", ip, msg);
    
                    System.out.println("After config prop -1 data" + data);
    
                    producer.send(data);
                }
                producer.close();
    
            } catch (Throwable th) {
                th.printStackTrace();
    
            }
        }
    }
    

    Pom.xml : All dependency downloaded from hortonworks repo.

            <dependencies>
                <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka_2.10</artifactId>
                    <version>0.9.0.2.3.4.0-3485</version>
                </dependency>
    
                <dependency>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                    <version>0.9.0.2.3.4.0-3485</version>
                </dependency>
    
                <dependency>
                    <groupId>org.jasypt</groupId>
                    <artifactId>jasypt-spring31</artifactId>
                    <version>1.9.2</version>
                    <scope>compile</scope>
                </dependency>
    
                <dependency>
                    <groupId>org.apache.hadoop</groupId>
                    <artifactId>hadoop-common</artifactId>
                    <version>2.7.1.2.3.4.0-3485</version>
                </dependency>
    
            </dependencies>
    

    Error : Case1 : when I specify myuser kafka_jass.conf

    log4j:WARN No appenders could be found for logger (kafka.utils.VerifiableProperties).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    After config prop -2 configkafka.producer.ProducerConfig@643293ae
    java.lang.SecurityException: Configuration Error:
            Line 6: expected [controlFlag]
            at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:110)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
            at java.lang.Class.newInstance(Class.java:379)
            at javax.security.auth.login.Configuration$2.run(Configuration.java:258)
            at javax.security.auth.login.Configuration$2.run(Configuration.java:250)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:249)
            at org.apache.kafka.common.security.kerberos.Login.login(Login.java:291)
            at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
            at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
            at kafka.producer.Producer.<init>(Producer.scala:50)
            at kafka.producer.Producer.<init>(Producer.scala:73)
            at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
            at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
    Caused by: java.io.IOException: Configuration Error:
            Line 6: expected [controlFlag]
            at com.sun.security.auth.login.ConfigFile.match(ConfigFile.java:563)
            at com.sun.security.auth.login.ConfigFile.parseLoginEntry(ConfigFile.java:413)
            at com.sun.security.auth.login.ConfigFile.readConfig(ConfigFile.java:383)
            at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:283)
            at com.sun.security.auth.login.ConfigFile.init(ConfigFile.java:219)
            at com.sun.security.auth.login.ConfigFile.<init>(ConfigFile.java:108)
    

    MyUser_Kafka_jass.conf

    KafkaClient {
       com.sun.security.auth.module.Krb5LoginModule required
       doNotPrompt=true
       useTicketCache=true
       renewTicket=true
       principal="ctadmin/[email protected]";
       useKeyTab=true
       serviceName="kafka"
       keyTab="/etc/security/keytabs/ctadmin.keytab"
       client=true;
    };
    Client {
       com.sun.security.auth.module.Krb5LoginModule required
       useKeyTab=true
       keyTab="/etc/security/keytabs/ctadmin.keytab"
       storeKey=true
       useTicketCache=true
       serviceName="zookeeper"
       principal="ctadmin/[email protected]";
    };
    

    case2 : When I specify Kafkas own jaas file

    Java config name: /etc/krb5.conf
    Loaded from Java config
    javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. Make sure -Djava.security.auth.login.config property passed to JVM and the client is configured to use a ticket cache (using the JAAS configuration setting 'useTicketCache=true)'. Make sure you are using FQDN of the Kafka broker you are trying to connect to. not available to garner  authentication information from the user
            at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)
            at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)
            at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:606)
            at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)
            at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)
            at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)
            at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)
            at java.security.AccessController.doPrivileged(Native Method)
            at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)
            at javax.security.auth.login.LoginContext.login(LoginContext.java:595)
            at org.apache.kafka.common.security.kerberos.Login.login(Login.java:298)
            at org.apache.kafka.common.security.kerberos.Login.<init>(Login.java:104)
            at kafka.common.security.LoginManager$.init(LoginManager.scala:36)
            at kafka.producer.Producer.<init>(Producer.scala:50)
            at kafka.producer.Producer.<init>(Producer.scala:73)
            at kafka.javaapi.producer.Producer.<init>(Producer.scala:26)
            at com.ct.test.kafka.TestProducer.main(TestProducer.java:51)
    

    This works fine, if I do kinit before running this app, else it will through above error. I cant do this in my production environment, if there is any way to do this by our app itself then please help me out. Please let me know if you need any more details.

    Thanks:)