传送门
《Netty 实战》第1,2,3章
《Netty 实战》第4,5,6章
《Netty 实战》第7,8,9章
《Netty 实战》第10,11章
《Netty 实战》第12,13章
第7章 EventLoop和线程模型
-
线程模型概述
线程模型确定了代码的执行方式,并发方式.在java5中,提供了线程池化的Executor API. -
EventLoop接口: netty事件循环
eventloop-class.png
a. Netty4中I/O和时间处理,都由EventLoop中那个指定Thread处理
b. Netty3中只保证入站在EventLoop中,出站在调用线程中,这样就造成了出站事件的同步问题. -
任务调度
a. JDk API Executor类工厂方法,性能/高负载情况下,线程调度开销大.
b. EventLoop 也提供了类似的方法(Schedule/ScheduleAtFixedRate) -
细节
a. 当执行Tread的身份可以确定是,直接在EventLoop任务队列中.
b. 线程分配.
异步传输:
NIO-dispatch.png
分配方式,由EventLoopGroup创建分配.
阻塞传输:
OIO-dispatch.png
分配方式,由Executor创建分配,和Channel数量一一对应,因而一个channle有一个Thread,切换开销大.
实际使用Netty,并不需要关心EventLoop的具体细节,毕竟这个最复杂的工作,是由Netty框架完成的.
第8章 引导BootStrap
组织PipeLne,Handler,EventloopGroup几大要素,使得应用得以运行.
-
BootStrap类
TODO:继承图
bootstap-class.png
类声明: AbstractBootStrap<B extends AbstractBootStrap<B,C>, C extend channel>
为父类型参数,这样可以做到链式继承. -
引导客户端-Bootstrap
a. 客户端流式(stream)语法
public void bootstrap() {
// 创建Nio的selector
EventLoopGroup group = new NioEventLooGroup();
// 创建客户端BootStrap
Bootstrap bootstrap = new Bootstrap();
// 流水调用
bootstrap.group(group)
// 这个需要和NioEventLooGroup配套
.channel(NioSocketChannel.class)
// 直接使用匿名Handler
.handler(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
}
});
// 进行连接动作
ChannelFuture future =
bootstrap.connect(
new InetSocketAddress("www.manning.com", 80));
future.addListener(new ChannelFutureListener() {
// 异步监听器,在连接建立时候判断channelFutrue状态.
@Override
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
});
}
这里需要注意的是EventLoopGroup和Channel的配套(第4章)
-
引导服务端-ServerBootstrap
服务端和客户端声明的区别不大,区别在于一个用connect(),一个用bind(). -
从Channel引导客户端(中继)
一般可以共享EventLoop,避免额外的线程创建,
TODO:8-4 -
在引导过程添加多个Channel
使用一个继承ChannelInitializer类完成->override其中的initChannel()方法
public void bootstrap() throws InterruptedException {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(new NioEventLoopGroup(), new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializerImpl());
ChannelFuture future = bootstrap.bind(new InetSocketAddress(8080));
future.sync();
}
// 覆盖initChannel()方法
final class ChannelInitializerImpl extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
// 先从Channel中获取Pipeline,这个本来就是一一对应的,而且线程安全.
ChannelPipeline pipeline = ch.pipeline();
// 多次调用addLast,都是在队列尾加
pipeline.addLast(new HttpClientCodec());
pipeline.addLast(new HttpObjectAggregator(Integer.MAX_VALUE));
}
}
-
使用Netty的ChannelOption
ChannelOption可以一次配置,并应用与改引导创建的所有channel -
引导DatagramChannel(UDP协议) -- 13章
a. 注意OioDatagramChannel和OioEventLoopGroup的配对.
b. Handle处理的是DatagramPacket而非ByteBuf -
优雅的关闭
// 调用关闭方法-shutdownGracefully(),返回一个Futre
Future<?> future = group.shutdownGracefully();
// 同步等待future的处理结束,当然也可以异步返回,在其他地方判断.
future.syncUninterruptibly();
第9章 单元测试-EmbeddedChannel
- EmbededChannel概述
a. 将入站,出站数据写入EmbededChannel中,然后在Pipeline判断到达了什么,是否符合预期,提供了一个测试业务逻辑(Handler)的方法.
b. WriteInbound-> readInbound->测试入站处理
c. Writeoutbound-> readOutbound->测试出站处理 - 使用EmbeddedChannel --看源码吧
a. 测试入站信息. 解码器类FixedLengthFrameDecoder,测试类FixedLengthFrameDecoderTest
b. 测试入站信息.
编码器类AbsIntegerEncoder,测试类AbsIntegerEncoderTest
c. 测试异常 - 相对复杂一点
需要注意的是EmbededChannel的构造方法,传入的值,是一个不定长的handlers
public EmbeddedChannel(ChannelHandler... handlers) {
this(EmbeddedChannelId.INSTANCE, handlers);
}















网友评论