Setting and Reading Spring JMS Message Header Properties Example

摘要: In this tutorial we demonstrate how to read and write Spring JMS Message Header properties. We show various ways which you can access header information. We can use the @Header annotation to obtain a single header attribute. The @Headers annotations can inject all headers inside a Map<String, Object>. We can also access header information using MessageHeaders and JmsMessageHeaderAccessor classes.

In this tutorial we demonstrate how to read and write Spring JMS Message Header properties. We show various ways which you can access header information. We can use the @Header annotation to obtain a single header attribute. The @Headers annotations can inject all headers inside a Map<String, Object>. We can also access header information using MessageHeaders and JmsMessageHeaderAccessor classes.

Project Structure

Let’s start by looking at the project structure.

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.integration.jms.activemq</groupId>
    <artifactId>accessing-headers</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>https://memorynotfound.com</url>
    <name>Spring Integration + ActiveMQ - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.7.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Spring Boot ActiveMQ Configuration

Spring Boot can automatically configure a ConnectionFactory when it detects that ActiveMQ is available on the class-path. If the broker is present, an embedded broker is started and configured automatically (as long as no broker URL is specified through configuration). For your convenience, we created and configured an embedded activeMQ server. The application.yml file is located in the src/main/resources/ folder. This configuration file creates and configures an embedded ActiveMQ broker.

spring:

  # Embedded ActiveMQ Configuration
  activemq:
      broker-url: vm://embedded?broker.persistent=false,useShutdownHook=false
      in-memory: true
      non-blocking-redelivery: true
      packages:
        trust-all: false
        trusted: com.memorynotfound
      pool:
        block-if-full: true
        block-if-full-timeout: -1
        create-connection-on-startup: true
        enabled: false
        expiry-timeout: 0
        idle-timeout: 30000
        max-connections: 1
        maximum-active-session-per-connection: 500
        reconnect-on-exception: true
        time-between-expiration-check: -1
        use-anonymous-producers: true

  # Spring JMS Settings
  jms:
    listener:
      acknowledge-mode: auto
      auto-startup: true
      concurrency: 2
      max-concurrency: 2
    pub-sub-domain: false
    template:
      default-destination:
      delivery-mode: non_persistent
      priority: 100
      qos-enabled: true
      receive-timeout: 1000
      time-to-live: 36000

# Logging configuration print only current thread and messages for tutorial purposes
logging:
  pattern:
    console: "%msg%n"
  level:
      - ".=info"
      - "com.memorynotfound=debug"
      - "org.springframework=info"

Spring JMS Configuration

The @EnableJms enables JMS listener annotated endpoints that are created under the cover by JmsListenerContainerFactory. The JmsListenerContainerFactory is responsible to create the listener container responsible for a particular endpoint. The @EnableJms annotation also enables detection of JmsListener annotations on any Spring-managed beans in the container. The MappingJackson2MessageConverter uses Jackson to convert messages to and from JSON.

package com.memorynotfound.integration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@EnableJms
@Configuration
public class ActiveMQConfig {

    public static final String ORDER_QUEUE = "order-queue";

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

}

Order Object

In this example we are sending and receiving objects of type Order to and from a ActiveMQ queue.

package com.memorynotfound.integration;

import java.io.Serializable;
import java.math.BigDecimal;

public class Order implements Serializable {

    private String from;
    private String to;
    private BigDecimal amount;

    public Order() {
    }

    public Order(String from, String to, BigDecimal amount) {
        this.from = from;
        this.to = to;
        this.amount = amount;
    }

    public String getFrom() {
        return from;
    }

    public void setFrom(String from) {
        this.from = from;
    }

    public String getTo() {
        return to;
    }

    public void setTo(String to) {
        this.to = to;
    }

    public BigDecimal getAmount() {
        return amount;
    }

    public void setAmount(BigDecimal amount) {
        this.amount = amount;
    }

    @Override
    public String toString() {
        return "Order{" +
                "from='" + from + '\'' +
                ", to='" + to + '\'' +
                ", amount=" + amount +
                '}';
    }
}

