Spring Kafka Adding Custom Header to Kafka Message Example

摘要: In this tutorial we demonstrate how to add/read custom headers to/from a Kafka Message using Spring Kafka. We start by adding headers using either Message<?> or ProducerRecord<String, String>. Followed by reading the values inside the KafkaListener using @Header annotation and MessageHeaders class.

In this tutorial we demonstrate how to add/read custom headers to/from a Kafka Message using Spring Kafka. We start by adding headers using either Message<?> or ProducerRecord<String, String>. Followed by reading the values inside the KafkaListener using @Header annotation and MessageHeaders class.

Project Setup

  • Spring Kafka: 2.1.4.RELEASE
  • Spring Boot: 2.0.0.RELEASE
  • Apache Kafka: kafka_2.11-1.0.0
  • Maven: 3.5

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path. Important: We need to include the com.fasterxml.jackson.core:jackson-databind dependency for working with rich header values.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.spring.kafka</groupId>
    <artifactId>message-headers</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://memorynotfound.com</url>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
        <spring-boot.version>2.0.0.RELEASE</spring-boot.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- for serializing/deserializing complex headers -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

The 0.11.0.0 Apache Kafka client introduced support for headers in messages. Spring for Apache Kafka supports mapping these headers to/from MessageHeaders since version 2.0.

Sending Custom Headers with Spring Kafka

Let’s start by adding custom header values to a Kafka Message. We can add headers to a Kafka message using either Message<?> or ProducerRecord<String, String> like shown in the following code.

package com.memorynotfound.kafka.producer;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topicFoo;

    @Value("${app.topic.bar}")
    private String topicBar;

    public void sendFoo(String data){

       Message<String> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, topicFoo)
                .setHeader(KafkaHeaders.MESSAGE_KEY, "999")
                .setHeader(KafkaHeaders.PARTITION_ID, 0)
                .setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
                .build();

        LOG.info("sending message='{}' to topic='{}'", data, topicFoo);
        kafkaTemplate.send(message);
    }

    public void sendBar(String data){

        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka".getBytes()));

        ProducerRecord<String, String> bar = new ProducerRecord<>(topicBar, 0, "111", data, headers);
        LOG.info("sending message='{}' to topic='{}'", data, topicBar);

        kafkaTemplate.send(bar);
    }
}

We configure the KafkaTemplate inside the SenderConfig class. For simplicity we used a StringSerializer for both key and value fields.

package com.memorynotfound.kafka.producer;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class SenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Reading Custom Header Values with Spring Kafka

Previously we saw how to send custom header values. Now we are going to read those values. We have a couple of options. We can inject each header individually using the @Header annotation. Or we can inject the MessageHeaders which you can use to iterate over each header. You can use whichever you find more suitable.

package com.memorynotfound.kafka.consumer;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void receive(@Payload String data,
                        @Header(KafkaHeaders.OFFSET) Long offset,
                        @Header(KafkaHeaders.CONSUMER) KafkaConsumer<String, String> consumer,
                        @Header(KafkaHeaders.TIMESTAMP_TYPE) String timestampType,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partitionId,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey,
                        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) Long timestamp,
                        @Header("X-Custom-Header") String customHeader) {

        LOG.info("- - - - - - - - - - - - - - -");
        LOG.info("received message='{}'", data);
        LOG.info("consumer: {}", consumer);
        LOG.info("topic: {}", topic);
        LOG.info("message key: {}", messageKey);
        LOG.info("partition id: {}", partitionId);
        LOG.info("offset: {}", offset);
        LOG.info("timestamp type: {}", timestampType);
        LOG.info("timestamp: {}", timestamp);
        LOG.info("custom header: {}", customHeader);
    }

    @KafkaListener(topics = "${app.topic.bar}")
    public void receive(@Payload String data,
                        @Headers MessageHeaders messageHeaders) {

        LOG.info("- - - - - - - - - - - - - - -");
        LOG.info("received message='{}'", data);
        messageHeaders.keySet().forEach(key -> {
            Object value = messageHeaders.get(key);
            if (key.equals("X-Custom-Header")){
                LOG.info("{}: {}", key, new String((byte[])value));
            } else {
                LOG.info("{}: {}", key, value);
            }
        });

    }
}

The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name. To support rich content i’ll automatically convert objects to JSON. You can optionally initialize the DefaultKafkaHeaderMapper using your own ObjectMapper and patterns.

package com.memorynotfound.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.support.DefaultKafkaHeaderMapper;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public DefaultKafkaHeaderMapper headerMapper(){
        return new DefaultKafkaHeaderMapper();
    }

}

Configure with application.yml

We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t
    bar: bar.t

logging:
  level:
    root: WARN
    org.springframework.web: INFO
    com.memorynotfound: DEBUG

Running with Spring Boot

Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.

package com.memorynotfound.kafka;

import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        String data = "Spring Kafka Custom Header Example";
        sender.sendFoo(data);
        sender.sendBar(data);
    }
}

Output

When we run the application we receive the following output.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
No active profile set, falling back to default profiles: default
sending message='Spring Kafka Custom Header Example' to topic='foo.t'
sending message='Spring Kafka Custom Header Example' to topic='bar.t'
- - - - - - - - - - - - - - -
received message='Spring Kafka Custom Header Example'
X-Custom-Header: Sending Custom Header with Spring Kafka
kafka_offset: 49
kafka_consumer: [email protected]
kafka_timestampType: CREATE_TIME
kafka_receivedMessageKey: 111
kafka_receivedPartitionId: 0
kafka_receivedTopic: bar.t
kafka_receivedTimestamp: 1520322866725
- - - - - - - - - - - - - - -
received message='Spring Kafka Custom Header Example'
consumer: [email protected]
topic: foo.t
message key: 999
partition id: 0
offset: 137
timestamp type: CREATE_TIME
timestamp: 1520322866701
custom header: Sending Custom Header with Spring Kafka

Download

上一篇: Spring Kafka JSON Serializer and Deserializer Example
下一篇: Spring Kafka Consumer and Producer Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号