mosquitto发送接收离线消息(fusesource客户端java版)

摘要: MQTT协议就不多说了,百度一下很多,官网 mqtt.org上也有很多说明. 今天记录的是在物联网设备上连接mosquitto, 发布消息。 JAVA写的应用程序订阅设备发送过来的消息。就本身应用来说是很简单的,与通常用的MQ没多大差别。我关注的重点是 mosquitto 对离线消息的处理。通常网上的例子是没有这些的. 看了下官网文档。还有fusesource客户端mqtt-client API,发现是可以很简单实现的。记录下过程。

MQTT协议就不多说了,百度一下很多,官网 mqtt.org上也有很多说明. 今天记录的是在物联网设备上连接mosquitto, 发布消息。 JAVA写的应用程序订阅设备发送过来的消息。就本身应用来说是很简单的,与通常用的MQ没多大差别。我关注的重点是 mosquitto 对离线消息的处理。通常网上的例子是没有这些的. 看了下官网文档。还有fusesource客户端mqtt-client API,发现是可以很简单实现的。记录下过程。

1. 修改配置文件 mosquitto.conf

persistence true
persistence_file mosquitto.db
persistence_location d:/tmp/mosquitto
max_queued_messages 100000


2. 客户端publish消息时需要设置clientID以及clean session设置为false
public static void main(String[] args) throws Exception {
		MQTT mqtt = new MQTT();
        mqtt.setHost(MQTTConstants.HOST, MQTTConstants.PORT);
        
        mqtt.setCleanSession(false);
        mqtt.setClientId("AH-JAVA-CLIENT-PUB-001");
        BlockingConnection connection = mqtt.blockingConnection();
        connection.connect();
        
        CountDownLatch count = new CountDownLatch(88);

        for(int i=0; i<88; i++) {
        	String message = "Message_Test_" + i;        	
            connection.publish(MQTTConstants.TOPIC_001, message.getBytes(), QoS.AT_LEAST_ONCE, false);
            System.out.println("publish: " + message);
            count.countDown();
            //Thread.sleep(10000);
        }  
        count.await();

        connection.disconnect();
	}


3. 客户端订阅消息时需要设置clientID以及clean session设置为false
public static void main(String[] args) throws Exception {
		  MQTT mqtt = new MQTT();
		
		  mqtt.setClientId("AH-JAVA-CLIENT-100"); 
          mqtt.setCleanSession(false);
		 
		  mqtt.setHost(MQTTConstants.HOST, MQTTConstants.PORT);
	      BlockingConnection connection = mqtt.blockingConnection();
	      connection.connect();

	      Topic[] topics = {new Topic(MQTTConstants.TOPIC_001, QoS.AT_LEAST_ONCE), new Topic("$SYS/broker/clients/total", QoS.AT_LEAST_ONCE)};
	      connection.subscribe(topics);
	      int i = 0;
	      while(true){
	    	  Message message = connection.receive();
	    	  if(message!=null){
	    		  System.out.println("Received messages. topic: " + message.getTopic() + "---" + new String(message.getPayload()));
	    		  message.ack();
	    		  i++;
	    		  if (i>= 50) {
	    			  break;
	    		  }
	    	  }
		      
	      }
	}


用的JAVA写的测试,但真实场景设备那边是C语言的API, 应该类似处理就可以。java, pom包如下:

            org.fusesource.mqtt-client
            mqtt-client
            1.12
        

上一篇: 接到了一个Secure Key Box侵权的通知
下一篇: [转]Redis配置注意事项
 评论 ( What Do You Think )
名称
邮箱
网址
评论
验证
   
 

 


  • 微信公众号

  • 我的微信

站点声明:

1、一号门博客CMS,由Python, MySQL, Nginx, Wsgi 强力驱动

2、部分文章或者资源来源于互联网, 有时候很难判断是否侵权, 若有侵权, 请联系邮箱:summer@yihaomen.com, 同时欢迎大家注册用户,主动发布无版权争议的 文章/资源.

3、鄂ICP备14001754号-3, 鄂公网安备 42280202422812号