最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 正文

netty源代码解析(1)——服务端流程

来源:动视网 责编:小OO 时间:2025-09-29 21:41:46
文档

netty源代码解析(1)——服务端流程

今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。 netty服务端启动代码如下 Java代码  1.ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),  2.                Executors.newCachedThreadPool()));  3.        boot
推荐度:
导读今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。 netty服务端启动代码如下 Java代码  1.ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),  2.                Executors.newCachedThreadPool()));  3.        boot
今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。 

netty服务端启动代码如下 

Java代码  

1.ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),  

2.                Executors.newCachedThreadPool()));  

3.        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  

4.  

5.            @Override  

6.            public ChannelPipeline getPipeline() {  

7.                ChannelPipeline pipleline = pipeline();  

8.                //默认最大传输帧大小为16M  

9.                pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));  

10.                pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));  

11.                pipleline.addLast("handler", handler);  

12.                return pipleline;  

13.            }  

14.  

15.        });  

16.        //设置缓冲区为M  

17.        bootstrap.setOption("receiveBufferSize", 1048576 * );  

18.        bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法  

19.        //tcp定期发送心跳包 比如IM里边定期探测对方是否下线  

20.        //只有tcp长连接下才有意义  

21.//      bootstrap.setOption("child.keepAlive", true);   

22.        bootstrap.bind(new InetSocketAddress(port));  

服务端事件处理顺序如下: 

UpStream.ChannelState.OPEN—–>DownStream.ChannelState.BOUND(需要绑定) 

——–>UpStream.ChannelState.BOUND(已经绑定)——>DownStream.CONNECTED(需要连接)——->UpStream.CONNECTED(连接成功) 

在bind的时候做了如下处理 

Java代码  

1.public Channel bind(final SocketAddress localAddress) {  

2.        if (localAddress == null) {  

3.            throw new NullPointerException("localAddress");  

4.        }  

5.  

6.        final BlockingQueue futureQueue =  

7.            new LinkedBlockingQueue();  

8.  

9.        ChannelHandler binder = new Binder(localAddress, futureQueue);  

10.        ChannelHandler parentHandler = getParentHandler();  

这里创建了一个Binder,它继承了SimpleChannelUpstreamHandler。先说说UpStreamHandler和DownStreamHandler,一般来说,UpStream类型的事件主要是由网络底层反馈给Netty的,比如messageReceived,channelConnected等事件,而DownStream类型的事件是由框架自己发起的,比如bind,write,connect,close等事件。 

接着 

Java代码  

1.ChannelPipeline bossPipeline = pipeline();  

2.        bossPipeline.addLast("binder", binder);  

3.        if (parentHandler != null) {  

4.            bossPipeline.addLast("userHandler", parentHandler);  

5.        }  

6.  

7.        Channel channel = getFactory().newChannel(bossPipeline);  

这里创建出一个channel,每一个channel都是由一个tcp四元组组成。channel由ChannelFactory创建而成。在创建完NioServerSocketChannel后,会调用 

fireChannelOpen(this);这是发出一个ChannelState.OPEN事件,前面注册的BinderHandler会处理这个事件。我们来看看Binder的代码 

Java代码  

1.@Override  

2.       public void channelOpen(  

3.               ChannelHandlerContext ctx,  

4.               ChannelStateEvent evt) {  

5.  

6.           try {  

7.               evt.getChannel().getConfig().setPipelineFactory(getPipelineFactory());  

8.  

9.               // Split options into two categories: parent and child.  

10.               Map allOptions = getOptions();  

11.               Map parentOptions = new HashMap();  

12.               for (Entry e: allOptions.entrySet()) {  

13.                   if (e.getKey().startsWith("child.")) {  

14.                       childOptions.put(  

15.                               e.getKey().substring(6),  

16.                               e.getValue());  

17.                   } else if (!e.getKey().equals("pipelineFactory")) {  

18.                       parentOptions.put(e.getKey(), e.getValue());  

19.                   }  

20.               }  

21.  

22.               // Apply parent options.  

23.               evt.getChannel().getConfig().setOptions(parentOptions);  

24.           } finally {  

25.               ctx.sendUpstream(evt);  

26.           }  

27.  

28.           boolean finished = futureQueue.offer(evt.getChannel().bind(localAddress)); //这里发出bind事件,return Channels.bind(this, localAddress)  

29.           assert finished;  

30.       }  

bind就触发了一个DownStream的ChannelState.BOUND事件。表明需要将该Channel绑定至指定的地址。 

Java代码  

1.public void sendDownstream(ChannelEvent e) {  

2.       DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);  

3.       if (tail == null) {  

4.           try {  

5.               getSink().eventSunk(this, e);  

6.               return;  

7.           } catch (Throwable t) {  

8.               notifyHandlerException(e, t);  

9.               return;  

10.           }  

11.       }  

12.  

13.       sendDownstream(tail, e);  

14.   }  

接着就要看NioServerSocketPipelineSink了,这个主要关注于具体传输数据的处理,同时也包括其他方面的内容,比如异常处理等等。执行eventSunk方法。 

Java代码  

1.public void eventSunk(  

2.           ChannelPipeline pipeline, ChannelEvent e) throws Exception {  

3.       Channel channel = e.getChannel();  

4.       if (channel instanceof NioServerSocketChannel) {  

5.           handleServerSocket(e);  

6.       } else if (channel instanceof NioSocketChannel) {  

7.           handleAcceptedSocket(e);  

8.       }  

9.   }  

nio方式ChannelSink一般会有1个boss实例(implements Runnable),以及若干个worker实例(不设置默认为cpu cores*2),它将channel分为 ServerSocketChannel和SocketChannel分开处理。这主要原因是boss线程accept()一个新的连接生成一个 SocketChannel交给worker进行数据接收。 

