美文网首页
I/O-手写简单netty

I/O-手写简单netty

作者: 麦大大吃不胖 | 来源:发表于2020-11-30 12:27 被阅读0次

by shihang.mai

1. 概述

netty原理

一共有3个类:

MainThread:这个类负责程序入口,不做任何的业务

SelectorThreadGroup:如上图,会有多个Selector,故用这个类管理

SelectorThread:核心类,用来做Selector

2. 混杂版

不区分boss,worker线程

MainThread

public class MainThread {
    public static void main(String[] args) {
        SelectorThreadGroup stg = new SelectorThreadGroup(3);
        stg.bind(9999);
    }
}

SelectorThreadGroup

public class SelectorThreadGroup {
    SelectorThread[] sts;

    ServerSocketChannel server;
        //用来做SelectorThread轮询
    AtomicInteger xid = new AtomicInteger(0);

    public SelectorThreadGroup(int num) {
        sts = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            //传入当前的这个SelectorThreadGroup
            sts[i] = new SelectorThread(this);
            //这里点用了线程开始这里,会在线程的int nums = selector.select();阻塞
            new Thread(sts[i]).start();
        }
    }

    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            //选一个Selector
            nextSelector(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel c){
        SelectorThread st = next();
        /*
        *为什么想到用LinkedBlockingDeque<Channel>通讯呢?
        *1.当st.selector.wakeup()在上时,线程执行int nums = selector.select();阻塞了,然后调用st.selector.wakeup(),线程被打断,然后s.register(st.selector, SelectionKey.OP_ACCEPT)还没来得及,线程又阻塞了,即还没来得及注册上
        *2.当st.selector.wakeup()在下时,根本不会执行,st.selector已经阻塞了
        */
        //将Channel放入SelectorThread的LinkedBlockingDeque<Channel>,将Channel交由线程自己去处理
        st.lbq.add(c);
        //唤醒selector,那么子线程就会继续走下去
        st.selector.wakeup();
    }

    private SelectorThread next() {
        int index = xid.incrementAndGet() % sts.length;
        return sts[index];
    }
}

SelectorThread

public class SelectorThread implements  Runnable{

    Selector selector = null;

    LinkedBlockingDeque<Channel> lbq = new LinkedBlockingDeque<>();
        //因为需要用到选择Selector,故引入SelectorThreadGroup,直接调用即可
    SelectorThreadGroup stg;
  
