Mina-IO线程模型

Accept事件处理

  • 继承关系:NioSocketAcceptor -> AbstractPollingIoAcceptor -> AbstractIoAcceptor -> AbstractIoService

NioSocketAcceptor

  • selector

  • init():初始化NIO.selector

  • open():将Accept事件注册到selector
  • select():执行NIO.selector.select()

AbstractPollingIoAcceptor

  • Acceptor -> Runnable:run()
  • SimpleIoProcessorPool -> IoProcessor

  • bindInternal():调用startupAcceptor(),执行Accept.run();

  • Accept.run():调用open()、select(),将select()返回的SelectionKey通过accept(IoProcessor)初始化为NioSocketSession,随后初始化session,并将session绑定到processor(最终绑定processor的逻辑在SimpleIoProcessorPool.add(),该方法调用NioProcessor数组中某个NioProcessor的add方法将session与其绑定,第一次调用NioProcessor.add时将启动processor线程)

Read/Write事件处理

  • 继承关系:NioProcessor -> AbstractPollingIoProcessor

SimpleIoProcessorPool

  • NioProcessor[]:Processor数组,默认大小为CPU核心数量+1;

NioProcessor

  • selector

  • init();打开selector;

  • select():调用selector的select();
  • selectedSessions():返回selector.selectedKeys()方法;
  • registerNewSelector():处理当程序掉入nio坑里时,将老selector中所有keys注册到新selector上面;

AbstractPollingIoProcessor

  • Processor -> Runnable

  • add():将新session加入到Queue,随后检查如果第一次被调用的话则启动startupProcessor()方法执行Processor.run();

  • Processor.run():执行select(),判断是否掉到nio坑里,随后处理newSession,初始化newSession并绑定Read/Write事件到selector;然后调用process开始处理活跃的session,再调用flush(),将需要发送的数据交给TCP
  • process():遍历活跃的session集合调用process(session)处理每个session;
  • process(session):判断如果是可读状态的session则调用read(),如果可写状态则将session加入flushingSessions队列;
  • read():将数据读取后调用IoFilterChain.fireMessageReceived方法,此时数据将通过Mina的filters处理数据,最终交给handler处理。
  • flush():遍历flushingSession取出session,调用flushNow(session),如果还有未发送的数据则需要调用scheduleFlush(session),“//Kernel buffer is full.”;
  • flushNow(session):取出session中的writeRequestQueue,发送队列里面的数据;
  • scheduleFlush(session):将session重新加入到flushingSession中;

session

NioSocketSession -> NioSession -> AbstractIoSession