Setting JMS Message Header Properties

Now we have configured the JMS message broker, we can start sending messages to an JMS Queue. We use the JmsTemplate to send JMS messages to the queue. We simply need to pass in a destination and message arguments and the JmsTemplate handles the rest.

We can also pass in an optional MessagePostProcessor. In this MessagePostProcessor we are able to access the core JMS Message object which we can set the default JMS headers or add custom headers to the message.

  • JMSDestination – the destination where the message is sent.
  • JMSReplyTo – the JMS destination where the reply message should be sent.
  • JMSDeliveryMode – the delivery mode of the message. can be one of the following:
    • PERSISTENT – signifies the messages are stored and forwarded
    • NON_PERSISTENT – messages are not stored and may be lost due to failures in transmission.
  • JMSMessageID – the unique ID of the message.
  • JMSTimestamp – the time a message was handed off to a JMS provider to be sent. The time expressed at the amount of time, in milliseconds.
  • JMSExpiration – the expiration time of the message.
  • JMSRedelivered – typically this item is set when the JMS provider has delivered the message at least once before.
  • JMSPriority – the priority of the message. Priority is a value from 0-9. Higher numbers signify a higher priority (that is, 9 is a higher priority than 8).
  • JMSCorrelationID – this ID is used to link a response message with its related request message. This is usually the message ID of a request message when this field is found in a reply message.
  • JMSType – the JMS provider-supplied string to describe the type of the message. Some JMS providers use this property to define messages in the provider’s repository. See the JMS provider documentation for more information about the use of this field.
package com.memorynotfound.integration;

import org.apache.activemq.command.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import javax.jms.DeliveryMode;
import javax.jms.Message;
import java.util.UUID;

import static com.memorynotfound.integration.ActiveMQConfig.ORDER_QUEUE;

@Service
public class OrderSender {

    private static Logger log = LoggerFactory.getLogger(OrderSender.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendQueue(Order order) {
        log.info("sending with convertAndSend() to " + ORDER_QUEUE + " <" + order + ">");
        jmsTemplate.convertAndSend(ORDER_QUEUE, order, m -> {

            log.info("setting standard JMS headers before sending");
            m.setJMSCorrelationID(UUID.randomUUID().toString());
            m.setJMSExpiration(1000);
            m.setJMSMessageID("message-id");
            m.setJMSDestination(new ActiveMQQueue(ORDER_QUEUE));
            m.setJMSReplyTo(new ActiveMQQueue(ORDER_QUEUE));
            m.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
            m.setJMSPriority(Message.DEFAULT_PRIORITY);
            m.setJMSTimestamp(System.nanoTime());
            m.setJMSType("type");

            log.info("setting custom JMS headers before sending");
            m.setStringProperty("jms-custom-header", "this is a custom jms property");
            m.setBooleanProperty("jms-custom-property", true);
            m.setDoubleProperty("jms-custom-property-price", 0.0);

            return m;
        });

    }

}

Accessing JMS Header Information

The @JmsListener annotation marks a method to be the target of a JMS message listener on the specified destination. We can access the JMS Message Headers using one of the following.

  • @Header-annotated method arguments to extract specific header values, including standard JMS headers defined by JmsHeaders.
  • @Headers-annotated method argument that must also be assignable to Map for obtaining access to all headers.
  • MessageHeaders arguments for obtaining access to all headers.
  • MessageHeaderAccessor or JmsMessageHeaderAccessor for convenient access to all method arguments.
package com.memorynotfound.integration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.jms.support.JmsHeaders;
import org.springframework.jms.support.JmsMessageHeaderAccessor;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;

import static com.memorynotfound.integration.ActiveMQConfig.ORDER_QUEUE;

@Component
public class OrderConsumer {

