Is it possible to access message headers with Kafka Streams?
Records headers are accessible since versions 2.0.0 (cf. KIP-244 for details).
You can access record metadata via the Processor API (ie, via transform()
, transformValues()
, or process()
), by the given "context" object (cf. https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-processor-context).
Update
As of 2.7.0 release, the Processor API was improved (cf. KIP-478), adding a new type-safe api.Processor
class with process(Record)
instead of process(K, V)
method. For this case, headers (and record metadata) are accessible via the Record
class).
This new feature is not yet available in "PAPI method of the DSL though (eg. KStream#process()
, KStream#transform()
and siblings).
+++++
Prior to 2.0, the context only exposes topic, partition, offset, and timestamp---but not headers that are in fact dropped by Streams on read in those older versions.
Metadata is not available at DSL level though. However, there is also work in progress to extend the DSL via KIP-159.
Related videos on Youtube
Nathan Myles
Updated on June 12, 2022Comments
-
Nathan Myles almost 2 years
With the addition of Headers to the records (ProducerRecord & ConsumerRecord) in Kafka 0.11, is it possible to get these headers when processing a topic with Kafka Streams? When calling methods like
map
on aKStream
it provides arguments of thekey
and thevalue
of the record but no way I can see to access theheaders
. It would be nice if we could justmap
over theConsumerRecord
s.ex.
KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> stream = kStreamBuilder.stream("some-topic"); stream .map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc? ...
something like this would work:
KStreamBuilder kStreamBuilder = new KStreamBuilder(); KStream<String, String> stream = kStreamBuilder.stream("some-topic"); stream .map((record) -> { record.headers(); record.key(); record.value(); }) ...
-
miguno over 6 yearsTo clarify what Matthias said: Yes, the Processor API in Kafka Streams gives you access to record metadata such as topic name, partition number, offset, etc. The DSL in Kafka Streams does not give you access. But because you can combine the Processor API and the DSL, you can still write a DSL-based stream processing application that accesses the record metadata by using the DSL's
transform()
ortransformValues()
function, which allows you to pass in a Processor/Transformer from the Processor API. -
Nathan Myles over 6 yearsThanks for the information guys, I'll keep an eye out for when the metadata is added to the DSL level so that this answer can be updated.
-
Nathan Myles over 6 years@MatthiasJ.Sax and @MichaelG.Noll: in cwiki.apache.org/confluence/display/KAFKA/…, for the
RecordContext
proposal, it doesn't seem to have the headers exposed. Is that something that will be added? -
Matthias J. Sax over 6 yearsThere are no plans to extend {{RecordContext}} via KIP-159 -- when we add header support, it's TDB how this would look like, but I would assume we would add new methods to {{RecordContext}} for this. What the Jira if you are interested in details :)
-
Vassilis over 5 years@MatthiasJ.Sax Still not 100% clear to me: Does this mean that via Streams 1.0.1 there is no way to access the headers of a message neither via DSL nor via Processor API? I am asking as by checking ProcessorContext (kafka.apache.org/10/javadoc/org/apache/kafka/streams/processor/…) I cannot locate the headers of the currently processed message.
-
Matthias J. Sax over 5 yearsAccessible metadata in
0.11
,1.0
and1.1
does not include headers. You will need to use Kafka Streams2.0
to get header access: cwiki.apache.org/confluence/display/KAFKA/… -- I'll update the answer. -
Matthias J. Sax about 5 yearsUsing
KStream#to(TopicNameExtractor)
you get access to the record header, too. -
Konrad almost 3 yearsWhat a nice coincidence I saw this just a day after you added info about 2.7.0 upgrade, I took a a chance and tried out api.Processor from 2.8.0 and all looks fine apart from one major issue - it's not compatible with KStream. Or am I missing something?
-
Matthias J. Sax almost 3 yearsAre you referring to
KStream#process()
method? As I tried to point out, the KIP only changed the PAPI, ie.,Topology
, but the DSL changes are not implemented yet. Cf. issues.apache.org/jira/browse/KAFKA-8410