美文网首页
Android Mqtt 订阅接收消息

Android Mqtt 订阅接收消息

作者: satisfying | 来源:发表于2019-01-10 10:40 被阅读0次

添加引用

        compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
        compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

功能封装,我这里订阅的topic 和clientId一样,有需要的可以自行修改

  class PushClient : MqttCallback {

    private val TAG = "PushClient"


    //mqtt连接client
    var mClient: MqttAndroidClient? = null
    //mqtt连接参数设置
    var options: MqttConnectOptions? = null;
    /*连接成功之后设置连接断开的缓冲配置*/
    var disconnectedBufferOptions: DisconnectedBufferOptions? = null
    var context: Context? = null
    var countDownTimer: CountDownTimer? = null//倒计时重连
    var exit = false//是否退出

    constructor(host: String, port: Int, context: Context, clientId: String) {

        this.context = context;
        exit = false
        var uri = "tcp://$host:$port"
//        var uri = "ssl://$host:$port"
        mClient = MqttAndroidClient(context, uri, clientId)

        mClient?.setCallback(this)

        //mqtt连接参数设置
        options = MqttConnectOptions();
        //设置自动重连
        options?.isAutomaticReconnect = false;
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options?.isCleanSession = false;
        // 设置超时时间 单位为秒
        options?.connectionTimeout = 5;
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
        options?.keepAliveInterval = 20;


        /*连接成功之后设置连接断开的缓冲配置*/
        disconnectedBufferOptions = DisconnectedBufferOptions();
        //开启
        disconnectedBufferOptions?.isBufferEnabled = true;
        //离线后最多缓存100调
        disconnectedBufferOptions?.bufferSize = 100;
        //不一直持续留存
        disconnectedBufferOptions?.isPersistBuffer = false;
        //删除旧消息
        disconnectedBufferOptions?.isDeleteOldestMessages = false;
    }

    companion object {
        fun create(host: String, port: Int, context: Context, clientId: String): PushClient {
            return PushClient(host = host, port = port, context = context, clientId = clientId)
        }
    }

    /**
     * 链接
     */
    fun connect() {

        Log.e(TAG, "链接${mClient?.serverURI}")
        mClient?.connect(options, null, object : IMqttActionListener {
            override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                Log.e(TAG, "链接失败")
                resetConnect()
            }

            override fun onSuccess(asyncActionToken: IMqttToken?) {

                Log.e(TAG, "链接成功")
                if (countDownTimer != null) {
                    countDownTimer?.cancel()
                    countDownTimer = null
                }
                subscribeToTopic()
            }
        })
    }
    

    /**
     * 3s后重新链接
     */
    fun resetConnect() {
        countDownTimer = object : CountDownTimer(3000, 1000) {
            override fun onTick(millisUntilFinished: Long) {
                Log.e(TAG, "等待重新链接  $millisUntilFinished")
            }

            override fun onFinish() {
                Log.e(TAG, "重新链接")
                connect()
            }
        }.start()
    }

    override fun messageArrived(topic: String?, message: MqttMessage?) {
        Log.e(TAG, "messageArrived")
    }

    /**
     * @desc 连接断开回调
     * 可在这里做一些重连等操作
     */
    override fun connectionLost(cause: Throwable?) {
        Log.e(TAG, "connectionLost")
        if (!exit) {
            resetConnect()
        }
    }

    override fun deliveryComplete(token: IMqttDeliveryToken?) {
        Log.e(TAG, "deliveryComplete")
    }


    /**
     * 断开链接
     */
    fun disConnect() {
        exit = true
        if (countDownTimer != null) {
            countDownTimer?.cancel()
            countDownTimer = null
        }
        Log.e(TAG, "断开链接")
        mClient?.disconnect(null, object : IMqttActionListener {
            override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                Log.e(TAG, "断开链接失败")

            }

            override fun onSuccess(asyncActionToken: IMqttToken?) {
                Log.e(TAG, "断开链接成功")
            }

        })
    }


    /**
     * 订阅主题
     */
    var set = false

    fun subscribeToTopic() {
        Log.e(TAG, "消息订阅${mClient?.clientId}")

        mClient?.setBufferOpts(disconnectedBufferOptions)

        //主题、QOS、context,订阅监听,消息监听
        var token = mClient?.subscribe(mClient?.clientId, 2, null, object : IMqttActionListener {
            override fun onSuccess(asyncActionToken: IMqttToken?) {
                Log.e(TAG, "订阅成功")
            }

            override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                Log.e(TAG, "订阅失败")
            }
        }) { topic, message ->

            Log.e(TAG, "$topic  消息到达 $message")
            showMessage(message = message.toString())
        }
        Log.e(TAG, "token ${token?.isComplete}")
    }

    /**
     * 是否链接
     */
    fun isConnected(): Boolean {
        return if (mClient == null) {
            false
        } else {
            mClient!!.isConnected
        }
    }

    private fun showMessage(message: String) {
//项目之前有极光推送,为了方便就直接极光本地推送
        val jsonObject = JSONObject(message)
        val ln = JPushLocalNotification()
        ln.extras = message
        ln.builderId = 0
        ln.content = jsonObject.getString("content")
//        ln.extras = " {\"content\":\"您收到一个案件处理任务,请及时处理\",\"messageId\":215,\"realId\":285,\"summary\":\"\",\"title\":\"处理案件提醒\",\"type\":200,\"url\":\"\"}";
        ln.title = jsonObject.getString("title")
        ln.notificationId = 11111111
        ln.broadcastTime = System.currentTimeMillis() + 1000 * 1
        JPushInterface.addLocalNotification(context, ln)
    }

}

相关文章

  • Android Mqtt 订阅接收消息

    添加引用 功能封装,我这里订阅的topic 和clientId一样,有需要的可以自行修改

  • 微信小程序+MQTT+esp8266温湿度

    删帖测试 第一、原理讲解 esp8266 通过mqtt发布消息,微信小程序通过mqtt 订阅消息,小程序订阅后,就...

  • iOS MQTTClient

    什么是MQTT MQTT(消息队列遥测传输),基于发布/订阅的消息协议。MQTT工作在TCP/IP协议族上,是为硬...

  • 广播概述

    Android应用可以从Android系统和其他Android应用发送或接收广播消息,类似于发布 - 订阅设计模式...

  • 物流网首选协议,关于 MQTT 你需要了解这些

    MQTT 协议简介 概览 MQTT[https://mqtt.org/] 是一种基于发布/订阅模式的轻量级消息传输...

  • Android广播机制详解

    Android应用程序可以从Android系统和其他Android应用程序发送或接收广播消息,类似于发布-订阅设计...

  • MQTT协议总结

    MQTT Protocol MQTT协议特性 一句话总结:MQTT是一个简单,轻量的消息发布/订阅协议。 MQTT...

  • MQTT在 iOS工程中的使用

    1、MQTT MQTT 是轻量的(Lightweight)、发布订阅模式(PubSub) 的物联网消息协议。 两种...

  • MQTT深入浅出系列(一)

    mqtt介绍与使用 mqtt协议是轻量级的消息订阅和发布(publish/subscribe)协议,建立在TCP/...

  • MQTT消息格式之SUBACK消息分析

    在上一节中(MQTT消息格式之SUBSCRIBE(消息订阅)消息分析),客户端发送了订阅的消息,这个时候,服务器端...

网友评论

      本文标题:Android Mqtt 订阅接收消息

      本文链接:https://www.haomeiwen.com/subject/amuorqtx.html