美文网首页前沿Android技术Android Socket开发IM
Android开发之使用Netty进行Socket编程(三)

Android开发之使用Netty进行Socket编程(三)

作者: 天才木木 | 来源:发表于2016-05-15 22:39 被阅读6411次

Android开发之使用Netty进行Socket编程(二) 主要介绍了Netty框架内的主要使用的类以及 在客户端基本的建立连接并接收消息 的简单示例。众所周知,在Android应用开发中,一般是发起网络请求,拿到数据后异步更新UI,那么本文就介绍在开发过程中,如何封装Netty,方便开发者请求服务端的数据并异步地更新UI。

1 基本功能

  1. 与服务器建立TCP连接,TCP是面向连接的协议,只能用于点对点的通讯,所以对服务器的定位是IP地址+端口号。
  2. 发送消息给服务器,相当于发起请求,这个消息里面应该包含 与后台协议好的校验部分 以及 提供给服务器索引数据的参数部分
  3. 理想状态下,TCP连接一旦建立,在通信双方中的任何一方主动关闭连接前,TCP 连接都将被一直保持下去,所以需要定时不间断地给服务器发送一个后台定义的消息段,即心跳包,让对方知道自己“在线”。
  4. 接收服务器返回的数据,并且拿到这些数据异步地去更新程序页面,将获取到的JSON文本解析成POJO。

2 接口的定义

利用Java的回调机制,在子线程建立连接并发出请求,在接收服务器返回的数据后告诉主线程更新UI。还包括一些连接异常的处理都通过回调接口实现。

public interface INettyClient {
    void connect(String host, int port);//1. 建立连接
    void sendMessage(int mt, String msg, long delayed);//2. 发送消息
    void addDataReceiveListener(OnDataReceiveListener listener);//3. 为不同的请求添加监听器

    interface OnDataReceiveListener {
        void onDataReceive(int mt, String json);//接收到数据时触发
    }

    interface OnConnectStatusListener {
        void onDisconnected();//连接异常时触发
    }
}

3 对服务器返回数据的处理

对数据的处理主要是在ChannelHandler中完成(详见Android开发之使用Netty进行Socket编程(二) )。在这里我们主要继承了ChannelInboundHandlerAdapter,并提供了回调接口供主线程更新UI。

public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private final String TAG = "Netty";
private INettyClient.OnConnectStatusListener statusListener;
private List<INettyClient.OnDataReceiveListener> listeners = new ArrayList<>();

 @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//channelActive()方法将会在连接被建立并且准备进行通信时被调用。
        Log.d(TAG, "channel active");
        super.channelActive(ctx);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
//channelRead()方法是在数据被接收的时候调用。
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
//verify(String body)方法对服务器返回的数据进行校验,并取出数据部分。
//具体校验的方法需要与后台同事进行协议。
        body = verify(body);

        Log.d(TAG, "verify : " + body);
        if (null != body)
            parseJson(body);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用,
//即当Netty由于IO错误或者处理器在处理事件时抛出的异常时。
//在大部分情况下,捕获的异常应该被记录下来并且把关联的channel给关闭掉。
        ctx.close();
        Log.e(TAG, "Unexpected exception from downstream : "
                + cause.getMessage());
        if (statusListener != null)//连接异常时触发onDisconnected()
            statusListener.onDisconnected();
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        LogUtils.d(TAG, "channelReadComplete");
    }

//对数据进行解析,拿出区分不同请求的 flag字段,再根据不同的flag字段去触发相对应的监听器
    private void parseJson(String json) {
        try {
            JSONObject jObject = new JSONObject(json);
            int msgType = jObject.getInt(Constant.FLAG_MT);
            Log.d(TAG, "parseJson message type: " + msgType + "  json: " + json);
            callListeners(msgType, json);
        } catch (Exception e) {
            LogUtils.e(TAG, "parseJson exception: " + e.getMessage());
            e.printStackTrace();
        }
    }

