1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.epoll;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.AbstractChannel;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelException;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelMetadata;
28 import io.netty.channel.ChannelOption;
29 import io.netty.channel.ChannelPromise;
30 import io.netty.channel.ConnectTimeoutException;
31 import io.netty.channel.EventLoop;
32 import io.netty.channel.socket.ChannelInputShutdownEvent;
33 import io.netty.channel.unix.Socket;
34 import io.netty.channel.unix.UnixChannel;
35 import io.netty.util.ReferenceCountUtil;
36 import io.netty.util.internal.ThrowableUtil;
37
38 import java.io.IOException;
39 import java.net.InetSocketAddress;
40 import java.net.SocketAddress;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.AlreadyConnectedException;
43 import java.nio.channels.ClosedChannelException;
44 import java.nio.channels.ConnectionPendingException;
45 import java.nio.channels.NotYetConnectedException;
46 import java.nio.channels.UnresolvedAddressException;
47 import java.util.concurrent.ScheduledFuture;
48 import java.util.concurrent.TimeUnit;
49
50 import static io.netty.util.internal.ObjectUtil.checkNotNull;
51 import static io.netty.channel.epoll.UnixChannelUtil.computeRemoteAddr;
52
53 abstract class AbstractEpollChannel extends AbstractChannel implements UnixChannel {
54 private static final ClosedChannelException DO_CLOSE_CLOSED_CHANNEL_EXCEPTION = ThrowableUtil.unknownStackTrace(
55 new ClosedChannelException(), AbstractEpollChannel.class, "doClose()");
56 private static final ChannelMetadata METADATA = new ChannelMetadata(false);
57 private final int readFlag;
58 private final Socket socket;
59
60
61
62
63 private ChannelPromise connectPromise;
64 private ScheduledFuture<?> connectTimeoutFuture;
65 private SocketAddress requestedRemoteAddress;
66
67 private volatile SocketAddress local;
68 private volatile SocketAddress remote;
69
70 protected int flags = Native.EPOLLET;
71
72 protected volatile boolean active;
73
74 AbstractEpollChannel(Socket fd, int flag) {
75 this(null, fd, flag, false);
76 }
77
78 AbstractEpollChannel(Channel parent, Socket fd, int flag, boolean active) {
79 super(parent);
80 socket = checkNotNull(fd, "fd");
81 readFlag = flag;
82 flags |= flag;
83 this.active = active;
84 if (active) {
85
86
87 local = fd.localAddress();
88 remote = fd.remoteAddress();
89 }
90 }
91
92 AbstractEpollChannel(Channel parent, Socket fd, int flag, SocketAddress remote) {
93 super(parent);
94 socket = checkNotNull(fd, "fd");
95 readFlag = flag;
96 flags |= flag;
97 active = true;
98
99
100 this.remote = remote;
101 local = fd.localAddress();
102 }
103
104 static boolean isSoErrorZero(Socket fd) {
105 try {
106 return fd.getSoError() == 0;
107 } catch (IOException e) {
108 throw new ChannelException(e);
109 }
110 }
111
112 void setFlag(int flag) throws IOException {
113 if (!isFlagSet(flag)) {
114 flags |= flag;
115 modifyEvents();
116 }
117 }
118
119 void clearFlag(int flag) throws IOException {
120 if (isFlagSet(flag)) {
121 flags &= ~flag;
122 modifyEvents();
123 }
124 }
125
126 boolean isFlagSet(int flag) {
127 return (flags & flag) != 0;
128 }
129
130 @Override
131 public final Socket fd() {
132 return socket;
133 }
134
135 @Override
136 public abstract EpollChannelConfig config();
137
138 @Override
139 public boolean isActive() {
140 return active;
141 }
142
143 @Override
144 public ChannelMetadata metadata() {
145 return METADATA;
146 }
147
148 @Override
149 protected void doClose() throws Exception {
150 active = false;
151 try {
152 ChannelPromise promise = connectPromise;
153 if (promise != null) {
154
155 promise.tryFailure(DO_CLOSE_CLOSED_CHANNEL_EXCEPTION);
156 connectPromise = null;
157 }
158
159 ScheduledFuture<?> future = connectTimeoutFuture;
160 if (future != null) {
161 future.cancel(false);
162 connectTimeoutFuture = null;
163 }
164
165 if (isRegistered()) {
166
167
168
169
170 EventLoop loop = eventLoop();
171 if (loop.inEventLoop()) {
172 doDeregister();
173 } else {
174 loop.execute(new Runnable() {
175 @Override
176 public void run() {
177 try {
178 doDeregister();
179 } catch (Throwable cause) {
180 pipeline().fireExceptionCaught(cause);
181 }
182 }
183 });
184 }
185 }
186 } finally {
187 socket.close();
188 }
189 }
190
191 @Override
192 protected void doDisconnect() throws Exception {
193 doClose();
194 }
195
196 @Override
197 protected boolean isCompatible(EventLoop loop) {
198 return loop instanceof EpollEventLoop;
199 }
200
201 @Override
202 public boolean isOpen() {
203 return socket.isOpen();
204 }
205
206 @Override
207 protected void doDeregister() throws Exception {
208 ((EpollEventLoop) eventLoop()).remove(this);
209 }
210
211 @Override
212 protected void doBeginRead() throws Exception {
213
214 ((AbstractEpollUnsafe) unsafe()).readPending = true;
215
216 setFlag(readFlag);
217 }
218
219 final void clearEpollIn() {
220
221 if (isRegistered()) {
222 final EventLoop loop = eventLoop();
223 final AbstractEpollUnsafe unsafe = (AbstractEpollUnsafe) unsafe();
224 if (loop.inEventLoop()) {
225 unsafe.clearEpollIn0();
226 } else {
227
228 loop.execute(new Runnable() {
229 @Override
230 public void run() {
231 if (!config().isAutoRead() && !unsafe.readPending) {
232
233 unsafe.clearEpollIn0();
234 }
235 }
236 });
237 }
238 } else {
239
240
241 flags &= ~readFlag;
242 }
243 }
244
245 private void modifyEvents() throws IOException {
246 if (isOpen() && isRegistered()) {
247 ((EpollEventLoop) eventLoop()).modify(this);
248 }
249 }
250
251 @Override
252 protected void doRegister() throws Exception {
253 EpollEventLoop loop = (EpollEventLoop) eventLoop();
254 loop.add(this);
255 }
256
257 @Override
258 protected abstract AbstractEpollUnsafe newUnsafe();
259
260
261
262
263 protected final ByteBuf newDirectBuffer(ByteBuf buf) {
264 return newDirectBuffer(buf, buf);
265 }
266
267
268
269
270
271
272 protected final ByteBuf newDirectBuffer(Object holder, ByteBuf buf) {
273 final int readableBytes = buf.readableBytes();
274 if (readableBytes == 0) {
275 ReferenceCountUtil.release(holder);
276 return Unpooled.EMPTY_BUFFER;
277 }
278
279 final ByteBufAllocator alloc = alloc();
280 if (alloc.isDirectBufferPooled()) {
281 return newDirectBuffer0(holder, buf, alloc, readableBytes);
282 }
283
284 final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
285 if (directBuf == null) {
286 return newDirectBuffer0(holder, buf, alloc, readableBytes);
287 }
288
289 directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
290 ReferenceCountUtil.safeRelease(holder);
291 return directBuf;
292 }
293
294 private static ByteBuf newDirectBuffer0(Object holder, ByteBuf buf, ByteBufAllocator alloc, int capacity) {
295 final ByteBuf directBuf = alloc.directBuffer(capacity);
296 directBuf.writeBytes(buf, buf.readerIndex(), capacity);
297 ReferenceCountUtil.safeRelease(holder);
298 return directBuf;
299 }
300
301 protected static void checkResolvable(InetSocketAddress addr) {
302 if (addr.isUnresolved()) {
303 throw new UnresolvedAddressException();
304 }
305 }
306
307
308
309
310 protected final int doReadBytes(ByteBuf byteBuf) throws Exception {
311 int writerIndex = byteBuf.writerIndex();
312 int localReadAmount;
313 if (byteBuf.hasMemoryAddress()) {
314 localReadAmount = socket.readAddress(byteBuf.memoryAddress(), writerIndex, byteBuf.capacity());
315 } else {
316 ByteBuffer buf = byteBuf.internalNioBuffer(writerIndex, byteBuf.writableBytes());
317 localReadAmount = socket.read(buf, buf.position(), buf.limit());
318 }
319 if (localReadAmount > 0) {
320 byteBuf.writerIndex(writerIndex + localReadAmount);
321 }
322 return localReadAmount;
323 }
324
325 protected final int doWriteBytes(ByteBuf buf, int writeSpinCount) throws Exception {
326 int readableBytes = buf.readableBytes();
327 int writtenBytes = 0;
328 if (buf.hasMemoryAddress()) {
329 long memoryAddress = buf.memoryAddress();
330 int readerIndex = buf.readerIndex();
331 int writerIndex = buf.writerIndex();
332 for (int i = writeSpinCount - 1; i >= 0; i--) {
333 int localFlushedAmount = socket.writeAddress(memoryAddress, readerIndex, writerIndex);
334 if (localFlushedAmount > 0) {
335 writtenBytes += localFlushedAmount;
336 if (writtenBytes == readableBytes) {
337 return writtenBytes;
338 }
339 readerIndex += localFlushedAmount;
340 } else {
341 break;
342 }
343 }
344 } else {
345 ByteBuffer nioBuf;
346 if (buf.nioBufferCount() == 1) {
347 nioBuf = buf.internalNioBuffer(buf.readerIndex(), buf.readableBytes());
348 } else {
349 nioBuf = buf.nioBuffer();
350 }
351 for (int i = writeSpinCount - 1; i >= 0; i--) {
352 int pos = nioBuf.position();
353 int limit = nioBuf.limit();
354 int localFlushedAmount = socket.write(nioBuf, pos, limit);
355 if (localFlushedAmount > 0) {
356 nioBuf.position(pos + localFlushedAmount);
357 writtenBytes += localFlushedAmount;
358 if (writtenBytes == readableBytes) {
359 return writtenBytes;
360 }
361 } else {
362 break;
363 }
364 }
365 }
366 if (writtenBytes < readableBytes) {
367
368 setFlag(Native.EPOLLOUT);
369 }
370 return writtenBytes;
371 }
372
373 protected abstract class AbstractEpollUnsafe extends AbstractUnsafe {
374 protected boolean readPending;
375 private boolean rdHup;
376
377
378
379
380 abstract void epollInReady();
381
382 public final boolean isRdHup() {
383 return rdHup;
384 }
385
386
387
388
389 final void epollRdHupReady() {
390
391 rdHup = true;
392
393 if (isActive()) {
394
395
396
397 epollInReady();
398
399
400 clearEpollRdHup();
401 }
402
403
404 shutdownInput();
405 }
406
407
408
409
410 private void clearEpollRdHup() {
411 try {
412 clearFlag(Native.EPOLLRDHUP);
413 } catch (IOException e) {
414 pipeline().fireExceptionCaught(e);
415 close(voidPromise());
416 }
417 }
418
419
420
421
422 void shutdownInput() {
423 if (!fd().isInputShutdown()) {
424 if (Boolean.TRUE.equals(config().getOption(ChannelOption.ALLOW_HALF_CLOSURE))) {
425 try {
426 fd().shutdown(true, false);
427 clearEpollIn0();
428 } catch (IOException ignored) {
429
430
431 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
432 return;
433 } catch (NotYetConnectedException ignore) {
434
435
436 fireEventAndClose(ChannelInputShutdownEvent.INSTANCE);
437 return;
438 }
439 pipeline().fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
440 } else {
441 close(voidPromise());
442 }
443 }
444 }
445
446 private void fireEventAndClose(Object evt) {
447 pipeline().fireUserEventTriggered(evt);
448 close(voidPromise());
449 }
450
451 @Override
452 protected void flush0() {
453
454
455
456 if (isFlagSet(Native.EPOLLOUT)) {
457 return;
458 }
459 super.flush0();
460 }
461
462
463
464
465 final void epollOutReady() {
466 if (connectPromise != null) {
467
468 finishConnect();
469 } else if (!fd().isOutputShutdown()) {
470
471 super.flush0();
472 }
473 }
474
475 protected final void clearEpollIn0() {
476 assert eventLoop().inEventLoop();
477 try {
478 clearFlag(readFlag);
479 } catch (IOException e) {
480
481
482 pipeline().fireExceptionCaught(e);
483 unsafe().close(unsafe().voidPromise());
484 }
485 }
486
487 @Override
488 public void connect(
489 final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
490 if (!promise.setUncancellable() || !ensureOpen(promise)) {
491 return;
492 }
493
494 try {
495 if (connectPromise != null) {
496 throw new ConnectionPendingException();
497 }
498
499 boolean wasActive = isActive();
500 if (doConnect(remoteAddress, localAddress)) {
501 fulfillConnectPromise(promise, wasActive);
502 } else {
503 connectPromise = promise;
504 requestedRemoteAddress = remoteAddress;
505
506
507 int connectTimeoutMillis = config().getConnectTimeoutMillis();
508 if (connectTimeoutMillis > 0) {
509 connectTimeoutFuture = eventLoop().schedule(new Runnable() {
510 @Override
511 public void run() {
512 ChannelPromise connectPromise = AbstractEpollChannel.this.connectPromise;
513 ConnectTimeoutException cause =
514 new ConnectTimeoutException("connection timed out: " + remoteAddress);
515 if (connectPromise != null && connectPromise.tryFailure(cause)) {
516 close(voidPromise());
517 }
518 }
519 }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
520 }
521
522 promise.addListener(new ChannelFutureListener() {
523 @Override
524 public void operationComplete(ChannelFuture future) throws Exception {
525 if (future.isCancelled()) {
526 if (connectTimeoutFuture != null) {
527 connectTimeoutFuture.cancel(false);
528 }
529 connectPromise = null;
530 close(voidPromise());
531 }
532 }
533 });
534 }
535 } catch (Throwable t) {
536 closeIfClosed();
537 promise.tryFailure(annotateConnectException(t, remoteAddress));
538 }
539 }
540
541 private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
542 if (promise == null) {
543
544 return;
545 }
546 active = true;
547
548
549
550 boolean active = isActive();
551
552
553 boolean promiseSet = promise.trySuccess();
554
555
556
557 if (!wasActive && active) {
558 pipeline().fireChannelActive();
559 }
560
561
562 if (!promiseSet) {
563 close(voidPromise());
564 }
565 }
566
567 private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
568 if (promise == null) {
569
570 return;
571 }
572
573
574 promise.tryFailure(cause);
575 closeIfClosed();
576 }
577
578 private void finishConnect() {
579
580
581
582 assert eventLoop().inEventLoop();
583
584 boolean connectStillInProgress = false;
585 try {
586 boolean wasActive = isActive();
587 if (!doFinishConnect()) {
588 connectStillInProgress = true;
589 return;
590 }
591 fulfillConnectPromise(connectPromise, wasActive);
592 } catch (Throwable t) {
593 fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
594 } finally {
595 if (!connectStillInProgress) {
596
597
598 if (connectTimeoutFuture != null) {
599 connectTimeoutFuture.cancel(false);
600 }
601 connectPromise = null;
602 }
603 }
604 }
605
606
607
608
609 private boolean doFinishConnect() throws Exception {
610 if (socket.finishConnect()) {
611 clearFlag(Native.EPOLLOUT);
612 if (requestedRemoteAddress instanceof InetSocketAddress) {
613 remote = computeRemoteAddr((InetSocketAddress) requestedRemoteAddress,
614 socket.remoteAddress());
615 }
616 requestedRemoteAddress = null;
617
618 return true;
619 }
620 setFlag(Native.EPOLLOUT);
621 return false;
622 }
623 }
624
625 @Override
626 protected void doBind(SocketAddress local) throws Exception {
627 if (local instanceof InetSocketAddress) {
628 checkResolvable((InetSocketAddress) local);
629 }
630 socket.bind(local);
631 this.local = socket.localAddress();
632 }
633
634
635
636
637 protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
638 if (localAddress instanceof InetSocketAddress) {
639 checkResolvable((InetSocketAddress) localAddress);
640 }
641
642 InetSocketAddress remoteSocketAddr = remoteAddress instanceof InetSocketAddress
643 ? (InetSocketAddress) remoteAddress : null;
644 if (remoteSocketAddr != null) {
645 checkResolvable(remoteSocketAddr);
646 }
647
648 if (remote != null) {
649
650
651
652 throw new AlreadyConnectedException();
653 }
654
655 if (localAddress != null) {
656 socket.bind(localAddress);
657 }
658
659 boolean connected = doConnect0(remoteAddress);
660 if (connected) {
661 remote = remoteSocketAddr == null ?
662 remoteAddress : computeRemoteAddr(remoteSocketAddr, socket.remoteAddress());
663 }
664
665
666
667 local = socket.localAddress();
668 return connected;
669 }
670
671 private boolean doConnect0(SocketAddress remote) throws Exception {
672 boolean success = false;
673 try {
674 boolean connected = socket.connect(remote);
675 if (!connected) {
676 setFlag(Native.EPOLLOUT);
677 }
678 success = true;
679 return connected;
680 } finally {
681 if (!success) {
682 doClose();
683 }
684 }
685 }
686
687 @Override
688 protected SocketAddress localAddress0() {
689 return local;
690 }
691
692 @Override
693 protected SocketAddress remoteAddress0() {
694 return remote;
695 }
696 }