Spring Kafka and Spring Boot Configuration Example

摘要: In the following tutorial we demonstrate how to configure Spring Kafka with Spring Boot. Spring Boot uses sensible default to configure Spring Kafka. We can override these defaults using the application.yml property file.

In the following tutorial we demonstrate how to configure Spring Kafka with Spring Boot. Spring Boot uses sensible default to configure Spring Kafka. We can override these defaults using the application.yml property file.

Project Setup

  • Spring Kafka: 2.1.4.RELEASE
  • Spring Boot: 2.0.0.RELEASE
  • Apache Kafka: kafka_2.11-1.0.0
  • Maven: 3.5

Previously we saw how to create a spring kafka consumer and producer which manually configures the Producer and Consumer. In this example we’ll use Spring Boot to automatically configure them for us using sensible defaults.

Download and Install Apache Kafka

To download and install Apache Kafka, please read the official documentation here. This tutorial assumes that server is started using the default configuration and no server ports are changed.

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.spring.kafka</groupId>
    <artifactId>spring-boot</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://memorynotfound.com</url>
    <description>Spring Kafka Spring Boot Configuration Example</description>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Sending Spring Kafka Messages with Spring Boot

Spring Boot automatically configures and initializes a KafkaTemplate based on the properties configured in the application.yml property file. By using the @Service annotation we make the Sender class eligible for the spring container to do auto discovery.

package com.memorynotfound.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${app.topic.foo}")
    private String topic;

    public void send(String message){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

Receiving Kafka Messages with Spring Boot

The ConcurrentKafkaListenerContainerFactory and KafkaMessageListenerContainer beans are also automatically configured by Spring Boot. You can optionally configure these beans using the application.yml property file.

By annotating a method with @KafkaListener annotation Spring Kafka will automatically create a message listener container.

package com.memorynotfound.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @KafkaListener(topics = "${app.topic.foo}")
    public void receive(@Payload String message,
                        @Headers MessageHeaders headers) {
        LOG.info("received message='{}'", message);
        headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key)));
    }

}

Configure Application with application.yml

Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.xml file. We haven’t configured any Consumer, Producer or KafkaTemplate beans, spring boot will auto-configure them using spring boot default values. These values can be overridden using the application.yml property file. You can find more information about Spring Boot Kafka Properties.

We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

app:
  topic:
    foo: foo.t

logging:
  level:
    root: WARN
    org.springframework.web: INFO
    com.memorynotfound: DEBUG

Running the Application

Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.

package com.memorynotfound.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send("Spring Kafka and Spring Boot Configuration Example");
    }
}

Demo

When we run the application we receive the following output.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
Started SpringKafkaApplication in 2.334 seconds (JVM running for 3.186)
sending message='Spring Kafka and Spring Boot Configuration Example' to topic='foo.t'
received message='Spring Kafka and Spring Boot Configuration Example'
kafka_offset: 144
kafka_nativeHeaders: RecordHeaders(headers = [], isReadOnly = false)
kafka_consumer: [email protected]
kafka_timestampType: CREATE_TIME
kafka_receivedMessageKey: null
kafka_receivedPartitionId: 0
kafka_receivedTopic: foo.t
kafka_receivedTimestamp: 1520508611795

Download

上一篇: Spring Kafka Forwarding Listener Results using @SendTo
下一篇: Spring Kafka Batch Listener Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号