Is it possible to access message headers with Kafka Streams?

13,565

Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).

You can access record metadata via the Processor API (ie, via transform(), transformValues(), or process()), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).

Update

As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor class with process(Record) instead of process(K, V) method. For this case, headers (and record metadata) are accessible via the Record class).

This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process(), KStream#transform() and siblings).

+++++

Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.

Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.

Share:
13,565

Related videos on Youtube

Nathan Myles
Author by

Nathan Myles

Updated on June 12, 2022

Comments

  • Nathan Myles
    Nathan Myles almost 2 years

    With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like map on a KStream it provides arguments of the key and the value of the record but no way I can see to access the headers. It would be nice if we could just map over the ConsumerRecords.

    ex.

    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    KStream<String, String> stream = kStreamBuilder.stream("some-topic");
    stream
        .map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
        ... 
    

    something like this would work:

    KStreamBuilder kStreamBuilder = new KStreamBuilder();
    KStream<String, String> stream = kStreamBuilder.stream("some-topic");
    stream
        .map((record) -> {
            record.headers();
            record.key();
            record.value();
        })
        ...
    
  • miguno
    miguno over 6 years
    To clarify what Matthias said: Yes, the Processor API in Kafka Streams gives you access to record metadata such as topic name, partition number, offset, etc. The DSL in Kafka Streams does not give you access. But because you can combine the Processor API and the DSL, you can still write a DSL-based stream processing application that accesses the record metadata by using the DSL's transform() or transformValues() function, which allows you to pass in a Processor/Transformer from the Processor API.
  • Nathan Myles
    Nathan Myles over 6 years
    Thanks for the information guys, I'll keep an eye out for when the metadata is added to the DSL level so that this answer can be updated.
  • Nathan Myles
    Nathan Myles over 6 years
    @MatthiasJ.Sax and @MichaelG.Noll: in cwiki.apache.org/confluence/display/KAFKA/…, for the RecordContext proposal, it doesn't seem to have the headers exposed. Is that something that will be added?
  • Matthias J. Sax
    Matthias J. Sax over 6 years
    There are no plans to extend {{RecordContext}} via KIP-159 -- when we add header support, it's TDB how this would look like, but I would assume we would add new methods to {{RecordContext}} for this. What the Jira if you are interested in details :)
  • Vassilis
    Vassilis over 5 years
    @MatthiasJ.Sax Still not 100% clear to me: Does this mean that via Streams 1.0.1 there is no way to access the headers of a message neither via DSL nor via Processor API? I am asking as by checking ProcessorContext (kafka.apache.org/10/javadoc/org/apache/kafka/streams/proces‌​sor/…) I cannot locate the headers of the currently processed message.
  • Matthias J. Sax
    Matthias J. Sax over 5 years
    Accessible metadata in 0.11, 1.0 and 1.1 does not include headers. You will need to use Kafka Streams 2.0 to get header access: cwiki.apache.org/confluence/display/KAFKA/… -- I'll update the answer.
  • Matthias J. Sax
    Matthias J. Sax about 5 years
    Using KStream#to(TopicNameExtractor) you get access to the record header, too.
  • Konrad
    Konrad almost 3 years
    What a nice coincidence I saw this just a day after you added info about 2.7.0 upgrade, I took a a chance and tried out api.Processor from 2.8.0 and all looks fine apart from one major issue - it's not compatible with KStream. Or am I missing something?
  • Matthias J. Sax
    Matthias J. Sax almost 3 years
    Are you referring to KStream#process() method? As I tried to point out, the KIP only changed the PAPI, ie., Topology, but the DSL changes are not implemented yet. Cf. issues.apache.org/jira/browse/KAFKA-8410