美文网首页
Netty官方样例(Echo ‐ the very basic

Netty官方样例(Echo ‐ the very basic

作者: 东南枝下 | 来源:发表于2022-02-08 14:55 被阅读0次
  • 代码来源于netty官方样例

netty响应式编程模型 Reactor pattern

图片.png

服务端代码

这个样例的关键在于服务端,服务端需要建立两个线程组bossGroupworkerGroup分别用于处理连接请求和真正的客户端业务,对应的模型如上图

服务端的关键代码分别是
1. ServerBootstrap bootstrap = new ServerBootstrap();创建服务器端的启动对象
2. bootstrap.group(bossGroup, workerGroup) ...链式的配置参数
3. ChannelFuture f = bootstrap.bind(PORT).sync(); 绑定端口,启动

服务端需要一个ChannelHandler来专注于业务逻辑的实现,只需要继承官方提供的标准Handler,重写相应的方法,实现具体的逻辑即可。

此处修改官方样例,在与客户端连接成功时打印一条消息,在收到消息时将消息打印出来,并发送一条确认消息给客户端

package example.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

/**
 * @author netty
 */
public final class EchoServer {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.
        final SslContext sslCtx;
        if (SSL) {
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }

        // Configure the server.
        // 创建两个线程组bossGroup和workerGroup,bossGroup只是处理连接请求,真正和客户端业务处理的是workerGroup
        // NioEventLoopGroup的子线程数默认是CPU核数的2倍
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        final EchoServerHandler serverHandler = new EchoServerHandler();
        try {
            // 关键代码 1:创建服务器端的启动对象
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 关键代码 2:配置参数,链式
            // 设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    // 使用NioServerSocketChannel作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 初始化服务器连接队列的大小,服务器端处理客户端的连接请求是顺序处理的,所以同一时间只能处理一个客户端连接
                    // 多个客户端同时来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc()));
                            }
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            // 对workerGroup的SocketChannel绑定处理器
                            p.addLast(serverHandler);
                        }
                    });

            // Start the server.
            // 关键代码 3:绑定端口,启动
            ChannelFuture f = bootstrap.bind(PORT).sync();

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

package example.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * @author netty
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    /**
     * 客户端连接服务器就会触发该方法
     *
     * @param ctx 上下文,内含通道channel,管道pipeline
     * @throws Exception 异常
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // super.channelActive(ctx);
        Channel channel = ctx.channel();
        System.out.println("客户端连接通道建立完成");
    }

    /**
     * 读取客户端发送的数据
     *
     * @param ctx 上下文
     * @param msg 客户端发送的数据
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // ctx.write(msg);
        ByteBuf buf = (ByteBuf) msg;
        String msgStr = buf.toString(CharsetUtil.UTF_8);
        Channel channel = ctx.channel();
        System.out.println("收到客户端发送的消息---->" + msgStr);
        channel.writeAndFlush(Unpooled.copiedBuffer("------>服务端回复,已经收到消息:" + msgStr, CharsetUtil.UTF_8));
    }

    /**
     * 数据读取完毕处理方法
     *
     * @param ctx 上下文
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        System.out.println("消息读取完毕........V");
        ctx.flush();
    }

    /**
     * 引发异常时处理
     *
     * @param ctx   上下文
     * @param cause 错误
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.out.println("异常发生........X");
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

客户端

客户端写法和服务端差不多,不同的是启动的时候使用的是connect ->ChannelFuture channelFuture = b.connect(HOST, PORT).sync();

在连接后写一个Scanner扫描控制台输入,来给客户端发消息

调整客户端的Handler,在channelActive中给服务端发送一条连接成功的消息,在channelRead中将收到的消息打印出来,并注释原本代码,免得形成了客户端发送服务端回复的死循环

package example.echo;


import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.CharsetUtil;

import java.util.Scanner;


/**
 * @author netty
 */
public final class EchoClient {

    static final boolean SSL = System.getProperty("ssl") != null;
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {
        // Configure SSL.git
        final SslContext sslCtx;
        if (SSL) {
            sslCtx = SslContextBuilder.forClient()
                    .trustManager(InsecureTrustManagerFactory.INSTANCE).build();
        } else {
            sslCtx = null;
        }

        // Configure the client.
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            if (sslCtx != null) {
                                p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
                            }
                            //p.addLast(new LoggingHandler(LogLevel.INFO));
                            p.addLast(new EchoClientHandler());
                        }
                    });

            // Start the client.
            ChannelFuture channelFuture = b.connect(HOST, PORT).sync();

            // 客户端连接后会返回一个channel,可以用此与服务端通信
            Channel channel = channelFuture.channel();
            System.out.println("=================" + channel.localAddress() + "===============");
            // 扫描控制台输入
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
            }
            // Wait until the connection is closed.
            channelFuture.channel().closeFuture().sync();

        } finally {
            // Shut down the event loop to terminate all threads.
            group.shutdownGracefully();
        }
    }
}
package example.echo;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

//    private final ByteBuf firstMessage;

//    /**
//     * Creates a client-side handler.
//     */
//    public EchoClientHandler() {
//        firstMessage = Unpooled.buffer(EchoClient.SIZE);
//        for (int i = 0; i < firstMessage.capacity(); i++) {
//            firstMessage.writeByte((byte) i);
//        }
//    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 连接成功后给客户端发送一条消息
        ByteBuf firstMessage = Unpooled.copiedBuffer("客户端连接成功发送的第一条消息......OK", CharsetUtil.UTF_8);
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf bf = (ByteBuf) msg;
        System.out.println(bf.toString(CharsetUtil.UTF_8));
//        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

实现效果

  • 客户端
=================/127.0.0.1:54400===============
------>服务端回复,已经收到消息:客户端连接成功发送的第一条消息......OK
hello
------>服务端回复,已经收到消息:hello
你好
------>服务端回复,已经收到消息:你好
  • 服务端
客户端连接通道建立完成
收到客户端发送的消息---->客户端连接成功发送的第一条消息......OK
消息读取完毕........V
收到客户端发送的消息---->hello
消息读取完毕........V
收到客户端发送的消息---->你好
消息读取完毕........V

参考学习: https://www.bilibili.com/video/BV1fA41157Ht?share_source=copy_web
netty官方文档:https://netty.io/wiki/index.html
netty官方样例:https://netty.io/4.1/xref/io/netty/example/echo/package-summary.html

相关文章

网友评论

      本文标题:Netty官方样例(Echo ‐ the very basic

      本文链接:https://www.haomeiwen.com/subject/rommkrtx.html