1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel;
17
18 import io.netty.buffer.ByteBufAllocator;
19 import io.netty.util.DefaultAttributeMap;
20 import io.netty.util.ReferenceCountUtil;
21 import io.netty.util.internal.EmptyArrays;
22 import io.netty.util.internal.OneTimeTask;
23 import io.netty.util.internal.PlatformDependent;
24 import io.netty.util.internal.logging.InternalLogger;
25 import io.netty.util.internal.logging.InternalLoggerFactory;
26
27 import java.net.ConnectException;
28 import java.net.InetSocketAddress;
29 import java.net.NoRouteToHostException;
30 import java.net.SocketAddress;
31 import java.net.SocketException;
32 import java.nio.channels.ClosedChannelException;
33 import java.nio.channels.NotYetConnectedException;
34 import java.util.concurrent.Executor;
35 import java.util.concurrent.RejectedExecutionException;
36
37
38
39
40 public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
41
42 private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractChannel.class);
43
44 static final ClosedChannelException CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
45 static final NotYetConnectedException NOT_YET_CONNECTED_EXCEPTION = new NotYetConnectedException();
46
47 static {
48 CLOSED_CHANNEL_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
49 NOT_YET_CONNECTED_EXCEPTION.setStackTrace(EmptyArrays.EMPTY_STACK_TRACE);
50 }
51
52 private MessageSizeEstimator.Handle estimatorHandle;
53
54 private final Channel parent;
55 private final ChannelId id;
56 private final Unsafe unsafe;
57 private final DefaultChannelPipeline pipeline;
58 private final ChannelFuture succeededFuture = new SucceededChannelFuture(this, null);
59 private final VoidChannelPromise voidPromise = new VoidChannelPromise(this, true);
60 private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(this, false);
61 private final CloseFuture closeFuture = new CloseFuture(this);
62
63 private volatile SocketAddress localAddress;
64 private volatile SocketAddress remoteAddress;
65 private volatile PausableChannelEventLoop eventLoop;
66 private volatile boolean registered;
67
68
69 private boolean strValActive;
70 private String strVal;
71
72
73
74
75
76
77
78 protected AbstractChannel(Channel parent) {
79 this.parent = parent;
80 id = DefaultChannelId.newInstance();
81 unsafe = newUnsafe();
82 pipeline = new DefaultChannelPipeline(this);
83 }
84
85
86
87
88
89
90
91 protected AbstractChannel(Channel parent, ChannelId id) {
92 this.parent = parent;
93 this.id = id;
94 unsafe = newUnsafe();
95 pipeline = new DefaultChannelPipeline(this);
96 }
97
98 @Override
99 public final ChannelId id() {
100 return id;
101 }
102
103 @Override
104 public boolean isWritable() {
105 ChannelOutboundBuffer buf = unsafe.outboundBuffer();
106 return buf != null && buf.isWritable();
107 }
108
109 @Override
110 public Channel parent() {
111 return parent;
112 }
113
114 @Override
115 public ChannelPipeline pipeline() {
116 return pipeline;
117 }
118
119 @Override
120 public ByteBufAllocator alloc() {
121 return config().getAllocator();
122 }
123
124 @Override
125 public final EventLoop eventLoop() {
126 EventLoop eventLoop = this.eventLoop;
127 if (eventLoop == null) {
128 throw new IllegalStateException("channel not registered to an event loop");
129 }
130 return eventLoop;
131 }
132
133 @Override
134 public SocketAddress localAddress() {
135 SocketAddress localAddress = this.localAddress;
136 if (localAddress == null) {
137 try {
138 this.localAddress = localAddress = unsafe().localAddress();
139 } catch (Throwable t) {
140
141 return null;
142 }
143 }
144 return localAddress;
145 }
146
147 protected void invalidateLocalAddress() {
148 localAddress = null;
149 }
150
151 @Override
152 public SocketAddress remoteAddress() {
153 SocketAddress remoteAddress = this.remoteAddress;
154 if (remoteAddress == null) {
155 try {
156 this.remoteAddress = remoteAddress = unsafe().remoteAddress();
157 } catch (Throwable t) {
158
159 return null;
160 }
161 }
162 return remoteAddress;
163 }
164
165
166
167
168 protected void invalidateRemoteAddress() {
169 remoteAddress = null;
170 }
171
172 @Override
173 public boolean isRegistered() {
174 return registered;
175 }
176
177 @Override
178 public ChannelFuture bind(SocketAddress localAddress) {
179 return pipeline.bind(localAddress);
180 }
181
182 @Override
183 public ChannelFuture connect(SocketAddress remoteAddress) {
184 return pipeline.connect(remoteAddress);
185 }
186
187 @Override
188 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
189 return pipeline.connect(remoteAddress, localAddress);
190 }
191
192 @Override
193 public ChannelFuture disconnect() {
194 return pipeline.disconnect();
195 }
196
197 @Override
198 public ChannelFuture close() {
199 return pipeline.close();
200 }
201
202 @Override
203 public ChannelFuture deregister() {
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224 eventLoop.rejectNewTasks();
225 return pipeline.deregister();
226 }
227
228 @Override
229 public Channel flush() {
230 pipeline.flush();
231 return this;
232 }
233
234 @Override
235 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
236 return pipeline.bind(localAddress, promise);
237 }
238
239 @Override
240 public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
241 return pipeline.connect(remoteAddress, promise);
242 }
243
244 @Override
245 public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
246 return pipeline.connect(remoteAddress, localAddress, promise);
247 }
248
249 @Override
250 public ChannelFuture disconnect(ChannelPromise promise) {
251 return pipeline.disconnect(promise);
252 }
253
254 @Override
255 public ChannelFuture close(ChannelPromise promise) {
256 return pipeline.close(promise);
257 }
258
259 @Override
260 public ChannelFuture deregister(ChannelPromise promise) {
261 eventLoop.rejectNewTasks();
262 return pipeline.deregister(promise);
263 }
264
265 @Override
266 public Channel read() {
267 pipeline.read();
268 return this;
269 }
270
271 @Override
272 public ChannelFuture write(Object msg) {
273 return pipeline.write(msg);
274 }
275
276 @Override
277 public ChannelFuture write(Object msg, ChannelPromise promise) {
278 return pipeline.write(msg, promise);
279 }
280
281 @Override
282 public ChannelFuture writeAndFlush(Object msg) {
283 return pipeline.writeAndFlush(msg);
284 }
285
286 @Override
287 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
288 return pipeline.writeAndFlush(msg, promise);
289 }
290
291 @Override
292 public ChannelPromise newPromise() {
293 return new DefaultChannelPromise(this);
294 }
295
296 @Override
297 public ChannelProgressivePromise newProgressivePromise() {
298 return new DefaultChannelProgressivePromise(this);
299 }
300
301 @Override
302 public ChannelFuture newSucceededFuture() {
303 return succeededFuture;
304 }
305
306 @Override
307 public ChannelFuture newFailedFuture(Throwable cause) {
308 return new FailedChannelFuture(this, null, cause);
309 }
310
311 @Override
312 public ChannelFuture closeFuture() {
313 return closeFuture;
314 }
315
316 @Override
317 public Unsafe unsafe() {
318 return unsafe;
319 }
320
321
322
323
324 protected abstract AbstractUnsafe newUnsafe();
325
326
327
328
329 @Override
330 public final int hashCode() {
331 return id.hashCode();
332 }
333
334
335
336
337
338 @Override
339 public final boolean equals(Object o) {
340 return this == o;
341 }
342
343 @Override
344 public final int compareTo(Channel o) {
345 if (this == o) {
346 return 0;
347 }
348
349 return id().compareTo(o.id());
350 }
351
352
353
354
355
356
357
358 @Override
359 public String toString() {
360 boolean active = isActive();
361 if (strValActive == active && strVal != null) {
362 return strVal;
363 }
364
365 SocketAddress remoteAddr = remoteAddress();
366 SocketAddress localAddr = localAddress();
367 if (remoteAddr != null) {
368 SocketAddress srcAddr;
369 SocketAddress dstAddr;
370 if (parent == null) {
371 srcAddr = localAddr;
372 dstAddr = remoteAddr;
373 } else {
374 srcAddr = remoteAddr;
375 dstAddr = localAddr;
376 }
377
378 StringBuilder buf = new StringBuilder(96)
379 .append("[id: 0x")
380 .append(id.asShortText())
381 .append(", ")
382 .append(srcAddr)
383 .append(active? " => " : " :> ")
384 .append(dstAddr)
385 .append(']');
386 strVal = buf.toString();
387 } else if (localAddr != null) {
388 StringBuilder buf = new StringBuilder(64)
389 .append("[id: 0x")
390 .append(id.asShortText())
391 .append(", ")
392 .append(localAddr)
393 .append(']');
394 strVal = buf.toString();
395 } else {
396 StringBuilder buf = new StringBuilder(16)
397 .append("[id: 0x")
398 .append(id.asShortText())
399 .append(']');
400 strVal = buf.toString();
401 }
402
403 strValActive = active;
404 return strVal;
405 }
406
407 @Override
408 public final ChannelPromise voidPromise() {
409 return voidPromise;
410 }
411
412 final MessageSizeEstimator.Handle estimatorHandle() {
413 if (estimatorHandle == null) {
414 estimatorHandle = config().getMessageSizeEstimator().newHandle();
415 }
416 return estimatorHandle;
417 }
418
419
420
421
422 protected abstract class AbstractUnsafe implements Unsafe {
423
424 private ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);
425 private RecvByteBufAllocator.Handle recvHandle;
426 private boolean inFlush0;
427
428 private boolean neverRegistered = true;
429
430 @Override
431 public RecvByteBufAllocator.Handle recvBufAllocHandle() {
432 if (recvHandle == null) {
433 recvHandle = config().getRecvByteBufAllocator().newHandle();
434 }
435 return recvHandle;
436 }
437
438 @Override
439 public final ChannelHandlerInvoker invoker() {
440
441 return ((PausableChannelEventExecutor) eventLoop().asInvoker()).unwrapInvoker();
442 }
443
444 @Override
445 public final ChannelOutboundBuffer outboundBuffer() {
446 return outboundBuffer;
447 }
448
449 @Override
450 public final SocketAddress localAddress() {
451 return localAddress0();
452 }
453
454 @Override
455 public final SocketAddress remoteAddress() {
456 return remoteAddress0();
457 }
458
459 @Override
460 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
461 if (eventLoop == null) {
462 throw new NullPointerException("eventLoop");
463 }
464 if (promise == null) {
465 throw new NullPointerException("promise");
466 }
467 if (isRegistered()) {
468 promise.setFailure(new IllegalStateException("registered to an event loop already"));
469 return;
470 }
471 if (!isCompatible(eventLoop)) {
472 promise.setFailure(
473 new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
474 return;
475 }
476
477
478
479 if (AbstractChannel.this.eventLoop == null) {
480 AbstractChannel.this.eventLoop = new PausableChannelEventLoop(eventLoop);
481 } else {
482 AbstractChannel.this.eventLoop.unwrapped = eventLoop;
483 }
484
485 if (eventLoop.inEventLoop()) {
486 register0(promise);
487 } else {
488 try {
489 eventLoop.execute(new OneTimeTask() {
490 @Override
491 public void run() {
492 register0(promise);
493 }
494 });
495 } catch (Throwable t) {
496 logger.warn(
497 "Force-closing a channel whose registration task was not accepted by an event loop: {}",
498 AbstractChannel.this, t);
499 closeForcibly();
500 closeFuture.setClosed();
501 safeSetFailure(promise, t);
502 }
503 }
504 }
505
506 private void register0(ChannelPromise promise) {
507 try {
508
509
510 if (!promise.setUncancellable() || !ensureOpen(promise)) {
511 return;
512 }
513 boolean firstRegistration = neverRegistered;
514 doRegister();
515 neverRegistered = false;
516 registered = true;
517 eventLoop.acceptNewTasks();
518 safeSetSuccess(promise);
519 pipeline.fireChannelRegistered();
520
521
522 if (firstRegistration && isActive()) {
523 pipeline.fireChannelActive();
524 }
525 } catch (Throwable t) {
526
527 closeForcibly();
528 closeFuture.setClosed();
529 safeSetFailure(promise, t);
530 }
531 }
532
533 @Override
534 public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
535 if (!promise.setUncancellable() || !ensureOpen(promise)) {
536 return;
537 }
538
539
540 if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
541 localAddress instanceof InetSocketAddress &&
542 !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
543 !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
544
545
546 logger.warn(
547 "A non-root user can't receive a broadcast packet if the socket " +
548 "is not bound to a wildcard address; binding to a non-wildcard " +
549 "address (" + localAddress + ") anyway as requested.");
550 }
551
552 boolean wasActive = isActive();
553 try {
554 doBind(localAddress);
555 } catch (Throwable t) {
556 safeSetFailure(promise, t);
557 closeIfClosed();
558 return;
559 }
560
561 if (!wasActive && isActive()) {
562 invokeLater(new OneTimeTask() {
563 @Override
564 public void run() {
565 pipeline.fireChannelActive();
566 }
567 });
568 }
569
570 safeSetSuccess(promise);
571 }
572
573 @Override
574 public final void disconnect(final ChannelPromise promise) {
575 if (!promise.setUncancellable()) {
576 return;
577 }
578
579 boolean wasActive = isActive();
580 try {
581 doDisconnect();
582 } catch (Throwable t) {
583 safeSetFailure(promise, t);
584 closeIfClosed();
585 return;
586 }
587
588 if (wasActive && !isActive()) {
589 invokeLater(new OneTimeTask() {
590 @Override
591 public void run() {
592 pipeline.fireChannelInactive();
593 }
594 });
595 }
596
597 safeSetSuccess(promise);
598 closeIfClosed();
599 }
600
601 @Override
602 public final void close(final ChannelPromise promise) {
603 if (!promise.setUncancellable()) {
604 return;
605 }
606
607 if (inFlush0) {
608 invokeLater(new OneTimeTask() {
609 @Override
610 public void run() {
611 close(promise);
612 }
613 });
614 return;
615 }
616
617 if (outboundBuffer == null) {
618
619 closeFuture.addListener(new ChannelFutureListener() {
620 @Override
621 public void operationComplete(ChannelFuture future) throws Exception {
622 promise.setSuccess();
623 }
624 });
625 return;
626 }
627
628 if (closeFuture.isDone()) {
629
630 safeSetSuccess(promise);
631 return;
632 }
633
634 final boolean wasActive = isActive();
635 final ChannelOutboundBuffer buffer = outboundBuffer;
636 outboundBuffer = null;
637 Executor closeExecutor = closeExecutor();
638 if (closeExecutor != null) {
639 closeExecutor.execute(new OneTimeTask() {
640 @Override
641 public void run() {
642 Throwable cause = null;
643 try {
644 doClose();
645 } catch (Throwable t) {
646 cause = t;
647 }
648 final Throwable error = cause;
649
650 invokeLater(new OneTimeTask() {
651 @Override
652 public void run() {
653 closeAndDeregister(buffer, wasActive, promise, error);
654 }
655 });
656 }
657 });
658 } else {
659 Throwable error = null;
660 try {
661 doClose();
662 } catch (Throwable t) {
663 error = t;
664 }
665 closeAndDeregister(buffer, wasActive, promise, error);
666 }
667 }
668
669 private void closeAndDeregister(ChannelOutboundBuffer outboundBuffer, final boolean wasActive,
670 ChannelPromise promise, Throwable error) {
671
672 try {
673 outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
674 outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);
675 } finally {
676 if (wasActive && !isActive()) {
677 invokeLater(new OneTimeTask() {
678 @Override
679 public void run() {
680 pipeline.fireChannelInactive();
681 deregister(voidPromise());
682 }
683 });
684 } else {
685 invokeLater(new OneTimeTask() {
686 @Override
687 public void run() {
688 deregister(voidPromise());
689 }
690 });
691 }
692
693
694 closeFuture.setClosed();
695 if (error != null) {
696 safeSetFailure(promise, error);
697 } else {
698 safeSetSuccess(promise);
699 }
700 }
701 }
702
703 @Override
704 public final void closeForcibly() {
705 try {
706 doClose();
707 } catch (Exception e) {
708 logger.warn("Failed to close a channel.", e);
709 }
710 }
711
712
713
714
715
716
717
718
719 @Override
720 public final void deregister(final ChannelPromise promise) {
721 if (!promise.setUncancellable()) {
722 return;
723 }
724
725 if (!registered) {
726 safeSetSuccess(promise);
727 return;
728 }
729
730 try {
731 doDeregister();
732 } catch (Throwable t) {
733 safeSetFailure(promise, t);
734 logger.warn("Unexpected exception occurred while deregistering a channel.", t);
735 } finally {
736 if (registered) {
737 registered = false;
738 safeSetSuccess(promise);
739 pipeline.fireChannelUnregistered();
740 } else {
741
742
743
744 safeSetSuccess(promise);
745 }
746 }
747 }
748
749 @Override
750 public final void beginRead() {
751 if (!isActive()) {
752 return;
753 }
754
755 try {
756 doBeginRead();
757 } catch (final Exception e) {
758 invokeLater(new OneTimeTask() {
759 @Override
760 public void run() {
761 pipeline.fireExceptionCaught(e);
762 }
763 });
764 close(voidPromise());
765 }
766 }
767
768 @Override
769 public final void write(Object msg, ChannelPromise promise) {
770 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
771 if (outboundBuffer == null) {
772
773
774
775
776 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
777
778 ReferenceCountUtil.release(msg);
779 return;
780 }
781
782 int size;
783 try {
784 msg = filterOutboundMessage(msg);
785 size = estimatorHandle().size(msg);
786 if (size < 0) {
787 size = 0;
788 }
789 } catch (Throwable t) {
790 safeSetFailure(promise, t);
791 ReferenceCountUtil.release(msg);
792 return;
793 }
794
795 outboundBuffer.addMessage(msg, size, promise);
796 }
797
798 @Override
799 public final void flush() {
800 ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
801 if (outboundBuffer == null) {
802 return;
803 }
804
805 outboundBuffer.addFlush();
806 flush0();
807 }
808
809 protected void flush0() {
810 if (inFlush0) {
811
812 return;
813 }
814
815 final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
816 if (outboundBuffer == null || outboundBuffer.isEmpty()) {
817 return;
818 }
819
820 inFlush0 = true;
821
822
823 if (!isActive()) {
824 try {
825 if (isOpen()) {
826 outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);
827 } else {
828 outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);
829 }
830 } finally {
831 inFlush0 = false;
832 }
833 return;
834 }
835
836 try {
837 doWrite(outboundBuffer);
838 } catch (Throwable t) {
839 outboundBuffer.failFlushed(t);
840 } finally {
841 inFlush0 = false;
842 }
843 }
844
845 @Override
846 public final ChannelPromise voidPromise() {
847 return unsafeVoidPromise;
848 }
849
850 protected final boolean ensureOpen(ChannelPromise promise) {
851 if (isOpen()) {
852 return true;
853 }
854
855 safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
856 return false;
857 }
858
859
860
861
862 protected final void safeSetSuccess(ChannelPromise promise) {
863 if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
864 logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
865 }
866 }
867
868
869
870
871 protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
872 if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
873 logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
874 }
875 }
876
877 protected final void closeIfClosed() {
878 if (isOpen()) {
879 return;
880 }
881 close(voidPromise());
882 }
883
884 private void invokeLater(Runnable task) {
885 try {
886
887
888
889
890
891
892
893
894
895
896
897 eventLoop().unwrap().execute(task);
898 } catch (RejectedExecutionException e) {
899 logger.warn("Can't invoke task later as EventLoop rejected it", e);
900 }
901 }
902
903
904
905
906 protected final Throwable annotateConnectException(Throwable cause, SocketAddress remoteAddress) {
907 if (cause instanceof ConnectException) {
908 Throwable newT = new ConnectException(cause.getMessage() + ": " + remoteAddress);
909 newT.setStackTrace(cause.getStackTrace());
910 cause = newT;
911 } else if (cause instanceof NoRouteToHostException) {
912 Throwable newT = new NoRouteToHostException(cause.getMessage() + ": " + remoteAddress);
913 newT.setStackTrace(cause.getStackTrace());
914 cause = newT;
915 } else if (cause instanceof SocketException) {
916 Throwable newT = new SocketException(cause.getMessage() + ": " + remoteAddress);
917 newT.setStackTrace(cause.getStackTrace());
918 cause = newT;
919 }
920
921 return cause;
922 }
923
924
925
926
927
928
929 protected Executor closeExecutor() {
930 return null;
931 }
932 }
933
934
935
936
937 protected abstract boolean isCompatible(EventLoop loop);
938
939
940
941
942 protected abstract SocketAddress localAddress0();
943
944
945
946
947 protected abstract SocketAddress remoteAddress0();
948
949
950
951
952
953
954 protected void doRegister() throws Exception {
955
956 }
957
958
959
960
961 protected abstract void doBind(SocketAddress localAddress) throws Exception;
962
963
964
965
966 protected abstract void doDisconnect() throws Exception;
967
968
969
970
971 protected abstract void doClose() throws Exception;
972
973
974
975
976
977
978 protected void doDeregister() throws Exception {
979
980 }
981
982
983
984
985 protected abstract void doBeginRead() throws Exception;
986
987
988
989
990 protected abstract void doWrite(ChannelOutboundBuffer in) throws Exception;
991
992
993
994
995
996 protected Object filterOutboundMessage(Object msg) throws Exception {
997 return msg;
998 }
999
1000 static final class CloseFuture extends DefaultChannelPromise {
1001
1002 CloseFuture(AbstractChannel ch) {
1003 super(ch);
1004 }
1005
1006 @Override
1007 public ChannelPromise setSuccess() {
1008 throw new IllegalStateException();
1009 }
1010
1011 @Override
1012 public ChannelPromise setFailure(Throwable cause) {
1013 throw new IllegalStateException();
1014 }
1015
1016 @Override
1017 public boolean trySuccess() {
1018 throw new IllegalStateException();
1019 }
1020
1021 @Override
1022 public boolean tryFailure(Throwable cause) {
1023 throw new IllegalStateException();
1024 }
1025
1026 boolean setClosed() {
1027 return super.trySuccess();
1028 }
1029 }
1030
1031 private final class PausableChannelEventLoop
1032 extends PausableChannelEventExecutor implements EventLoop {
1033
1034 volatile boolean isAcceptingNewTasks = true;
1035 volatile EventLoop unwrapped;
1036
1037 PausableChannelEventLoop(EventLoop unwrapped) {
1038 this.unwrapped = unwrapped;
1039 }
1040
1041 @Override
1042 public void rejectNewTasks() {
1043 isAcceptingNewTasks = false;
1044 }
1045
1046 @Override
1047 public void acceptNewTasks() {
1048 isAcceptingNewTasks = true;
1049 }
1050
1051 @Override
1052 public boolean isAcceptingNewTasks() {
1053 return isAcceptingNewTasks;
1054 }
1055
1056 @Override
1057 public EventLoopGroup parent() {
1058 return unwrap().parent();
1059 }
1060
1061 @Override
1062 public EventLoop next() {
1063 return unwrap().next();
1064 }
1065
1066 @Override
1067 public EventLoop unwrap() {
1068 return unwrapped;
1069 }
1070
1071 @Override
1072 public ChannelHandlerInvoker asInvoker() {
1073 return this;
1074 }
1075
1076 @Override
1077 public ChannelFuture register(Channel channel) {
1078 return unwrap().register(channel);
1079 }
1080
1081 @Override
1082 public ChannelFuture register(Channel channel, ChannelPromise promise) {
1083 return unwrap().register(channel, promise);
1084 }
1085
1086 @Override
1087 Channel channel() {
1088 return AbstractChannel.this;
1089 }
1090
1091 @Override
1092 ChannelHandlerInvoker unwrapInvoker() {
1093 return unwrapped.asInvoker();
1094 }
1095 }
1096 }