    SelectorThread(SelectorThreadGroup stg){
        try {
            this.stg = stg;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){
            try {
                //阻塞
                int nums = selector.select();
                if(nums>0){
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    if(iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if(key.isAcceptable()){
                            acceptHandler(key);
                        }else if(key.isReadable()){
                            readHandler(key);
                        }else if(key.isWritable()){

                        }
                    }
                }
                if(!lbq.isEmpty()){
                    Channel c = lbq.take();
                    if(c instanceof ServerSocketChannel){
                        ServerSocketChannel server = (ServerSocketChannel) c;
                        server.register(selector,SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName()+" register listen");
                    }else if(c instanceof SocketChannel){
                        SocketChannel client = (SocketChannel) c;
                        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
                        client.register(selector,SelectionKey.OP_READ,byteBuffer);
                        System.out.println(Thread.currentThread().getName()+" register client:"+client.getRemoteAddress());

                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void readHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+" read......");
        ByteBuffer buffer = (ByteBuffer)key.attachment();
        SocketChannel client=(SocketChannel)key.channel();
        buffer.clear();
        while(true){
            try {
                int num=client.read(buffer);
                if(num>0){
                    buffer.flip();
                    while(buffer.hasRemaining()){
                        client.write(buffer);
                    }
                    buffer.clear();
                }else if(num==0){
                    break;
                }else if(num<0){
                    System.out.println("client:"+client.getRemoteAddress()+" is closed.....");
                    key.cancel();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+" acceptHandler....");
        ServerSocketChannel server=(ServerSocketChannel)key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            stg.nextSelector(client);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3. 正常版

区分boss,worker线程

MainThread

public class MainThread {
    public static void main(String[] args) {
        //创建多个boss线程
        SelectorThreadGroup boss = new SelectorThreadGroup(3);
        //创建多个worker线程
        SelectorThreadGroup worker = new SelectorThreadGroup(3);
        //boss持有worker,因为当接收到客户端时,需要将客户端分配给worker线程
        boss.setWorker(worker);
        boss.bind(9999);
        boss.bind(8888);
        boss.bind(7777);
        boss.bind(6666);
    }
}

SelectorThreadGroup

public class SelectorThreadGroup {

    SelectorThread[] sts;

    ServerSocketChannel server;

    AtomicInteger xid = new AtomicInteger(0);

    //初始化的时候,指代自己
    SelectorThreadGroup stg = this;
        
    //在Main时,boss调用setWorker持有worker线程
    public void setWorker(SelectorThreadGroup stg){
        this.stg = stg;
    }

    public SelectorThreadGroup(int num) {
        sts = new SelectorThread[num];
        for (int i = 0; i < num; i++) {
            sts[i] = new SelectorThread(this);
            new Thread(sts[i]).start();
        }
    }

    public void bind(int port) {
        try {
            server = ServerSocketChannel.open();
            server.configureBlocking(false);
            server.bind(new InetSocketAddress(port));
            nextSelector(server);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void nextSelector(Channel c){
        try {
            if(c instanceof  ServerSocketChannel){
                //在自己的boss线程中选取
                SelectorThread st = next();
                st.lbq.put(c);
                //当时ServerSocketChannel时,需要将worker传到SelectorThread中,以便接收到client,分配Selector
                st.setWorker(stg);
                st.selector.wakeup();
            }else {
                //在worker线程中选取
                SelectorThread st = nextV2();
                st.lbq.put(c);
                st.selector.wakeup();
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private SelectorThread next() {
        int index = xid.incrementAndGet() % sts.length;
        return sts[index];
    }

    private SelectorThread nextV2() {
        int index = xid.incrementAndGet() % stg.sts.length;
        return sts[index];
    }
}

SelectorThread

public class SelectorThread extends ThreadLocal<LinkedBlockingDeque<Channel>> implements  Runnable{

    Selector selector = null;

    LinkedBlockingDeque<Channel> lbq = get();

    SelectorThreadGroup stg;

    @Override
    protected LinkedBlockingDeque<Channel> initialValue() {

        return new LinkedBlockingDeque<>();
    }

    SelectorThread(SelectorThreadGroup stg){
        try {
            this.stg = stg;
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){
            try {
                //阻塞
                int nums = selector.select();
                if(nums>0){
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = keys.iterator();
                    if(iterator.hasNext()){
                        SelectionKey key = iterator.next();
                        iterator.remove();
                        if(key.isAcceptable()){
                            acceptHandler(key);
                        }else if(key.isReadable()){
                            readHandler(key);
                        }else if(key.isWritable()){

                        }
                    }
                }
                if(!lbq.isEmpty()){
                    Channel c = lbq.take();
                    if(c instanceof ServerSocketChannel){
                        ServerSocketChannel server = (ServerSocketChannel) c;
                        server.register(selector,SelectionKey.OP_ACCEPT);
                        System.out.println(Thread.currentThread().getName()+" register listen");
                    }else if(c instanceof SocketChannel){
                        SocketChannel client = (SocketChannel) c;
                        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
                        client.register(selector,SelectionKey.OP_READ,byteBuffer);
                        System.out.println(Thread.currentThread().getName()+" register client:"+client.getRemoteAddress());

                    }
                }

            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private void readHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+" read......");
        ByteBuffer buffer = (ByteBuffer)key.attachment();
        SocketChannel client=(SocketChannel)key.channel();
        buffer.clear();
        while(true){
            try {
                int num=client.read(buffer);
                if(num>0){
                    buffer.flip();
                    while(buffer.hasRemaining()){
                        client.write(buffer);
                    }
                    buffer.clear();
                }else if(num==0){
                    break;
                }else if(num<0){
                    System.out.println("client:"+client.getRemoteAddress()+" is closed.....");
                    key.cancel();
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void acceptHandler(SelectionKey key) {
        System.out.println(Thread.currentThread().getName()+" acceptHandler....");
        ServerSocketChannel server=(ServerSocketChannel)key.channel();
        try {
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            stg.nextSelector(client);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setWorker(SelectorThreadGroup stgWorker) {
        this.stg = stgWorker;
    }
}

相关文章

网友评论

      本文标题:I/O-手写简单netty

      本文链接:https://www.haomeiwen.com/subject/elloiktx.html