1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.IovArray;
42 import io.netty.channel.unix.Socket;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46
47 import java.io.IOException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ClosedChannelException;
53 import java.nio.channels.ConnectionPendingException;
54 import java.nio.channels.NotYetConnectedException;
55 import java.nio.channels.UnresolvedAddressException;
56 import java.util.concurrent.TimeUnit;
57
58 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
59 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
60 import static io.netty.util.internal.ObjectUtil.checkNotNull;
61
62 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64 protected final LinuxSocket socket;
65
66
67
68
69 private ChannelPromise connectPromise;
70 private Future<?> connectTimeoutFuture;
71 private SocketAddress requestedRemoteAddress;
72 private volatile SocketAddress local;
73 private volatile SocketAddress remote;
74
75 private EpollIoRegistration registration;
76 boolean inputClosedSeenErrorOnRead;
77 boolean epollInReadyRunnablePending;
78 private EpollIoOps ops;
79 private EpollIoOps inital;
80
81 protected volatile boolean active;
82
83 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
84 super(parent);
85 this.socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 this.local = fd.localAddress();
91 this.remote = fd.remoteAddress();
92 }
93 this.ops = initialOps;
94 }
95
96 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
97 super(parent);
98 this.socket = checkNotNull(fd, "fd");
99 this.active = true;
100
101
102 this.remote = remote;
103 this.local = fd.localAddress();
104 this.ops = initialOps;
105 }
106
107 static boolean isSoErrorZero(Socket fd) {
108 try {
109 return fd.getSoError() == 0;
110 } catch (IOException e) {
111 throw new ChannelException(e);
112 }
113 }
114
115 protected void setFlag(int flag) throws IOException {
116 ops = ops.with(EpollIoOps.valueOf(flag));
117 if (isRegistered()) {
118 EpollIoRegistration registration = registration();
119 try {
120 registration.submit(ops);
121 } catch (IOException e) {
122 throw e;
123 } catch (Exception e) {
124 throw new IllegalStateException(e);
125 }
126 } else {
127 ops = ops.with(EpollIoOps.valueOf(flag));
128 }
129 }
130
131 void clearFlag(int flag) throws IOException {
132 EpollIoRegistration registration = registration();
133 ops = ops.without(EpollIoOps.valueOf(flag));
134 try {
135 registration.submit(ops);
136 } catch (IOException e) {
137 throw e;
138 } catch (Exception e) {
139 throw new IllegalStateException(e);
140 }
141 }
142
143 protected final EpollIoRegistration registration() {
144 assert registration != null;
145 return registration;
146 }
147
148 boolean isFlagSet(int flag) {
149 return (ops.value & flag) != 0;
150 }
151
152 @Override
153 public final FileDescriptor fd() {
154 return socket;
155 }
156
157 @Override
158 public abstract EpollChannelConfig config();
159
160 @Override
161 public boolean isActive() {
162 return active;
163 }
164
165 @Override
166 public ChannelMetadata metadata() {
167 return METADATA;
168 }
169
170 @Override
171 protected void doClose() throws Exception {
172 active = false;
173
174
175 inputClosedSeenErrorOnRead = true;
176 try {
177 ChannelPromise promise = connectPromise;
178 if (promise != null) {
179
180 promise.tryFailure(new ClosedChannelException());
181 connectPromise = null;
182 }
183
184 Future<?> future = connectTimeoutFuture;
185 if (future != null) {
186 future.cancel(false);
187 connectTimeoutFuture = null;
188 }
189
190 if (isRegistered()) {
191
192
193
194
195 EventLoop loop = eventLoop();
196 if (loop.inEventLoop()) {
197 doDeregister();
198 } else {
199 loop.execute(new Runnable() {
200 @Override
201 public void run() {
202 try {
203 doDeregister();
204 } catch (Throwable cause) {
205 pipeline().fireExceptionCaught(cause);
206 }
207 }
208 });
209 }
210 }
211 } finally {
212 socket.close();
213 }
214 }
215
216 void resetCachedAddresses() {
217 local = socket.localAddress();
218 remote = socket.remoteAddress();
219 }
220
221 @Override
222 protected void doDisconnect() throws Exception {
223 doClose();
224 }
225
226 @Override
227 public boolean isOpen() {
228 return socket.isOpen();
229 }
230
231 @Override
232 protected void doDeregister() throws Exception {
233 EpollIoRegistration registration = this.registration;
234 if (registration != null) {
235 ops = inital;
236 registration.cancel();
237 }
238 }
239
240 @Override
241 protected boolean isCompatible(EventLoop loop) {
242 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
243 }
244
245 @Override
246 protected void doBeginRead() throws Exception {
247
248 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
249 unsafe.readPending = true;
250
251
252
253
254 setFlag(Native.EPOLLIN);
255 }
256
257 final boolean shouldBreakEpollInReady(ChannelConfig config) {
258 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
259 }
260
261 private static boolean isAllowHalfClosure(ChannelConfig config) {
262 if (config instanceof EpollDomainSocketChannelConfig) {
263 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
264 }
265 return config instanceof SocketChannelConfig &&
266 ((SocketChannelConfig) config).isAllowHalfClosure();
267 }
268
269 final void clearEpollIn() {
270
271 if (isRegistered()) {
272 final EventLoop loop = eventLoop();
273 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
274 if (loop.inEventLoop()) {
275 unsafe.clearEpollIn0();
276 } else {
277
278 loop.execute(new Runnable() {
279 @Override
280 public void run() {
281 if (!unsafe.readPending && !config().isAutoRead()) {
282
283 unsafe.clearEpollIn0();
284 }
285 }
286 });
287 }
288 } else {
289
290
291 ops = ops.without(EpollIoOps.EPOLLIN);
292 }
293 }
294
295 @Override
296 protected void doRegister(ChannelPromise promise) {
297
298
299
300 epollInReadyRunnablePending = false;
301 ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
302 if (f.isSuccess()) {
303 registration = (EpollIoRegistration) f.getNow();
304 registration.submit(ops);
305 inital = ops;
306 promise.setSuccess();
307 } else {
308 promise.setFailure(f.cause());
309 }
310 });
311 }
312
313 @Override
314 protected abstract AbstractEpollUnsafe newUnsafe();
315
316
317
318
319 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
320 return newDirectBuffer(buf, buf);
321 }
322
323
324
325
326
327
328 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
329 final int readableBytes = buf.readableBytes();
330 if (readableBytes == 0) {
331 ReferenceCountUtil.release(holder);
332 return Unpooled.EMPTY_BUFFER;
333 }
334
335 final ByteBufAllocator alloc = alloc();
336 if (alloc.isDirectBufferPooled()) {
337 return newDirectBuffer0(holder, buf, alloc, readableBytes);
338 }
339
340 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
341 if (directBuf == null) {
342 return newDirectBuffer0(holder, buf, alloc, readableBytes);
343 }
344
345 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
346 ReferenceCountUtil.safeRelease(holder);
347 return directBuf;
348 }
349
350 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
351 final ByteBuf directBuf = alloc.directBuffer(capacity);
352 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
353 ReferenceCountUtil.safeRelease(holder);
354 return directBuf;
355 }
356
357 protected static void checkResolvable(InetSocketAddress addr) {
358 if (addr.isUnresolved()) {
359 throw new UnresolvedAddressException();
360 }
361 }
362
363
364
365
366 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
367 int writerIndex = byteBuf.writerIndex();
368 int localReadAmount;
369 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
370 if (byteBuf.hasMemoryAddress()) {
371 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
372 } else {
373 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
374 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
375 }
376 if (localReadAmount > 0) {
377 byteBuf.writerIndex(writerIndex + localReadAmount);
378 }
379 return localReadAmount;
380 }
381
382 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
383 if (buf.hasMemoryAddress()) {
384 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
385 if (localFlushedAmount > 0) {
386 in.removeBytes(localFlushedAmount);
387 return 1;
388 }
389 } else {
390 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
391 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
392 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
393 if (localFlushedAmount > 0) {
394 nioBuf.position(nioBuf.position() + localFlushedAmount);
395 in.removeBytes(localFlushedAmount);
396 return 1;
397 }
398 }
399 return WRITE_STATUS_SNDBUF_FULL;
400 }
401
402
403
404
405
406 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
407 throws IOException {
408 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
409 if (data.hasMemoryAddress()) {
410 long memoryAddress = data.memoryAddress();
411 if (remoteAddress == null) {
412 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
413 }
414 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
415 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
416 }
417
418 if (data.nioBufferCount() > 1) {
419 IovArray array = registration().ioHandler().cleanIovArray();
420 array.add(data, data.readerIndex(), data.readableBytes());
421 int cnt = array.count();
422 assert cnt != 0;
423
424 if (remoteAddress == null) {
425 return socket.writevAddresses(array.memoryAddress(0), cnt);
426 }
427 return socket.sendToAddresses(array.memoryAddress(0), cnt,
428 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
429 }
430
431 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
432 if (remoteAddress == null) {
433 return socket.send(nioData, nioData.position(), nioData.limit());
434 }
435 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
436 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
437 }
438
439 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
440 boolean readPending;
441 private EpollRecvByteAllocatorHandle allocHandle;
442 private final Runnable epollInReadyRunnable = new Runnable() {
443 @Override
444 public void run() {
445 epollInReadyRunnablePending = false;
446 epollInReady();
447 }
448 };
449
450 Channel channel() {
451 return AbstractEpollChannel.this;
452 }
453
454 @Override
455 public FileDescriptor fd() {
456 return AbstractEpollChannel.this.fd();
457 }
458
459 @Override
460 public void close() {
461 close(voidPromise());
462 }
463
464 @Override
465 public void handle(IoRegistration registration, IoEvent event) {
466 EpollIoEvent epollEvent = (EpollIoEvent) event;
467 EpollIoOps epollOps = epollEvent.ops();
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLOUT)) {
483
484 epollOutReady();
485 }
486
487
488
489
490
491
492 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLIN)) {
493
494 epollInReady();
495 }
496
497
498
499
500 if (epollOps.contains(EpollIoOps.EPOLLRDHUP)) {
501 epollRdHupReady();
502 }
503 }
504
505
506
507
508 abstract void epollInReady();
509
510 final boolean shouldStopReading(ChannelConfig config) {
511
512
513
514
515
516
517 return !readPending && !config.isAutoRead();
518 }
519
520
521
522
523 final void epollRdHupReady() {
524
525 recvBufAllocHandle().receivedRdHup();
526
527 if (isActive()) {
528
529
530
531 epollInReady();
532 } else {
533
534 shutdownInput(true);
535 }
536
537
538 clearEpollRdHup();
539 }
540
541
542
543
544 private void clearEpollRdHup() {
545 try {
546 clearFlag(Native.EPOLLRDHUP);
547 } catch (IOException e) {
548 pipeline().fireExceptionCaught(e);
549 close(voidPromise());
550 }
551 }
552
553
554
555
556 void shutdownInput(boolean rdHup) {
557 if (!socket.isInputShutdown()) {
558 if (isAllowHalfClosure(config())) {
559 try {
560 socket.shutdown(true, false);
561 } catch (IOException ignored) {
562
563
564 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
565 return;
566 } catch (NotYetConnectedException ignore) {
567
568
569 }
570 if (shouldStopReading(config())) {
571 clearEpollIn0();
572 }
573 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
574 } else {
575 close(voidPromise());
576 }
577 } else if (!rdHup && !inputClosedSeenErrorOnRead) {
578 inputClosedSeenErrorOnRead = true;
579 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
580 }
581 }
582
583 private void fireEventAndClose(Object evt) {
584 pipeline().fireUserEventTriggered(evt);
585 close(voidPromise());
586 }
587
588 @Override
589 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
590 if (allocHandle == null) {
591 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
592 }
593 return allocHandle;
594 }
595
596
597
598
599
600 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
601 return new EpollRecvByteAllocatorHandle(handle);
602 }
603
604 @Override
605 protected final void flush0() {
606
607
608
609 if (!isFlagSet(Native.EPOLLOUT)) {
610 super.flush0();
611 }
612 }
613
614
615
616
617 final void epollOutReady() {
618 if (connectPromise != null) {
619
620 finishConnect();
621 } else if (!socket.isOutputShutdown()) {
622
623 super.flush0();
624 }
625 }
626
627 protected final void clearEpollIn0() {
628 assert eventLoop().inEventLoop();
629 try {
630 readPending = false;
631 ops = ops.without(EpollIoOps.EPOLLIN);
632 EpollIoRegistration registration = registration();
633 registration.submit(ops);
634 } catch (Exception e) {
635
636
637 pipeline().fireExceptionCaught(e);
638 unsafe().close(unsafe().voidPromise());
639 }
640 }
641
642 @Override
643 public void connect(
644 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
645
646
647 if (promise.isDone() || !ensureOpen(promise)) {
648 return;
649 }
650
651 try {
652 if (connectPromise != null) {
653 throw new ConnectionPendingException();
654 }
655
656 boolean wasActive = isActive();
657 if (doConnect(remoteAddress, localAddress)) {
658 fulfillConnectPromise(promise, wasActive);
659 } else {
660 connectPromise = promise;
661 requestedRemoteAddress = remoteAddress;
662
663
664 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
665 if (connectTimeoutMillis > 0) {
666 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
667 @Override
668 public void run() {
669 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
670 if (connectPromise != null && !connectPromise.isDone()
671 && connectPromise.tryFailure(new ConnectTimeoutException(
672 "connection timed out after " + connectTimeoutMillis + " ms: " +
673 remoteAddress))) {
674 close(voidPromise());
675 }
676 }
677 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
678 }
679
680 promise.addListener(new ChannelFutureListener() {
681 @Override
682 public void operationComplete(ChannelFuture future) {
683
684
685 if (future.isCancelled()) {
686 if (connectTimeoutFuture != null) {
687 connectTimeoutFuture.cancel(false);
688 }
689 connectPromise = null;
690 close(voidPromise());
691 }
692 }
693 });
694 }
695 } catch (Throwable t) {
696 closeIfClosed();
697 promise.tryFailure(annotateConnectException(t, remoteAddress));
698 }
699 }
700
701 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
702 if (promise == null) {
703
704 return;
705 }
706 active = true;
707
708
709
710 boolean active = isActive();
711
712
713 boolean promiseSet = promise.trySuccess();
714
715
716
717 if (!wasActive && active) {
718 pipeline().fireChannelActive();
719 }
720
721
722 if (!promiseSet) {
723 close(voidPromise());
724 }
725 }
726
727 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
728 if (promise == null) {
729
730 return;
731 }
732
733
734 promise.tryFailure(cause);
735 closeIfClosed();
736 }
737
738 private void finishConnect() {
739
740
741
742 assert eventLoop().inEventLoop();
743
744 boolean connectStillInProgress = false;
745 try {
746 boolean wasActive = isActive();
747 if (!doFinishConnect()) {
748 connectStillInProgress = true;
749 return;
750 }
751 fulfillConnectPromise(connectPromise, wasActive);
752 } catch (Throwable t) {
753 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
754 } finally {
755 if (!connectStillInProgress) {
756
757
758 if (connectTimeoutFuture != null) {
759 connectTimeoutFuture.cancel(false);
760 }
761 connectPromise = null;
762 }
763 }
764 }
765
766
767
768
769 private boolean doFinishConnect() throws Exception {
770 if (socket.finishConnect()) {
771 clearFlag(Native.EPOLLOUT);
772 if (requestedRemoteAddress instanceof InetSocketAddress) {
773 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
774 }
775 requestedRemoteAddress = null;
776
777 return true;
778 }
779 setFlag(Native.EPOLLOUT);
780 return false;
781 }
782 }
783
784 @Override
785 protected void doBind(SocketAddress local) throws Exception {
786 if (local instanceof InetSocketAddress) {
787 checkResolvable((InetSocketAddress) local);
788 }
789 socket.bind(local);
790 this.local = socket.localAddress();
791 }
792
793
794
795
796 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
797 if (localAddress instanceof InetSocketAddress) {
798 checkResolvable((InetSocketAddress) localAddress);
799 }
800
801 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
802 ? (InetSocketAddress) remoteAddress : null;
803 if (remoteSocketAddr != null) {
804 checkResolvable(remoteSocketAddr);
805 }
806
807 if (remote != null) {
808
809
810
811 throw new AlreadyConnectedException();
812 }
813
814 if (localAddress != null) {
815 socket.bind(localAddress);
816 }
817
818 boolean connected = doConnect0(remoteAddress);
819 if (connected) {
820 remote = remoteSocketAddr == null ?
821 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
822 }
823
824
825
826 local = socket.localAddress();
827 return connected;
828 }
829
830 boolean doConnect0(SocketAddress remote) throws Exception {
831 boolean success = false;
832 try {
833 boolean connected = socket.connect(remote);
834 if (!connected) {
835 setFlag(Native.EPOLLOUT);
836 }
837 success = true;
838 return connected;
839 } finally {
840 if (!success) {
841 doClose();
842 }
843 }
844 }
845
846 @Override
847 protected SocketAddress localAddress0() {
848 return local;
849 }
850
851 @Override
852 protected SocketAddress remoteAddress0() {
853 return remote;
854 }
855 }