1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.IovArray;
42 import io.netty.channel.unix.Socket;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46
47 import java.io.IOException;
48 import java.net.InetSocketAddress;
49 import java.net.SocketAddress;
50 import java.nio.ByteBuffer;
51 import java.nio.channels.AlreadyConnectedException;
52 import java.nio.channels.ClosedChannelException;
53 import java.nio.channels.ConnectionPendingException;
54 import java.nio.channels.NotYetConnectedException;
55 import java.nio.channels.UnresolvedAddressException;
56 import java.util.concurrent.TimeUnit;
57
58 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
59 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
60 import static io.netty.util.internal.ObjectUtil.checkNotNull;
61
62 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
63 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
64 protected final LinuxSocket socket;
65
66
67
68
69 private ChannelPromise connectPromise;
70 private Future<?> connectTimeoutFuture;
71 private SocketAddress requestedRemoteAddress;
72 private volatile SocketAddress local;
73 private volatile SocketAddress remote;
74
75 private EpollIoRegistration registration;
76 boolean inputClosedSeenErrorOnRead;
77 private EpollIoOps ops;
78 private EpollIoOps inital;
79
80 protected volatile boolean active;
81
82 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
83 super(parent);
84 this.socket = checkNotNull(fd, "fd");
85 this.active = active;
86 if (active) {
87
88
89 this.local = fd.localAddress();
90 this.remote = fd.remoteAddress();
91 }
92 this.ops = initialOps;
93 }
94
95 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
96 super(parent);
97 this.socket = checkNotNull(fd, "fd");
98 this.active = true;
99
100
101 this.remote = remote;
102 this.local = fd.localAddress();
103 this.ops = initialOps;
104 }
105
106 static boolean isSoErrorZero(Socket fd) {
107 try {
108 return fd.getSoError() == 0;
109 } catch (IOException e) {
110 throw new ChannelException(e);
111 }
112 }
113
114 protected void setFlag(int flag) throws IOException {
115 ops = ops.with(EpollIoOps.valueOf(flag));
116 if (isRegistered()) {
117 EpollIoRegistration registration = registration();
118 try {
119 registration.submit(ops);
120 } catch (IOException e) {
121 throw e;
122 } catch (Exception e) {
123 throw new IllegalStateException(e);
124 }
125 } else {
126 ops = ops.with(EpollIoOps.valueOf(flag));
127 }
128 }
129
130 void clearFlag(int flag) throws IOException {
131 EpollIoRegistration registration = registration();
132 ops = ops.without(EpollIoOps.valueOf(flag));
133 try {
134 registration.submit(ops);
135 } catch (IOException e) {
136 throw e;
137 } catch (Exception e) {
138 throw new IllegalStateException(e);
139 }
140 }
141
142 protected final EpollIoRegistration registration() {
143 assert registration != null;
144 return registration;
145 }
146
147 boolean isFlagSet(int flag) {
148 return (ops.value & flag) != 0;
149 }
150
151 @Override
152 public final FileDescriptor fd() {
153 return socket;
154 }
155
156 @Override
157 public abstract EpollChannelConfig config();
158
159 @Override
160 public boolean isActive() {
161 return active;
162 }
163
164 @Override
165 public ChannelMetadata metadata() {
166 return METADATA;
167 }
168
169 @Override
170 protected void doClose() throws Exception {
171 active = false;
172
173
174 inputClosedSeenErrorOnRead = true;
175 try {
176 ChannelPromise promise = connectPromise;
177 if (promise != null) {
178
179 promise.tryFailure(new ClosedChannelException());
180 connectPromise = null;
181 }
182
183 Future<?> future = connectTimeoutFuture;
184 if (future != null) {
185 future.cancel(false);
186 connectTimeoutFuture = null;
187 }
188
189 if (isRegistered()) {
190
191
192
193
194 EventLoop loop = eventLoop();
195 if (loop.inEventLoop()) {
196 doDeregister();
197 } else {
198 loop.execute(new Runnable() {
199 @Override
200 public void run() {
201 try {
202 doDeregister();
203 } catch (Throwable cause) {
204 pipeline().fireExceptionCaught(cause);
205 }
206 }
207 });
208 }
209 }
210 } finally {
211 socket.close();
212 }
213 }
214
215 void resetCachedAddresses() {
216 local = socket.localAddress();
217 remote = socket.remoteAddress();
218 }
219
220 @Override
221 protected void doDisconnect() throws Exception {
222 doClose();
223 }
224
225 @Override
226 public boolean isOpen() {
227 return socket.isOpen();
228 }
229
230 @Override
231 protected void doDeregister() throws Exception {
232 EpollIoRegistration registration = this.registration;
233 if (registration != null) {
234 ops = inital;
235 registration.cancel();
236 }
237 }
238
239 @Override
240 protected boolean isCompatible(EventLoop loop) {
241 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
242 }
243
244 @Override
245 protected void doBeginRead() throws Exception {
246
247 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
248 unsafe.readPending = true;
249
250
251
252
253 setFlag(Native.EPOLLIN);
254 }
255
256 final boolean shouldBreakEpollInReady(ChannelConfig config) {
257 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
258 }
259
260 private static boolean isAllowHalfClosure(ChannelConfig config) {
261 if (config instanceof EpollDomainSocketChannelConfig) {
262 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
263 }
264 return config instanceof SocketChannelConfig &&
265 ((SocketChannelConfig) config).isAllowHalfClosure();
266 }
267
268 final void clearEpollIn() {
269
270 if (isRegistered()) {
271 final EventLoop loop = eventLoop();
272 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
273 if (loop.inEventLoop()) {
274 unsafe.clearEpollIn0();
275 } else {
276
277 loop.execute(new Runnable() {
278 @Override
279 public void run() {
280 if (!unsafe.readPending && !config().isAutoRead()) {
281
282 unsafe.clearEpollIn0();
283 }
284 }
285 });
286 }
287 } else {
288
289
290 ops = ops.without(EpollIoOps.EPOLLIN);
291 }
292 }
293
294 @Override
295 protected void doRegister(ChannelPromise promise) {
296 ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
297 if (f.isSuccess()) {
298 registration = (EpollIoRegistration) f.getNow();
299 registration.submit(ops);
300 inital = ops;
301 promise.setSuccess();
302 } else {
303 promise.setFailure(f.cause());
304 }
305 });
306 }
307
308 @Override
309 protected abstract AbstractEpollUnsafe newUnsafe();
310
311
312
313
314 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
315 return newDirectBuffer(buf, buf);
316 }
317
318
319
320
321
322
323 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
324 final int readableBytes = buf.readableBytes();
325 if (readableBytes == 0) {
326 ReferenceCountUtil.release(holder);
327 return Unpooled.EMPTY_BUFFER;
328 }
329
330 final ByteBufAllocator alloc = alloc();
331 if (alloc.isDirectBufferPooled()) {
332 return newDirectBuffer0(holder, buf, alloc, readableBytes);
333 }
334
335 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
336 if (directBuf == null) {
337 return newDirectBuffer0(holder, buf, alloc, readableBytes);
338 }
339
340 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
341 ReferenceCountUtil.safeRelease(holder);
342 return directBuf;
343 }
344
345 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
346 final ByteBuf directBuf = alloc.directBuffer(capacity);
347 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
348 ReferenceCountUtil.safeRelease(holder);
349 return directBuf;
350 }
351
352 protected static void checkResolvable(InetSocketAddress addr) {
353 if (addr.isUnresolved()) {
354 throw new UnresolvedAddressException();
355 }
356 }
357
358
359
360
361 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
362 int writerIndex = byteBuf.writerIndex();
363 int localReadAmount;
364 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
365 if (byteBuf.hasMemoryAddress()) {
366 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
367 } else {
368 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
369 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
370 }
371 if (localReadAmount > 0) {
372 byteBuf.writerIndex(writerIndex + localReadAmount);
373 }
374 return localReadAmount;
375 }
376
377 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
378 if (buf.hasMemoryAddress()) {
379 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
380 if (localFlushedAmount > 0) {
381 in.removeBytes(localFlushedAmount);
382 return 1;
383 }
384 } else {
385 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
386 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
387 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
388 if (localFlushedAmount > 0) {
389 nioBuf.position(nioBuf.position() + localFlushedAmount);
390 in.removeBytes(localFlushedAmount);
391 return 1;
392 }
393 }
394 return WRITE_STATUS_SNDBUF_FULL;
395 }
396
397
398
399
400
401 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
402 throws IOException {
403 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
404 if (data.hasMemoryAddress()) {
405 long memoryAddress = data.memoryAddress();
406 if (remoteAddress == null) {
407 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
408 }
409 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
410 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
411 }
412
413 if (data.nioBufferCount() > 1) {
414 IovArray array = registration().ioHandler().cleanIovArray();
415 array.add(data, data.readerIndex(), data.readableBytes());
416 int cnt = array.count();
417 assert cnt != 0;
418
419 if (remoteAddress == null) {
420 return socket.writevAddresses(array.memoryAddress(0), cnt);
421 }
422 return socket.sendToAddresses(array.memoryAddress(0), cnt,
423 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
424 }
425
426 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
427 if (remoteAddress == null) {
428 return socket.send(nioData, nioData.position(), nioData.limit());
429 }
430 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
431 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
432 }
433
434 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
435 boolean readPending;
436 private EpollRecvByteAllocatorHandle allocHandle;
437
438 Channel channel() {
439 return AbstractEpollChannel.this;
440 }
441
442 @Override
443 public FileDescriptor fd() {
444 return AbstractEpollChannel.this.fd();
445 }
446
447 @Override
448 public void close() {
449 close(voidPromise());
450 }
451
452 @Override
453 public void handle(IoRegistration registration, IoEvent event) {
454 EpollIoEvent epollEvent = (EpollIoEvent) event;
455 EpollIoOps epollOps = epollEvent.ops();
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLOUT)) {
471
472 epollOutReady();
473 }
474
475
476
477
478
479
480 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLIN)) {
481
482 epollInReady();
483 }
484
485
486
487
488 if (epollOps.contains(EpollIoOps.EPOLLRDHUP)) {
489 epollRdHupReady();
490 }
491 }
492
493
494
495
496 abstract void epollInReady();
497
498 final boolean shouldStopReading(ChannelConfig config) {
499
500
501
502
503
504
505 return !readPending && !config.isAutoRead();
506 }
507
508
509
510
511 final void epollRdHupReady() {
512
513 recvBufAllocHandle().receivedRdHup();
514
515 if (isActive()) {
516
517
518
519 epollInReady();
520 } else {
521
522 shutdownInput(false);
523 }
524
525
526 clearEpollRdHup();
527 }
528
529
530
531
532 private void clearEpollRdHup() {
533 try {
534 clearFlag(Native.EPOLLRDHUP);
535 } catch (IOException e) {
536 pipeline().fireExceptionCaught(e);
537 close(voidPromise());
538 }
539 }
540
541
542
543
544 void shutdownInput(boolean allDataRead) {
545 if (!socket.isInputShutdown()) {
546 if (isAllowHalfClosure(config())) {
547 try {
548 socket.shutdown(true, false);
549 } catch (IOException ignored) {
550
551
552 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
553 return;
554 } catch (NotYetConnectedException ignore) {
555
556
557 }
558 if (shouldStopReading(config())) {
559 clearEpollIn0();
560 }
561 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
562 } else {
563 close(voidPromise());
564 return;
565 }
566 }
567
568 if (allDataRead && !inputClosedSeenErrorOnRead) {
569 inputClosedSeenErrorOnRead = true;
570 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
571 }
572 }
573
574 private void fireEventAndClose(Object evt) {
575 pipeline().fireUserEventTriggered(evt);
576 close(voidPromise());
577 }
578
579 @Override
580 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
581 if (allocHandle == null) {
582 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
583 }
584 return allocHandle;
585 }
586
587
588
589
590
591 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
592 return new EpollRecvByteAllocatorHandle(handle);
593 }
594
595 @Override
596 protected final void flush0() {
597
598
599
600 if (!isFlagSet(Native.EPOLLOUT)) {
601 super.flush0();
602 }
603 }
604
605
606
607
608 final void epollOutReady() {
609 if (connectPromise != null) {
610
611 finishConnect();
612 } else if (!socket.isOutputShutdown()) {
613
614 super.flush0();
615 }
616 }
617
618 protected final void clearEpollIn0() {
619 assert eventLoop().inEventLoop();
620 try {
621 readPending = false;
622 ops = ops.without(EpollIoOps.EPOLLIN);
623 EpollIoRegistration registration = registration();
624 registration.submit(ops);
625 } catch (Exception e) {
626
627
628 pipeline().fireExceptionCaught(e);
629 unsafe().close(unsafe().voidPromise());
630 }
631 }
632
633 @Override
634 public void connect(
635 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
636
637
638 if (promise.isDone() || !ensureOpen(promise)) {
639 return;
640 }
641
642 try {
643 if (connectPromise != null) {
644 throw new ConnectionPendingException();
645 }
646
647 boolean wasActive = isActive();
648 if (doConnect(remoteAddress, localAddress)) {
649 fulfillConnectPromise(promise, wasActive);
650 } else {
651 connectPromise = promise;
652 requestedRemoteAddress = remoteAddress;
653
654
655 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
656 if (connectTimeoutMillis > 0) {
657 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
658 @Override
659 public void run() {
660 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
661 if (connectPromise != null && !connectPromise.isDone()
662 && connectPromise.tryFailure(new ConnectTimeoutException(
663 "connection timed out after " + connectTimeoutMillis + " ms: " +
664 remoteAddress))) {
665 close(voidPromise());
666 }
667 }
668 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
669 }
670
671 promise.addListener(new ChannelFutureListener() {
672 @Override
673 public void operationComplete(ChannelFuture future) {
674
675
676 if (future.isCancelled()) {
677 if (connectTimeoutFuture != null) {
678 connectTimeoutFuture.cancel(false);
679 }
680 connectPromise = null;
681 close(voidPromise());
682 }
683 }
684 });
685 }
686 } catch (Throwable t) {
687 closeIfClosed();
688 promise.tryFailure(annotateConnectException(t, remoteAddress));
689 }
690 }
691
692 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
693 if (promise == null) {
694
695 return;
696 }
697 active = true;
698
699
700
701 boolean active = isActive();
702
703
704 boolean promiseSet = promise.trySuccess();
705
706
707
708 if (!wasActive && active) {
709 pipeline().fireChannelActive();
710 }
711
712
713 if (!promiseSet) {
714 close(voidPromise());
715 }
716 }
717
718 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
719 if (promise == null) {
720
721 return;
722 }
723
724
725 promise.tryFailure(cause);
726 closeIfClosed();
727 }
728
729 private void finishConnect() {
730
731
732
733 assert eventLoop().inEventLoop();
734
735 boolean connectStillInProgress = false;
736 try {
737 boolean wasActive = isActive();
738 if (!doFinishConnect()) {
739 connectStillInProgress = true;
740 return;
741 }
742 fulfillConnectPromise(connectPromise, wasActive);
743 } catch (Throwable t) {
744 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
745 } finally {
746 if (!connectStillInProgress) {
747
748
749 if (connectTimeoutFuture != null) {
750 connectTimeoutFuture.cancel(false);
751 }
752 connectPromise = null;
753 }
754 }
755 }
756
757
758
759
760 private boolean doFinishConnect() throws Exception {
761 if (socket.finishConnect()) {
762 clearFlag(Native.EPOLLOUT);
763 if (requestedRemoteAddress instanceof InetSocketAddress) {
764 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
765 }
766 requestedRemoteAddress = null;
767
768 return true;
769 }
770 setFlag(Native.EPOLLOUT);
771 return false;
772 }
773 }
774
775 @Override
776 protected void doBind(SocketAddress local) throws Exception {
777 if (local instanceof InetSocketAddress) {
778 checkResolvable((InetSocketAddress) local);
779 }
780 socket.bind(local);
781 this.local = socket.localAddress();
782 }
783
784
785
786
787 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
788 if (localAddress instanceof InetSocketAddress) {
789 checkResolvable((InetSocketAddress) localAddress);
790 }
791
792 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
793 ? (InetSocketAddress) remoteAddress : null;
794 if (remoteSocketAddr != null) {
795 checkResolvable(remoteSocketAddr);
796 }
797
798 if (remote != null) {
799
800
801
802 throw new AlreadyConnectedException();
803 }
804
805 if (localAddress != null) {
806 socket.bind(localAddress);
807 }
808
809 boolean connected = doConnect0(remoteAddress);
810 if (connected) {
811 remote = remoteSocketAddr == null ?
812 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
813 }
814
815
816
817 local = socket.localAddress();
818 return connected;
819 }
820
821 boolean doConnect0(SocketAddress remote) throws Exception {
822 boolean success = false;
823 try {
824 boolean connected = socket.connect(remote);
825 if (!connected) {
826 setFlag(Native.EPOLLOUT);
827 }
828 success = true;
829 return connected;
830 } finally {
831 if (!success) {
832 doClose();
833 }
834 }
835 }
836
837 @Override
838 protected SocketAddress localAddress0() {
839 return local;
840 }
841
842 @Override
843 protected SocketAddress remoteAddress0() {
844 return remote;
845 }
846 }