1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.ConnectTimeoutException;
29 import io.netty.channel.EventLoop;
30 import io.netty.util.ReferenceCountUtil;
31 import io.netty.util.ReferenceCounted;
32 import io.netty.util.concurrent.Future;
33 import io.netty.util.internal.logging.InternalLogger;
34 import io.netty.util.internal.logging.InternalLoggerFactory;
35
36 import java.io.IOException;
37 import java.net.SocketAddress;
38 import java.nio.channels.CancelledKeyException;
39 import java.nio.channels.ClosedChannelException;
40 import java.nio.channels.ConnectionPendingException;
41 import java.nio.channels.SelectableChannel;
42 import java.nio.channels.SelectionKey;
43 import java.util.concurrent.TimeUnit;
44
45
46
47
48 public abstract class AbstractNioChannel extends AbstractChannel {
49
50 private static final InternalLogger logger =
51 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
52
53 private final SelectableChannel ch;
54 protected final int readInterestOp;
55 volatile SelectionKey selectionKey;
56 boolean readPending;
57 private final Runnable clearReadPendingRunnable = new Runnable() {
58 @Override
59 public void run() {
60 clearReadPending0();
61 }
62 };
63
64
65
66
67
68 private ChannelPromise connectPromise;
69 private Future<?> connectTimeoutFuture;
70 private SocketAddress requestedRemoteAddress;
71
72
73
74
75
76
77
78
79 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
80 super(parent);
81 this.ch = ch;
82 this.readInterestOp = readInterestOp;
83 try {
84 ch.configureBlocking(false);
85 } catch (IOException e) {
86 try {
87 ch.close();
88 } catch (IOException e2) {
89 logger.warn(
90 "Failed to close a partially initialized socket.", e2);
91 }
92
93 throw new ChannelException("Failed to enter non-blocking mode.", e);
94 }
95 }
96
97 @Override
98 public boolean isOpen() {
99 return ch.isOpen();
100 }
101
102 @Override
103 public NioUnsafe unsafe() {
104 return (NioUnsafe) super.unsafe();
105 }
106
107 protected SelectableChannel javaChannel() {
108 return ch;
109 }
110
111 @Override
112 public NioEventLoop eventLoop() {
113 return (NioEventLoop) super.eventLoop();
114 }
115
116
117
118
119 protected SelectionKey selectionKey() {
120 assert selectionKey != null;
121 return selectionKey;
122 }
123
124
125
126
127
128 @Deprecated
129 protected boolean isReadPending() {
130 return readPending;
131 }
132
133
134
135
136
137 @Deprecated
138 protected void setReadPending(final boolean readPending) {
139 if (isRegistered()) {
140 EventLoop eventLoop = eventLoop();
141 if (eventLoop.inEventLoop()) {
142 setReadPending0(readPending);
143 } else {
144 eventLoop.execute(new Runnable() {
145 @Override
146 public void run() {
147 setReadPending0(readPending);
148 }
149 });
150 }
151 } else {
152
153
154
155 this.readPending = readPending;
156 }
157 }
158
159
160
161
162 protected final void clearReadPending() {
163 if (isRegistered()) {
164 EventLoop eventLoop = eventLoop();
165 if (eventLoop.inEventLoop()) {
166 clearReadPending0();
167 } else {
168 eventLoop.execute(clearReadPendingRunnable);
169 }
170 } else {
171
172
173
174 readPending = false;
175 }
176 }
177
178 private void setReadPending0(boolean readPending) {
179 this.readPending = readPending;
180 if (!readPending) {
181 ((AbstractNioUnsafe) unsafe()).removeReadOp();
182 }
183 }
184
185 private void clearReadPending0() {
186 readPending = false;
187 ((AbstractNioUnsafe) unsafe()).removeReadOp();
188 }
189
190
191
192
193 public interface NioUnsafe extends Unsafe {
194
195
196
197 SelectableChannel ch();
198
199
200
201
202 void finishConnect();
203
204
205
206
207 void read();
208
209 void forceFlush();
210 }
211
212 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe {
213
214 protected final void removeReadOp() {
215 SelectionKey key = selectionKey();
216
217
218
219 if (!key.isValid()) {
220 return;
221 }
222 int interestOps = key.interestOps();
223 if ((interestOps & readInterestOp) != 0) {
224
225 key.interestOps(interestOps & ~readInterestOp);
226 }
227 }
228
229 @Override
230 public final SelectableChannel ch() {
231 return javaChannel();
232 }
233
234 @Override
235 public final void connect(
236 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
237
238
239 if (promise.isDone() || !ensureOpen(promise)) {
240 return;
241 }
242
243 try {
244 if (connectPromise != null) {
245
246 throw new ConnectionPendingException();
247 }
248
249 boolean wasActive = isActive();
250 if (doConnect(remoteAddress, localAddress)) {
251 fulfillConnectPromise(promise, wasActive);
252 } else {
253 connectPromise = promise;
254 requestedRemoteAddress = remoteAddress;
255
256
257 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
258 if (connectTimeoutMillis > 0) {
259 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
260 @Override
261 public void run() {
262 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
263 if (connectPromise != null && !connectPromise.isDone()
264 && connectPromise.tryFailure(new ConnectTimeoutException(
265 "connection timed out after " + connectTimeoutMillis + " ms: " +
266 remoteAddress))) {
267 close(voidPromise());
268 }
269 }
270 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
271 }
272
273 promise.addListener(new ChannelFutureListener() {
274 @Override
275 public void operationComplete(ChannelFuture future) {
276
277
278 if (future.isCancelled()) {
279 if (connectTimeoutFuture != null) {
280 connectTimeoutFuture.cancel(false);
281 }
282 connectPromise = null;
283 close(voidPromise());
284 }
285 }
286 });
287 }
288 } catch (Throwable t) {
289 promise.tryFailure(annotateConnectException(t, remoteAddress));
290 closeIfClosed();
291 }
292 }
293
294 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
295 if (promise == null) {
296
297 return;
298 }
299
300
301
302 boolean active = isActive();
303
304
305 boolean promiseSet = promise.trySuccess();
306
307
308
309 if (!wasActive && active) {
310 pipeline().fireChannelActive();
311 }
312
313
314 if (!promiseSet) {
315 close(voidPromise());
316 }
317 }
318
319 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
320 if (promise == null) {
321
322 return;
323 }
324
325
326 promise.tryFailure(cause);
327 closeIfClosed();
328 }
329
330 @Override
331 public final void finishConnect() {
332
333
334
335 assert eventLoop().inEventLoop();
336
337 try {
338 boolean wasActive = isActive();
339 doFinishConnect();
340 fulfillConnectPromise(connectPromise, wasActive);
341 } catch (Throwable t) {
342 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
343 } finally {
344
345
346 if (connectTimeoutFuture != null) {
347 connectTimeoutFuture.cancel(false);
348 }
349 connectPromise = null;
350 }
351 }
352
353 @Override
354 protected final void flush0() {
355
356
357
358 if (!isFlushPending()) {
359 super.flush0();
360 }
361 }
362
363 @Override
364 public final void forceFlush() {
365
366 super.flush0();
367 }
368
369 private boolean isFlushPending() {
370 SelectionKey selectionKey = selectionKey();
371 return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
372 }
373 }
374
375 @Override
376 protected boolean isCompatible(EventLoop loop) {
377 return loop instanceof NioEventLoop;
378 }
379
380 @Override
381 protected void doRegister() throws Exception {
382 boolean selected = false;
383 for (;;) {
384 try {
385 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
386 return;
387 } catch (CancelledKeyException e) {
388 if (!selected) {
389
390
391 eventLoop().selectNow();
392 selected = true;
393 } else {
394
395
396 throw e;
397 }
398 }
399 }
400 }
401
402 @Override
403 protected void doDeregister() throws Exception {
404 eventLoop().cancel(selectionKey());
405 }
406
407 @Override
408 protected void doBeginRead() throws Exception {
409
410 final SelectionKey selectionKey = this.selectionKey;
411 if (!selectionKey.isValid()) {
412 return;
413 }
414
415 readPending = true;
416
417 final int interestOps = selectionKey.interestOps();
418 if ((interestOps & readInterestOp) == 0) {
419 selectionKey.interestOps(interestOps | readInterestOp);
420 }
421 }
422
423
424
425
426 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
427
428
429
430
431 protected abstract void doFinishConnect() throws Exception;
432
433
434
435
436
437
438 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
439 final int readableBytes = buf.readableBytes();
440 if (readableBytes == 0) {
441 ReferenceCountUtil.safeRelease(buf);
442 return Unpooled.EMPTY_BUFFER;
443 }
444
445 final ByteBufAllocator alloc = alloc();
446 if (alloc.isDirectBufferPooled()) {
447 ByteBuf directBuf = alloc.directBuffer(readableBytes);
448 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
449 ReferenceCountUtil.safeRelease(buf);
450 return directBuf;
451 }
452
453 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
454 if (directBuf != null) {
455 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
456 ReferenceCountUtil.safeRelease(buf);
457 return directBuf;
458 }
459
460
461 return buf;
462 }
463
464
465
466
467
468
469
470 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
471 final int readableBytes = buf.readableBytes();
472 if (readableBytes == 0) {
473 ReferenceCountUtil.safeRelease(holder);
474 return Unpooled.EMPTY_BUFFER;
475 }
476
477 final ByteBufAllocator alloc = alloc();
478 if (alloc.isDirectBufferPooled()) {
479 ByteBuf directBuf = alloc.directBuffer(readableBytes);
480 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
481 ReferenceCountUtil.safeRelease(holder);
482 return directBuf;
483 }
484
485 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
486 if (directBuf != null) {
487 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
488 ReferenceCountUtil.safeRelease(holder);
489 return directBuf;
490 }
491
492
493 if (holder != buf) {
494
495 buf.retain();
496 ReferenceCountUtil.safeRelease(holder);
497 }
498
499 return buf;
500 }
501
502 @Override
503 protected void doClose() throws Exception {
504 ChannelPromise promise = connectPromise;
505 if (promise != null) {
506
507 promise.tryFailure(new ClosedChannelException());
508 connectPromise = null;
509 }
510
511 Future<?> future = connectTimeoutFuture;
512 if (future != null) {
513 future.cancel(false);
514 connectTimeoutFuture = null;
515 }
516 }
517 }