1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.IovArray;
42 import io.netty.channel.unix.Socket;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46
47 import java.io.IOException;
48 import java.io.UncheckedIOException;
49 import java.net.InetSocketAddress;
50 import java.net.SocketAddress;
51 import java.nio.ByteBuffer;
52 import java.nio.channels.AlreadyConnectedException;
53 import java.nio.channels.ClosedChannelException;
54 import java.nio.channels.ConnectionPendingException;
55 import java.nio.channels.NotYetConnectedException;
56 import java.nio.channels.UnresolvedAddressException;
57 import java.util.concurrent.TimeUnit;
58
59 import static io.netty.channel.epoll.EpollIoOps.EPOLL_ERR_IN_MASK;
60 import static io.netty.channel.epoll.EpollIoOps.EPOLL_ERR_OUT_MASK;
61 import static io.netty.channel.epoll.EpollIoOps.EPOLL_RDHUP_MASK;
62 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
63 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
64 import static io.netty.util.internal.ObjectUtil.checkNotNull;
65
66 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
67 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
68 protected final LinuxSocket socket;
69 private final EpollIoOps initial;
70
71
72
73
74
75 private ChannelPromise connectPromise;
76 private Future<?> connectTimeoutFuture;
77 private SocketAddress requestedRemoteAddress;
78 private volatile SocketAddress local;
79 private volatile SocketAddress remote;
80
81 private IoRegistration registration;
82 boolean inputClosedSeenErrorOnRead;
83 private EpollIoOps ops;
84
85 protected volatile boolean active;
86
87 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
88 super(parent);
89 this.socket = checkNotNull(fd, "fd");
90 this.active = active;
91 if (active) {
92
93
94 this.local = fd.localAddress();
95 this.remote = fd.remoteAddress();
96 }
97 this.initial = initialOps;
98 this.ops = initialOps;
99 }
100
101 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
102 super(parent);
103 this.socket = checkNotNull(fd, "fd");
104 this.active = true;
105
106
107 this.remote = remote;
108 this.local = fd.localAddress();
109 this.initial = initialOps;
110 this.ops = initialOps;
111 }
112
113 static boolean isSoErrorZero(Socket fd) {
114 try {
115 return fd.getSoError() == 0;
116 } catch (IOException e) {
117 throw new ChannelException(e);
118 }
119 }
120
121 protected void setFlag(int flag) throws IOException {
122 if (ops.contains(flag)) {
123
124 return;
125 }
126 ops = ops.with(EpollIoOps.valueOf(flag));
127 if (isRegistered()) {
128 IoRegistration registration = registration();
129 registration.submit(ops);
130 } else {
131 ops = ops.with(EpollIoOps.valueOf(flag));
132 }
133 }
134
135 void clearFlag(int flag) throws IOException {
136 IoRegistration registration = registration();
137 if (!ops.contains(flag)) {
138
139 return;
140 }
141 ops = ops.without(EpollIoOps.valueOf(flag));
142 registration.submit(ops);
143 }
144
145 protected final IoRegistration registration() {
146 assert registration != null;
147 return registration;
148 }
149
150 boolean isFlagSet(int flag) {
151 return (ops.value & flag) != 0;
152 }
153
154 @Override
155 public final FileDescriptor fd() {
156 return socket;
157 }
158
159 @Override
160 public abstract EpollChannelConfig config();
161
162 @Override
163 public boolean isActive() {
164 return active;
165 }
166
167 @Override
168 public ChannelMetadata metadata() {
169 return METADATA;
170 }
171
172 @Override
173 protected void doClose() throws Exception {
174 active = false;
175
176
177 inputClosedSeenErrorOnRead = true;
178 try {
179 ChannelPromise promise = connectPromise;
180 if (promise != null) {
181
182 promise.tryFailure(new ClosedChannelException());
183 connectPromise = null;
184 }
185
186 Future<?> future = connectTimeoutFuture;
187 if (future != null) {
188 future.cancel(false);
189 connectTimeoutFuture = null;
190 }
191
192 if (isRegistered()) {
193
194
195
196
197 EventLoop loop = eventLoop();
198 if (loop.inEventLoop()) {
199 doDeregister();
200 } else {
201 loop.execute(new Runnable() {
202 @Override
203 public void run() {
204 try {
205 doDeregister();
206 } catch (Throwable cause) {
207 pipeline().fireExceptionCaught(cause);
208 }
209 }
210 });
211 }
212 }
213 } finally {
214 socket.close();
215 }
216 }
217
218 void resetCachedAddresses() {
219 local = socket.localAddress();
220 remote = socket.remoteAddress();
221 }
222
223 @Override
224 protected void doDisconnect() throws Exception {
225 doClose();
226 }
227
228 @Override
229 public boolean isOpen() {
230 return socket.isOpen();
231 }
232
233 @Override
234 protected void doDeregister() throws Exception {
235 IoRegistration registration = this.registration;
236 if (registration != null) {
237 ops = initial;
238 registration.cancel();
239 }
240 }
241
242 @Override
243 protected boolean isCompatible(EventLoop loop) {
244 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
245 }
246
247 @Override
248 protected void doBeginRead() throws Exception {
249
250 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
251 unsafe.readPending = true;
252
253
254
255
256 setFlag(Native.EPOLLIN);
257 }
258
259 final boolean shouldBreakEpollInReady(ChannelConfig config) {
260 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
261 }
262
263 private static boolean isAllowHalfClosure(ChannelConfig config) {
264 if (config instanceof EpollDomainSocketChannelConfig) {
265 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
266 }
267 return config instanceof SocketChannelConfig &&
268 ((SocketChannelConfig) config).isAllowHalfClosure();
269 }
270
271 final void clearEpollIn() {
272
273 if (isRegistered()) {
274 final EventLoop loop = eventLoop();
275 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
276 if (loop.inEventLoop()) {
277 unsafe.clearEpollIn0();
278 } else {
279
280 loop.execute(new Runnable() {
281 @Override
282 public void run() {
283 if (!unsafe.readPending && !config().isAutoRead()) {
284
285 unsafe.clearEpollIn0();
286 }
287 }
288 });
289 }
290 } else {
291
292
293 ops = ops.without(EpollIoOps.EPOLLIN);
294 }
295 }
296
297 @Override
298 protected void doRegister(ChannelPromise promise) {
299 ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
300 if (f.isSuccess()) {
301 registration = (IoRegistration) f.getNow();
302 if (isActive()) {
303
304 submitCurrentOps();
305 }
306 promise.setSuccess();
307 } else {
308 promise.setFailure(f.cause());
309 }
310 });
311 }
312
313 @Override
314 protected abstract AbstractEpollUnsafe newUnsafe();
315
316
317
318
319 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
320 return newDirectBuffer(buf, buf);
321 }
322
323
324
325
326
327
328 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
329 final int readableBytes = buf.readableBytes();
330 if (readableBytes == 0) {
331 ReferenceCountUtil.release(holder);
332 return Unpooled.EMPTY_BUFFER;
333 }
334
335 final ByteBufAllocator alloc = alloc();
336 if (alloc.isDirectBufferPooled()) {
337 return newDirectBuffer0(holder, buf, alloc, readableBytes);
338 }
339
340 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
341 if (directBuf == null) {
342 return newDirectBuffer0(holder, buf, alloc, readableBytes);
343 }
344
345 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
346 ReferenceCountUtil.safeRelease(holder);
347 return directBuf;
348 }
349
350 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
351 final ByteBuf directBuf = alloc.directBuffer(capacity);
352 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
353 ReferenceCountUtil.safeRelease(holder);
354 return directBuf;
355 }
356
357 protected static void checkResolvable(InetSocketAddress addr) {
358 if (addr.isUnresolved()) {
359 throw new UnresolvedAddressException();
360 }
361 }
362
363
364
365
366 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
367 int writerIndex = byteBuf.writerIndex();
368 int localReadAmount;
369 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
370 if (byteBuf.hasMemoryAddress()) {
371 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
372 } else {
373 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
374 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
375 }
376 if (localReadAmount > 0) {
377 byteBuf.writerIndex(writerIndex + localReadAmount);
378 }
379 return localReadAmount;
380 }
381
382 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
383 if (buf.hasMemoryAddress()) {
384 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
385 if (localFlushedAmount > 0) {
386 in.removeBytes(localFlushedAmount);
387 return 1;
388 }
389 } else {
390 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
391 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
392 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
393 if (localFlushedAmount > 0) {
394 nioBuf.position(nioBuf.position() + localFlushedAmount);
395 in.removeBytes(localFlushedAmount);
396 return 1;
397 }
398 }
399 return WRITE_STATUS_SNDBUF_FULL;
400 }
401
402
403
404
405
406 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
407 throws IOException {
408 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
409 if (data.hasMemoryAddress()) {
410 long memoryAddress = data.memoryAddress();
411 if (remoteAddress == null) {
412 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
413 }
414 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
415 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
416 }
417
418 if (data.nioBufferCount() > 1) {
419 IovArray array = ((NativeArrays) registration.attachment()).cleanIovArray();
420 array.add(data, data.readerIndex(), data.readableBytes());
421 int cnt = array.count();
422 assert cnt != 0;
423
424 if (remoteAddress == null) {
425 return socket.writevAddresses(array.memoryAddress(0), cnt);
426 }
427 return socket.sendToAddresses(array.memoryAddress(0), cnt,
428 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
429 }
430
431 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
432 if (remoteAddress == null) {
433 return socket.send(nioData, nioData.position(), nioData.limit());
434 }
435 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
436 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
437 }
438
439 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
440 boolean readPending;
441 private EpollRecvByteAllocatorHandle allocHandle;
442
443 Channel channel() {
444 return AbstractEpollChannel.this;
445 }
446
447 @Override
448 public FileDescriptor fd() {
449 return AbstractEpollChannel.this.fd();
450 }
451
452 @Override
453 public void close() {
454 close(voidPromise());
455 }
456
457 @Override
458 public void handle(IoRegistration registration, IoEvent event) {
459 EpollIoEvent epollEvent = (EpollIoEvent) event;
460 int ops = epollEvent.ops().value;
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475 if ((ops & EPOLL_ERR_OUT_MASK) != 0) {
476
477 epollOutReady();
478 }
479
480
481
482
483
484
485 if ((ops & EPOLL_ERR_IN_MASK) != 0) {
486
487 epollInReady();
488 }
489
490
491
492
493 if ((ops & EPOLL_RDHUP_MASK) != 0) {
494 epollRdHupReady();
495 }
496 }
497
498
499
500
501 abstract void epollInReady();
502
503 final boolean shouldStopReading(ChannelConfig config) {
504
505
506
507
508
509
510 return !readPending && !config.isAutoRead();
511 }
512
513
514
515
516 final void epollRdHupReady() {
517
518 recvBufAllocHandle().receivedRdHup();
519
520 if (isActive()) {
521
522
523
524 epollInReady();
525 } else {
526
527 shutdownInput(false);
528 }
529
530
531 clearEpollRdHup();
532 }
533
534
535
536
537 private void clearEpollRdHup() {
538 try {
539 clearFlag(Native.EPOLLRDHUP);
540 } catch (IOException e) {
541 pipeline().fireExceptionCaught(e);
542 close(voidPromise());
543 }
544 }
545
546
547
548
549 void shutdownInput(boolean allDataRead) {
550 if (!socket.isInputShutdown()) {
551 if (isAllowHalfClosure(config())) {
552 try {
553 socket.shutdown(true, false);
554 } catch (IOException ignored) {
555
556
557 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
558 return;
559 } catch (NotYetConnectedException ignore) {
560
561
562 }
563 if (shouldStopReading(config())) {
564 clearEpollIn0();
565 }
566 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
567 } else {
568 close(voidPromise());
569 return;
570 }
571 }
572
573 if (allDataRead && !inputClosedSeenErrorOnRead) {
574 inputClosedSeenErrorOnRead = true;
575 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
576 }
577 }
578
579 private void fireEventAndClose(Object evt) {
580 pipeline().fireUserEventTriggered(evt);
581 close(voidPromise());
582 }
583
584 @Override
585 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
586 if (allocHandle == null) {
587 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
588 }
589 return allocHandle;
590 }
591
592
593
594
595
596 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
597 return new EpollRecvByteAllocatorHandle(handle);
598 }
599
600 @Override
601 protected final void flush0() {
602
603
604
605 if (!isFlagSet(Native.EPOLLOUT)) {
606 super.flush0();
607 }
608 }
609
610
611
612
613 final void epollOutReady() {
614 if (connectPromise != null) {
615
616 finishConnect();
617 } else if (!socket.isOutputShutdown()) {
618
619 super.flush0();
620 }
621 }
622
623 protected final void clearEpollIn0() {
624 assert eventLoop().inEventLoop();
625 try {
626 readPending = false;
627 if (!ops.contains(EpollIoOps.EPOLLIN)) {
628 return;
629 }
630 ops = ops.without(EpollIoOps.EPOLLIN);
631 IoRegistration registration = registration();
632 registration.submit(ops);
633 } catch (UncheckedIOException e) {
634
635
636 pipeline().fireExceptionCaught(e);
637 unsafe().close(unsafe().voidPromise());
638 }
639 }
640
641 @Override
642 public void connect(
643 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
644
645
646 if (promise.isDone() || !ensureOpen(promise)) {
647 return;
648 }
649
650 try {
651 if (connectPromise != null) {
652 throw new ConnectionPendingException();
653 }
654
655 boolean wasActive = isActive();
656 if (doConnect(remoteAddress, localAddress)) {
657 fulfillConnectPromise(promise, wasActive);
658 } else {
659 connectPromise = promise;
660 requestedRemoteAddress = remoteAddress;
661
662
663 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
664 if (connectTimeoutMillis > 0) {
665 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
666 @Override
667 public void run() {
668 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
669 if (connectPromise != null && !connectPromise.isDone()
670 && connectPromise.tryFailure(new ConnectTimeoutException(
671 "connection timed out after " + connectTimeoutMillis + " ms: " +
672 remoteAddress))) {
673 close(voidPromise());
674 }
675 }
676 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
677 }
678
679 promise.addListener(new ChannelFutureListener() {
680 @Override
681 public void operationComplete(ChannelFuture future) {
682
683
684 if (future.isCancelled()) {
685 if (connectTimeoutFuture != null) {
686 connectTimeoutFuture.cancel(false);
687 }
688 connectPromise = null;
689 close(voidPromise());
690 }
691 }
692 });
693 }
694 } catch (Throwable t) {
695 closeIfClosed();
696 promise.tryFailure(annotateConnectException(t, remoteAddress));
697 }
698 }
699
700 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
701 if (promise == null) {
702
703 return;
704 }
705 active = true;
706
707
708 submitCurrentOps();
709
710
711
712 boolean active = isActive();
713
714
715 boolean promiseSet = promise.trySuccess();
716
717
718
719 if (!wasActive && active) {
720 pipeline().fireChannelActive();
721 }
722
723
724 if (!promiseSet) {
725 close(voidPromise());
726 }
727 }
728
729 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
730 if (promise == null) {
731
732 return;
733 }
734
735
736 promise.tryFailure(cause);
737 closeIfClosed();
738 }
739
740 private void finishConnect() {
741
742
743
744 assert eventLoop().inEventLoop();
745
746 boolean connectStillInProgress = false;
747 try {
748 boolean wasActive = isActive();
749 if (!doFinishConnect()) {
750 connectStillInProgress = true;
751 return;
752 }
753 fulfillConnectPromise(connectPromise, wasActive);
754 } catch (Throwable t) {
755 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
756 } finally {
757 if (!connectStillInProgress) {
758
759
760 if (connectTimeoutFuture != null) {
761 connectTimeoutFuture.cancel(false);
762 }
763 connectPromise = null;
764 }
765 }
766 }
767
768
769
770
771 private boolean doFinishConnect() throws Exception {
772 if (socket.finishConnect()) {
773 clearFlag(Native.EPOLLOUT);
774 if (requestedRemoteAddress instanceof InetSocketAddress) {
775 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
776 }
777 requestedRemoteAddress = null;
778
779 return true;
780 }
781 setFlag(Native.EPOLLOUT);
782 return false;
783 }
784 }
785
786 @Override
787 protected void doBind(SocketAddress local) throws Exception {
788 if (local instanceof InetSocketAddress) {
789 checkResolvable((InetSocketAddress) local);
790 }
791 socket.bind(local);
792 this.local = socket.localAddress();
793 }
794
795
796
797
798 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
799 if (localAddress instanceof InetSocketAddress) {
800 checkResolvable((InetSocketAddress) localAddress);
801 }
802
803 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
804 ? (InetSocketAddress) remoteAddress : null;
805 if (remoteSocketAddr != null) {
806 checkResolvable(remoteSocketAddr);
807 }
808
809 if (remote != null) {
810
811
812
813 throw new AlreadyConnectedException();
814 }
815
816 if (localAddress != null) {
817 socket.bind(localAddress);
818 }
819
820 boolean connected = doConnect0(remoteAddress);
821 if (connected) {
822 remote = remoteSocketAddr == null ?
823 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
824 }
825
826
827
828 local = socket.localAddress();
829 return connected;
830 }
831
832 boolean doConnect0(SocketAddress remote) throws Exception {
833 boolean success = false;
834 try {
835 boolean connected = socket.connect(remote);
836 if (!connected) {
837 setFlag(Native.EPOLLOUT);
838 }
839 success = true;
840 return connected;
841 } finally {
842 if (!success) {
843 doClose();
844 }
845 }
846 }
847
848 final void submitCurrentOps() {
849 IoRegistration registration = registration();
850 registration.submit(ops);
851 }
852
853 @Override
854 protected SocketAddress localAddress0() {
855 return local;
856 }
857
858 @Override
859 protected SocketAddress remoteAddress0() {
860 return remote;
861 }
862 }