固然上面说到NIO一个线程就可以支持全部的IO处理赏罚。可是瓶颈也是显而易见的!我们看一个客户端的环境,假如这个客户端多次举办哀求,假如在Handler中的处理赏罚速率较慢,那么后续的客户端哀求城市被积存,导致相应变慢!以是引入了Reactor多线程模子!
Reactor多线程模子

Reactor多线程模子就是将Handler中的IO操纵和非IO操纵分隔,操纵IO的线程称为IO线程,非IO操纵的线程称为事变线程!这样的话,客户端的哀求会直接被丢到线程池中,客户端发送哀求就不会堵塞!
可是当用户进一步增进的时辰,Reactor会呈现瓶颈!由于Reactor既要处理赏罚IO操纵哀求,又要相应毗连哀求!为了分管Reactor的承担,以是引入了主从Reactor模子!
主从Reactor模子

主Reactor用于相应毗连哀求,从Reactor用于处理赏罚IO操纵哀求!
Netty
Netty是一个高机能NIO框架,其是对Reactor模子的一个实现!
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(workerGroup);
- b.channel(NioSocketChannel.class);
- b.option(ChannelOption.SO_KEEPALIVE, true);
- b.handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new TimeClientHandler());
- }
- });
-
- ChannelFuture f = b.connect(host, port).sync();
-
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- }
- public class TimeClientHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ByteBuf m = (ByteBuf) msg;
- try {
- long currentTimeMillis =
- (m.readUnsignedInt() - 2208988800L) * 1000L;
- System.out.println(new Date(currentTimeMillis));
- ctx.close();
- } finally {
- m.release();
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- }
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new TimeServerHandler());
- }
- })
- .option(ChannelOption.SO_BACKLOG, 128)
- .childOption(ChannelOption.SO_KEEPALIVE, true);
- // Bind and start to accept incoming connections.
- ChannelFuture f = b.bind(port).sync();
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- public class TimeServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelActive(final ChannelHandlerContext ctx) {
- final ByteBuf time = ctx.alloc().buffer(4);
- time.writeInt((int)
- (System.currentTimeMillis() / 1000L + 2208988800L));
-
- final ChannelFuture f = ctx.writeAndFlush(time);
- f.addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) {
- assert f == future;
- ctx.close();
- }
- });
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx,
- Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- }
(编辑:湖南网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|