基于MQTT的消息推送

作者: Mark_Guan | 来源:发表于2016-11-15 17:18 被阅读2891次

背景


因为前段时间公司做了一个基于路由器的项目,在APP客户端可以动态的显示连接到路由器的设备列表以及每台设备所消耗的流量,客户端所采取的方式是采用HTTP轮询的方式来动态的获取上线设备以及其它信息,我们都知道HTTP轮询的方式实时性比较差,只有时间到了才会向服务器查看是否有新的数据。如果两次请求之间的时间间隔过大,则失去了即时推送的意义。但如果设置的时间间隔较短,又会耗费流量。而且因为是和路由器进行通信,所以有一个大前提那就是必须连接到路由器才能够获取到这些信息。这几天产品提出了一个方案,就是在外网的情况下也可以看到上线设备以及动态的获取路由器那边的相关信息,上网找了好久,感觉MQTT可以帮助我们实现这个方案,在这里记录下,以便于之后用到。

MQTT简介


MQTT是IBM开发的一个即时通讯协议,该协议支持所有的平台,几乎可以把所有联网的物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。

MQTT的特点:
  1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP.
    对负载内容屏蔽的消息传输。
  2. 使用TCP/IP提供网络连接。主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。
  3. 三种消息传输方式QoS:
    • 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • 1代表“至少一次”,确保消息到达,但消息重复可能会发生。
  • 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1

简单说明下,如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了(客户端登录的时候要指明支持的QoS级别,同时发送消息的时候也要指明这条消息支持的QoS级别),如果需要客户端保证能接收消息,需要指定QoS为1,如果同时需要假如客户端不在线也要能接收到消息,那么客户端登录的时候要指定session的有效性,接收离线消息需要指定服务端要保留客户端的session状态。

MQTT是基于订阅者模型架构的,客户端如果互相通信,必须是在同一订阅主题下,即都订阅了同一个topic,客户端之间是没办法直接通讯的。订阅模型显而易见的好处是群发消息的话只需要发布到topic,所有订阅了这个topic的客户端就可以接收到消息了。

发送消息必须发送到某个topic,重点说明的是不管客户端是否订阅了该topic,它都可以向该topic发送消息,还有如果客户端订阅了该主题,那么自己发送的消息也会被接收到。

基于MQTT-Client的iOS客户端的实现


1. 导入框架
  platform :ios, '9.0'
  target 'MQTTTest' do
  pod 'MQTTClient'
  end

MQTTClient:

2. 使用框架

我们做个小小的Demo来测试下,在StoryBoard中我们拖入两个TextView,一个用来显示从路由那边收到的信息,一个用来向路由器那边发送消息,如下图:

111.png
#define kMQTTServerHost @"homecc.cc"
#define kReceiveTopic @"kReceiveTopic"
#define kPostTopic @"kPostTopic"
#import "ViewController.h"

#import <MQTTClient/MQTTClient.h>

@interface ViewController ()<MQTTSessionDelegate>
    
@property (nonatomic, strong) MQTTSession *mySession;
    
@property (weak, nonatomic) IBOutlet UITextField *lblReceiveMsg;//收到的消息

@property (weak, nonatomic) IBOutlet UITextField *lblPostMsg;//发送的消息

@property (weak, nonatomic) IBOutlet UIButton *btnPost;
    
- (IBAction)btnPostClick:(id)sender;
    
@end

  - (void)viewDidLoad {
[super viewDidLoad];

MQTTCFSocketTransport *transport = [[MQTTCFSocketTransport alloc] init];//初始化对象

transport.host = kMQTTServerHost;//设置MQTT服务器的地址

transport.port = 1883;//设置MQTT服务器的端口(默认是1883,当然,你也可以和你的后台好基友协商~)

self.mySession = [[MQTTSession alloc] init];//初始化MQTTSession对象

self.mySession.transport = transport;//给mySession对象设置基本信息

self.mySession.userName=@"root";//用户名
self.mySession.password=@"admin";//密码

self.mySession.delegate = self;//设置mySession的代理为APPDelegate

[self.mySession connectAndWaitTimeout:30];//设定超时时长,如果超时则认为是连接失败,如果设为0则是一直连接。


[self.mySession subscribeToTopic:kReceiveTopic atLevel:2 subscribeHandler:^(NSError *error, NSArray*gQoss) {//Topic则表示要订阅的主题,Level(qosLevel)表示消息等级。
    if (error) {
        NSLog(@"Subscription failed %@", error.localizedDescription);
    } else {
        NSLog(@"Subscription sucessfull! Granted Qos: %@", gQoss);
    }
}];  
}
// 这个是代理回调方法,接收到的数据可以在这里进行处理。
- (void)newMessage:(MQTTSession *)session data:(NSData*)data onTopic:(NSString*)topic qos:(MQTTQosLevel)qos retained:(BOOL)retained mid:(unsigned int)mid
    {
        NSString * receivtData= [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding];
        self.lblReceiveMsg.text=[NSString stringWithFormat:@"内容:%@,topic:%@",receivtData,topic];
    }
    //发送的消息
- (IBAction)btnPostClick:(id)sender {
    
    NSString* postMsg = self.lblPostMsg.text;
    //publishData:要发送的消息体,topic:要往哪个topic发送消息  qos:消息的级别  retain:
    [self.mySession publishData:[postMsg dataUsingEncoding:NSUTF8StringEncoding]  onTopic:kPostTopic retain:YES qos:2];
    NSLog(@"推送内容:%@",postMsg);
}

    
    //注册观察者,记得在离开页面时移除观察者
- (void)viewWillAppear:(BOOL)animated {
    [super viewWillAppear:animated];
    
    [self.mySession addObserver:self
                     forKeyPath:@"status"
                        options:NSKeyValueObservingOptionInitial | NSKeyValueObservingOptionNew
                        context:nil];
}
    
    
- (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context {
    
    switch (self.mySession.status) {
        
        case MQTTSessionStatusClosed: //连接已经关闭
        break;
        case MQTTSessionStatusDisconnecting://将要断开连接
        break;
        case MQTTSessionStatusConnected: //已经连接
        break;
        case MQTTSessionStatusConnecting: //正在连接中
        break;
        case MQTTSessionStatusError: //异常
        break;
        case MQTTSessionStatusCreated:
        default:
        break;
    }
}
    
-(void)dealloc
{
    [[NSNotificationCenter defaultCenter] removeObserver:self];
}
@end

谢谢大家,欢迎讨论。

参考资料:http://targe.me/2016/01/02/mqtt-three/

相关文章

网友评论

    本文标题:基于MQTT的消息推送

    本文链接:https://www.haomeiwen.com/subject/egflpttx.html