美文网首页程序员
long polling 实现B/S架构的实时消息推送

long polling 实现B/S架构的实时消息推送

作者: 天涯笑笑生 | 来源:发表于2017-10-20 11:04 被阅读0次

comet 4j 实现消息推送

整体方案:

PS:该方案的消息推送速度基本和socket相差无疑,至于和借助于rabbitmq、rocketmq等构建的long polling以及websocket,此处不讨论,该方案最大的优点在于可以忽略所有浏览器、jdk、Tomcat、spring等版本兼容问题
响应时间 本人的服务器接收到消息,到把该消息推送到客户端仅需100ms左右,如果轮询失败,响应速度在20ms以内(没有过于复杂的消息处理),至于连接数量的影响,只测了大概20个客户端同时连接的情况,耗时上面基本波动很小

客户端:

使用jQuery的ajax方法,在请求成功的基础上进行递归调用,实现客户端对服务器的不间断少量数据请求。
注:jQuery 最大程度避免浏览器不兼容问题
ajax 少量数据请求,提高效率

服务器端:

  • 创建一个响应long polling 线程的管理工具类,并使用Hashtable 存储这些线程,创建响应的方法
  • 在Controller 中,在处理long polling 请求的方法中,收到请求则添加到到线程列表,并让该线程睡眠,睡眠时间也就是轮询间隔时间(在不触发条件的情况下)
  • 为了测试,在服务器端写了一个socket的服务器端,当socket服务器端收到消息后,就调用long polling 线程的管理工具类的响应方法,查找线程,并进行打断该线程,Controller中的线程被打断,则去消息队列读取最新的消息,然后返回并在列表中移除该响应线程
  • 在客户端成功收到响应后,立即进行下一次long polling 请求,以此构成整个系统的实时响应

注:本人使用Maven搭建springMVC的应用

Hashtable 自身线程安全,省去很多麻烦,当然也可根据实际情况选用其他的集合类,该系统的key值是来自客户端的标示,只要提供一个唯一的long polling 请求标示就行,也许有同时出发多个请求线程的需求,就需要使用其他集合类,以及构建相应的逻辑。

Controller springMVC中默认是单例多线程的,本系统也是基于此基础,如果更改,同样需要重新构建整个框架

线程打断 使用了线程的interrupte方法,该方法具有一定缺陷,执行后打断sleep,同时会触发InterruptedException,追求完美的可以选择其他线程管理方案。

线程睡眠时间 如果没有新消息触发该线程,则会一直睡到设置的时间结束,返回一个值,该时间也是轮询连接的最大时间,一般浏览器会对持续时间连接有限制,所以此处建议30s~60s

消息队列 可以自己建一个,如果整个系统流畅,处理速度足够快,或者服务器端接收消息间隔不是很小,可以随时接收随时处理,加上消息队列主要起个缓冲的作用,当然使用者要构建完善的管理逻辑,线程安全是重中之重。

源码

以下是一部分源码,只是大概的逻辑流程

Javaweb 服务器端

Controller

 @RequestMapping(value="/msg", method = RequestMethod.POST)
    @ResponseBody
    public String msg(String param){

       System.out.println("long polling: tag="+param);

        //参数作为当前线程的标志
        if (!sharedPollingThread.addPollingThreadToList(param,Thread.currentThread())) {
            return "falled";
        }

        try {
            Thread.sleep(1000*30);

        } catch (InterruptedException e) {
//            e.printStackTrace();
            return "daduan";
        }finally {
            sharedPollingThread.removePollingThread(param);
        }

//        String msg = null;
//        if (msgListened.msg() != null){
//            return "new msg";
//        }

        return "server msg"+param;
    }

PollingUtil

package com.jony.socket;

import java.util.Enumeration;
import java.util.Hashtable;

/**
 * Manage the polling thread
 * Created by jony on 17/10/19.
 */
public class PollingUtil {

    //单例模式
    private PollingUtil(){

    }

    private static final PollingUtil shareadPollingUtil = new PollingUtil();
    public static PollingUtil getInstance(){
        return shareadPollingUtil;
    }

    //polling thread list
    //hashtable相对于hashmap线程安全,不需要再去为保证线程安全而做工作
    Hashtable<String, Thread> pollingThreadList = new Hashtable<String, Thread>();

    //add thread to list
    public boolean addPollingThreadToList(String tag, Thread pollingThread){

        if (findPollingThread(tag) == null) {
            pollingThreadList.put(tag, pollingThread);
            return true;
        }else {
            return false;
        }
//        pollingThreadList.put(tag, pollingThread);
//        return true;
    }

    //interrupte and remove
    //在收到消息是调用改方法
    public void interruptePollingThread(String tag){

//        System.out.println("find ...... ");

        Thread pollingThread = findPollingThread(tag);
        if (pollingThread != null){

//            System.out.println("interrupt ...... ");
            pollingThread.interrupt();

            //移除放在返回睡眠
//            removePollingThread(tag);
        }
    }

    //find thread in list
    private Thread findPollingThread(String tag){
        Enumeration<String> e = pollingThreadList.keys();

//        System.out.println("find count : "+pollingThreadList.size());

        while (e.hasMoreElements()){

            String key = e.nextElement();

//            System.out.println("find: "+key+" ,tag: "+tag);

            if (tag.equals(key)){

//                System.out.println("find success!");
                return pollingThreadList.get(key);
            }
        }

        return null;
    }

