Java代码
1.ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
2. Executors.newCachedThreadPool()));
3.
4. bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
5.
6. @Override
7. public ChannelPipeline getPipeline() throws Exception {
8. ChannelPipeline pipleline = pipeline();
9. pipleline.addLast("encode", new ObjectEncoder(1048576 * 16));
10. pipleline.addLast("decode", new ObjectDecoder(1048576 * 16, ClassResolvers.weakCachingConcurrentResolver(null)));
11. pipleline.addLast("handler", handler);
12. return pipleline;
13. }
14. });
15.
16. bootstrap.setOption("receiveBufferSize", 1048576 * );
17. bootstrap.setOption("child.tcpNoDelay", true); //关闭Nagle算法
18. //tcp定期发送心跳包 比如IM里边定期探测对方是否下线
19. //只有tcp长连接下才有意义
20.// bootstrap.setOption("child.keepAlive", true);
21. ChannelFuture future = bootstrap.connect(new InetSocketAddress(address, port));
22. Channel channel = future.awaitUninterruptibly().getChannel();
客户端事件处理顺序如下:
UpStream.ChannelState.OPEN(已经open)–>DownStream.ChannelState.BOUND(需要绑定)——>DownStream.CONNECTED(需要连接)—–>UpStream.ChannelState.BOUND(已经绑定)——->UpStream.CONNECTED(连接成功)
在connect的时候做了如下处理
Java代码
1.public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
2.
3. if (remoteAddress == null) {
4. throw new NullPointerException("remoteAddress");
5. }
6.
7. ChannelPipeline pipeline;
8. try {
9. pipeline = getPipelineFactory().getPipeline();
10. } catch (Exception e) {
11. throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
12. }
13.
14. // Set the options.先创建Channel
15. Channel ch = getFactory().newChannel(pipeline);
16. ch.getConfig().setOptions(getOptions());
17.
18. // Bind.
19. if (localAddress != null) {
20. ch.bind(localAddress);
21. }
22.
23. // Connect. 再进行连接
24. return ch.connect(remoteAddress);
25. }
首先要创建出Channel
Java代码
1.NioClientSocketChannel(
2. ChannelFactory factory, ChannelPipeline pipeline,
3. ChannelSink sink, NioWorker worker) {
4.
5. super(null, factory, pipeline, sink, newSocket(), worker);
6. fireChannelOpen(this);
7. }
紧接着会fire一个ChannelOpen事件,
Java代码
1.if (channel.getParent() != null) {
2. fireChildChannelStateChanged(channel.getParent(), channel);
3. }
4.
5. channel.getPipeline().sendUpstream(
6. new UpstreamChannelStateEvent(
7. channel, ChannelState.OPEN, Boolean.TRUE));
这样会出发Upstream的ChannelState.OPEN事件。
接下来要继续connect了
Java代码
1.if (remoteAddress == null) {
2. throw new NullPointerException("remoteAddress");
3. }
4. ChannelFuture future = future(channel, true);
5. channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
6. channel, future, ChannelState.CONNECTED, remoteAddress));
7. return future;
这样就会出发Downstream的ChannelState.CONNECTED事件。
接下来就要由NioClientSocketPipelineSink来进行处理了
Java代码
1.switch (state) {
2. case OPEN:
3. if (Boolean.FALSE.equals(value)) {
4. channel.worker.close(channel, future);
5. }
6. break;
7. case BOUND:
8. if (value != null) {
9. bind(channel, future, (SocketAddress) value);
10. } else {
11. channel.worker.close(channel, future);
12. }
13. break;
14. case CONNECTED:
15. if (value != null) {
16. connect(channel, future, (SocketAddress) value);
17. } else {
18. channel.worker.close(channel, future);
19. }
20. break;
21. case INTEREST_OPS:
22. channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
23. break;
下面看下channel注册到worker的代码,连接的时候是在内部的一个Boss类里处理的
所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select
Java代码
1.private static final class RegisterTask implements Runnable {
2. private final Boss boss;
3. private final NioClientSocketChannel channel;
4.
5. RegisterTask(Boss boss, NioClientSocketChannel channel) {
6. this.boss = boss;
7. this.channel = channel;
8. }
9.
10. public void run() {
11. try {
12. channel.socket.register(
13. boss.selector, SelectionKey.OP_CONNECT, channel);
14. } catch (ClosedChannelException e) {
15. channel.worker.close(channel, succeededFuture(channel));
16. }
17.
18. int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
19. if (connectTimeout > 0) {
20. channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
21. }
22. }
23. }
register方法
Java代码
1.void register(NioClientSocketChannel channel) {
2. Runnable registerTask = new RegisterTask(this, channel);
3. Selector selector;
4.
5. synchronized (startStopLock) {
6. if (!started) {
7. // Open a selector if this worker didn't start yet.
8. try {
9. this.selector = selector = Selector.open();
10. } catch (Throwable t) {
11. throw new ChannelException(
12. "Failed to create a selector.", t);
13. }
14.
15. // Start the worker thread with the new Selector.
16. boolean success = false;
17. try {
18. DeadLockProofWorker.start(
19. bossExecutor,
20. new ThreadRenamingRunnable(
21. this, "New I/O client boss #" + id + '-' + subId));
22. success = true;
23. } finally {
24. if (!success) {
25. // Release the Selector if the execution fails.
26. try {
27. selector.close();
28. } catch (Throwable t) {
29. logger.warn("Failed to close a selector.", t);
30. }
31. this.selector = selector = null;
32. // The method will return to the caller at this point.
33. }
34. }
35. } else {
36. // Use the existing selector if this worker has been started.
37. selector = this.selector;
38. }
39.
40. assert selector != null && selector.isOpen();
41.
42. started = true;
43. boolean offered = registerTaskQueue.offer(registerTask);
44. assert offered;
45. }
RegisterTask,放到Boss类持有的registerTaskQueue之后,Boss类会从boss executer线程池中取出一个线程不断地处理队列、选择准备就绪的键等。
然后run方法处理感兴趣的事件
Java代码
1.public void run() {
2. boolean shutdown = false;
3. Selector selector = this.selector;
4. long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
5. for (;;) {
6. wakenUp.set(false);
7.
8. try {
9. int selectedKeyCount = selector.select(500);
10. .......
11.
12.
13. processRegisterTaskQueue();
14.
15. if (selectedKeyCount > 0) {
16. processSelectedKeys(selector.selectedKeys());
17. }
在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件
Java代码
1.private void processSelectedKeys(Set 2. for (Iterator 3. SelectionKey k = i.next(); 4. i.remove(); 5. 6. if (!k.isValid()) { 7. close(k); 8. continue; 9. } 10. 11. if (k.isConnectable()) { 12. connect(k); 13. } 14. } 15. } 将连接上的Channel注册到worker中,交给worker去注册read和write Java代码 1.private void connect(SelectionKey k) { 2. NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment(); 3. try { 4. if (ch.socket.finishConnect()) { 5. k.cancel(); 6. ch.worker.register(ch, ch.connectFuture); 7. } 8. } catch (Throwable t) { 9. ch.connectFuture.setFailure(t); 10. fireExceptionCaught(ch, t); 11. k.cancel(); // Some JDK implementations run into an infinite loop without this. 12. ch.worker.close(ch, succeededFuture(ch)); 13. } 14. } 在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了。 http://www.iteye.com/topic/1124846