Spring Kafka Batch Listener Example

摘要: In the following tutorial we demonstrate how to setup a batch listener using Spring Kafka, Spring Boot and Maven. We start by configuring the BatchListener. You can optionally configure a BatchErrorHandler. We also demonstrate how to set the upper limit of batch size messages. When we receive messages we also have the possibility of grabbing header values for individual messages.

In the following tutorial we demonstrate how to setup a batch listener using Spring Kafka, Spring Boot and Maven. We start by configuring the BatchListener. You can optionally configure a BatchErrorHandler. We also demonstrate how to set the upper limit of batch size messages. When we receive messages we also have the possibility of grabbing header values for individual messages.

Project Setup

  • Spring Kafka: 2.1.4.RELEASE
  • Spring Boot: 2.0.0.RELEASE
  • Apache Kafka: kafka_2.11-1.0.0
  • Maven: 3.5

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.spring.kafka</groupId>
    <artifactId>batch-listener</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://memorynotfound.com</url>
    <description>Spring Kafka - Batch Listener Example</description>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Sending Messages to Kafka

In a previous tutorial we saw how to produce and consume messages using Spring Kafka. The Sender and SenderConfig are identical. In the following example we show how to batch receive messages using a BatchListener.

Configuring a Batch Listener

Starting with version 1.1, @KafkaListener methods can be configured to receive the entire batch of consumer records received from the consumer poll. To configure the listener container factory to create batch listeners, set the batchListener property of the ConcurrentKafkaListenerContainerFactory to true.

We can optionally create a BatchErrorHandler by using the ConcurrentKafkaListenerContainerFactory#getContainerProperties().setBatchErrorHandler() and providing your Batch Error Handler.

We can configure Spring Kafka to set an upper limit for the batch size by setting the ConsumerConfig.MAX_POLL_RECORDS_CONFIG to a value that suits you. By default, the number of records received in each batch is dynamically calculated. In the following example we configured the upper limit to 5.

package com.memorynotfound.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.BatchLoggingErrorHandler;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class ListenerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batch");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "5");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.getContainerProperties().setBatchErrorHandler(new BatchLoggingErrorHandler());
        return factory;
    }

}

Batch Receive Kafka Messages using a Batch Listener

Since we are receiving batch messages we need to update the receive() method to accept a List of messages. Alternatively you can receive a List of Message<?> or ConsumerRecord<?,?&glt; objects with each offset, etc in each message, but it must be the only parameter (aside from an optional Acknowledgment when using manual commits) defined on the method.

While receiving batch messages it’s also possible to receive the complementary headers of individual messages. You’ll also need to accept a List of headers you want to get.

package com.memorynotfound.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

import java.util.List;

@Service
public class Listener {

    private static final Logger LOG = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "batch-listener", topics = "${app.topic.batch}")
    public void receive(@Payload List<String> messages,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {

        LOG.info("- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -");
        LOG.info("beginning to consume batch messages");

        for (int i = 0; i < messages.size(); i++) {

            LOG.info("received message='{}' with partition-offset='{}'",
                    messages.get(i), partitions.get(i) + "-" + offsets.get(i));

        }
        LOG.info("all batch messages consumed");
    }

}

Configure Application with application.yml

We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.

spring:
  kafka:
    bootstrap-servers: localhost:9092

app:
  topic:
    foo: foo.t

logging:
  level:
    root: ERROR
    org.springframework.web: ERROR
    com.memorynotfound: DEBUG

Running the Application

Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.

package com.memorynotfound.kafka;

import com.memorynotfound.kafka.producer.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        for (int i = 1; i < 13; i++){
            sender.send("message-" + i);
        }
    }
}

Demo

When we run the application we receive the following output.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
sending message='message-1' to topic='batch.t'
sending message='message-2' to topic='batch.t'
sending message='message-3' to topic='batch.t'
sending message='message-4' to topic='batch.t'
sending message='message-5' to topic='batch.t'
sending message='message-6' to topic='batch.t'
sending message='message-7' to topic='batch.t'
sending message='message-8' to topic='batch.t'
sending message='message-9' to topic='batch.t'
sending message='message-10' to topic='batch.t'
sending message='message-11' to topic='batch.t'
sending message='message-12' to topic='batch.t'
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-1' with partition-offset='0-295'
received message='message-2' with partition-offset='0-296'
received message='message-3' with partition-offset='0-297'
received message='message-4' with partition-offset='0-298'
received message='message-5' with partition-offset='0-299'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-6' with partition-offset='0-300'
received message='message-7' with partition-offset='0-301'
received message='message-8' with partition-offset='0-302'
received message='message-9' with partition-offset='0-303'
received message='message-10' with partition-offset='0-304'
all batch messages consumed
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
beginning to consume batch messages
received message='message-11' with partition-offset='0-305'
received message='message-12' with partition-offset='0-306'
all batch messages consumed

Download

上一篇: Spring Kafka and Spring Boot Configuration Example
下一篇: Spring Kafka JSON Serializer and Deserializer Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号