最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 正文

netty源代码解析(2)——客户端流程

来源:动视网 责编:小OO 时间:2025-09-29 21:40:33
文档

netty源代码解析(2)——客户端流程

前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下 Java代码  1.ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),  2.                Executors.newCachedThreadPool()));  3.  4.        bootstrap
推荐度:
导读前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下 Java代码  1.ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),  2.                Executors.newCachedThreadPool()));  3.  4.        bootstrap
前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下 

Java代码  

1.ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),  

2.                Executors.newCachedThreadPool()));  

3.  

4.        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {  

5.  

6.            @Override  

7.            public ChannelPipeline getPipeline() throws Exception {  

8.                ChannelPipeline pipleline = pipeline();  

9.                pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));  

10.                pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));  

11.                pipleline.addLast("handler", handler);  

12.                return pipleline;  

13.            }  

14.        });  

15.  

16.        bootstrap.setOption("receiveBufferSize", 1048576 * );  

17.        bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法  

18.        //tcp定期发送心跳包 比如IM里边定期探测对方是否下线  

19.        //只有tcp长连接下才有意义  

20.//      bootstrap.setOption("child.keepAlive", true);  

21.        ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));  

22.        Channel channel = future.awaitUninterruptibly().getChannel();  

客户端事件处理顺序如下: 

UpStream.ChannelState.OPEN(已经open)–>DownStream.ChannelState.BOUND(需要绑定)——>DownStream.CONNECTED(需要连接)—–>UpStream.ChannelState.BOUND(已经绑定)——->UpStream.CONNECTED(连接成功) 

在connect的时候做了如下处理 

Java代码  

1.public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {  

2.  

3.        if (remoteAddress == null) {  

4.            throw new NullPointerException("remoteAddress");  

5.        }  

6.  

7.        ChannelPipeline pipeline;  

8.        try {  

9.            pipeline = getPipelineFactory().getPipeline();  

10.        } catch (Exception e) {  

11.            throw new ChannelPipelineException("Failed to initialize a pipeline.", e);  

12.        }  

13.  

14.        // Set the options.先创建Channel  

15.        Channel ch = getFactory().newChannel(pipeline);  

16.        ch.getConfig().setOptions(getOptions());  

17.  

18.        // Bind.  

19.        if (localAddress != null) {  

20.            ch.bind(localAddress);  

21.        }  

22.  

23.        // Connect. 再进行连接  

24.        return ch.connect(remoteAddress);  

25.    }  

首先要创建出Channel 

Java代码  

1.NioClientSocketChannel(  

2.            ChannelFactory factory, ChannelPipeline pipeline,  

3.            ChannelSink sink, NioWorker worker) {  

4.  

5.        super(null, factory, pipeline, sink, newSocket(), worker);  

6.        fireChannelOpen(this);  

7.    }  

紧接着会fire一个ChannelOpen事件, 

Java代码  

1.if (channel.getParent() != null) {  

2.            fireChildChannelStateChanged(channel.getParent(), channel);  

3.        }  

4.  

5.        channel.getPipeline().sendUpstream(  

6.                new UpstreamChannelStateEvent(  

7.                        channel, ChannelState.OPEN, Boolean.TRUE));  

这样会出发Upstream的ChannelState.OPEN事件。 

接下来要继续connect了 

Java代码  

1.if (remoteAddress == null) {  

2.          throw new NullPointerException("remoteAddress");  

3.      }  

4.      ChannelFuture future = future(channel, true);  

5.      channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(  

6.              channel, future, ChannelState.CONNECTED, remoteAddress));  

7.      return future;  

这样就会出发Downstream的ChannelState.CONNECTED事件。 

接下来就要由NioClientSocketPipelineSink来进行处理了 

Java代码  

1.switch (state) {  

2.            case OPEN:  

3.                if (Boolean.FALSE.equals(value)) {  

4.                    channel.worker.close(channel, future);  

5.                }  

6.                break;  

7.            case BOUND:  

8.                if (value != null) {  

9.                    bind(channel, future, (SocketAddress) value);  

10.                } else {  

11.                    channel.worker.close(channel, future);  

12.                }  

13.                break;  

14.            case CONNECTED:  

15.                if (value != null) {  

16.                    connect(channel, future, (SocketAddress) value);  

17.                } else {  

18.                    channel.worker.close(channel, future);  

19.                }  

20.                break;  

21.            case INTEREST_OPS:  

22.                channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());  

23.                break;  

下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的 

所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select 

Java代码  

