Spring Kafka Forwarding Listener Results using @SendTo

摘要: This tutorial demonstrates how to forward listener results using the @SendTo annotation using Spring Kafka, Spring Boot and Maven. We can use static typed topics, runtime expressions or application initialization expressions. Take a look at the following example.

This tutorial demonstrates how to forward listener results using the @SendTo annotation using Spring Kafka, Spring Boot and Maven. We can use static typed topics, runtime expressions or application initialization expressions. Take a look at the following example.

Project Setup

  • Spring Kafka: 2.1.4.RELEASE
  • Spring Boot: 2.0.0.RELEASE
  • Apache Kafka: kafka_2.11-1.0.0
  • Maven: 3.5

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.spring.kafka</groupId>
    <artifactId>send-to</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>http://memorynotfound.com</url>
    <description>Spring Kafka - forwarding listener results using @SendTo</description>
    <name>Spring Kafka - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.0.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring-kafka.version>2.1.4.RELEASE</spring-kafka.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>${spring-kafka.version}</version>
        </dependency>

        <!-- testing -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>${spring-kafka.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Sending Messages to Kafka Topic

We use the Constants object as a placeholder for the Kafka topics. This way we can easily use constants to specify which topic we are sending to.

package com.memorynotfound.kafka;

public class Constants {

    public static final String FOO_TOPIC = "foo.t";
    public static final String BAR_TOPIC = "bar.t";
    
}

Spring Boot auto-configures the KafkaTemplate using properties from the application.yml property file. We use it to send a message to a Kafka topic.

package com.memorynotfound.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import static com.memorynotfound.kafka.Constants.FOO_TOPIC;

@Service
public class Sender {

    private static final Logger LOG = LoggerFactory.getLogger(Sender.class);

    @Autowired
    private KafkaTemplate<String, Double> kafkaTemplate;

    public void send(Double data){
        LOG.info("sending data='{}' to topic='{}'", data, FOO_TOPIC);
        kafkaTemplate.send(FOO_TOPIC, data);
    }
}

Forwarding Listener Results using @SendTo

Starting from version 2.0, if you also annotate a @KafkaListener with a @SendTo annotation and the method invocation returns a result, the result will be forwarded to the topic specified by the @SendTo annotation.

The @SendTo value argument can accept several forms:

  • @SendTo("someTopic") specifies a static topic to rout to.
  • @SendTo("#{someExpression}") specifies an application initialization expression. This expression is evaluated once during application context initialization and’ll be forwarded to the result.
  • @SendTo("!{someExpression}") specifies a runtime expression. This expression is evaluated at runtime. The #root object for the evaluation has 3 properties:
    • request – the inbound ConsumerRecord (or ConsumerRecords object for a batch listener)
    • source – the Message<?> converted from the request.
    • result – the method return result.
  • @SendTo() (no properties) is treaded as !{source.headers["kafka_replyTopic']} (since version 2.1.3).
The result of the expression evaluation must be a String representing the topic name.

package com.memorynotfound.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Service;

import static com.memorynotfound.kafka.Constants.BAR_TOPIC;
import static com.memorynotfound.kafka.Constants.FOO_TOPIC;

@Service
public class Receiver {

    private static final Logger LOG = LoggerFactory.getLogger(Receiver.class);

    @SendTo(BAR_TOPIC)
    @KafkaListener(topics = FOO_TOPIC)
    public Double calculate(Double data) {
        LOG.info("calculating square root from='{}'", data);
        return Math.sqrt(data);
    }

    @KafkaListener(topics = BAR_TOPIC)
    public void result(Double data) {
        LOG.info("received square root='{}'", data);
    }

}

Configure Application with application.yml

Spring Boot tries to automatically configure your application with sensible defaults based on the specified dependencies inside your pom.xml file. We haven’t configured any Consumer, Producer or KafkaTemplate beans, spring boot will auto-configure them using spring boot default values. These values can be overridden using the application.yml property file. You can find more information about Spring Boot Kafka Properties.

We also create a application.yml properties file which is located in the src/main/resources folder. These properties are injected in the configuration classes by spring boot.

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer
      value-deserializer: org.apache.kafka.common.serialization.DoubleDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.DoubleSerializer
      value-serializer: org.apache.kafka.common.serialization.DoubleSerializer

logging:
  level:
    root: WARN
    org.springframework.web: INFO
    com.memorynotfound: DEBUG

Running the Application

Finally, we wrote a simple Spring Boot application to demonstrate the application. In order for this demo to work, we need a Kafka Server running on localhost on port 9092, which is the default configuration of Kafka.

package com.memorynotfound.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaApplication.class, args);
    }

    @Autowired
    private Sender sender;

    @Override
    public void run(String... strings) throws Exception {
        sender.send(123.123);
    }
}

Demo

When we run the application we receive the following output.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::       (v2.0.0.RELEASE)

Running with Spring Boot v2.0.0.RELEASE, Spring v5.0.4.RELEASE
Started SpringKafkaApplication in 2.334 seconds (JVM running for 3.186)
Started SpringKafkaApplication in 3.272 seconds (JVM running for 4.534)
sending data='123.123' to topic='foo.t'
calculating square root from='123.123'
received square root='11.096080389038285'

Download

Download it – [email protected]

上一篇: Spring Boot + Spring Security + Thymeleaf Form Login Example
下一篇: Spring Kafka and Spring Boot Configuration Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号