netty服务端启动代码如下
Java代码
1.ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
2. Executors.newCachedThreadPool()));
3. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
4.
5. @Override
6. public ChannelPipeline getPipeline() {
7. ChannelPipeline pipleline = pipeline();
8. //默认最大传输帧大小为16M
9. pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
10. pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
11. pipleline.addLast("handler", handler);
12. return pipleline;
13. }
14.
15. });
16. //设置缓冲区为M
17. bootstrap.setOption("receiveBufferSize", 1048576 * );
18. bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
19. //tcp定期发送心跳包 比如IM里边定期探测对方是否下线
20. //只有tcp长连接下才有意义
21.// bootstrap.setOption("child.keepAlive", true);
22. bootstrap.bind(new InetSocketAddress(port));
服务端事件处理顺序如下:
UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定)
——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功)
在bind的时候做了如下处理
Java代码
1.public Channel bind(final SocketAddress localAddress) {
2. if (localAddress == null) {
3. throw new NullPointerException("localAddress");
4. }
5.
6. final BlockingQueue 7. new LinkedBlockingQueue 8. 9. ChannelHandler binder = new Binder(localAddress, futureQueue); 10. ChannelHandler parentHandler = getParentHandler(); 这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。 接着 Java代码 1.ChannelPipeline bossPipeline = pipeline(); 2. bossPipeline.addLast("binder", binder); 3. if (parentHandler != null) { 4. bossPipeline.addLast("userHandler", parentHandler); 5. } 6. 7. Channel channel = getFactory().newChannel(bossPipeline); 这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用 fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码 Java代码 1.@Override 2. public void channelOpen( 3. ChannelHandlerContext ctx, 4. ChannelStateEvent evt) { 5. 6. try { 7. evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory()); 8. 9. // Split options into two categories: parent and child. 10. Map 11. Map 12. for (Entry 13. if (e.getKey().startsWith("child.")) { 14. childOptions.put( 15. e.getKey().substring(6), 16. e.getValue()); 17. } else if (!e.getKey().equals("pipelineFactory")) { 18. parentOptions.put(e.getKey(), e.getValue()); 19. } 20. } 21. 22. // Apply parent options. 23. evt.getChannel().getConfig().setOptions(parentOptions); 24. } finally { 25. ctx.sendUpstream(evt); 26. } 27. 28. boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress) 29. assert finished; 30. } bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。 Java代码 1.public void sendDownstream(ChannelEvent e) { 2. DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); 3. if (tail == null) { 4. try { 5. getSink().eventSunk(this, e); 6. return; 7. } catch (Throwable t) { 8. notifyHandlerException(e, t); 9. return; 10. } 11. } 12. 13. sendDownstream(tail, e); 14. } 接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。 Java代码 1.public void eventSunk( 2. ChannelPipeline pipeline, ChannelEvent e) throws Exception { 3. Channel channel = e.getChannel(); 4. if (channel instanceof NioServerSocketChannel) { 5. handleServerSocket(e); 6. } else if (channel instanceof NioSocketChannel) { 7. handleAcceptedSocket(e); 8. } 9. } nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。 看下ServerSocketChannel的处理 Java代码 1.private void handleServerSocket(ChannelEvent e) { 2. if (!(e instanceof ChannelStateEvent)) { 3. return; 4. } 5. 6. ChannelStateEvent event = (ChannelStateEvent) e; 7. NioServerSocketChannel channel = 8. (NioServerSocketChannel) event.getChannel(); 9. ChannelFuture future = event.getFuture(); 10. ChannelState state = event.getState(); 11. Object value = event.getValue(); 12. 13. switch (state) { 14. case OPEN: 15. if (Boolean.FALSE.equals(value)) { 16. close(channel, future); 17. } 18. break; 19. case BOUND: 20. if (value != null) { 21. bind(channel, future, (SocketAddress) value); 22. } else { 23. close(channel, future); 24. } 25. break; 26. } 27. } 主要是处理bind事件, Java代码 1.private void bind( 2. NioServerSocketChannel channel, ChannelFuture future, 3. SocketAddress localAddress) { 4. 5. boolean bound = false; 6. boolean bossStarted = false; 7. try { 8. channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog()); 9. bound = true; 10. 11. future.setSuccess(); 12. fireChannelBound(channel, channel.getLocalAddress()); 13. 14. //取出一个boss线程,然后交给Boss类去处理。 15. Executor bossExecutor = 16. ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor; 17. DeadLockProofWorker.start( 18. bossExecutor, 19. new ThreadRenamingRunnable( 20. new Boss(channel), 21. "New I/O server boss #" + id + " (" + channel + ')')); 22. bossStarted = true; 23. } catch (Throwable t) { 24. future.setFailure(t); 25. fireExceptionCaught(channel, t); 26. } finally { 27. if (!bossStarted && bound) { 28. close(channel, future); 29. } 30. } 31. } 看下Boss类,它实现了Runnable接口 Java代码 1.private final Selector selector; 2. private final NioServerSocketChannel channel; 3. 4. Boss(NioServerSocketChannel channel) throws IOException { 5. this.channel = channel; 6. 7. selector = Selector.open(); 8. 9. boolean registered = false; 10. try { 11. channel.socket.register(selector, SelectionKey.OP_ACCEPT); 12. registered = true; 13. } finally { 14. if (!registered) { 15. closeSelector(); 16. } 17. } 18. 19. channel.selector = selector; 代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。 再看下Boss类的run方法 Java代码 1.public void run() { 2. final Thread currentThread = Thread.currentThread(); 3. 4. channel.shutdownLock.lock(); 5. try { 6. for (;;) { 7. try { 8. if (selector.select(1000) > 0) { 9. selector.selectedKeys().clear(); 10. } 11. 12. SocketChannel acceptedSocket = channel.socket.accept(); 13. if (acceptedSocket != null) { 14. registerAcceptedChannel(acceptedSocket, currentThread); 15. } 16. } catch (SocketTimeoutException e) { 17. // Thrown every second to get ClosedChannelException 18. // raised. 19. } catch (CancelledKeyException e) { 20. // Raised by accept() when the server socket was closed. 21. } catch (ClosedSelectorException e) { 22. // Raised by accept() when the server socket was closed. 23. } catch (ClosedChannelException e) { 24. // Closed as requested. 25. break; 26. } catch (Throwable e) { 27. logger.warn( 28. "Failed to accept a connection.", e); 29. try { 30. Thread.sleep(1000); 31. } catch (InterruptedException e1) { 32. // Ignore 33. } 34. } 35. } 36. } finally { 37. channel.shutdownLock.unlock(); 38. closeSelector(); 39. } 40. } 这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法 Java代码 1.private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) { 2. try { 3. ChannelPipeline pipeline = 4. channel.getConfig().getPipelineFactory().getPipeline(); 5. NioWorker worker = nextWorker(); //获取一个NioWorker 6. //将Channel注册到NioWorker上去 7. worker.register(new NioAcceptedSocketChannel( 8. channel.getFactory(), pipeline, channel, 9. NioServerSocketPipelineSink.this, acceptedSocket, 10. worker, currentThread), null); 11. } catch (Exception e) { 12. logger.warn( 13. "Failed to initialize an accepted socket.", e); 14. try { 15. acceptedSocket.close(); 16. } catch (IOException e2) { 17. logger.warn( 18. "Failed to close a partially accepted socket.", 19. e2); 20. } 21. } 22. } 当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。