    private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    @JmsListener(destination = ORDER_QUEUE)
    public void receiveMessage(@Payload Order order,
                               @Header(JmsHeaders.CORRELATION_ID) String correlationId,
                               @Header(name = "jms-header-not-exists", defaultValue = "default") String nonExistingHeader,
                               @Headers Map<String, Object> headers,
                               MessageHeaders messageHeaders,
                               JmsMessageHeaderAccessor jmsMessageHeaderAccessor) {

        log.info("received <" + order + ">");

        log.info("\n# Spring JMS accessing single header property");
        log.info("- jms_correlationId=" + correlationId);
        log.info("- jms-header-not-exists=" + nonExistingHeader);

        log.info("\n# Spring JMS retrieving all header properties using Map<String, Object>");
        log.info("- jms-custom-header=" + String.valueOf(headers.get("jms-custom-property")));

        log.info("\n# Spring JMS retrieving all header properties MessageHeaders");
        log.info("- jms-custom-property-price=" + messageHeaders.get("jms-custom-property-price", Double.class));

        log.info("\n# Spring JMS retrieving all header properties JmsMessageHeaderAccessor");
        log.info("- jms_destination=" + jmsMessageHeaderAccessor.getDestination());
        log.info("- jms_priority=" + jmsMessageHeaderAccessor.getPriority());
        log.info("- jms_timestamp=" + jmsMessageHeaderAccessor.getTimestamp());
        log.info("- jms_type=" + jmsMessageHeaderAccessor.getType());
        log.info("- jms_redelivered=" + jmsMessageHeaderAccessor.getRedelivered());
        log.info("- jms_replyTo=" + jmsMessageHeaderAccessor.getReplyTo());
        log.info("- jms_correlationId=" + jmsMessageHeaderAccessor.getCorrelationId());
        log.info("- jms_contentType=" + jmsMessageHeaderAccessor.getContentType());
        log.info("- jms_expiration=" + jmsMessageHeaderAccessor.getExpiration());
        log.info("- jms_messageId=" + jmsMessageHeaderAccessor.getMessageId());
        log.info("- jms_deliveryMode=" + jmsMessageHeaderAccessor.getDeliveryMode() + "\n");

    }

}

Bootstrap Spring Application

We bootstrap the application using Spring Boot. When the application is initialized, we simply send a couple of messages to a JMS queue and print the output to the console.

package com.memorynotfound.integration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.math.BigDecimal;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class Run implements ApplicationRunner {

    private static Logger log = LoggerFactory.getLogger(Run.class);

    @Autowired
    private OrderSender orderSender;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        log.info("Setting and Reading Spring JMS Message Header Properties Example");

        orderSender.sendQueue(new Order("me", "you", new BigDecimal(12)));

        log.info("Waiting for all ActiveMQ JMS Messages to be consumed");
        TimeUnit.SECONDS.sleep(3);
        System.exit(-1);
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Run.class, args);
    }
}

Example Output

The previous application prints the following output to the console.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.7.RELEASE)

...
Connector vm://embedded started
Setting and Reading Spring JMS Message Header Properties Example 
sending with convertAndSend() to order-queue <Order{from='me', to='you', amount=12}>
setting standard JMS headers before sending
setting custom JMS headers before sending
Waiting for all ActiveMQ JMS Messages to be consumed
received <Order{from='me', to='you', amount=12}>

# Spring JMS accessing single header property
- jms_correlationId=0cf5a6a2-02fd-443d-92df-42d50efa06ad
- jms-header-not-exists=default

# Spring JMS retrieving all header properties using Map<String, Object>
- jms-custom-header=true

# Spring JMS retrieving all header properties MessageHeaders
- jms-custom-property-price=0.0

# Spring JMS retrieving all header properties JmsMessageHeaderAccessor
- jms_destination=queue://order-queue
- jms_priority=9
- jms_timestamp=1507703581645
- jms_type=type
- jms_redelivered=false
- jms_replyTo=queue://order-queue
- jms_correlationId=0cf5a6a2-02fd-443d-92df-42d50efa06ad
- jms_contentType=null
- jms_expiration=1507703617645
- jms_messageId=ID:darwin-13.local-57502-1507703581282-4:2:1:1:1
- jms_deliveryMode=1

Connector vm://embedded stopped
...

Download

上一篇: Spring Boot Customize Actuator Info Endpoint Example Configuration
下一篇: Spring JMS Error Handling Configuration Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号