1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.IovArray;
42 import io.netty.channel.unix.Socket;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46
47 import java.io.IOException;
48 import java.io.UncheckedIOException;
49 import java.net.InetSocketAddress;
50 import java.net.SocketAddress;
51 import java.nio.ByteBuffer;
52 import java.nio.channels.AlreadyConnectedException;
53 import java.nio.channels.ClosedChannelException;
54 import java.nio.channels.ConnectionPendingException;
55 import java.nio.channels.NotYetConnectedException;
56 import java.nio.channels.UnresolvedAddressException;
57 import java.util.concurrent.TimeUnit;
58
59 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
60 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
61 import static io.netty.util.internal.ObjectUtil.checkNotNull;
62
63 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
64 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
65 protected final LinuxSocket socket;
66
67
68
69
70 private ChannelPromise connectPromise;
71 private Future<?> connectTimeoutFuture;
72 private SocketAddress requestedRemoteAddress;
73 private volatile SocketAddress local;
74 private volatile SocketAddress remote;
75
76 private IoRegistration registration;
77 boolean inputClosedSeenErrorOnRead;
78 private EpollIoOps ops;
79 private EpollIoOps inital;
80
81 protected volatile boolean active;
82
83 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
84 super(parent);
85 this.socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 this.local = fd.localAddress();
91 this.remote = fd.remoteAddress();
92 }
93 this.ops = initialOps;
94 }
95
96 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
97 super(parent);
98 this.socket = checkNotNull(fd, "fd");
99 this.active = true;
100
101
102 this.remote = remote;
103 this.local = fd.localAddress();
104 this.ops = initialOps;
105 }
106
107 static boolean isSoErrorZero(Socket fd) {
108 try {
109 return fd.getSoError() == 0;
110 } catch (IOException e) {
111 throw new ChannelException(e);
112 }
113 }
114
115 protected void setFlag(int flag) throws IOException {
116 if (ops.contains(flag)) {
117
118 return;
119 }
120 ops = ops.with(EpollIoOps.valueOf(flag));
121 if (isRegistered()) {
122 IoRegistration registration = registration();
123 registration.submit(ops);
124 } else {
125 ops = ops.with(EpollIoOps.valueOf(flag));
126 }
127 }
128
129 void clearFlag(int flag) throws IOException {
130 IoRegistration registration = registration();
131 if (!ops.contains(flag)) {
132
133 return;
134 }
135 ops = ops.without(EpollIoOps.valueOf(flag));
136 registration.submit(ops);
137 }
138
139 protected final IoRegistration registration() {
140 assert registration != null;
141 return registration;
142 }
143
144 boolean isFlagSet(int flag) {
145 return (ops.value & flag) != 0;
146 }
147
148 @Override
149 public final FileDescriptor fd() {
150 return socket;
151 }
152
153 @Override
154 public abstract EpollChannelConfig config();
155
156 @Override
157 public boolean isActive() {
158 return active;
159 }
160
161 @Override
162 public ChannelMetadata metadata() {
163 return METADATA;
164 }
165
166 @Override
167 protected void doClose() throws Exception {
168 active = false;
169
170
171 inputClosedSeenErrorOnRead = true;
172 try {
173 ChannelPromise promise = connectPromise;
174 if (promise != null) {
175
176 promise.tryFailure(new ClosedChannelException());
177 connectPromise = null;
178 }
179
180 Future<?> future = connectTimeoutFuture;
181 if (future != null) {
182 future.cancel(false);
183 connectTimeoutFuture = null;
184 }
185
186 if (isRegistered()) {
187
188
189
190
191 EventLoop loop = eventLoop();
192 if (loop.inEventLoop()) {
193 doDeregister();
194 } else {
195 loop.execute(new Runnable() {
196 @Override
197 public void run() {
198 try {
199 doDeregister();
200 } catch (Throwable cause) {
201 pipeline().fireExceptionCaught(cause);
202 }
203 }
204 });
205 }
206 }
207 } finally {
208 socket.close();
209 }
210 }
211
212 void resetCachedAddresses() {
213 local = socket.localAddress();
214 remote = socket.remoteAddress();
215 }
216
217 @Override
218 protected void doDisconnect() throws Exception {
219 doClose();
220 }
221
222 @Override
223 public boolean isOpen() {
224 return socket.isOpen();
225 }
226
227 @Override
228 protected void doDeregister() throws Exception {
229 IoRegistration registration = this.registration;
230 if (registration != null) {
231 ops = inital;
232 registration.cancel();
233 }
234 }
235
236 @Override
237 protected boolean isCompatible(EventLoop loop) {
238 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
239 }
240
241 @Override
242 protected void doBeginRead() throws Exception {
243
244 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
245 unsafe.readPending = true;
246
247
248
249
250 setFlag(Native.EPOLLIN);
251 }
252
253 final boolean shouldBreakEpollInReady(ChannelConfig config) {
254 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
255 }
256
257 private static boolean isAllowHalfClosure(ChannelConfig config) {
258 if (config instanceof EpollDomainSocketChannelConfig) {
259 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
260 }
261 return config instanceof SocketChannelConfig &&
262 ((SocketChannelConfig) config).isAllowHalfClosure();
263 }
264
265 final void clearEpollIn() {
266
267 if (isRegistered()) {
268 final EventLoop loop = eventLoop();
269 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
270 if (loop.inEventLoop()) {
271 unsafe.clearEpollIn0();
272 } else {
273
274 loop.execute(new Runnable() {
275 @Override
276 public void run() {
277 if (!unsafe.readPending && !config().isAutoRead()) {
278
279 unsafe.clearEpollIn0();
280 }
281 }
282 });
283 }
284 } else {
285
286
287 ops = ops.without(EpollIoOps.EPOLLIN);
288 }
289 }
290
291 @Override
292 protected void doRegister(ChannelPromise promise) {
293 ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
294 if (f.isSuccess()) {
295 registration = (IoRegistration) f.getNow();
296 registration.submit(ops);
297 inital = ops;
298 promise.setSuccess();
299 } else {
300 promise.setFailure(f.cause());
301 }
302 });
303 }
304
305 @Override
306 protected abstract AbstractEpollUnsafe newUnsafe();
307
308
309
310
311 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
312 return newDirectBuffer(buf, buf);
313 }
314
315
316
317
318
319
320 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
321 final int readableBytes = buf.readableBytes();
322 if (readableBytes == 0) {
323 ReferenceCountUtil.release(holder);
324 return Unpooled.EMPTY_BUFFER;
325 }
326
327 final ByteBufAllocator alloc = alloc();
328 if (alloc.isDirectBufferPooled()) {
329 return newDirectBuffer0(holder, buf, alloc, readableBytes);
330 }
331
332 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
333 if (directBuf == null) {
334 return newDirectBuffer0(holder, buf, alloc, readableBytes);
335 }
336
337 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
338 ReferenceCountUtil.safeRelease(holder);
339 return directBuf;
340 }
341
342 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
343 final ByteBuf directBuf = alloc.directBuffer(capacity);
344 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
345 ReferenceCountUtil.safeRelease(holder);
346 return directBuf;
347 }
348
349 protected static void checkResolvable(InetSocketAddress addr) {
350 if (addr.isUnresolved()) {
351 throw new UnresolvedAddressException();
352 }
353 }
354
355
356
357
358 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
359 int writerIndex = byteBuf.writerIndex();
360 int localReadAmount;
361 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
362 if (byteBuf.hasMemoryAddress()) {
363 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
364 } else {
365 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
366 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
367 }
368 if (localReadAmount > 0) {
369 byteBuf.writerIndex(writerIndex + localReadAmount);
370 }
371 return localReadAmount;
372 }
373
374 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
375 if (buf.hasMemoryAddress()) {
376 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
377 if (localFlushedAmount > 0) {
378 in.removeBytes(localFlushedAmount);
379 return 1;
380 }
381 } else {
382 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
383 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
384 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
385 if (localFlushedAmount > 0) {
386 nioBuf.position(nioBuf.position() + localFlushedAmount);
387 in.removeBytes(localFlushedAmount);
388 return 1;
389 }
390 }
391 return WRITE_STATUS_SNDBUF_FULL;
392 }
393
394
395
396
397
398 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
399 throws IOException {
400 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
401 if (data.hasMemoryAddress()) {
402 long memoryAddress = data.memoryAddress();
403 if (remoteAddress == null) {
404 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
405 }
406 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
407 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
408 }
409
410 if (data.nioBufferCount() > 1) {
411 IovArray array = ((NativeArrays) registration.attachment()).cleanIovArray();
412 array.add(data, data.readerIndex(), data.readableBytes());
413 int cnt = array.count();
414 assert cnt != 0;
415
416 if (remoteAddress == null) {
417 return socket.writevAddresses(array.memoryAddress(0), cnt);
418 }
419 return socket.sendToAddresses(array.memoryAddress(0), cnt,
420 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
421 }
422
423 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
424 if (remoteAddress == null) {
425 return socket.send(nioData, nioData.position(), nioData.limit());
426 }
427 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
428 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
429 }
430
431 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
432 boolean readPending;
433 private EpollRecvByteAllocatorHandle allocHandle;
434
435 Channel channel() {
436 return AbstractEpollChannel.this;
437 }
438
439 @Override
440 public FileDescriptor fd() {
441 return AbstractEpollChannel.this.fd();
442 }
443
444 @Override
445 public void close() {
446 close(voidPromise());
447 }
448
449 @Override
450 public void handle(IoRegistration registration, IoEvent event) {
451 EpollIoEvent epollEvent = (EpollIoEvent) event;
452 EpollIoOps epollOps = epollEvent.ops();
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLOUT)) {
468
469 epollOutReady();
470 }
471
472
473
474
475
476
477 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLIN)) {
478
479 epollInReady();
480 }
481
482
483
484
485 if (epollOps.contains(EpollIoOps.EPOLLRDHUP)) {
486 epollRdHupReady();
487 }
488 }
489
490
491
492
493 abstract void epollInReady();
494
495 final boolean shouldStopReading(ChannelConfig config) {
496
497
498
499
500
501
502 return !readPending && !config.isAutoRead();
503 }
504
505
506
507
508 final void epollRdHupReady() {
509
510 recvBufAllocHandle().receivedRdHup();
511
512 if (isActive()) {
513
514
515
516 epollInReady();
517 } else {
518
519 shutdownInput(false);
520 }
521
522
523 clearEpollRdHup();
524 }
525
526
527
528
529 private void clearEpollRdHup() {
530 try {
531 clearFlag(Native.EPOLLRDHUP);
532 } catch (IOException e) {
533 pipeline().fireExceptionCaught(e);
534 close(voidPromise());
535 }
536 }
537
538
539
540
541 void shutdownInput(boolean allDataRead) {
542 if (!socket.isInputShutdown()) {
543 if (isAllowHalfClosure(config())) {
544 try {
545 socket.shutdown(true, false);
546 } catch (IOException ignored) {
547
548
549 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
550 return;
551 } catch (NotYetConnectedException ignore) {
552
553
554 }
555 if (shouldStopReading(config())) {
556 clearEpollIn0();
557 }
558 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
559 } else {
560 close(voidPromise());
561 return;
562 }
563 }
564
565 if (allDataRead && !inputClosedSeenErrorOnRead) {
566 inputClosedSeenErrorOnRead = true;
567 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
568 }
569 }
570
571 private void fireEventAndClose(Object evt) {
572 pipeline().fireUserEventTriggered(evt);
573 close(voidPromise());
574 }
575
576 @Override
577 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
578 if (allocHandle == null) {
579 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
580 }
581 return allocHandle;
582 }
583
584
585
586
587
588 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
589 return new EpollRecvByteAllocatorHandle(handle);
590 }
591
592 @Override
593 protected final void flush0() {
594
595
596
597 if (!isFlagSet(Native.EPOLLOUT)) {
598 super.flush0();
599 }
600 }
601
602
603
604
605 final void epollOutReady() {
606 if (connectPromise != null) {
607
608 finishConnect();
609 } else if (!socket.isOutputShutdown()) {
610
611 super.flush0();
612 }
613 }
614
615 protected final void clearEpollIn0() {
616 assert eventLoop().inEventLoop();
617 try {
618 readPending = false;
619 if (!ops.contains(EpollIoOps.EPOLLIN)) {
620 return;
621 }
622 ops = ops.without(EpollIoOps.EPOLLIN);
623 IoRegistration registration = registration();
624 registration.submit(ops);
625 } catch (UncheckedIOException e) {
626
627
628 pipeline().fireExceptionCaught(e);
629 unsafe().close(unsafe().voidPromise());
630 }
631 }
632
633 @Override
634 public void connect(
635 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
636
637
638 if (promise.isDone() || !ensureOpen(promise)) {
639 return;
640 }
641
642 try {
643 if (connectPromise != null) {
644 throw new ConnectionPendingException();
645 }
646
647 boolean wasActive = isActive();
648 if (doConnect(remoteAddress, localAddress)) {
649 fulfillConnectPromise(promise, wasActive);
650 } else {
651 connectPromise = promise;
652 requestedRemoteAddress = remoteAddress;
653
654
655 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
656 if (connectTimeoutMillis > 0) {
657 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
658 @Override
659 public void run() {
660 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
661 if (connectPromise != null && !connectPromise.isDone()
662 && connectPromise.tryFailure(new ConnectTimeoutException(
663 "connection timed out after " + connectTimeoutMillis + " ms: " +
664 remoteAddress))) {
665 close(voidPromise());
666 }
667 }
668 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
669 }
670
671 promise.addListener(new ChannelFutureListener() {
672 @Override
673 public void operationComplete(ChannelFuture future) {
674
675
676 if (future.isCancelled()) {
677 if (connectTimeoutFuture != null) {
678 connectTimeoutFuture.cancel(false);
679 }
680 connectPromise = null;
681 close(voidPromise());
682 }
683 }
684 });
685 }
686 } catch (Throwable t) {
687 closeIfClosed();
688 promise.tryFailure(annotateConnectException(t, remoteAddress));
689 }
690 }
691
692 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
693 if (promise == null) {
694
695 return;
696 }
697 active = true;
698
699
700
701 boolean active = isActive();
702
703
704 boolean promiseSet = promise.trySuccess();
705
706
707
708 if (!wasActive && active) {
709 pipeline().fireChannelActive();
710 }
711
712
713 if (!promiseSet) {
714 close(voidPromise());
715 }
716 }
717
718 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
719 if (promise == null) {
720
721 return;
722 }
723
724
725 promise.tryFailure(cause);
726 closeIfClosed();
727 }
728
729 private void finishConnect() {
730
731
732
733 assert eventLoop().inEventLoop();
734
735 boolean connectStillInProgress = false;
736 try {
737 boolean wasActive = isActive();
738 if (!doFinishConnect()) {
739 connectStillInProgress = true;
740 return;
741 }
742 fulfillConnectPromise(connectPromise, wasActive);
743 } catch (Throwable t) {
744 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
745 } finally {
746 if (!connectStillInProgress) {
747
748
749 if (connectTimeoutFuture != null) {
750 connectTimeoutFuture.cancel(false);
751 }
752 connectPromise = null;
753 }
754 }
755 }
756
757
758
759
760 private boolean doFinishConnect() throws Exception {
761 if (socket.finishConnect()) {
762 clearFlag(Native.EPOLLOUT);
763 if (requestedRemoteAddress instanceof InetSocketAddress) {
764 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
765 }
766 requestedRemoteAddress = null;
767
768 return true;
769 }
770 setFlag(Native.EPOLLOUT);
771 return false;
772 }
773 }
774
775 @Override
776 protected void doBind(SocketAddress local) throws Exception {
777 if (local instanceof InetSocketAddress) {
778 checkResolvable((InetSocketAddress) local);
779 }
780 socket.bind(local);
781 this.local = socket.localAddress();
782 }
783
784
785
786
787 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
788 if (localAddress instanceof InetSocketAddress) {
789 checkResolvable((InetSocketAddress) localAddress);
790 }
791
792 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
793 ? (InetSocketAddress) remoteAddress : null;
794 if (remoteSocketAddr != null) {
795 checkResolvable(remoteSocketAddr);
796 }
797
798 if (remote != null) {
799
800
801
802 throw new AlreadyConnectedException();
803 }
804
805 if (localAddress != null) {
806 socket.bind(localAddress);
807 }
808
809 boolean connected = doConnect0(remoteAddress);
810 if (connected) {
811 remote = remoteSocketAddr == null ?
812 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
813 }
814
815
816
817 local = socket.localAddress();
818 return connected;
819 }
820
821 boolean doConnect0(SocketAddress remote) throws Exception {
822 boolean success = false;
823 try {
824 boolean connected = socket.connect(remote);
825 if (!connected) {
826 setFlag(Native.EPOLLOUT);
827 }
828 success = true;
829 return connected;
830 } finally {
831 if (!success) {
832 doClose();
833 }
834 }
835 }
836
837 @Override
838 protected SocketAddress localAddress0() {
839 return local;
840 }
841
842 @Override
843 protected SocketAddress remoteAddress0() {
844 return remote;
845 }
846 }