Spring Kafka JSON Serializer and Deserializer Example

摘要: The following tutorial demonstrates how to send and receive a Java Object as a JSON byte[] to and from Apache Kafka using Spring Kafka, Spring Boot and Maven. We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterwards we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.

The following tutorial demonstrates how to send and receive a Java Object as a JSON byte[] to and from Apache Kafka using Spring Kafka, Spring Boot and Maven. We’ll send a Java Object as JSON byte[] to a Kafka Topic using a JsonSerializer. Afterwards we’ll configure how to receive a JSON byte[] and automatically convert it to a Java Object using a JsonDeserializer.

Project Setup

  • Spring Kafka: 2.1.4.RELEASE
  • Spring Boot: 2.0.0.RELEASE
  • Apache Kafka: kafka_2.11-1.0.0
  • Maven: 3.5

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path. Since we are working with JSON, we need to include the Jackson JSON library com.fasterxml.jackson.core:ackson-databind.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.spring.kafka</groupId>
    <artifactId>message-conversion-json</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://memorynotfound.com/spring-kafka-json-serializer-deserializer-example</url>
    <description>Spring Kafka - JSON Serializer Deserializer Example</description>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- json support -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Simple POJO to Serialize/Deserialize

In this example we’ll send and receive a Foo object to and from a Kafka topic.

package com.memorynotfound.kafka;

public class Foo {

    private String name;
    private String description;

    public Foo() {
    }

    public Foo(String name, String description) {
        this.name = name;
        this.description = description;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getDescription() {
        return description;
    }

    public void setDescription(String description) {
        this.description = description;
    }

    @Override
    public String toString() {
        return "Foo{" +
                "name='" + name + '\'' +
                ", description='" + description + '\'' +
                '}';
    }
}

Apache Kafka stores and transports bye[]. There are a number of built in serializers and deserializers but it doesn’t include any for JSON. Spring Kafka created a JsonSerializer and JsonDeserializer which we can use to convert Java Objects to and from JSON.

Producing JSON messages with Spring Kafka

Let’s start by sending a Foo object to a Kafka Topic. Notice: we created a KafkaTemplate<String, Foo> since we are sending Java Objects to the Kafka topic that’ll automatically be transformed in a JSON byte[]. In this example we created a Message<Foo> using the MessageBuilder. It’s important to add the topic where we are going to send the message to.

package com.memorynotfound.kafka.producer;

import com.memorynotfound.kafka.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class FooSender {

    private static final Logger LOG = LoggerFactory.getLogger(FooSender.class);

    @Autowired
    private KafkaTemplate<String, Foo> kafkaTemplate;

    @Value("${app.topic.example}")
    private String topic;

    public void send(Foo data){
        LOG.info("sending data='{}' to topic='{}'", data, topic);

        Message<Foo> message = MessageBuilder
                .withPayload(data)
                .setHeader(KafkaHeaders.TOPIC, topic)
                .build();
        
        kafkaTemplate.send(message);
    }
}

Starting with version 2.1, type information can be conveyed in record Headers, allowing the handling of multiple types. In addition, the serializer/deserializer can be configured using Kafka Properties.

  • JsonSerializer.ADD_TYPE_INFO_HEADERS (default true); set to false to disable this feature.
  • JsonSerializer.DEFAULT_KEY_TYPE; fallback type for deserialization of keys if no header information is present.
  • JsonSerializer.DEFAULT_VALUE_TYPE; fallback type for deserialization of values if no header information is present.
  • JsonSerializer.TRUSTED_PACKAGES (default java.util, java.lang); comma-delimited list of packages patterns allowed for deserialization; * means deserialize all.

We need to configure the correct Serializer to support JSON types. We can register this by setting the ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to the JsonSerializer class. Finally, we need to set the correct value type for our ProducerFactory and KafkaTemplate to the Foo object.

package com.memorynotfound.kafka.producer;

import com.memorynotfound.kafka.Foo;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class FooSenderConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, Foo> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Foo> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

Consuming JSON Messages with Spring Kafka

Next, we’ll look at how we can receive JSON messages. In the FooListener we simply need to add the Foo Java Object as a parameter in our method.

package com.memorynotfound.kafka.consumer;

import com.memorynotfound.kafka.Foo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class FooListener {

    private static final Logger LOG = LoggerFactory.getLogger(FooListener.class);

    @KafkaListener(topics = "${app.topic.example}")
    public void receive(@Payload Foo data,
                        @Headers MessageHeaders headers) {
        LOG.info("received data='{}'", data);

        headers.keySet().forEach(key -> {
            LOG.info("{}: {}", key, headers.get(key));
        });
    }

}

The FooListenerConfig is a bit more complex. First we need to add the appropriate Deserializer which can convert JSON byte[] into a Java Object. To do this, we need to set the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG with the JsonDeserializer class. Next we need to create a ConsumerFactory and pass the consumer configuration, the key deserializer and the typed JsonDeserializer<>(Foo.class). Finally, we need to make sure the ConsumerFactory and the ConcurrentKafkaListenerContainerFactory all have the correct value type of Foo.

package com.memorynotfound.kafka.consumer;

import com.memorynotfound.kafka.Foo;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class FooListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "json");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public ConsumerFactory<String, Foo> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                consumerConfigs(),
                new StringDeserializer(),
                new JsonDeserializer<>(Foo.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Foo> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Foo> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

Configure with application.yml

We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    example: example.t

logging:
  level:
    root: WARN
    org.springframework.web: INFO
    com.memorynotfound: DEBUG

Running with Spring Boot

Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.

package com.memorynotfound.kafka;

import com.memorynotfound.kafka.producer.FooSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private FooSender sender;

    @Override
    public void run(String... strings) throws Exception {
        Foo foo = new Foo("Spring Kafka", "sending and receiving JSON messages");
        sender.send(foo);
    }
}

Output

When we run the application we receive the following output.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
No active profile set, falling back to default profiles: default
sending data='Foo{name='Spring Kafka', description='sending and receiving JSON messages'}' to topic='example.t'
received data='Foo{name='Spring Kafka', description='sending and receiving JSON messages'}'
kafka_offset: 18
kafka_consumer: org.apac[email protected]
kafka_timestampType: CREATE_TIME
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: example.t
kafka_receivedTimestamp: 1520332684097
__TypeId__: [99, 111, 109, 46, 109, 101, 109, 111, 114, 121, 110, 111, 116, 102, 111, 117, 110, 100, 46, 107, 97, 102, 107, 97, 46, 70, 111, 111]

Download

上一篇: Spring Kafka Batch Listener Example
下一篇: Spring Kafka Adding Custom Header to Kafka Message Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号