1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelMetadata;
29 import io.netty.channel.ChannelOutboundBuffer;
30 import io.netty.channel.ChannelPromise;
31 import io.netty.channel.ConnectTimeoutException;
32 import io.netty.channel.EventLoop;
33 import io.netty.channel.IoEvent;
34 import io.netty.channel.IoEventLoop;
35 import io.netty.channel.IoRegistration;
36 import io.netty.channel.RecvByteBufAllocator;
37 import io.netty.channel.socket.ChannelInputShutdownEvent;
38 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
39 import io.netty.channel.socket.SocketChannelConfig;
40 import io.netty.channel.unix.FileDescriptor;
41 import io.netty.channel.unix.IovArray;
42 import io.netty.channel.unix.Socket;
43 import io.netty.channel.unix.UnixChannel;
44 import io.netty.util.ReferenceCountUtil;
45 import io.netty.util.concurrent.Future;
46
47 import java.io.IOException;
48 import java.io.UncheckedIOException;
49 import java.net.InetSocketAddress;
50 import java.net.SocketAddress;
51 import java.nio.ByteBuffer;
52 import java.nio.channels.AlreadyConnectedException;
53 import java.nio.channels.ClosedChannelException;
54 import java.nio.channels.ConnectionPendingException;
55 import java.nio.channels.NotYetConnectedException;
56 import java.nio.channels.UnresolvedAddressException;
57 import java.util.concurrent.TimeUnit;
58
59 import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
60 import static io.netty.channel.unix.UnixChannelUtil.computeRemoteAddr;
61 import static io.netty.util.internal.ObjectUtil.checkNotNull;
62
63 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
64 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
65 protected final LinuxSocket socket;
66
67
68
69
70 private ChannelPromise connectPromise;
71 private Future<?> connectTimeoutFuture;
72 private SocketAddress requestedRemoteAddress;
73 private volatile SocketAddress local;
74 private volatile SocketAddress remote;
75
76 private IoRegistration registration;
77 boolean inputClosedSeenErrorOnRead;
78 private EpollIoOps ops;
79 private EpollIoOps inital;
80
81 protected volatile boolean active;
82
83 AbstractEpollChannel(Channel parent, LinuxSocket fd, boolean active, EpollIoOps initialOps) {
84 super(parent);
85 this.socket = checkNotNull(fd, "fd");
86 this.active = active;
87 if (active) {
88
89
90 this.local = fd.localAddress();
91 this.remote = fd.remoteAddress();
92 }
93 this.ops = initialOps;
94 }
95
96 AbstractEpollChannel(Channel parent, LinuxSocket fd, SocketAddress remote, EpollIoOps initialOps) {
97 super(parent);
98 this.socket = checkNotNull(fd, "fd");
99 this.active = true;
100
101
102 this.remote = remote;
103 this.local = fd.localAddress();
104 this.ops = initialOps;
105 }
106
107 static boolean isSoErrorZero(Socket fd) {
108 try {
109 return fd.getSoError() == 0;
110 } catch (IOException e) {
111 throw new ChannelException(e);
112 }
113 }
114
115 protected void setFlag(int flag) throws IOException {
116 ops = ops.with(EpollIoOps.valueOf(flag));
117 if (isRegistered()) {
118 IoRegistration registration = registration();
119 registration.submit(ops);
120 } else {
121 ops = ops.with(EpollIoOps.valueOf(flag));
122 }
123 }
124
125 void clearFlag(int flag) throws IOException {
126 IoRegistration registration = registration();
127 ops = ops.without(EpollIoOps.valueOf(flag));
128 registration.submit(ops);
129 }
130
131 protected final IoRegistration registration() {
132 assert registration != null;
133 return registration;
134 }
135
136 boolean isFlagSet(int flag) {
137 return (ops.value & flag) != 0;
138 }
139
140 @Override
141 public final FileDescriptor fd() {
142 return socket;
143 }
144
145 @Override
146 public abstract EpollChannelConfig config();
147
148 @Override
149 public boolean isActive() {
150 return active;
151 }
152
153 @Override
154 public ChannelMetadata metadata() {
155 return METADATA;
156 }
157
158 @Override
159 protected void doClose() throws Exception {
160 active = false;
161
162
163 inputClosedSeenErrorOnRead = true;
164 try {
165 ChannelPromise promise = connectPromise;
166 if (promise != null) {
167
168 promise.tryFailure(new ClosedChannelException());
169 connectPromise = null;
170 }
171
172 Future<?> future = connectTimeoutFuture;
173 if (future != null) {
174 future.cancel(false);
175 connectTimeoutFuture = null;
176 }
177
178 if (isRegistered()) {
179
180
181
182
183 EventLoop loop = eventLoop();
184 if (loop.inEventLoop()) {
185 doDeregister();
186 } else {
187 loop.execute(new Runnable() {
188 @Override
189 public void run() {
190 try {
191 doDeregister();
192 } catch (Throwable cause) {
193 pipeline().fireExceptionCaught(cause);
194 }
195 }
196 });
197 }
198 }
199 } finally {
200 socket.close();
201 }
202 }
203
204 void resetCachedAddresses() {
205 local = socket.localAddress();
206 remote = socket.remoteAddress();
207 }
208
209 @Override
210 protected void doDisconnect() throws Exception {
211 doClose();
212 }
213
214 @Override
215 public boolean isOpen() {
216 return socket.isOpen();
217 }
218
219 @Override
220 protected void doDeregister() throws Exception {
221 IoRegistration registration = this.registration;
222 if (registration != null) {
223 ops = inital;
224 registration.cancel();
225 }
226 }
227
228 @Override
229 protected boolean isCompatible(EventLoop loop) {
230 return loop instanceof IoEventLoop && ((IoEventLoop) loop).isCompatible(AbstractEpollUnsafe.class);
231 }
232
233 @Override
234 protected void doBeginRead() throws Exception {
235
236 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
237 unsafe.readPending = true;
238
239
240
241
242 setFlag(Native.EPOLLIN);
243 }
244
245 final boolean shouldBreakEpollInReady(ChannelConfig config) {
246 return socket.isInputShutdown() && (inputClosedSeenErrorOnRead || !isAllowHalfClosure(config));
247 }
248
249 private static boolean isAllowHalfClosure(ChannelConfig config) {
250 if (config instanceof EpollDomainSocketChannelConfig) {
251 return ((EpollDomainSocketChannelConfig) config).isAllowHalfClosure();
252 }
253 return config instanceof SocketChannelConfig &&
254 ((SocketChannelConfig) config).isAllowHalfClosure();
255 }
256
257 final void clearEpollIn() {
258
259 if (isRegistered()) {
260 final EventLoop loop = eventLoop();
261 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
262 if (loop.inEventLoop()) {
263 unsafe.clearEpollIn0();
264 } else {
265
266 loop.execute(new Runnable() {
267 @Override
268 public void run() {
269 if (!unsafe.readPending && !config().isAutoRead()) {
270
271 unsafe.clearEpollIn0();
272 }
273 }
274 });
275 }
276 } else {
277
278
279 ops = ops.without(EpollIoOps.EPOLLIN);
280 }
281 }
282
283 @Override
284 protected void doRegister(ChannelPromise promise) {
285 ((IoEventLoop) eventLoop()).register((AbstractEpollUnsafe) unsafe()).addListener(f -> {
286 if (f.isSuccess()) {
287 registration = (IoRegistration) f.getNow();
288 registration.submit(ops);
289 inital = ops;
290 promise.setSuccess();
291 } else {
292 promise.setFailure(f.cause());
293 }
294 });
295 }
296
297 @Override
298 protected abstract AbstractEpollUnsafe newUnsafe();
299
300
301
302
303 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
304 return newDirectBuffer(buf, buf);
305 }
306
307
308
309
310
311
312 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
313 final int readableBytes = buf.readableBytes();
314 if (readableBytes == 0) {
315 ReferenceCountUtil.release(holder);
316 return Unpooled.EMPTY_BUFFER;
317 }
318
319 final ByteBufAllocator alloc = alloc();
320 if (alloc.isDirectBufferPooled()) {
321 return newDirectBuffer0(holder, buf, alloc, readableBytes);
322 }
323
324 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
325 if (directBuf == null) {
326 return newDirectBuffer0(holder, buf, alloc, readableBytes);
327 }
328
329 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
330 ReferenceCountUtil.safeRelease(holder);
331 return directBuf;
332 }
333
334 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
335 final ByteBuf directBuf = alloc.directBuffer(capacity);
336 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
337 ReferenceCountUtil.safeRelease(holder);
338 return directBuf;
339 }
340
341 protected static void checkResolvable(InetSocketAddress addr) {
342 if (addr.isUnresolved()) {
343 throw new UnresolvedAddressException();
344 }
345 }
346
347
348
349
350 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
351 int writerIndex = byteBuf.writerIndex();
352 int localReadAmount;
353 unsafe().recvBufAllocHandle().attemptedBytesRead(byteBuf.writableBytes());
354 if (byteBuf.hasMemoryAddress()) {
355 localReadAmount = socket.recvAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
356 } else {
357 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
358 localReadAmount = socket.recv(buf, buf.position(), buf.limit());
359 }
360 if (localReadAmount > 0) {
361 byteBuf.writerIndex(writerIndex + localReadAmount);
362 }
363 return localReadAmount;
364 }
365
366 protected final int doWriteBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
367 if (buf.hasMemoryAddress()) {
368 int localFlushedAmount = socket.sendAddress(buf.memoryAddress(), buf.readerIndex(), buf.writerIndex());
369 if (localFlushedAmount > 0) {
370 in.removeBytes(localFlushedAmount);
371 return 1;
372 }
373 } else {
374 final ByteBuffer nioBuf = buf.nioBufferCount() == 1 ?
375 buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes()) : buf.nioBuffer();
376 int localFlushedAmount = socket.send(nioBuf, nioBuf.position(), nioBuf.limit());
377 if (localFlushedAmount > 0) {
378 nioBuf.position(nioBuf.position() + localFlushedAmount);
379 in.removeBytes(localFlushedAmount);
380 return 1;
381 }
382 }
383 return WRITE_STATUS_SNDBUF_FULL;
384 }
385
386
387
388
389
390 final long doWriteOrSendBytes(ByteBuf data, InetSocketAddress remoteAddress, boolean fastOpen)
391 throws IOException {
392 assert !(fastOpen && remoteAddress == null) : "fastOpen requires a remote address";
393 if (data.hasMemoryAddress()) {
394 long memoryAddress = data.memoryAddress();
395 if (remoteAddress == null) {
396 return socket.sendAddress(memoryAddress, data.readerIndex(), data.writerIndex());
397 }
398 return socket.sendToAddress(memoryAddress, data.readerIndex(), data.writerIndex(),
399 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
400 }
401
402 if (data.nioBufferCount() > 1) {
403 IovArray array = ((NativeArrays) registration.attachment()).cleanIovArray();
404 array.add(data, data.readerIndex(), data.readableBytes());
405 int cnt = array.count();
406 assert cnt != 0;
407
408 if (remoteAddress == null) {
409 return socket.writevAddresses(array.memoryAddress(0), cnt);
410 }
411 return socket.sendToAddresses(array.memoryAddress(0), cnt,
412 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
413 }
414
415 ByteBuffer nioData = data.internalNioBuffer(data.readerIndex(), data.readableBytes());
416 if (remoteAddress == null) {
417 return socket.send(nioData, nioData.position(), nioData.limit());
418 }
419 return socket.sendTo(nioData, nioData.position(), nioData.limit(),
420 remoteAddress.getAddress(), remoteAddress.getPort(), fastOpen);
421 }
422
423 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe implements EpollIoHandle {
424 boolean readPending;
425 private EpollRecvByteAllocatorHandle allocHandle;
426
427 Channel channel() {
428 return AbstractEpollChannel.this;
429 }
430
431 @Override
432 public FileDescriptor fd() {
433 return AbstractEpollChannel.this.fd();
434 }
435
436 @Override
437 public void close() {
438 close(voidPromise());
439 }
440
441 @Override
442 public void handle(IoRegistration registration, IoEvent event) {
443 EpollIoEvent epollEvent = (EpollIoEvent) event;
444 EpollIoOps epollOps = epollEvent.ops();
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLOUT)) {
460
461 epollOutReady();
462 }
463
464
465
466
467
468
469 if (epollOps.contains(EpollIoOps.EPOLLERR) || epollOps.contains(EpollIoOps.EPOLLIN)) {
470
471 epollInReady();
472 }
473
474
475
476
477 if (epollOps.contains(EpollIoOps.EPOLLRDHUP)) {
478 epollRdHupReady();
479 }
480 }
481
482
483
484
485 abstract void epollInReady();
486
487 final boolean shouldStopReading(ChannelConfig config) {
488
489
490
491
492
493
494 return !readPending && !config.isAutoRead();
495 }
496
497
498
499
500 final void epollRdHupReady() {
501
502 recvBufAllocHandle().receivedRdHup();
503
504 if (isActive()) {
505
506
507
508 epollInReady();
509 } else {
510
511 shutdownInput(false);
512 }
513
514
515 clearEpollRdHup();
516 }
517
518
519
520
521 private void clearEpollRdHup() {
522 try {
523 clearFlag(Native.EPOLLRDHUP);
524 } catch (IOException e) {
525 pipeline().fireExceptionCaught(e);
526 close(voidPromise());
527 }
528 }
529
530
531
532
533 void shutdownInput(boolean allDataRead) {
534 if (!socket.isInputShutdown()) {
535 if (isAllowHalfClosure(config())) {
536 try {
537 socket.shutdown(true, false);
538 } catch (IOException ignored) {
539
540
541 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
542 return;
543 } catch (NotYetConnectedException ignore) {
544
545
546 }
547 if (shouldStopReading(config())) {
548 clearEpollIn0();
549 }
550 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
551 } else {
552 close(voidPromise());
553 return;
554 }
555 }
556
557 if (allDataRead && !inputClosedSeenErrorOnRead) {
558 inputClosedSeenErrorOnRead = true;
559 pipeline().fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);
560 }
561 }
562
563 private void fireEventAndClose(Object evt) {
564 pipeline().fireUserEventTriggered(evt);
565 close(voidPromise());
566 }
567
568 @Override
569 public EpollRecvByteAllocatorHandle recvBufAllocHandle() {
570 if (allocHandle == null) {
571 allocHandle = newEpollHandle((RecvByteBufAllocator.ExtendedHandle) super.recvBufAllocHandle());
572 }
573 return allocHandle;
574 }
575
576
577
578
579
580 EpollRecvByteAllocatorHandle newEpollHandle(RecvByteBufAllocator.ExtendedHandle handle) {
581 return new EpollRecvByteAllocatorHandle(handle);
582 }
583
584 @Override
585 protected final void flush0() {
586
587
588
589 if (!isFlagSet(Native.EPOLLOUT)) {
590 super.flush0();
591 }
592 }
593
594
595
596
597 final void epollOutReady() {
598 if (connectPromise != null) {
599
600 finishConnect();
601 } else if (!socket.isOutputShutdown()) {
602
603 super.flush0();
604 }
605 }
606
607 protected final void clearEpollIn0() {
608 assert eventLoop().inEventLoop();
609 try {
610 readPending = false;
611 ops = ops.without(EpollIoOps.EPOLLIN);
612 IoRegistration registration = registration();
613 registration.submit(ops);
614 } catch (UncheckedIOException e) {
615
616
617 pipeline().fireExceptionCaught(e);
618 unsafe().close(unsafe().voidPromise());
619 }
620 }
621
622 @Override
623 public void connect(
624 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
625
626
627 if (promise.isDone() || !ensureOpen(promise)) {
628 return;
629 }
630
631 try {
632 if (connectPromise != null) {
633 throw new ConnectionPendingException();
634 }
635
636 boolean wasActive = isActive();
637 if (doConnect(remoteAddress, localAddress)) {
638 fulfillConnectPromise(promise, wasActive);
639 } else {
640 connectPromise = promise;
641 requestedRemoteAddress = remoteAddress;
642
643
644 final int connectTimeoutMillis = config().getConnectTimeoutMillis();
645 if (connectTimeoutMillis > 0) {
646 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
647 @Override
648 public void run() {
649 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
650 if (connectPromise != null && !connectPromise.isDone()
651 && connectPromise.tryFailure(new ConnectTimeoutException(
652 "connection timed out after " + connectTimeoutMillis + " ms: " +
653 remoteAddress))) {
654 close(voidPromise());
655 }
656 }
657 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
658 }
659
660 promise.addListener(new ChannelFutureListener() {
661 @Override
662 public void operationComplete(ChannelFuture future) {
663
664
665 if (future.isCancelled()) {
666 if (connectTimeoutFuture != null) {
667 connectTimeoutFuture.cancel(false);
668 }
669 connectPromise = null;
670 close(voidPromise());
671 }
672 }
673 });
674 }
675 } catch (Throwable t) {
676 closeIfClosed();
677 promise.tryFailure(annotateConnectException(t, remoteAddress));
678 }
679 }
680
681 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
682 if (promise == null) {
683
684 return;
685 }
686 active = true;
687
688
689
690 boolean active = isActive();
691
692
693 boolean promiseSet = promise.trySuccess();
694
695
696
697 if (!wasActive && active) {
698 pipeline().fireChannelActive();
699 }
700
701
702 if (!promiseSet) {
703 close(voidPromise());
704 }
705 }
706
707 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
708 if (promise == null) {
709
710 return;
711 }
712
713
714 promise.tryFailure(cause);
715 closeIfClosed();
716 }
717
718 private void finishConnect() {
719
720
721
722 assert eventLoop().inEventLoop();
723
724 boolean connectStillInProgress = false;
725 try {
726 boolean wasActive = isActive();
727 if (!doFinishConnect()) {
728 connectStillInProgress = true;
729 return;
730 }
731 fulfillConnectPromise(connectPromise, wasActive);
732 } catch (Throwable t) {
733 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
734 } finally {
735 if (!connectStillInProgress) {
736
737
738 if (connectTimeoutFuture != null) {
739 connectTimeoutFuture.cancel(false);
740 }
741 connectPromise = null;
742 }
743 }
744 }
745
746
747
748
749 private boolean doFinishConnect() throws Exception {
750 if (socket.finishConnect()) {
751 clearFlag(Native.EPOLLOUT);
752 if (requestedRemoteAddress instanceof InetSocketAddress) {
753 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress, socket.remoteAddress());
754 }
755 requestedRemoteAddress = null;
756
757 return true;
758 }
759 setFlag(Native.EPOLLOUT);
760 return false;
761 }
762 }
763
764 @Override
765 protected void doBind(SocketAddress local) throws Exception {
766 if (local instanceof InetSocketAddress) {
767 checkResolvable((InetSocketAddress) local);
768 }
769 socket.bind(local);
770 this.local = socket.localAddress();
771 }
772
773
774
775
776 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
777 if (localAddress instanceof InetSocketAddress) {
778 checkResolvable((InetSocketAddress) localAddress);
779 }
780
781 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
782 ? (InetSocketAddress) remoteAddress : null;
783 if (remoteSocketAddr != null) {
784 checkResolvable(remoteSocketAddr);
785 }
786
787 if (remote != null) {
788
789
790
791 throw new AlreadyConnectedException();
792 }
793
794 if (localAddress != null) {
795 socket.bind(localAddress);
796 }
797
798 boolean connected = doConnect0(remoteAddress);
799 if (connected) {
800 remote = remoteSocketAddr == null ?
801 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
802 }
803
804
805
806 local = socket.localAddress();
807 return connected;
808 }
809
810 boolean doConnect0(SocketAddress remote) throws Exception {
811 boolean success = false;
812 try {
813 boolean connected = socket.connect(remote);
814 if (!connected) {
815 setFlag(Native.EPOLLOUT);
816 }
817 success = true;
818 return connected;
819 } finally {
820 if (!success) {
821 doClose();
822 }
823 }
824 }
825
826 @Override
827 protected SocketAddress localAddress0() {
828 return local;
829 }
830
831 @Override
832 protected SocketAddress remoteAddress0() {
833 return remote;
834 }
835 }