1.private static final class RegisterTask implements Runnable {  

2.        private final Boss boss;  

3.        private final NioClientSocketChannel channel;  

4.  

5.        RegisterTask(Boss boss, NioClientSocketChannel channel) {  

6.            this.boss = boss;  

7.            this.channel = channel;  

8.        }  

9.  

10.        public void run() {  

11.            try {  

12.                channel.socket.register(  

13.                        boss.selector, SelectionKey.OP_CONNECT, channel);  

14.            } catch (ClosedChannelException e) {  

15.                channel.worker.close(channel, succeededFuture(channel));  

16.            }  

17.  

18.            int connectTimeout = channel.getConfig().getConnectTimeoutMillis();  

19.            if (connectTimeout > 0) {  

20.                channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;  

21.            }  

22.        }  

23.    }  

register方法 

Java代码  

1.void register(NioClientSocketChannel channel) {  

2.            Runnable registerTask = new RegisterTask(this, channel);  

3.            Selector selector;  

4.  

5.            synchronized (startStopLock) {  

6.                if (!started) {  

7.                    // Open a selector if this worker didn't start yet.  

8.                    try {  

9.                        this.selector = selector =  Selector.open();  

10.                    } catch (Throwable t) {  

11.                        throw new ChannelException(  

12.                                "Failed to create a selector.", t);  

13.                    }  

14.  

15.                    // Start the worker thread with the new Selector.  

16.                    boolean success = false;  

17.                    try {  

18.                        DeadLockProofWorker.start(  

19.                                bossExecutor,  

20.                                new ThreadRenamingRunnable(  

21.                                        this, "New I/O client boss #" + id + '-' + subId));  

22.                        success = true;  

23.                    } finally {  

24.                        if (!success) {  

25.                            // Release the Selector if the execution fails.  

26.                            try {  

27.                                selector.close();  

28.                            } catch (Throwable t) {  

29.                                logger.warn("Failed to close a selector.", t);  

30.                            }  

31.                            this.selector = selector = null;  

32.                            // The method will return to the caller at this point.  

33.                        }  

34.                    }  

35.                } else {  

36.                    // Use the existing selector if this worker has been started.  

37.                    selector = this.selector;  

38.                }  

39.  

40.                assert selector != null && selector.isOpen();  

41.  

42.                started = true;  

43.                boolean offered = registerTaskQueue.offer(registerTask);  

44.                assert offered;  

45.            }  

RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。 

然后run方法处理感兴趣的事件 

Java代码  

1.public void run() {  

2.            boolean shutdown = false;  

3.            Selector selector = this.selector;  

4.            long lastConnectTimeoutCheckTimeNanos = System.nanoTime();  

5.            for (;;) {  

6.                wakenUp.set(false);  

7.  

8.                try {  

9.                    int selectedKeyCount = selector.select(500);  

10.                    .......  

11.  

12.  

13.            processRegisterTaskQueue();  

14.  

15.                    if (selectedKeyCount > 0) {  

16.                        processSelectedKeys(selector.selectedKeys());  

17.                    }  

在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件 

Java代码  

1.private void processSelectedKeys(Set selectedKeys) {  

2.            for (Iterator i = selectedKeys.iterator(); i.hasNext();) {  

3.                SelectionKey k = i.next();  

4.                i.remove();  

5.  

6.                if (!k.isValid()) {  

7.                    close(k);  

8.                    continue;  

9.                }  

10.  

11.                if (k.isConnectable()) {  

12.                    connect(k);  

13.                }  

14.            }  

15.        }  

将连接上的Channel注册到worker中,交给worker去注册read和write 

Java代码  

1.private void connect(SelectionKey k) {  

2.            NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();  

3.            try {  

4.                if (ch.socket.finishConnect()) {  

5.                    k.cancel();  

6.                    ch.worker.register(ch, ch.connectFuture);  

7.                }  

8.            } catch (Throwable t) {  

9.                ch.connectFuture.setFailure(t);  

10.                fireExceptionCaught(ch, t);  

11.                k.cancel(); // Some JDK implementations run into an infinite loop without this.  

12.                ch.worker.close(ch, succeededFuture(ch));  

13.            }  

14.        }  

在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了。

http://www.iteye.com/topic/1124846

文档

netty源代码解析(2)——客户端流程

前一篇文章分析了netty的服务端流程,接下来分析一下客户端的大致流程,客户端启动代码如下 Java代码  1.ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),  2.                Executors.newCachedThreadPool()));  3.  4.        bootstrap
推荐度:
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top