common:
public interface HelloService {
String hello(String message);
}
provider:
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String message) {
System.out.println(message);
return "调用成功";
}
}
server:
public class ServerBootstrap {
public static void main(String[] args) {
new NettyServer().start("127.0.0.1", 7000);
}
}
public class NettyServer {
public void start(String host, int port) {
startServer0(host, port);
}
public static void startServer0(String host, int port) {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(host, port).sync();
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 找到接口实现类
//调用
//返回结果
System.out.println("服务端接收到" + msg);
if (msg.toString().startsWith("HelloService#hello#")){
String result = new HelloServiceImpl().hello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
ctx.writeAndFlush(result);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
client:
public class ClientBootstrap {
public static void main(String[] args) {
NettyClient nettyClient = new NettyClient();
HelloService bean = (HelloService) nettyClient.getBean(HelloService.class, "HelloService#hello#");
String result = bean.hello("1233211234567");
System.out.println("result = " + result);
}
}
public class NettyClient {
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
public Object getBean(final Class<?> clazz, final String providerName) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{clazz}, ((proxy, method, args) -> {
if (client == null) {
initClient();
}
//设置发送服务端信息
client.setParam(providerName + args[0]);
return executor.submit(client).get();
}));
}
private static void initClient() {
client = new NettyClientHandler();
EventLoopGroup executors = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(executors)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(client);
}
});
try {
bootstrap.connect("127.0.0.1", 7000).sync();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
private String result;
private String param;
private ChannelHandlerContext context;
/**
调用流程:
1.首先通过channelActive给context赋值
2.setParam 设置参数
3.call方法,通过context将参数发送给服务端,并且等待
4.channelRead服务端回复消息,将服务端会送的消息赋值给result,唤醒等待线程
5.call被唤醒,返回result
*/
@Override
public synchronized Object call() throws Exception {
System.out.println("call方法被调用");
context.writeAndFlush(param);
wait();
return result;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
context = ctx;
}
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channelRead");
result = msg.toString();
notify();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
public void setParam(String param) {
this.param = param;
}
}
网友评论