这是《手写RPC框架,我学会了什么?》系列的第07篇
基于之前实现的序列化/反序列化,编码/解码,这一次组合实现一个server。其中存有一个已注册服务的map,key是服务名称,value暂时不实现。client端发送服务名,如果已经注册过,则返回“已注册”,否则返回“未注册”。
在这个基础上,只需要再多加一点点功能就相当于把RPC框架重写了。
client端通过短连接的形式和服务端通信。也就是说,每次发送数据,都需要与服务端重新建立连接。缺点是通信的开销增大,优点是服务端不用一直维持连接从而占用系统资源。
需要实现1个ServerHandler、1个ClientHandler、1个Server以及1个Client。
ServerHandler
/**
* @author nathan
* @date 2019/3/20
*/
public class ServerHandler extends ChannelInboundHandlerAdapter {
private Map<String, Object> registered = new HashMap<>();
{
registered.put("foo", "registered");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("[server] received message: " + msg);
String out = (String) registered.getOrDefault(msg, "unregistered");
ctx.write(out);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ClientHandler
/**
* @author nathan
* @date 2019/3/20
*/
public class ClientHandler extends SimpleChannelInboundHandler {
private String request;
@Getter
private String response;
private CountDownLatch latch;
public ClientHandler(String request, CountDownLatch latch) {
this.request = request;
this.latch = latch;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(request);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
latch.countDown();
response = (String) msg;
System.out.println("[client] received message: " + response);
}
}
Server
/**
* @author nathan
* @date 2019/3/7
*/
public class Server {
private final int port;
public Server(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new RpcDecoder(String.class))
.addLast(new RpcEncoder(String.class))
.addLast(new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture cf = b.bind(port).sync();
System.out.println("[server] started...");
cf.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
Client
/**
* @author nathan
* @date 2019/3/16
*/
public class Client {
private final String host;
private final int port;
public Client(String host, int port) {
this.host = host;
this.port = port;
}
public Object send(String data) {
CountDownLatch latch = new CountDownLatch(1);
ClientHandler handler = new ClientHandler(data, latch);
EventLoopGroup workerGroup = new NioEventLoopGroup();
Object res = null;
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.remoteAddress(new InetSocketAddress(host, port));
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new RpcEncoder(String.class))
.addLast(new RpcDecoder(String.class))
.addLast(handler);
}
});
ChannelFuture cf = b.connect().sync();
System.out.println("[client] connected...");
// wait until get a response
latch.await();
// close
cf.channel().closeFuture();
res = handler.getResponse();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
return res;
}
}
测试用例
/**
* @author nathan
* @date 2019/3/20
*/
public class FrameworkTest {
private final int port = 8080;
private final String ip = "127.0.0.1";
@BeforeTest
public void before() {
Server server = new Server(port);
new Thread(() -> {
try {
server.start();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
@Test
public void test() {
Client client = new Client(ip, port);
Assert.assertEquals(client.send("foo"), "registered");
Assert.assertEquals(client.send("bar"), "unregistered");
}
}
网友评论