1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.ConnectTimeoutException;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.IoEvent;
31 import io.netty.channel.IoEventLoop;
32 import io.netty.channel.IoEventLoopGroup;
33 import io.netty.channel.IoRegistration;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.ReferenceCounted;
36 import io.netty.util.concurrent.Future;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.net.SocketAddress;
43 import java.nio.channels.CancelledKeyException;
44 import java.nio.channels.ClosedChannelException;
45 import java.nio.channels.ConnectionPendingException;
46 import java.nio.channels.SelectableChannel;
47 import java.nio.channels.SelectionKey;
48 import java.util.concurrent.TimeUnit;
49
50
51
52
53 public abstract class AbstractNioChannel extends AbstractChannel {
54
55 private static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
57
58 private final SelectableChannel ch;
59 protected final int readInterestOp;
60 protected final NioIoOps readOps;
61 volatile NioIoRegistration registration;
62 boolean readPending;
63 private final Runnable clearReadPendingRunnable = new Runnable() {
64 @Override
65 public void run() {
66 clearReadPending0();
67 }
68 };
69
70
71
72
73
74 private ChannelPromise connectPromise;
75 private Future<?> connectTimeoutFuture;
76 private SocketAddress requestedRemoteAddress;
77
78
79
80
81
82
83
84
85 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readOps) {
86 this(parent, ch, NioIoOps.valueOf(readOps));
87 }
88
89 protected AbstractNioChannel(Channel parent, SelectableChannel ch, NioIoOps readOps) {
90 super(parent);
91 this.ch = ch;
92 this.readInterestOp = ObjectUtil.checkNotNull(readOps, "readOps").value;
93 this.readOps = readOps;
94 try {
95 ch.configureBlocking(false);
96 } catch (IOException e) {
97 try {
98 ch.close();
99 } catch (IOException e2) {
100 logger.warn(
101 "Failed to close a partially initialized socket.", e2);
102 }
103
104 throw new ChannelException("Failed to enter non-blocking mode.", e);
105 }
106 }
107
108 protected void addAndSubmit(NioIoOps addOps) {
109 int interestOps = selectionKey().interestOps();
110 if (!addOps.isIncludedIn(interestOps)) {
111 try {
112 registration().submit(NioIoOps.valueOf(interestOps).with(addOps));
113 } catch (Exception e) {
114 throw new ChannelException(e);
115 }
116 }
117 }
118
119 protected void removeAndSubmit(NioIoOps removeOps) {
120 int interestOps = selectionKey().interestOps();
121 if (removeOps.isIncludedIn(interestOps)) {
122 try {
123 registration().submit(NioIoOps.valueOf(interestOps).without(removeOps));
124 } catch (Exception e) {
125 throw new ChannelException(e);
126 }
127 }
128 }
129
130 @Override
131 public boolean isOpen() {
132 return ch.isOpen();
133 }
134
135 @Override
136 public NioUnsafe unsafe() {
137 return (NioUnsafe) super.unsafe();
138 }
139
140 protected SelectableChannel javaChannel() {
141 return ch;
142 }
143
144
145
146
147
148
149 @Deprecated
150 protected SelectionKey selectionKey() {
151 return registration().selectionKey();
152 }
153
154 @SuppressWarnings("unchecked")
155 protected NioIoRegistration registration() {
156 assert registration != null;
157 return registration;
158 }
159
160
161
162
163
164 @Deprecated
165 protected boolean isReadPending() {
166 return readPending;
167 }
168
169
170
171
172
173 @Deprecated
174 protected void setReadPending(final boolean readPending) {
175 if (isRegistered()) {
176 EventLoop eventLoop = eventLoop();
177 if (eventLoop.inEventLoop()) {
178 setReadPending0(readPending);
179 } else {
180 eventLoop.execute(new Runnable() {
181 @Override
182 public void run() {
183 setReadPending0(readPending);
184 }
185 });
186 }
187 } else {
188
189
190
191 this.readPending = readPending;
192 }
193 }
194
195
196
197
198 protected final void clearReadPending() {
199 if (isRegistered()) {
200 EventLoop eventLoop = eventLoop();
201 if (eventLoop.inEventLoop()) {
202 clearReadPending0();
203 } else {
204 eventLoop.execute(clearReadPendingRunnable);
205 }
206 } else {
207
208
209
210 readPending = false;
211 }
212 }
213
214 private void setReadPending0(boolean readPending) {
215 this.readPending = readPending;
216 if (!readPending) {
217 ((AbstractNioUnsafe) unsafe()).removeReadOp();
218 }
219 }
220
221 private void clearReadPending0() {
222 readPending = false;
223 ((AbstractNioUnsafe) unsafe()).removeReadOp();
224 }
225
226
227
228
229 public interface NioUnsafe extends Unsafe {
230
231
232
233 SelectableChannel ch();
234
235
236
237
238 void finishConnect();
239
240
241
242
243 void read();
244
245 void forceFlush();
246 }
247
248 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe, NioIoHandle {
249 @Override
250 public void close() {
251 close(voidPromise());
252 }
253
254 @Override
255 public SelectableChannel selectableChannel() {
256 return ch();
257 }
258
259 Channel channel() {
260 return AbstractNioChannel.this;
261 }
262
263 protected final void removeReadOp() {
264 NioIoRegistration registration = registration();
265
266
267
268 if (!registration.isValid()) {
269 return;
270 }
271 removeAndSubmit(readOps);
272 }
273
274 @Override
275 public final SelectableChannel ch() {
276 return javaChannel();
277 }
278
279 @Override
280 public final void connect(
281 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
282
283
284 if (promise.isDone() || !ensureOpen(promise)) {
285 return;
286 }
287
288 try {
289 if (connectPromise != null) {
290
291 throw new ConnectionPendingException();
292 }
293
294 boolean wasActive = isActive();
295 if (doConnect(remoteAddress, localAddress)) {
296 fulfillConnectPromise(promise, wasActive);
297 } else {
298 connectPromise = promise;
299 requestedRemoteAddress = remoteAddress;
300
301
302 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
303 if (connectTimeoutMillis > 0) {
304 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
305 @Override
306 public void run() {
307 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
308 if (connectPromise != null && !connectPromise.isDone()
309 && connectPromise.tryFailure(new ConnectTimeoutException(
310 "connection timed out after " + connectTimeoutMillis + " ms: " +
311 remoteAddress))) {
312 close(voidPromise());
313 }
314 }
315 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
316 }
317
318 promise.addListener(new ChannelFutureListener() {
319 @Override
320 public void operationComplete(ChannelFuture future) {
321
322
323 if (future.isCancelled()) {
324 if (connectTimeoutFuture != null) {
325 connectTimeoutFuture.cancel(false);
326 }
327 connectPromise = null;
328 close(voidPromise());
329 }
330 }
331 });
332 }
333 } catch (Throwable t) {
334 promise.tryFailure(annotateConnectException(t, remoteAddress));
335 closeIfClosed();
336 }
337 }
338
339 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
340 if (promise == null) {
341
342 return;
343 }
344
345
346
347 boolean active = isActive();
348
349
350 boolean promiseSet = promise.trySuccess();
351
352
353
354 if (!wasActive && active) {
355 pipeline().fireChannelActive();
356 }
357
358
359 if (!promiseSet) {
360 close(voidPromise());
361 }
362 }
363
364 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
365 if (promise == null) {
366
367 return;
368 }
369
370
371 promise.tryFailure(cause);
372 closeIfClosed();
373 }
374
375 @Override
376 public final void finishConnect() {
377
378
379
380 assert eventLoop().inEventLoop();
381
382 try {
383 boolean wasActive = isActive();
384 doFinishConnect();
385 fulfillConnectPromise(connectPromise, wasActive);
386 } catch (Throwable t) {
387 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
388 } finally {
389
390
391 if (connectTimeoutFuture != null) {
392 connectTimeoutFuture.cancel(false);
393 }
394 connectPromise = null;
395 }
396 }
397
398 @Override
399 protected final void flush0() {
400
401
402
403 if (!isFlushPending()) {
404 super.flush0();
405 }
406 }
407
408 @Override
409 public final void forceFlush() {
410
411 super.flush0();
412 }
413
414 private boolean isFlushPending() {
415 NioIoRegistration registration = registration();
416 return registration.isValid() && NioIoOps.WRITE.isIncludedIn(registration.selectionKey().interestOps());
417 }
418
419 @Override
420 public void handle(IoRegistration registration, IoEvent event) {
421 try {
422 NioIoEvent nioEvent = (NioIoEvent) event;
423 NioIoOps nioReadyOps = nioEvent.ops();
424
425
426 if (nioReadyOps.contains(NioIoOps.CONNECT)) {
427
428
429 removeAndSubmit(NioIoOps.CONNECT);
430
431 unsafe().finishConnect();
432 }
433
434
435 if (nioReadyOps.contains(NioIoOps.WRITE)) {
436
437
438 forceFlush();
439 }
440
441
442
443 if (nioReadyOps.contains(NioIoOps.READ_AND_ACCEPT) || nioReadyOps.equals(NioIoOps.NONE)) {
444 read();
445 }
446 } catch (CancelledKeyException ignored) {
447 close(voidPromise());
448 }
449 }
450 }
451
452 @Override
453 protected boolean isCompatible(EventLoop loop) {
454 return loop instanceof IoEventLoop && ((IoEventLoopGroup) loop).isCompatible(AbstractNioUnsafe.class);
455 }
456
457 @SuppressWarnings("unchecked")
458 @Override
459 protected void doRegister(ChannelPromise promise) {
460 assert registration == null;
461 ((IoEventLoop) eventLoop()).register((AbstractNioUnsafe) unsafe()).addListener(f -> {
462 if (f.isSuccess()) {
463 registration = (NioIoRegistration) f.getNow();
464 promise.setSuccess();
465 } else {
466 promise.setFailure(f.cause());
467 }
468 });
469 }
470
471 @Override
472 protected void doDeregister() throws Exception {
473 NioIoRegistration registration = this.registration;
474 if (registration != null) {
475 this.registration = null;
476 registration.cancel();
477 }
478 }
479
480 @Override
481 protected void doBeginRead() throws Exception {
482
483 NioIoRegistration registration = this.registration;
484 if (registration == null || !registration.isValid()) {
485 return;
486 }
487
488 readPending = true;
489
490 addAndSubmit(readOps);
491 }
492
493
494
495
496 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
497
498
499
500
501 protected abstract void doFinishConnect() throws Exception;
502
503
504
505
506
507
508 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
509 final int readableBytes = buf.readableBytes();
510 if (readableBytes == 0) {
511 ReferenceCountUtil.safeRelease(buf);
512 return Unpooled.EMPTY_BUFFER;
513 }
514
515 final ByteBufAllocator alloc = alloc();
516 if (alloc.isDirectBufferPooled()) {
517 ByteBuf directBuf = alloc.directBuffer(readableBytes);
518 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
519 ReferenceCountUtil.safeRelease(buf);
520 return directBuf;
521 }
522
523 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
524 if (directBuf != null) {
525 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
526 ReferenceCountUtil.safeRelease(buf);
527 return directBuf;
528 }
529
530
531 return buf;
532 }
533
534
535
536
537
538
539
540 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
541 final int readableBytes = buf.readableBytes();
542 if (readableBytes == 0) {
543 ReferenceCountUtil.safeRelease(holder);
544 return Unpooled.EMPTY_BUFFER;
545 }
546
547 final ByteBufAllocator alloc = alloc();
548 if (alloc.isDirectBufferPooled()) {
549 ByteBuf directBuf = alloc.directBuffer(readableBytes);
550 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
551 ReferenceCountUtil.safeRelease(holder);
552 return directBuf;
553 }
554
555 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
556 if (directBuf != null) {
557 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
558 ReferenceCountUtil.safeRelease(holder);
559 return directBuf;
560 }
561
562
563 if (holder != buf) {
564
565 buf.retain();
566 ReferenceCountUtil.safeRelease(holder);
567 }
568
569 return buf;
570 }
571
572 @Override
573 protected void doClose() throws Exception {
574 ChannelPromise promise = connectPromise;
575 if (promise != null) {
576
577 promise.tryFailure(new ClosedChannelException());
578 connectPromise = null;
579 }
580
581 Future<?> future = connectTimeoutFuture;
582 if (future != null) {
583 future.cancel(false);
584 connectTimeoutFuture = null;
585 }
586 }
587 }