我的第一个Netty应用

作者: 日更专用小马甲 | 来源:发表于2019-03-20 21:59 被阅读1次

这是《手写RPC框架,我学会了什么?》系列的第07篇

基于之前实现的序列化/反序列化,编码/解码,这一次组合实现一个server。其中存有一个已注册服务的map,key是服务名称,value暂时不实现。client端发送服务名,如果已经注册过,则返回“已注册”,否则返回“未注册”。

在这个基础上,只需要再多加一点点功能就相当于把RPC框架重写了。

client端通过短连接的形式和服务端通信。也就是说,每次发送数据,都需要与服务端重新建立连接。缺点是通信的开销增大,优点是服务端不用一直维持连接从而占用系统资源。

需要实现1个ServerHandler、1个ClientHandler、1个Server以及1个Client。

ServerHandler

/**
 * @author nathan
 * @date 2019/3/20
 */
public class ServerHandler extends ChannelInboundHandlerAdapter {

    private Map<String, Object> registered = new HashMap<>();

    {
        registered.put("foo", "registered");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("[server] received message: " + msg);
        String out = (String) registered.getOrDefault(msg, "unregistered");
        ctx.write(out);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ClientHandler

/**
 * @author nathan
 * @date 2019/3/20
 */
public class ClientHandler extends SimpleChannelInboundHandler {

    private String request;

    @Getter
    private String response;

    private CountDownLatch latch;

    public ClientHandler(String request, CountDownLatch latch) {
        this.request = request;
        this.latch = latch;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(request);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
        latch.countDown();
        response = (String) msg;
        System.out.println("[client] received message: " + response);
    }
}

Server

/**
 * @author nathan
 * @date 2019/3/7
 */
public class Server {

    private final int port;

    public Server(int port) {
        this.port = port;
    }

    public void start() throws Exception {

        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline()
                                    .addLast(new RpcDecoder(String.class))
                                    .addLast(new RpcEncoder(String.class))
                                    .addLast(new ServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture cf = b.bind(port).sync();

            System.out.println("[server] started...");

            cf.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

Client

/**
 * @author nathan
 * @date 2019/3/16
 */
public class Client {

    private final String host;
    private final int port;

    public Client(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object send(String data) {

        CountDownLatch latch = new CountDownLatch(1);
        ClientHandler handler = new ClientHandler(data, latch);

        EventLoopGroup workerGroup = new NioEventLoopGroup();

        Object res = null;
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.remoteAddress(new InetSocketAddress(host, port));
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) {
                    ch.pipeline()
                            .addLast(new RpcEncoder(String.class))
                            .addLast(new RpcDecoder(String.class))
                            .addLast(handler);
                }
            });

            ChannelFuture cf = b.connect().sync();
            System.out.println("[client] connected...");

            // wait until get a response
            latch.await();

            // close
            cf.channel().closeFuture();

            res = handler.getResponse();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
        return res;
    }
}

测试用例

/**
 * @author nathan
 * @date 2019/3/20
 */
public class FrameworkTest {

    private final int port = 8080;
    private final String ip = "127.0.0.1";

    @BeforeTest
    public void before() {
        Server server = new Server(port);
        new Thread(() -> {
            try {
                server.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    @Test
    public void test() {

        Client client = new Client(ip, port);

        Assert.assertEquals(client.send("foo"), "registered");

        Assert.assertEquals(client.send("bar"), "unregistered");
    }
}

相关文章

网友评论

    本文标题:我的第一个Netty应用

    本文链接:https://www.haomeiwen.com/subject/cfyxvqtx.html