看下ServerSocketChannel的处理 

Java代码  

1.private void handleServerSocket(ChannelEvent e) {  

2.        if (!(e instanceof ChannelStateEvent)) {  

3.            return;  

4.        }  

5.  

6.        ChannelStateEvent event = (ChannelStateEvent) e;  

7.        NioServerSocketChannel channel =  

8.            (NioServerSocketChannel) event.getChannel();  

9.        ChannelFuture future = event.getFuture();  

10.        ChannelState state = event.getState();  

11.        Object value = event.getValue();  

12.  

13.        switch (state) {  

14.        case OPEN:  

15.            if (Boolean.FALSE.equals(value)) {  

16.                close(channel, future);  

17.            }  

18.            break;  

19.        case BOUND:  

20.            if (value != null) {  

21.                bind(channel, future, (SocketAddress) value);  

22.            } else {  

23.                close(channel, future);  

24.            }  

25.            break;  

26.        }  

27.    }  

主要是处理bind事件, 

Java代码  

1.private void bind(  

2.            NioServerSocketChannel channel, ChannelFuture future,  

3.            SocketAddress localAddress) {  

4.  

5.        boolean bound = false;  

6.        boolean bossStarted = false;  

7.        try {  

8.            channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());  

9.            bound = true;  

10.  

11.            future.setSuccess();  

12.            fireChannelBound(channel, channel.getLocalAddress());  

13.  

14.            //取出一个boss线程,然后交给Boss类去处理。  

15.            Executor bossExecutor =  

16.                ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;  

17.            DeadLockProofWorker.start(  

18.                    bossExecutor,  

19.                    new ThreadRenamingRunnable(  

20.                            new Boss(channel),  

21.                            "New I/O server boss #" + id + " (" + channel + ')'));  

22.            bossStarted = true;  

23.        } catch (Throwable t) {  

24.            future.setFailure(t);  

25.            fireExceptionCaught(channel, t);  

26.        } finally {  

27.            if (!bossStarted && bound) {  

28.                close(channel, future);  

29.            }  

30.        }  

31.    }  

看下Boss类,它实现了Runnable接口 

Java代码  

1.private final Selector selector;  

2.        private final NioServerSocketChannel channel;  

3.  

4.        Boss(NioServerSocketChannel channel) throws IOException {  

5.            this.channel = channel;  

6.  

7.            selector = Selector.open();  

8.  

9.            boolean registered = false;  

10.            try {  

11.                channel.socket.register(selector, SelectionKey.OP_ACCEPT);  

12.                registered = true;  

13.            } finally {  

14.                if (!registered) {  

15.                    closeSelector();  

16.                }  

17.            }  

18.  

19.            channel.selector = selector;  

代码是不是有点熟悉,没错,是nio里的代码,需要注意的是,ServerSocketChannel只注册OP_ACCEPT事件。 

再看下Boss类的run方法 

Java代码  

1.public void run() {  

2.            final Thread currentThread = Thread.currentThread();  

3.  

4.            channel.shutdownLock.lock();  

5.            try {  

6.                for (;;) {  

7.                    try {  

8.                        if (selector.select(1000) > 0) {  

9.                            selector.selectedKeys().clear();  

10.                        }  

11.  

12.                        SocketChannel acceptedSocket = channel.socket.accept();  

13.                        if (acceptedSocket != null) {  

14.                            registerAcceptedChannel(acceptedSocket, currentThread);  

15.                        }  

16.                    } catch (SocketTimeoutException e) {  

17.                        // Thrown every second to get ClosedChannelException  

18.                        // raised.  

19.                    } catch (CancelledKeyException e) {  

20.                        // Raised by accept() when the server socket was closed.  

21.                    } catch (ClosedSelectorException e) {  

22.                        // Raised by accept() when the server socket was closed.  

23.                    } catch (ClosedChannelException e) {  

24.                        // Closed as requested.  

25.                        break;  

26.                    } catch (Throwable e) {  

27.                        logger.warn(  

28.                                "Failed to accept a connection.", e);  

29.                        try {  

30.                            Thread.sleep(1000);  

31.                        } catch (InterruptedException e1) {  

32.                            // Ignore  

33.                        }  

34.                    }  

35.                }  

36.            } finally {  

37.                channel.shutdownLock.unlock();  

38.                closeSelector();  

39.            }  

40.        }  

这里会调用registerAcceptedChannel(acceptedSocket, currentThread);方法 

Java代码  

1.private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {  

2.            try {  

3.                ChannelPipeline pipeline =  

4.                    channel.getConfig().getPipelineFactory().getPipeline();  

5.                NioWorker worker = nextWorker(); //获取一个NioWorker  

6.                //将Channel注册到NioWorker上去  

7.                worker.register(new NioAcceptedSocketChannel(  

8.                        channel.getFactory(), pipeline, channel,  

9.                        NioServerSocketPipelineSink.this, acceptedSocket,  

10.                        worker, currentThread), null);  

11.            } catch (Exception e) {  

12.                logger.warn(  

13.                        "Failed to initialize an accepted socket.", e);  

14.                try {  

15.                    acceptedSocket.close();  

16.                } catch (IOException e2) {  

17.                    logger.warn(  

18.                            "Failed to close a partially accepted socket.",  

19.                            e2);  

20.                }  

21.            }  

22.        }  

当有新的连接建立,会交给NioWorker的线程池去处理,boss只负责accept到新的连接,新的SocketChannel会被注册到一个work线程中去。 

文档

netty源代码解析(1)——服务端流程

今天看了下netty代码,对代码做了个流程分析,netty的代码写的真是漂亮。 netty服务端启动代码如下 Java代码  1.ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),  2.                Executors.newCachedThreadPool()));  3.        boot
推荐度:
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top