//遍历监听器List,触发拥有正确msgType 的OnDataReceiveListener,
//回调 void onDataReceive(int mt, String json);方法
     private void callListeners(final int msgType , final String json) {
        for (final INettyClient.OnDataReceiveListener listener : listeners)
            if (listener != null)
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {//主线程中进行
                        listener.onDataReceive(mt, json);
                    }
                });
    }

//绑定OnDataReceiveListener 
    public void addDataReceiveListener(INettyClient.OnDataReceiveListener listener) {
        if (!listeners.contains(listener))
            listeners.add(listener);
    }
//绑定OnConnectStatusListener
    public void setConnectStatusListener(INettyClient.OnConnectStatusListener listener) {   
         statusListener = listener;
    }
}```
以上,就是一个供Android客户端使用的``ChannelHandler``,可以通过实现具体的``OnDataReceiveListener ``来异步地获得服务器返回的 数据。

## 4 NettyClient的实现
以上仅仅是展示了如何处理服务器返回的数据。建立连接、发送消息以及心跳包的功能还没进行封装。在2.接口的定义 里面已经定义好了NettyClient应该具备哪些行为,现在进行具体的实现。
主要的实现思路是:
1. 构建Bootstrap,其中包括设置好ChannelHandler来处理将来接收到的数据(详见[Android开发之使用Netty进行Socket编程(二)](http://www.jianshu.com/p/db74e673e43c) )。由Boostrap建立连接。通过``   channel.writeAndFlush(constructMessage(sendMsg)).sync()``发送消息。这些工作都在子线程完成。
2. 在子线程 建立连接并向服务器发送请求,这里采用了`HanlderThread`+`Handler`的方案。通过`Looper`依次从`Handler`的队列中获取信息,逐个进行处理,保证安全,不会出现混乱。
3. 心跳包的发送通过``handleMessage(Message msg)``中的死循环进行不间断地发送。
4. `NettyClientHandler `的实现中我们已经知道,当Netty异常时会触发`statusListener.onDisconnected();`,NettyClient中,onDisconnected()方法会进行重连操作。
接收到服务器返回的消息时,会在主线程中触发``onDataReceiveListener .onDataReceive(mt, json);``。
5. 外部通过单例模式进行调用。

public class NettyClient implements INettyClient {
private final String TAG = NettyClient.class.getSimpleName();
private static NettyClient mInstance;
private Bootstrap bootstrap;
private Channel channel;
private String host;
private int port;
private HandlerThread workThread = null;
private Handler mWorkHandler = null;
private NettyClientHandler nettyClientHandler;

private final String ACTION_SEND_TYPE = "action_send_type";
private final String ACTION_SEND_MSG = "action_send_msg";
private final int MESSAGE_INIT = 0x1;
private final int MESSAGE_CONNECT = 0x2;
private final int MESSAGE_SEND = 0x3;
private Handler.Callback mWorkHandlerCallback = new Handler.Callback() {

    @Override
    public boolean handleMessage(Message msg) {
        switch (msg.what) {
            case MESSAGE_INIT: {
                NioEventLoopGroup  group = new NioEventLoopGroup();
                bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(group);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
                        pipeline.addLast(new LineBasedFrameDecoder(Integer.MAX_VALUE));
                        pipeline.addLast(nettyClientHandler);
                    }
                });
                break;
            }
            case MESSAGE_CONNECT: {
                try {
                    if (TextUtils.isEmpty(host) || port == 0) {
                        Exception exception = new Exception("Netty host | port is invalid");
                        throw exception;
                    }
                    channel = bootstrap.connect(new InetSocketAddress(host,
                            port)).sync().channel();
                } catch (Exception e) {
                    LogUtils.e(TAG, "connect failed  " + e.getMessage() + "  reconnect delay: " + Constant.DELAY_CONNECT);
                    ToastUtils.showOnOtherThread("服务器连接失败");
                    e.printStackTrace();
                    sendReconnectMessage();
                }
                break;
            }
            case MESSAGE_SEND: {
                String sendMsg = msg.getData().getString(ACTION_SEND_MSG);
                int mt = msg.getData().getInt(ACTION_SEND_TYPE);
                try {
                    if (channel != null && channel.isOpen()) {
                        channel.writeAndFlush(constructMessage(sendMsg)).sync();
                        Log.d(TAG, "send succeed " + constructMessage(sendMsg));
                    } else {
                        throw new Exception("channel is null | closed");
                    }
                } catch (Exception e) {
                    LogUtils.e(TAG, "send failed " + e.getMessage());
                    sendReconnectMessage();
                    e.printStackTrace();
                } finally {
                    if (Constant.MT.HAND_SHAKE.getType() == mt)
                        sendMessage(mt, sendMsg, Constant.DELAY_HAND_SHAKE);
                }

                break;
            }
        }
        return true;
    }
};

private NettyClient() {
    init();
}

public synchronized static NettyClient getInstance() {
    if (mInstance == null)
        mInstance = new NettyClient();
    return mInstance;
}

private void init() {
    workThread = new HandlerThread(NettyClient.class.getName());
    workThread.start();
    mWorkHandler = new Handler(workThread.getLooper(), mWorkHandlerCallback);
    nettyClientHandler = new NettyClientHandler();
    nettyClientHandler.setConnectStatusListener(new OnConnectStatusListener() {
        @Override
        public void onDisconnected() {
            sendReconnectMessage();
        }
    });
    mWorkHandler.sendEmptyMessage(MESSAGE_INIT);
}

@Override
public void connect(String host, int port) {
    this.host = host;
    this.port = port;
    mWorkHandler.sendEmptyMessage(MESSAGE_CONNECT);
}

@Override
public void addDataReceiveListener(OnDataReceiveListener listener) {
    if (nettyClientHandler != null)
        nettyClientHandler.addDataReceiveListener(listener);
}

private void sendReconnectMessage() {
   mWorkHandler.sendEmptyMessageDelayed(MESSAGE_CONNECT, Constant.DELAY_CONNECT);
}

@Override
public void sendMessage(int mt, String msg, long delayed) {
    if (TextUtils.isEmpty(msg))
        return;
    Message message = new Message();
    Bundle bundle = new Bundle();
    message.what = MESSAGE_SEND;
    bundle.putString(ACTION_SEND_MSG, msg);
    bundle.putInt(ACTION_SEND_TYPE, mt);
    message.setData(bundle);
    mWorkHandler.sendMessageDelayed(message, delayed);
}

private String constructMessage(String json) {
    String message;
    //与后台协议好,如何设置校验部分,然后和json一起发给服务器
    return message;
}

}


## 5 NettyClient的使用
NettyClient采用了全局单例的模式。
1. 在Activity或者Fragment中为NettyClient绑定相应的数据接收监听器:
  NettyClient.getInstance().addDataReceiveListener(new INettyClient.OnDataReceiveListener() {
        @Override
        public void onDataReceive(int mt, String json) {

//这些逻辑运行在主线程
if (mt == Constant.MSG_TYPE) {
UserModel user=new Gson().fromJson(json, UserModel .class);
textView.setText(user.getName);
}
}

2. 在Activity或者Fragment中发起请求:
```NettyClient.getInstance().sendMessage(Constant.MSG_TYPE, "发向服务器的消息", 0);```
可以看到使用起来十分简单方便。

相关文章

网友评论

  • hyk888:大神,请问下,在NettyClientHandler类里面重写的userEventTriggered触发心跳机制的方法为什么没有执行
  • hyk888:在网上看了很多别人写的案例,觉得你的写得不错,能否发一份完整的代码研究研究,最近也在做socket编程,有很多不懂的地方,希望能够学习下
  • 1f49e0c77db2:大神,可以把项目代码发给我么?现在项目需要Socket通讯
  • 3af2f29c8562:你好,看了您的文章对我帮助很大,能发下源码吗 kingkadienm@163.com
    天才木木:@羁絆那淩亂的回憶 建议github搜索 netty android
  • aa0ed98d99d7:楼主,刚接触socket通信,能把github地址公布下么
    天才木木:@乘风_16a7 github搜索 netty android
  • 啊哦买嘎的::+1: 最近正在研究 netty ,大神的思路清晰简单易懂。赞!赞!赞!
    天才木木:很水,自己回头看博客都觉得比较混乱。没有误导你们已经是万幸
  • ghjjjhghh:demo啊
    天才木木:@2fb2f6cf4922 github上搜搜netty android,最近没空~
  • 3cf4933d1229:大神Github上有Demo吗?正在做socket这方面的想学习下
  • 爺珍爱那颗心:楼主,我最近也在弄netty,方便提供下github地址么
  • 0_oHuanyu:现在的安卓开发都这么懒了吗?百度能搜到的东西在这里问博主……
  • 繁华落寞的世界:我把netty jar包 导入进来感觉太大了,2M多,但其实我们用到的就只有几个类,关联的类就算加起来其实也不大。你有简化后的nettyjar包吗?如果有的话,发我一份,感谢至极!
  • 687c1694d039:大神,上传到了github上了么?
  • 繁华落寞的世界:Netty 底层封装了心跳包检测功能吗?
    41125b8d02d2:这个也是我考虑的到的事情,netty的jar包 包含了服务器的整个封装实现,作为客户端确实没必要弄这么大的,有没有兴趣一起用nio 搞个比较完善的Android nio客户端出来玩玩
    繁华落寞的世界:@天才木木 我把netty jar包 导入进来感觉太大了,2M多,但其实我们用到的就只有几个类,关联的类就算加起来其实也不大。你有简化后的nettyjar包吗?如果有的话,发我一份,感谢至极!
    天才木木:@繁华落寞的世界 自己定时发送一条简短的消息即可
  • 简单1104:大神,传到github了吗?求github地址啊! :grin:
  • 9665a07a4d9c:您好,大神,传到github了没有啊
  • 6937b18671c3:顶,什么时候传到github啊?
  • a466ebd9f82c:写得挺好的.能提供下完整的demo 就更好拉.希望版主能快些把代码放到github上.
  • 73801abd1826:写的是什么鬼,新手完全看不懂,而且里面很多几个类还没上传!我不是说什么写的不好,你吧代码上传直接看代码就好了
  • 9fe9d62085c4:表示有些疑惑:在Android组件提供了service作为后台长期运行;朋友的NettyClient代码没有放倒service里面运行,那么生命周期是多久?应用退出后会保持长连接么???只知道4大组件的生命周期和回收优先级,而NettyClient的呢?
    9fe9d62085c4:@天才木木 单例+多线程 与service+多线程的区别在于,系统资源紧张回收后,service只要在资源充足的情况下,再次开启。还有其他的么
    9fe9d62085c4:@天才木木 不懂
    天才木木:@子晓_believe 没有放在Service中,当客户端与服务端建立连接后,我们只需要在保持一个全局单例的NettyClient便可持续地接收到服务端的信息。生命周期就是Java中静态变量的生命周期:在进程创建的时候就被创建了,且其生命周期同该进程的生命周期一样长。
  • 9fe9d62085c4:朋友,一只比较纳闷,既然activity可以开线程,做耗时处理,还要service干啥。。。。。或者给些参考资料吧
    9fe9d62085c4:@天才木木 我查到的是,防止Activity销毁失去thread的控制;而service可以被所有的activity绑定调用;请问还有其它更多的消息么。。。。
    天才木木:@子晓_believe 把你的问题百度一下,基本上能够解除你的疑惑了
  • 宁静致远灬:写的很有耐心,谢谢大神!
    天才木木:@路漫漫其修远兮灬 没有没有,我也是小白一个,相互学习:relaxed:
  • 458732213d2d:你好,Demo传到github了吗
  • 最最最最醉人:写的挺好的,如果得把整个项目的代码提供下载学习就更好了,最近正在做socket,很多不懂的。
    刘启敏:@天才木木 能把项目代码给我学习下吗
    最最最最醉人:@天才木木 没事,你更新了@ 一下我吧,我最近也看了一下mina
    天才木木:@这种重中之重重中之重 有空会上传到github,如果你急需,记得提醒我一下 :smiley:

本文标题:Android开发之使用Netty进行Socket编程(三)

本文链接:https://www.haomeiwen.com/subject/xegvrttx.html