虽然我强烈推荐使用EMQ,但是Mosquitto也是非常有必要了解的。
Mosquitto的英文意思是 蚊子 。
由于Mosquitto的安装比较简单,本文不会具体描述,本文测试介绍Mosquitto的概念、客户端以及如何使用Java代码跑通Mosquitto的功能。
在文章的最后,也会说明Mosquitto存在的一些问题以及我不选择它的一些理由。
Mosquitto介绍
Eclipse Mosquitto是一个开源(EPL / EDL许可)消息代理,它实现了MQTT协议版本3.1和3.1.1。Mosquitto重量轻,适用于从低功耗单板计算机到完整服务器的所有设备。
MQTT协议提供了一种使用发布/订阅模型执行消息传递的轻量级方法。这使其适用于物联网消息传递,例如低功率传感器或移动设备,如电话,嵌入式计算机或微控制器。
Mosquitto项目还提供了用于实现MQTT客户端的C库,以及非常流行的mosquitto_pub和mosquitto_sub命令行MQTT客户端。
Mosquitto是Eclipse Foundation的一部分,是一个iot.eclipse.org项目。
官网: http://mosquitto.org/




正常来说,如果是Java项目我们可以引入如下依赖
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0
Mosquitto Java 客户端实现
Mosquitto 消息发送主要分为三个类:
PubMsg 客服端发布消息
PushCallback 消息回调
SubMsg 订阅消息
下面我们一起来看这个示例程序:
PubMsg
package com.example.mqtt.publish;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* 发送消息到MQTT
*/
public class PubMsg {
private static int qos = 2; //只有一次
private static String broker = "tcp://XXXXXXXXX:1883";
private static String userName = "admin";
private static String passWord = "admin";
private static MqttClient connect(String clientId, String userName,
String password) throws MqttException {
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
connOpts.setConnectionTimeout(10);// 设置超时时间
connOpts.setKeepAliveInterval(20); // 设置会话心跳时间
// String[] uris = {"tcp://10.100.124.206:1883","tcp://10.100.124.207:1883"};
// connOpts.setServerURIs(uris); //起到负载均衡和高可用的作用
// broker,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
/* // MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
MqttTopic topic = mqttClient.getTopic("test-topic");
// setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
options.setWill(topic, "close".getBytes(), 2, true);*/
mqttClient.setCallback(new PushCallback("test"));
mqttClient.connect(connOpts);
return mqttClient;
}
private static void pub(MqttClient sampleClient, String msg, String topic)
throws MqttPersistenceException, MqttException {
MqttMessage message = new MqttMessage("Hello Charles!".getBytes());
message.setQos(qos);
message.setRetained(false);
sampleClient.publish(topic, message);
}
private static void publish(String str, String clientId, String topic) throws MqttException {
MqttClient mqttClient = connect(clientId, userName, passWord);
if (mqttClient != null) {
pub(mqttClient, str, topic);
System.out.println("pub-->" + str);
}
if (mqttClient != null) {
mqttClient.disconnect();
}
}
public static void main(String[] args) throws MqttException {
publish("message content", "client-id-0", "test-topic");
}
}PushCallback
package com.example.mqtt.publish;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 发布消息的回调类
*
* 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。
* 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 在回调中,将它用来标识已经启动了该回调的哪个实例。
* 必须在回调类中实现三个方法:
*
* public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。
*
* public void connectionLost(Throwable cause)在断开连接时调用。
*
* public void deliveryComplete(MqttDeliveryToken token)) 接收到已经发布的 QoS 1 或 QoS 2
* 消息的传递令牌时调用。 由 MqttClient.connect 激活此回调。
*
*/
class PushCallback implements MqttCallback {
private String threadId;
public PushCallback(String threadId) {
this.threadId = threadId;
}
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
System.out.println("-------messageArrived-------"+threadId + " " + msg);
}
}SubMsg
package com.example.mqtt.subscribe;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
public class SubMsg {
private static int qos = 2;
private static String broker = "tcp://XXXXXXX:1883";
private static MqttClient connect(String clientId) throws MqttException{
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(false);
connOpts.setConnectionTimeout(10);
connOpts.setKeepAliveInterval(20);
connOpts.setUserName("admin");
connOpts.setPassword("admin".toCharArray());
MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
mqttClient.connect(connOpts);
return mqttClient;
}
public static void sub(MqttClient mqttClient,String topic) throws MqttException{
int[] Qos = {qos};
String[] topics = {topic};
mqttClient.subscribe(topics, Qos);
}
private static void runsub(String clientId, String topic) throws MqttException{
MqttClient mqttClient = connect(clientId);
if(mqttClient != null){
sub(mqttClient,topic);
}
mqttClient.subscribe(topic,2, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
});
}
public static void main(String[] args) throws MqttException{
runsub("testSub", "test-topic");
}
}运行结果
启动SubMsg订阅消息,再启动PubMsg发送消息
1)PubMsg显示发送成功:

2)SubMsg也会收到信息:

为什么线上环境不考虑使用Mosquitto?
线上环境需要使用集群模式,虽然官方可以使用bridge功能,连接多个mqtt broker,但是还是存在一些问题。原因主要有如下几个:
Mosquitto半年多没维护了,版本也不是最新的
如果增加bridge节点,需要修改多个节点的配置,并且重启,维护成本高
集群模式如果主节点down机,所有从服务器会变成孤立的节点
down机节点重新启动以后,容易造成负载不均衡
我们有更好的方案EMQ