    //remove thread
    public boolean removePollingThread(String tag){

        Thread pollingThread = findPollingThread(tag);

        if (pollingThread != null){
            pollingThreadList.remove(tag);
            return true;
        }else {
            return false;
        }
    }


    //view list infomation
    public void viewListInfo(){

        System.out.println("List infomation: count: "+pollingThreadList.size());
    }

}

**SocketUtil **

package com.jony.socket;

import java.net.DatagramSocket;
import java.net.SocketException;

/**
 * Created by jony on 17/10/19.
 * 单类模式
 */
public class SocketUtil {

    private static final int INPORT = 5000;
    private static DatagramSocket serverSocket = null;
    private static UDPServer udpServer = null;

    static {
        //初始化数据
        try {
            serverSocket = new DatagramSocket(INPORT);
        } catch (SocketException e) {
            e.printStackTrace();
        }

        if (serverSocket != null){
            udpServer = new UDPServer(serverSocket);

        }else {
            System.out.println("Server is null !");
        }
    }

    //私有化构造方法
    private SocketUtil(){
        startReceivingUDPMessages();
    }

    //创建对象并提供外部方法
    private static final SocketUtil socketUtil = new SocketUtil();
    public static SocketUtil getInstance(){
        return socketUtil;
    }

    //开启本地接收线程
    private  void  startReceivingUDPMessages(){

        if (serverSocket != null){
            new Thread(udpServer).start();
        }else {
            System.out.println("UDPServer open error !");
        }

    }
    //停止接收udp消息
    private void stopReceivingUDPMessages(){
        serverSocket.close();

    }

    //发送udp消息
    public void sendUDPMessages(String msg){

    }

}

UDPServer

package com.jony.socket;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;

/**
 * Created by jony on 17/10/19.
 */
public class UDPServer implements Runnable{

    //轮询线程管理类
    private PollingUtil sharedPollingUtil = PollingUtil.getInstance();

    private DatagramSocket serverSocket;
    private static final int dataLength = 1024;
    private byte[] recvBuf = new byte[dataLength];
    private DatagramPacket packet = new DatagramPacket(recvBuf, recvBuf.length);

    public UDPServer(DatagramSocket serverSocket) {
        this.serverSocket = serverSocket;
    }

    @Override
    public void run() {

        System.out.println("Server Start receiving !");

        while (true){
            try {
                serverSocket.receive(packet);
                final String recvString = new String(packet.getData(), 0, packet.getLength());
                System.out.println("UDP receviving: "+recvString);

                if (recvString.equals("761399")){
                    sharedPollingUtil.viewListInfo();
                    continue;
                }

                //通知
                sharedPollingUtil.interruptePollingThread(recvString);

//                new MsgListened(){
//                    @Override
//                    public String msg() {
//                        return recvString;
//                    }
//                };

            } catch (IOException e) {
                e.printStackTrace();

                System.out.println("UDP received thread error !");
                serverSocket.close();
                return;

            }

        }
    }
}

客户端

jsp页面

<%--
  Created by IntelliJ IDEA.
  User: jony
  Date: 17/10/17
  Time: 下午3:17
  To change this template use File | Settings | File Templates.
--%>
<%@ page contentType="text/html;charset=UTF-8" language="java" %>
<html>
<head>
    <title>长轮询测试</title>

    <script src="<%=request.getContextPath()%>/js/jquery-3.2.1.js" type="text/javascript" language="JavaScript"></script>
    <script src="<%=request.getContextPath()%>/js/msgtest.js" type="text/javascript" language="JavaScript"></script>
</head>
<body>

<p>
    长轮询测试页面
</p>

<p>
    <textarea rows="1" cols="10" id="tag"></textarea>
    <button type="button" id="start">开始轮询</button>
</p>
<p id="dataShow">轮询过程:</p>
</body>
</html>

js

/**
 * Created by jony on 17/10/17.
 */

//记录轮询次数
var count = 0;

//记录轮询开始时间

//轮询标示
var tag;


$(Document).ready(function(){

    $("#start").click(function(){
        tag = $("#tag").val();
        getMsg();
    });

});


function getMsg() {

    count++;
    var currentTime = (new Date()).getTime();

    $.ajax({
        url:"/polling/msg",
        type:"post",
        global:true, //默认值,会触发全局的ajax
        async:true,
        data:{"param":tag},
        success:function(data)
        {
            // if(data != null && data!="")
            //     alertShow(data.msg);

            var intervalTime = (new Date()).getTime() - currentTime;

            $("#dataShow").append("<p>第"+count+"次, data: "+data+",interval time:"+intervalTime+"</p>");
            if (data == "falled"){
                alert("轮询失败,该标识已被使用!");
                return;

            }
            getMsg();
        }
    });
}

$.ajaxError(function () {
    alert("ajax error !");
});

setInterval()

至于pom、springMVC配置等其余代码就不贴了
该文章只是个大概方案流程,还有很多不完善的地方,仅供参考,敬请指正。

相关文章

网友评论

    本文标题:long polling 实现B/S架构的实时消息推送

    本文链接:https://www.haomeiwen.com/subject/tpvmuxtx.html