Spring Boot Embedded ActiveMQ Configuration Example

摘要: ActiveMQ is the most popular and powerful open source messaging and integration pattern server. In this tutorial we demonstrate how to configure an Embedded ActiveMQ server with Spring Boot using either Java -or XML Configuration.

ActiveMQ is the most popular and powerful open source messaging and integration pattern server. In this tutorial we demonstrate how to configure an Embedded ActiveMQ server with Spring Boot using either Java -or XML Configuration.

Project Structure

Let’s start by looking at the project structure.

Maven Dependencies

We use Apache Maven to manage our project dependencies. Make sure the following dependencies reside on the class-path.

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                             http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.memorynotfound.integration.jms.activemq</groupId>
    <artifactId>embedded-activemq</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <url>https://memorynotfound.com</url>
    <name>Spring Integration + ActiveMQ - ${project.artifactId}</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.7.RELEASE</version>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-broker</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

Order Object

In this example we are producing and consuming objects of type Order to and from a ActiveMQ queue. The Order object can have any any attributes. For simplicity we only have a few.

package com.memorynotfound.integration;

import java.io.Serializable;
import java.util.Date;

public class Order implements Serializable {

    private String content;
    private Date timestamp;

    public Order() {
    }

    public Order(String content, Date timestamp) {
        this.content = content;
        this.timestamp = timestamp;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    public Date getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(Date timestamp) {
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Order{" +
                "content='" + content + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}

Spring Boot Embedded ActiveMQ Configuration

Spring Boot can automatically configure a ConnectionFactory when it detects that ActiveMQ is available on the class-path. If the broker is present, an embedded broker is started and configured automatically (as long as no broker URL is specified through configuration).

You can configure spring using the application.yml file or by using an application.properties file. We prefer the first.

application.yml

The application.yml file is located in the src/main/resources/ folder. This configuration file creates and configures an embedded ActiveMQ broker.

spring:

  # Embedded ActiveMQ Configuration Example
  activemq:
      broker-url: vm://embedded?broker.persistent=false,useShutdownHook=false
      in-memory: true
      non-blocking-redelivery: true
      packages:
        trust-all: false
        trusted: com.memorynotfound
      pool:
        block-if-full: true
        block-if-full-timeout: -1
        create-connection-on-startup: true
        enabled: false
        expiry-timeout: 0
        idle-timeout: 30000
        max-connections: 1
        maximum-active-session-per-connection: 500
        reconnect-on-exception: true
        time-between-expiration-check: -1
        use-anonymous-producers: true

  # Spring JMS Settings
  jms:
    listener:
      acknowledge-mode: auto
      auto-startup: true
      concurrency: 5
      max-concurrency: 10
    pub-sub-domain: false
    template:
      default-destination:
      delivery-mode: non_persistent
      priority: 100
      qos-enabled: true
      receive-timeout: 1000
      time-to-live: 36000

# Logging configuration print only messages for tutorial purposes
logging:
  pattern:
    console: "%msg%n"
  level:
      - ".=info"
      - "com.memorynotfound=debug"
      - "org.springframework=info"
application.properties

The application.properties file is located in the src/main/resources/ folder. This configuration file creates and configures an embedded ActiveMQ broker.

# Embedded ActiveMQ Configuration Example
spring.activemq.broker-url=vm://embedded?broker.persistent=false,useShutdownHook=false
spring.activemq.close-timeout=15000
spring.activemq.in-memory=true
spring.activemq.non-blocking-redelivery=false
spring.activemq.password=admin
spring.activemq.user=admin
spring.activemq.send-timeout=0
spring.activemq.packages.trust-all=false
spring.activemq.packages.trusted=com.memorynotfound
spring.activemq.pool.block-if-full=true
spring.activemq.pool.block-if-full-timeout=-1
spring.activemq.pool.create-connection-on-startup=true
spring.activemq.pool.enabled=false
spring.activemq.pool.expiry-timeout=0
spring.activemq.pool.idle-timeout=30000
spring.activemq.pool.max-connections=1
spring.activemq.pool.maximum-active-session-per-connection=500
spring.activemq.pool.reconnect-on-exception=true
spring.activemq.pool.time-between-expiration-check=-1
spring.activemq.pool.use-anonymous-producers=true

# Spring JMS Settings
spring.jms.jndi-name=
spring.jms.listener.acknowledge-mode=auto
spring.jms.listener.auto-startup=true
spring.jms.listener.concurrency=5
spring.jms.listener.max-concurrency=10
spring.jms.pub-sub-domain=false
spring.jms.template.default-destination=empty
spring.jms.template.delivery-mode=non_persistent
spring.jms.template.priority=100
spring.jms.template.qos-enabled=true
spring.jms.template.receive-timeout=1000
spring.jms.template.time-to-live=36000
        
# Logging configuration print only messages for tutorial purposes
logging.pattern.console=%msg%n
logging.level.=info
logging.level.com.memorynotfound=debug
logging.level.org.springframework=info

Spring ActiveMQ Configuration

The @EnableJms enables JMS listener annotated endpoints that are created under the cover by JmsListenerContainerFactory. The JmsListenerContainerFactory is responsible to create the listener container responsible for a particular endpoint. The @EnableJms annotation also enables detection of JmsListener annotations on any Spring-managed beans in the container. The MappingJackson2MessageConverter uses Jackson to convert messages to and from JSON. We create a DefaultJmsListenerContainerFactory and assign the previously created MessageConverter.

Java Configuration
package com.memorynotfound.integration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@EnableJms
@Configuration
public class ActiveMQConfig {

    public static final String ORDER_QUEUE = "order-queue";

    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setMessageConverter(messageConverter());
        return factory;
    }

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

}
XML Configuration

If you cant to configure an embedded ActiveMQ server using Spring XML Configuration, you can use the following.

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:jms="http://www.springframework.org/schema/jms"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
	                       http://www.springframework.org/schema/beans/spring-beans.xsd
	                       http://www.springframework.org/schema/jms
	                       http://www.springframework.org/schema/jms/spring-jms.xsd
                           http://www.springframework.org/schema/context
                           http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- applciation component scan -->
    <context:component-scan base-package="com.memorynotfound" />

    <!-- enable the configuration of jms on annotations -->
    <jms:annotation-driven />
    
    <!-- configure listener container -->
    <jms:listener-container message-converter="messageConverter" />
    
    <!-- jackson message converter -->
    <bean id="messageConverter" class="org.springframework.jms.support.converter.MappingJackson2MessageConverter">
        <property name="targetType" value="TEXT"/>
        <property name="typeIdPropertyName" value="_type"/>
    </bean>

</beans>

Sending Messages to JMS Queue

Now we have configured the Embedded ActiveMQ message broker, we can start sending/producing messages to an ActiveMQ Queue. We use the JmsTemplate to send/publish JMS messages on the queue. We simply need to pass in a destination and message arguments and the JmsTemplate handles the rest.

package com.memorynotfound.integration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;

import static com.memorynotfound.integration.ActiveMQConfig.ORDER_QUEUE;

@Service
public class OrderSender {

    private static Logger log = LoggerFactory.getLogger(OrderSender.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    public void send(Order myMessage) {
        log.info("sending with convertAndSend() to queue <" + myMessage + ">");
        jmsTemplate.convertAndSend(ORDER_QUEUE, myMessage);
    }
}

Consuming Messages from JMS Queue

The @JmsListener annotation marks a method to be the target of a JMS message listener on the specified destination. Annotated JMS listener methods are allowed to have flexible signatures.

  • javax.jms.Session to get access to the JMS session.
  • javax.jms.Message or one of its subclasses to get access to the raw JMS message.
  • org.springframework.messaging.Message to use Spring’s messaging abstraction counterpart.
  • org.springframework.messaging.handler.annotation.Payload @Payload-annotated method arguments, including support for validation.
  • org.springframework.messaging.handler.annotation.Header @Header – annotated method arguments to extract specific header values, including standard JMS headers defined by org.springframework.jms.support.JmsHeaders.
  • org.springframework.messaging.handler.annotation.Headers @Headers – annotated method argument that must also be assignable to java.util.Map for obtaining access to all headers.
  • org.springframework.messaging.MessageHeaders arguments for obtaining access to all headers.
  • org.springframework.messaging.support.MessageHeaderAccessor or org.springframework.jms.support.JmsMessageHeaderAccessor for convenient access to all method arguments.

Annotated methods may have a non-void return type.When they do, the result of the method invocation is sent as a JMS reply to the destination defined by the JMSReplyTo header of the incoming message. If this header is not set, a default destination can be provided by adding org.springframework.messaging.handler.annotation.SendTo @SendTo to the method declared.

package com.memorynotfound.integration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import javax.jms.Session;

import static com.memorynotfound.integration.ActiveMQConfig.ORDER_QUEUE;

@Component
public class OrderConsumer {

    private static Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    @JmsListener(destination = ORDER_QUEUE)
    public void receiveMessage(@Payload Order order,
                               @Headers MessageHeaders headers,
                               Message message, Session session) {
        log.info("received <" + order + ">");

        log.info("- - - - - - - - - - - - - - - - - - - - - - - -");
        log.info("######          Message Details           #####");
        log.info("- - - - - - - - - - - - - - - - - - - - - - - -");
        log.info("headers: " + headers);
        log.info("message: " + message);
        log.info("session: " + session);
        log.info("- - - - - - - - - - - - - - - - - - - - - - - -");
    }

}

Bootstrap Spring Application

We bootstrap the application using Spring Boot. When the application is initialized, we simply send a couple of messages to a JMS queue and print the output to the console.

package com.memorynotfound.integration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.Date;
import java.util.concurrent.TimeUnit;

// Enable if you want to configure an embedded activeMQ server using Spring XML Configuration
// @Configuration
// @ImportResource(value = "classpath:spring-activemq-config.xml") 
@SpringBootApplication
public class Run implements ApplicationRunner {

    private static Logger log = LoggerFactory.getLogger(Run.class);

    @Autowired
    private OrderSender orderSender;

    @Override
    public void run(ApplicationArguments applicationArguments) throws Exception {
        log.info("Spring Boot Embedded ActiveMQ Configuration Example");

        for (int i = 0; i < 5; i++){
            Order myMessage = new Order(i + " - Sending JMS Message using Embedded activeMQ", new Date());
            orderSender.send(myMessage);
        }

        log.info("Waiting for all ActiveMQ JMS Messages to be consumed");
        TimeUnit.SECONDS.sleep(3);
        System.exit(-1);
    }

    public static void main(String[] args) throws Exception {
        SpringApplication.run(Run.class, args);
    }
}

Example Output

The previous application prints the following output to the console

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.7.RELEASE)

...
JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
Apache ActiveMQ 5.14.5 (embedded, ID:darwin-13.local-65407-1507540267788-0:1) is starting
Apache ActiveMQ 5.14.5 (embedded, ID:darwin-13.local-65407-1507540267788-0:1) started
For help or more information please see: http://activemq.apache.org
Temporary Store limit is 51200 mb (current store usage is 0 mb). 
Connector vm://embedded started
Spring Boot Embedded ActiveMQ Configuration Example
sending with convertAndSend() to queue <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
sending with convertAndSend() to queue <Order{content='1 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
sending with convertAndSend() to queue <Order{content='2 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
sending with convertAndSend() to queue <Order{content='3 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
sending with convertAndSend() to queue <Order{content='4 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
Waiting for all ActiveMQ JMS Messages to be consumed
received <Order{content='3 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
received <Order{content='2 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
received <Order{content='4 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
received <Order{content='0 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
received <Order{content='1 - Sending JMS Message using Embedded activeMQ', timestamp=Mon Oct 09 11:48:36 CEST 2017}>
Apache ActiveMQ 5.14.5 (embedded, ID:darwin-13.local-65407-1507540267788-0:1) is shutting down
Closing org.spring[email protected]25be7b63: startup date [Mon Oct 09 11:11:05 CEST 2017]; root of context hierarchy
Stopping beans in phase 2147483647
Connector vm://embedded stopped
Setup of JMS message listener invoker failed for destination 'order-queue' - trying to recover. Cause: peer (vm://embedded#1) stopped.
Unregistering JMX-exposed beans on shutdown
Apache ActiveMQ 5.14.5 (embedded, ID:darwin-13.local-65407-1507540267788-0:1) uptime 3.668 seconds
Apache ActiveMQ 5.14.5 (embedded, ID:darwin-13.local-65407-1507540267788-0:1) is shutdown

Download

上一篇: Spring Boot ActiveMQ Publish Subscribe Topic Configuration Example
下一篇: Spring Security + Spring LDAP Authentication Configuration Example
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号