1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.IovArray;
42 import io.netty.channel.unix.Socket;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46
47 import java.io.IOException;
48 import java.io.UncheckedIOException;
49 import java.net.InetSocketAddress;
50 import java.net.SocketAddress;
51 import java.nio.ByteBuffer;
52 import java.nio.channels.AlreadyConnectedException;
53 import java.nio.channels.ClosedChannelException;
54 import java.nio.channels.ConnectionPendingException;
55 import java.nio.channels.NotYetConnectedException;
56 import java.nio.channels.UnresolvedAddressException;
57 import java.util.concurrent.TimeUnit;
58
59 import static io.netty.channel.epoll.EpollIoOps.EPOLL_ERR_IN_MASK;
60 import static io.netty.channel.epoll.EpollIoOps.EPOLL_ERR_OUT_MASK;
61 import static io.netty.channel.epoll.EpollIoOps.EPOLL_RDHUP_MASK;
62 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
63 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
64 import static io.netty.util.internal.ObjectUtil.checkNotNull;
65
66 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
67 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
68 protected final LinuxSocket socket;
69
70
71
72
73 private ChannelPromise connectPromise;
74 private Future<?> connectTimeoutFuture;
75 private SocketAddress requestedRemoteAddress;
76 private volatile SocketAddress local;
77 private volatile SocketAddress remote;
78
79 private IoRegistration registration;
80 boolean inputClosedSeenErrorOnRead;
81 private EpollIoOps ops;
82 private EpollIoOps inital;
83
84 protected volatile boolean active;
85
86 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
87 super(parent);
88 this.socket = checkNotNull(fd, "fd");
89 this.active = active;
90 if (active) {
91
92
93 this.local = fd.localAddress();
94 this.remote = fd.remoteAddress();
95 }
96 this.ops = initialOps;
97 }
98
99 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
100 super(parent);
101 this.socket = checkNotNull(fd, "fd");
102 this.active = true;
103
104
105 this.remote = remote;
106 this.local = fd.localAddress();
107 this.ops = initialOps;
108 }
109
110 static boolean isSoErrorZero(Socket fd) {
111 try {
112 return fd.getSoError() == 0;
113 } catch (IOException e) {
114 throw new ChannelException(e);
115 }
116 }
117
118 protected void setFlag(int flag) throws IOException {
119 if (ops.contains(flag)) {
120
121 return;
122 }
123 ops = ops.with(EpollIoOps.valueOf(flag));
124 if (isRegistered()) {
125 IoRegistration registration = registration();
126 registration.submit(ops);
127 } else {
128 ops = ops.with(EpollIoOps.valueOf(flag));
129 }
130 }
131
132 void clearFlag(int flag) throws IOException {
133 IoRegistration registration = registration();
134 if (!ops.contains(flag)) {
135
136 return;
137 }
138 ops = ops.without(EpollIoOps.valueOf(flag));
139 registration.submit(ops);
140 }
141
142 protected final IoRegistration registration() {
143 assert registration != null;
144 return registration;
145 }
146
147 boolean isFlagSet(int flag) {
148 return (ops.value & flag) != 0;
149 }
150
151 @Override
152 public final FileDescriptor fd() {
153 return socket;
154 }
155
156 @Override
157 public abstract EpollChannelConfig config();
158
159 @Override
160 public boolean isActive() {
161 return active;
162 }
163
164 @Override
165 public ChannelMetadata metadata() {
166 return METADATA;
167 }
168
169 @Override
170 protected void doClose() throws Exception {
171 active = false;
172
173
174 inputClosedSeenErrorOnRead = true;
175 try {
176 ChannelPromise promise = connectPromise;
177 if (promise != null) {
178
179 promise.tryFailure(new ClosedChannelException());
180 connectPromise = null;
181 }
182
183 Future<?> future = connectTimeoutFuture;
184 if (future != null) {
185 future.cancel(false);
186 connectTimeoutFuture = null;
187 }
188
189 if (isRegistered()) {
190
191
192
193
194 EventLoop loop = eventLoop();
195 if (loop.inEventLoop()) {
196 doDeregister();
197 } else {
198 loop.execute(new Runnable() {
199 @Override
200 public void run() {
201 try {
202 doDeregister();
203 } catch (Throwable cause) {
204 pipeline().fireExceptionCaught(cause);
205 }
206 }
207 });
208 }
209 }
210 } finally {
211 socket.close();
212 }
213 }
214
215 void resetCachedAddresses() {
216 local = socket.localAddress();
217 remote = socket.remoteAddress();
218 }
219
220 @Override
221 protected void doDisconnect() throws Exception {
222 doClose();
223 }
224
225 @Override
226 public boolean isOpen() {
227 return socket.isOpen();
228 }
229
230 @Override
231 protected void doDeregister() throws Exception {
232 IoRegistration registration = this.registration;
233 if (registration != null) {
234 ops = inital;
235 registration.cancel();
236 }
237 }
238
239 @Override
240 protected boolean isCompatible(EventLoop loop) {
241 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
242 }
243
244 @Override
245 protected void doBeginRead() throws Exception {
246
247 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
248 unsafe.readPending = true;
249
250
251
252
253 setFlag(Native.EPOLLIN);
254 }
255
256 final boolean shouldBreakEpollInReady(ChannelConfig config) {
257 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
258 }
259
260 private static boolean isAllowHalfClosure(ChannelConfig config) {
261 if (config instanceof EpollDomainSocketChannelConfig) {
262 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
263 }
264 return config instanceof SocketChannelConfig &&
265 ((SocketChannelConfig) config).isAllowHalfClosure();
266 }
267
268 final void clearEpollIn() {
269
270 if (isRegistered()) {
271 final EventLoop loop = eventLoop();
272 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
273 if (loop.inEventLoop()) {
274 unsafe.clearEpollIn0();
275 } else {
276
277 loop.execute(new Runnable() {
278 @Override
279 public void run() {
280 if (!unsafe.readPending && !config().isAutoRead()) {
281
282 unsafe.clearEpollIn0();
283 }
284 }
285 });
286 }
287 } else {
288
289
290 ops = ops.without(EpollIoOps.EPOLLIN);
291 }
292 }
293
294 @Override
295 protected void doRegister(ChannelPromise promise) {
296 ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
297 if (f.isSuccess()) {
298 registration = (IoRegistration) f.getNow();
299 registration.submit(ops);
300 inital = ops;
301 promise.setSuccess();
302 } else {
303 promise.setFailure(f.cause());
304 }
305 });
306 }
307
308 @Override
309 protected abstract AbstractEpollUnsafe newUnsafe();
310
311
312
313
314 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
315 return newDirectBuffer(buf, buf);
316 }
317
318
319
320
321
322
323 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
324 final int readableBytes = buf.readableBytes();
325 if (readableBytes == 0) {
326 ReferenceCountUtil.release(holder);
327 return Unpooled.EMPTY_BUFFER;
328 }
329
330 final ByteBufAllocator alloc = alloc();
331 if (alloc.isDirectBufferPooled()) {
332 return newDirectBuffer0(holder, buf, alloc, readableBytes);
333 }
334
335 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
336 if (directBuf == null) {
337 return newDirectBuffer0(holder, buf, alloc, readableBytes);
338 }
339
340 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
341 ReferenceCountUtil.safeRelease(holder);
342 return directBuf;
343 }
344
345 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
346 final ByteBuf directBuf = alloc.directBuffer(capacity);
347 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
348 ReferenceCountUtil.safeRelease(holder);
349 return directBuf;
350 }
351
352 protected static void checkResolvable(InetSocketAddress addr) {
353 if (addr.isUnresolved()) {
354 throw new UnresolvedAddressException();
355 }
356 }
357
358
359
360
361 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
362 int writerIndex = byteBuf.writerIndex();
363 int localReadAmount;
364 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
365 if (byteBuf.hasMemoryAddress()) {
366 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
367 } else {
368 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
369 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
370 }
371 if (localReadAmount > 0) {
372 byteBuf.writerIndex(writerIndex + localReadAmount);
373 }
374 return localReadAmount;
375 }
376
377 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
378 if (buf.hasMemoryAddress()) {
379 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
380 if (localFlushedAmount > 0) {
381 in.removeBytes(localFlushedAmount);
382 return 1;
383 }
384 } else {
385 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
386 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
387 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
388 if (localFlushedAmount > 0) {
389 nioBuf.position(nioBuf.position() + localFlushedAmount);
390 in.removeBytes(localFlushedAmount);
391 return 1;
392 }
393 }
394 return WRITE_STATUS_SNDBUF_FULL;
395 }
396
397
398
399
400
401 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
402 throws IOException {
403 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
404 if (data.hasMemoryAddress()) {
405 long memoryAddress = data.memoryAddress();
406 if (remoteAddress == null) {
407 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
408 }
409 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
410 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
411 }
412
413 if (data.nioBufferCount() > 1) {
414 IovArray array = ((NativeArrays) registration.attachment()).cleanIovArray();
415 array.add(data, data.readerIndex(), data.readableBytes());
416 int cnt = array.count();
417 assert cnt != 0;
418
419 if (remoteAddress == null) {
420 return socket.writevAddresses(array.memoryAddress(0), cnt);
421 }
422 return socket.sendToAddresses(array.memoryAddress(0), cnt,
423 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
424 }
425
426 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
427 if (remoteAddress == null) {
428 return socket.send(nioData, nioData.position(), nioData.limit());
429 }
430 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
431 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
432 }
433
434 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
435 boolean readPending;
436 private EpollRecvByteAllocatorHandle allocHandle;
437
438 Channel channel() {
439 return AbstractEpollChannel.this;
440 }
441
442 @Override
443 public FileDescriptor fd() {
444 return AbstractEpollChannel.this.fd();
445 }
446
447 @Override
448 public void close() {
449 close(voidPromise());
450 }
451
452 @Override
453 public void handle(IoRegistration registration, IoEvent event) {
454 EpollIoEvent epollEvent = (EpollIoEvent) event;
455 int ops = epollEvent.ops().value;
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470 if ((ops & EPOLL_ERR_OUT_MASK) != 0) {
471
472 epollOutReady();
473 }
474
475
476
477
478
479
480 if ((ops & EPOLL_ERR_IN_MASK) != 0) {
481
482 epollInReady();
483 }
484
485
486
487
488 if ((ops & EPOLL_RDHUP_MASK) != 0) {
489 epollRdHupReady();
490 }
491 }
492
493
494
495
496 abstract void epollInReady();
497
498 final boolean shouldStopReading(ChannelConfig config) {
499
500
501
502
503
504
505 return !readPending && !config.isAutoRead();
506 }
507
508
509
510
511 final void epollRdHupReady() {
512
513 recvBufAllocHandle().receivedRdHup();
514
515 if (isActive()) {
516
517
518
519 epollInReady();
520 } else {
521
522 shutdownInput(false);
523 }
524
525
526 clearEpollRdHup();
527 }
528
529
530
531
532 private void clearEpollRdHup() {
533 try {
534 clearFlag(Native.EPOLLRDHUP);
535 } catch (IOException e) {
536 pipeline().fireExceptionCaught(e);
537 close(voidPromise());
538 }
539 }
540
541
542
543
544 void shutdownInput(boolean allDataRead) {
545 if (!socket.isInputShutdown()) {
546 if (isAllowHalfClosure(config())) {
547 try {
548 socket.shutdown(true, false);
549 } catch (IOException ignored) {
550
551
552 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
553 return;
554 } catch (NotYetConnectedException ignore) {
555
556
557 }
558 if (shouldStopReading(config())) {
559 clearEpollIn0();
560 }
561 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
562 } else {
563 close(voidPromise());
564 return;
565 }
566 }
567
568 if (allDataRead && !inputClosedSeenErrorOnRead) {
569 inputClosedSeenErrorOnRead = true;
570 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
571 }
572 }
573
574 private void fireEventAndClose(Object evt) {
575 pipeline().fireUserEventTriggered(evt);
576 close(voidPromise());
577 }
578
579 @Override
580 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
581 if (allocHandle == null) {
582 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
583 }
584 return allocHandle;
585 }
586
587
588
589
590
591 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
592 return new EpollRecvByteAllocatorHandle(handle);
593 }
594
595 @Override
596 protected final void flush0() {
597
598
599
600 if (!isFlagSet(Native.EPOLLOUT)) {
601 super.flush0();
602 }
603 }
604
605
606
607
608 final void epollOutReady() {
609 if (connectPromise != null) {
610
611 finishConnect();
612 } else if (!socket.isOutputShutdown()) {
613
614 super.flush0();
615 }
616 }
617
618 protected final void clearEpollIn0() {
619 assert eventLoop().inEventLoop();
620 try {
621 readPending = false;
622 if (!ops.contains(EpollIoOps.EPOLLIN)) {
623 return;
624 }
625 ops = ops.without(EpollIoOps.EPOLLIN);
626 IoRegistration registration = registration();
627 registration.submit(ops);
628 } catch (UncheckedIOException e) {
629
630
631 pipeline().fireExceptionCaught(e);
632 unsafe().close(unsafe().voidPromise());
633 }
634 }
635
636 @Override
637 public void connect(
638 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
639
640
641 if (promise.isDone() || !ensureOpen(promise)) {
642 return;
643 }
644
645 try {
646 if (connectPromise != null) {
647 throw new ConnectionPendingException();
648 }
649
650 boolean wasActive = isActive();
651 if (doConnect(remoteAddress, localAddress)) {
652 fulfillConnectPromise(promise, wasActive);
653 } else {
654 connectPromise = promise;
655 requestedRemoteAddress = remoteAddress;
656
657
658 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
659 if (connectTimeoutMillis > 0) {
660 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
661 @Override
662 public void run() {
663 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
664 if (connectPromise != null && !connectPromise.isDone()
665 && connectPromise.tryFailure(new ConnectTimeoutException(
666 "connection timed out after " + connectTimeoutMillis + " ms: " +
667 remoteAddress))) {
668 close(voidPromise());
669 }
670 }
671 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
672 }
673
674 promise.addListener(new ChannelFutureListener() {
675 @Override
676 public void operationComplete(ChannelFuture future) {
677
678
679 if (future.isCancelled()) {
680 if (connectTimeoutFuture != null) {
681 connectTimeoutFuture.cancel(false);
682 }
683 connectPromise = null;
684 close(voidPromise());
685 }
686 }
687 });
688 }
689 } catch (Throwable t) {
690 closeIfClosed();
691 promise.tryFailure(annotateConnectException(t, remoteAddress));
692 }
693 }
694
695 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
696 if (promise == null) {
697
698 return;
699 }
700 active = true;
701
702
703
704 boolean active = isActive();
705
706
707 boolean promiseSet = promise.trySuccess();
708
709
710
711 if (!wasActive && active) {
712 pipeline().fireChannelActive();
713 }
714
715
716 if (!promiseSet) {
717 close(voidPromise());
718 }
719 }
720
721 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
722 if (promise == null) {
723
724 return;
725 }
726
727
728 promise.tryFailure(cause);
729 closeIfClosed();
730 }
731
732 private void finishConnect() {
733
734
735
736 assert eventLoop().inEventLoop();
737
738 boolean connectStillInProgress = false;
739 try {
740 boolean wasActive = isActive();
741 if (!doFinishConnect()) {
742 connectStillInProgress = true;
743 return;
744 }
745 fulfillConnectPromise(connectPromise, wasActive);
746 } catch (Throwable t) {
747 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
748 } finally {
749 if (!connectStillInProgress) {
750
751
752 if (connectTimeoutFuture != null) {
753 connectTimeoutFuture.cancel(false);
754 }
755 connectPromise = null;
756 }
757 }
758 }
759
760
761
762
763 private boolean doFinishConnect() throws Exception {
764 if (socket.finishConnect()) {
765 clearFlag(Native.EPOLLOUT);
766 if (requestedRemoteAddress instanceof InetSocketAddress) {
767 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
768 }
769 requestedRemoteAddress = null;
770
771 return true;
772 }
773 setFlag(Native.EPOLLOUT);
774 return false;
775 }
776 }
777
778 @Override
779 protected void doBind(SocketAddress local) throws Exception {
780 if (local instanceof InetSocketAddress) {
781 checkResolvable((InetSocketAddress) local);
782 }
783 socket.bind(local);
784 this.local = socket.localAddress();
785 }
786
787
788
789
790 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
791 if (localAddress instanceof InetSocketAddress) {
792 checkResolvable((InetSocketAddress) localAddress);
793 }
794
795 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
796 ? (InetSocketAddress) remoteAddress : null;
797 if (remoteSocketAddr != null) {
798 checkResolvable(remoteSocketAddr);
799 }
800
801 if (remote != null) {
802
803
804
805 throw new AlreadyConnectedException();
806 }
807
808 if (localAddress != null) {
809 socket.bind(localAddress);
810 }
811
812 boolean connected = doConnect0(remoteAddress);
813 if (connected) {
814 remote = remoteSocketAddr == null ?
815 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
816 }
817
818
819
820 local = socket.localAddress();
821 return connected;
822 }
823
824 boolean doConnect0(SocketAddress remote) throws Exception {
825 boolean success = false;
826 try {
827 boolean connected = socket.connect(remote);
828 if (!connected) {
829 setFlag(Native.EPOLLOUT);
830 }
831 success = true;
832 return connected;
833 } finally {
834 if (!success) {
835 doClose();
836 }
837 }
838 }
839
840 @Override
841 protected SocketAddress localAddress0() {
842 return local;
843 }
844
845 @Override
846 protected SocketAddress remoteAddress0() {
847 return remote;
848 }
849 }