1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.nio;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelPromise;
28 import io.netty.channel.ConnectTimeoutException;
29 import io.netty.channel.EventLoop;
30 import io.netty.channel.IoEvent;
31 import io.netty.channel.IoEventLoop;
32 import io.netty.channel.IoEventLoopGroup;
33 import io.netty.channel.IoRegistration;
34 import io.netty.util.ReferenceCountUtil;
35 import io.netty.util.ReferenceCounted;
36 import io.netty.util.concurrent.Future;
37 import io.netty.util.internal.ObjectUtil;
38 import io.netty.util.internal.logging.InternalLogger;
39 import io.netty.util.internal.logging.InternalLoggerFactory;
40
41 import java.io.IOException;
42 import java.net.SocketAddress;
43 import java.nio.channels.CancelledKeyException;
44 import java.nio.channels.ClosedChannelException;
45 import java.nio.channels.ConnectionPendingException;
46 import java.nio.channels.SelectableChannel;
47 import java.nio.channels.SelectionKey;
48 import java.util.concurrent.TimeUnit;
49
50
51
52
53 public abstract class AbstractNioChannel extends AbstractChannel {
54
55 private static final InternalLogger logger =
56 InternalLoggerFactory.getInstance(AbstractNioChannel.class);
57
58 private final SelectableChannel ch;
59 protected final int readInterestOp;
60 protected final NioIoOps readOps;
61 volatile IoRegistration registration;
62 boolean readPending;
63 private final Runnable clearReadPendingRunnable = new Runnable() {
64 @Override
65 public void run() {
66 clearReadPending0();
67 }
68 };
69
70
71
72
73
74 private ChannelPromise connectPromise;
75 private Future<?> connectTimeoutFuture;
76 private SocketAddress requestedRemoteAddress;
77
78
79
80
81
82
83
84
85 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readOps) {
86 this(parent, ch, NioIoOps.valueOf(readOps));
87 }
88
89 protected AbstractNioChannel(Channel parent, SelectableChannel ch, NioIoOps readOps) {
90 super(parent);
91 this.ch = ch;
92 this.readInterestOp = ObjectUtil.checkNotNull(readOps, "readOps").value;
93 this.readOps = readOps;
94 try {
95 ch.configureBlocking(false);
96 } catch (IOException e) {
97 try {
98 ch.close();
99 } catch (IOException e2) {
100 logger.warn(
101 "Failed to close a partially initialized socket.", e2);
102 }
103
104 throw new ChannelException("Failed to enter non-blocking mode.", e);
105 }
106 }
107
108 protected void addAndSubmit(NioIoOps addOps) {
109 int interestOps = selectionKey().interestOps();
110 if (!addOps.isIncludedIn(interestOps)) {
111 try {
112 registration().submit(NioIoOps.valueOf(interestOps).with(addOps));
113 } catch (Exception e) {
114 throw new ChannelException(e);
115 }
116 }
117 }
118
119 protected void removeAndSubmit(NioIoOps removeOps) {
120 int interestOps = selectionKey().interestOps();
121 if (removeOps.isIncludedIn(interestOps)) {
122 try {
123 registration().submit(NioIoOps.valueOf(interestOps).without(removeOps));
124 } catch (Exception e) {
125 throw new ChannelException(e);
126 }
127 }
128 }
129
130 @Override
131 public boolean isOpen() {
132 return ch.isOpen();
133 }
134
135 @Override
136 public NioUnsafe unsafe() {
137 return (NioUnsafe) super.unsafe();
138 }
139
140 protected SelectableChannel javaChannel() {
141 return ch;
142 }
143
144
145
146
147
148
149 @Deprecated
150 protected SelectionKey selectionKey() {
151 return registration().attachment();
152 }
153
154 @SuppressWarnings("unchecked")
155 protected IoRegistration registration() {
156 assert registration != null;
157 return registration;
158 }
159
160
161
162
163
164 @Deprecated
165 protected boolean isReadPending() {
166 return readPending;
167 }
168
169
170
171
172
173 @Deprecated
174 protected void setReadPending(final boolean readPending) {
175 if (isRegistered()) {
176 EventLoop eventLoop = eventLoop();
177 if (eventLoop.inEventLoop()) {
178 setReadPending0(readPending);
179 } else {
180 eventLoop.execute(new Runnable() {
181 @Override
182 public void run() {
183 setReadPending0(readPending);
184 }
185 });
186 }
187 } else {
188
189
190
191 this.readPending = readPending;
192 }
193 }
194
195
196
197
198 protected final void clearReadPending() {
199 if (isRegistered()) {
200 EventLoop eventLoop = eventLoop();
201 if (eventLoop.inEventLoop()) {
202 clearReadPending0();
203 } else {
204 eventLoop.execute(clearReadPendingRunnable);
205 }
206 } else {
207
208
209
210 readPending = false;
211 }
212 }
213
214 private void setReadPending0(boolean readPending) {
215 this.readPending = readPending;
216 if (!readPending) {
217 ((AbstractNioUnsafe) unsafe()).removeReadOp();
218 }
219 }
220
221 private void clearReadPending0() {
222 readPending = false;
223 ((AbstractNioUnsafe) unsafe()).removeReadOp();
224 }
225
226
227
228
229 public interface NioUnsafe extends Unsafe {
230
231
232
233 SelectableChannel ch();
234
235
236
237
238 void finishConnect();
239
240
241
242
243 void read();
244
245 void forceFlush();
246 }
247
248 protected abstract class AbstractNioUnsafe extends AbstractUnsafe implements NioUnsafe, NioIoHandle {
249 @Override
250 public void close() {
251 close(voidPromise());
252 }
253
254 @Override
255 public SelectableChannel selectableChannel() {
256 return ch();
257 }
258
259 Channel channel() {
260 return AbstractNioChannel.this;
261 }
262
263 protected final void removeReadOp() {
264 IoRegistration registration = registration();
265
266
267
268 if (!registration.isValid()) {
269 return;
270 }
271 removeAndSubmit(readOps);
272 }
273
274 @Override
275 public final SelectableChannel ch() {
276 return javaChannel();
277 }
278
279 @Override
280 public final void connect(
281 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
282
283
284 if (promise.isDone() || !ensureOpen(promise)) {
285 return;
286 }
287
288 try {
289 if (connectPromise != null) {
290
291 throw new ConnectionPendingException();
292 }
293
294 boolean wasActive = isActive();
295 if (doConnect(remoteAddress, localAddress)) {
296 fulfillConnectPromise(promise, wasActive);
297 } else {
298 connectPromise = promise;
299 requestedRemoteAddress = remoteAddress;
300
301
302 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
303 if (connectTimeoutMillis > 0) {
304 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
305 @Override
306 public void run() {
307 ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
308 if (connectPromise != null && !connectPromise.isDone()
309 && connectPromise.tryFailure(new ConnectTimeoutException(
310 "connection timed out after " + connectTimeoutMillis + " ms: " +
311 remoteAddress))) {
312 close(voidPromise());
313 }
314 }
315 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
316 }
317
318 promise.addListener(new ChannelFutureListener() {
319 @Override
320 public void operationComplete(ChannelFuture future) {
321
322
323 if (future.isCancelled()) {
324 if (connectTimeoutFuture != null) {
325 connectTimeoutFuture.cancel(false);
326 }
327 connectPromise = null;
328 close(voidPromise());
329 }
330 }
331 });
332 }
333 } catch (Throwable t) {
334 promise.tryFailure(annotateConnectException(t, remoteAddress));
335 closeIfClosed();
336 }
337 }
338
339 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
340 if (promise == null) {
341
342 return;
343 }
344
345
346
347 boolean active = isActive();
348
349
350 boolean promiseSet = promise.trySuccess();
351
352
353
354 if (!wasActive && active) {
355 pipeline().fireChannelActive();
356 }
357
358
359 if (!promiseSet) {
360 close(voidPromise());
361 }
362 }
363
364 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
365 if (promise == null) {
366
367 return;
368 }
369
370
371 promise.tryFailure(cause);
372 closeIfClosed();
373 }
374
375 @Override
376 public final void finishConnect() {
377
378
379
380 assert eventLoop().inEventLoop();
381
382 try {
383 boolean wasActive = isActive();
384 doFinishConnect();
385 fulfillConnectPromise(connectPromise, wasActive);
386 } catch (Throwable t) {
387 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
388 } finally {
389
390
391 if (connectTimeoutFuture != null) {
392 connectTimeoutFuture.cancel(false);
393 }
394 connectPromise = null;
395 }
396 }
397
398 @Override
399 protected final void flush0() {
400
401
402
403 if (!isFlushPending()) {
404 super.flush0();
405 }
406 }
407
408 @Override
409 public final void forceFlush() {
410
411 super.flush0();
412 }
413
414 private boolean isFlushPending() {
415 IoRegistration registration = registration();
416 return registration.isValid() && NioIoOps.WRITE.isIncludedIn((
417 (SelectionKey) registration.attachment()).interestOps());
418 }
419
420 @Override
421 public void handle(IoRegistration registration, IoEvent event) {
422 try {
423 NioIoEvent nioEvent = (NioIoEvent) event;
424 NioIoOps nioReadyOps = nioEvent.ops();
425
426
427 if (nioReadyOps.contains(NioIoOps.CONNECT)) {
428
429
430 removeAndSubmit(NioIoOps.CONNECT);
431
432 unsafe().finishConnect();
433 }
434
435
436 if (nioReadyOps.contains(NioIoOps.WRITE)) {
437
438
439 forceFlush();
440 }
441
442
443
444 if (nioReadyOps.contains(NioIoOps.READ_AND_ACCEPT) || nioReadyOps.equals(NioIoOps.NONE)) {
445 read();
446 }
447 } catch (CancelledKeyException ignored) {
448 close(voidPromise());
449 }
450 }
451 }
452
453 @Override
454 protected boolean isCompatible(EventLoop loop) {
455 return loop instanceof IoEventLoop && ((IoEventLoopGroup) loop).isCompatible(AbstractNioUnsafe.class);
456 }
457
458 @SuppressWarnings("unchecked")
459 @Override
460 protected void doRegister(ChannelPromise promise) {
461 assert registration == null;
462 ((IoEventLoop) eventLoop()).register((AbstractNioUnsafe) unsafe()).addListener(f -> {
463 if (f.isSuccess()) {
464 registration = (IoRegistration) f.getNow();
465 promise.setSuccess();
466 } else {
467 promise.setFailure(f.cause());
468 }
469 });
470 }
471
472 @Override
473 protected void doDeregister() throws Exception {
474 IoRegistration registration = this.registration;
475 if (registration != null) {
476 this.registration = null;
477 registration.cancel();
478 }
479 }
480
481 @Override
482 protected void doBeginRead() throws Exception {
483
484 IoRegistration registration = this.registration;
485 if (registration == null || !registration.isValid()) {
486 return;
487 }
488
489 readPending = true;
490
491 addAndSubmit(readOps);
492 }
493
494
495
496
497 protected abstract boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception;
498
499
500
501
502 protected abstract void doFinishConnect() throws Exception;
503
504
505
506
507
508
509 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
510 final int readableBytes = buf.readableBytes();
511 if (readableBytes == 0) {
512 ReferenceCountUtil.safeRelease(buf);
513 return Unpooled.EMPTY_BUFFER;
514 }
515
516 final ByteBufAllocator alloc = alloc();
517 if (alloc.isDirectBufferPooled()) {
518 ByteBuf directBuf = alloc.directBuffer(readableBytes);
519 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
520 ReferenceCountUtil.safeRelease(buf);
521 return directBuf;
522 }
523
524 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
525 if (directBuf != null) {
526 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
527 ReferenceCountUtil.safeRelease(buf);
528 return directBuf;
529 }
530
531
532 return buf;
533 }
534
535
536
537
538
539
540
541 protected final ByteBuf newDirectBuffer(ReferenceCounted holder, ByteBuf buf) {
542 final int readableBytes = buf.readableBytes();
543 if (readableBytes == 0) {
544 ReferenceCountUtil.safeRelease(holder);
545 return Unpooled.EMPTY_BUFFER;
546 }
547
548 final ByteBufAllocator alloc = alloc();
549 if (alloc.isDirectBufferPooled()) {
550 ByteBuf directBuf = alloc.directBuffer(readableBytes);
551 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
552 ReferenceCountUtil.safeRelease(holder);
553 return directBuf;
554 }
555
556 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
557 if (directBuf != null) {
558 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
559 ReferenceCountUtil.safeRelease(holder);
560 return directBuf;
561 }
562
563
564 if (holder != buf) {
565
566 buf.retain();
567 ReferenceCountUtil.safeRelease(holder);
568 }
569
570 return buf;
571 }
572
573 @Override
574 protected void doClose() throws Exception {
575 ChannelPromise promise = connectPromise;
576 if (promise != null) {
577
578 promise.tryFailure(new ClosedChannelException());
579 connectPromise = null;
580 }
581
582 Future<?> future = connectTimeoutFuture;
583 if (future != null) {
584 future.cancel(false);
585 connectTimeoutFuture = null;
586 }
587 }
588 }