首先要明白,MQTT通信是客户端和客户端之间的相互订阅,MQTT中只存在一个服务器,作为消息的中转站,其余客户端之间通过订阅对应的Topic来扮演客户端和服务端的角色,服务器的搭建比较麻烦,有兴趣的可以了解下
public class Client {
private static final String HOST = "tcp://119.11.111.111:1111";//服务器地址,自己搭建
private String TOPIC;//订阅的主题
private String CLIENT_ID;
public Client(String clientId, String topic) {
this.CLIENT_ID = clientId;
this.TOPIC = topic;
}
private String username = "admin";
private String password = "123456";
public void start() {
try {
//MqttAsyncClient
MqttClient mqttClient = new MqttClient(HOST, CLIENT_ID, new MemoryPersistence());
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(false);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(20);
mqttClient.setCallback(new MyMqttCallback());
MqttTopic topic = mqttClient.getTopic(TOPIC);
//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
//遗嘱
// options.setWill(topic, "close".getBytes(), 2, true);
mqttClient.connect(mqttConnectOptions);
int[] Qos = {2};//消息等级最高2最低1
String[] topic1 = {TOPIC};
Log.e("xxx", "topic1[0]=" + topic1[0] + "Qos=" + Arrays.toString(Qos));
mqttClient.subscribe(topic1, Qos);
} catch (MqttException e) {
Log.e("xxx", "createClientError" + e.getMessage());
e.printStackTrace();
}
}
}
以上是客户端,需要注意的是不要重复订阅,订阅之前先判断Top是否为空,不为空且则订阅
下面是服务端
class Service {
private static final String HOST = "tcp://119.11.111.111:1111";
private static final String TOPIC = "192.111.1.1111";//发布到那个主题,所有订阅该主题的客户端都能收到
private String CLIENT_ID;
private MqttMessage message;
private MqttClient client;
private MqttTopic topic11;
private Service service = null;
Service(String clientId) throws MqttException {
this.CLIENT_ID = clientId;
client = new MqttClient(HOST, clientId, new MemoryPersistence());
connect();
}
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
String username = "admin";
options.setUserName(username);
String password = "123456";
options.setPassword(password.toCharArray());
// 设置超时时间
options.setConnectionTimeout(20);
// 设置会话心跳时间
options.setKeepAliveInterval(10);
try {
client.setCallback(new MyMqttCallback());
client.connect(options);
//创建MQTT相关的主题
topic11 = client.getTopic(TOPIC);
Log.e("xxx", "topic11=" + topic11);
} catch (Exception e) {
Log.e("xxx", e.getMessage());
e.printStackTrace();
}
}
private void publish(MqttTopic topic, MqttMessage mqttMessage) throws MqttException {
Log.e("xxx", "topic=" + topic);
MqttDeliveryToken token = topic.publish(mqttMessage);
token.waitForCompletion();
Log.e("xxx", "消息发布成功" + token.isComplete());
}
void startService() {
new Thread(() -> {
Service service;
try {
service = new Service(CLIENT_ID);
service.message = new MqttMessage();
service.message.setQos(2);
//设置是否在服务器中保存消息体
service.message.setRetained(true);
int cc = 0;
//noinspection InfiniteLoopStatement
while (true) {
cc++;
String s = "这是推送消息的内容" + cc;
Log.e("xxx", "准备发送消息" + s);
service.message.setPayload(s.getBytes());
Log.e("xxx", "service=" + service + "message=" + service.message);
service.publish(service.topic11, service.message);
Log.e("xxx", "Retained状态=" + service.message.isRetained());
Thread.sleep(3000);
}
} catch (MqttException | InterruptedException e) {
e.printStackTrace();
}
}).start();
}
下面是回调,注册监听就好了
public class MyMqttCallback implements MqttCallback {
@Override
public void connectionLost(Throwable throwable) {
Log.e("xxx", "connectionLost:" + throwable.getMessage());
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
Log.e("xxx", "消息主题=" + s + "=======接收消息Qos=" + mqttMessage.getQos() + "=======接受消息内容=" + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
Log.e("xxx", "deliveryComplete:" + iMqttDeliveryToken.isComplete());
}
}
具体演示的话建一个Activity 加量按钮,一个发,一个收就好了。其他的没啥了重复订阅会造成一开始就一定会受到消息。还有需要注意的是Qos级别根据需求设置我这里都是2。
网友评论