java 连接ACTIVE MQ 进行队列读写操作的例子

摘要: MQ 用过好多年了,不过以前用 WEBSPHERE MQ 比较多,IBM 的,商业版。其实开源的 ACTIVE MQ 也算不错了,最近刚好用到,写了个简单的测试例子,在项目中用到的测试例子而已。1. 向队列中保存消息2. 从队列中获取消息

MQ 用过好多年了,不过以前用 WEBSPHERE MQ 比较多,IBM 的,商业版。其实开源的 ACTIVE MQ 也算不错了,最近刚好用到,写了个简单的测试例子,在项目中用到的测试例子而已。
1. 向队列中保存消息
2. 从队列中获取消息

向队列中保存消息

public class SaveMsg {

    static final String BROKER_URL = "tcp://192.168.1.111:61616";
    static ActiveMQConnectionFactory factory;
    protected static final Logger log = LogManager.getLogger(SaveMsg.class);
    

    static {
        factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
        factory.setExceptionListener(
                new ExceptionListener() { 
                    @Override
                    public void onException(JMSException ex) { 
                       log.warn("监听到ACTIVEMQ JSM 异常" + ex);
                    } 
                }
        );
    }

    final ExecutorService es = Executors.newSingleThreadExecutor();
    String _DESTINATION;
    

    public static void main(String[] args) throws Exception {
        SaveMsg mi = new SaveMsg();      
        for (int i=0; i<100; i++) {            
            mi.sendRoomChat(8888, 80001, 80001, "收到来自android的测试信息" + i + "");  
            log.debug("send message,index is :" + i);
        }
        mi.conn.close();
        
      
    }

    public SaveMsg() throws Exception {
        _initConn();
       
    }
  
    public static boolean stat = true;

    private boolean isRunning() {
        return stat;
    }
    Connection conn;

    private void _initConn() throws JMSException {
        conn = factory.createConnection();
        conn.start();
    }
   
    /**
     *
     * @param roomId 房间ID
     * @param srcId 消息发送ID
     * @param toId 消息接收者
     * @param data 聊天消息
     */
    public void sendRoomChat(int roomId, int srcId, int toId, String data) {
        Session session;
        try {
            session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue("live.test.rsp");//amq.projId.mode.trans
            // 创建消息制作者
            MessageProducer producer = session.createProducer(destination);
            {//非持久
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            }
            MapMessage msg = session.createMapMessage();
            {//数据封装
                msg.setInt("relay_cmd_id", 17);//
                msg.setInt("vcbid", roomId);//房间ID
                msg.setInt("srcid", srcId);//当前用户ID
                msg.setInt("toid", toId);//目标用户ID
                msg.setInt("msgtype", 0);//文字聊天消息
                msg.setInt("isprivate", 0);//是否私聊(1为私聊)
                msg.setString("content", data);
            }
            log.debug("android发送给pc的消息内容为:"+msg);
            producer.send(msg);
            session.close();
        } catch (JMSException ex) {
            log.warn("send android message to pc error:", ex);
        }
    }

    
}


从队列中获取消息
例子如下:

import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.DestinationSource;

/**
 *
 * @author Administrator
 */
public class ReadMsg {
    
       
    public static void main(String[] args) throws JMSException {
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.1.111:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();
      
        //第一种情况
        System.out.println("***********************");
        while (true) {
            Session session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("live.test.rsp");
            MessageConsumer consumer = session.createConsumer(destination);
            MapMessage message = (MapMessage) consumer.receive();
            //session.commit();
            
            System.out .println("收到消息:" +message.getString("content"));
            session.close();            
        }
       
    }
    
}



这个进程不断的从队列中获取数据, 最简单的方式实现进程间的通信。说白了,MQ 也是ESB 的核心。

上一篇: Spring4 + Quartz Scheduler 执行定时任务例子
下一篇: 测试的一个websocket 代码,利用netty
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号