Accept事件处理
- 继承关系:NioSocketAcceptor -> AbstractPollingIoAcceptor -> AbstractIoAcceptor -> AbstractIoService
NioSocketAcceptor
selector
init():初始化NIO.selector
- open():将Accept事件注册到selector
- select():执行NIO.selector.select()
AbstractPollingIoAcceptor
- Acceptor -> Runnable:run()
SimpleIoProcessorPool -> IoProcessor
bindInternal():调用startupAcceptor(),执行Accept.run();
- Accept.run():调用open()、select(),将select()返回的SelectionKey通过accept(IoProcessor)初始化为NioSocketSession,随后初始化session,并将session绑定到processor(最终绑定processor的逻辑在SimpleIoProcessorPool.add(),该方法调用NioProcessor数组中某个NioProcessor的add方法将session与其绑定,第一次调用NioProcessor.add时将启动processor线程)
Read/Write事件处理
- 继承关系:NioProcessor -> AbstractPollingIoProcessor
SimpleIoProcessorPool
- NioProcessor[]:Processor数组,默认大小为CPU核心数量+1;
NioProcessor
selector
init();打开selector;
- select():调用selector的select();
- selectedSessions():返回selector.selectedKeys()方法;
- registerNewSelector():处理当程序掉入nio坑里时,将老selector中所有keys注册到新selector上面;
AbstractPollingIoProcessor
Processor -> Runnable
add():将新session加入到Queue,随后检查如果第一次被调用的话则启动startupProcessor()方法执行Processor.run();
- Processor.run():执行select(),判断是否掉到nio坑里,随后处理newSession,初始化newSession并绑定Read/Write事件到selector;然后调用process开始处理活跃的session,再调用flush(),将需要发送的数据交给TCP
- process():遍历活跃的session集合调用process(session)处理每个session;
- process(session):判断如果是可读状态的session则调用read(),如果可写状态则将session加入flushingSessions队列;
- read():将数据读取后调用IoFilterChain.fireMessageReceived方法,此时数据将通过Mina的filters处理数据,最终交给handler处理。
- flush():遍历flushingSession取出session,调用flushNow(session),如果还有未发送的数据则需要调用scheduleFlush(session),“//Kernel buffer is full.”;
- flushNow(session):取出session中的writeRequestQueue,发送队列里面的数据;
- scheduleFlush(session):将session重新加入到flushingSession中;
session
NioSocketSession -> NioSession -> AbstractIoSession