View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.epoll;
17  
18  import io.netty5.buffer.api.Buffer;
19  import io.netty5.buffer.api.BufferAllocator;
20  import io.netty5.channel.AdaptiveRecvBufferAllocator;
21  import io.netty5.channel.ChannelException;
22  import io.netty5.channel.ChannelMetadata;
23  import io.netty5.channel.ChannelOption;
24  import io.netty5.channel.ChannelOutboundBuffer;
25  import io.netty5.channel.ChannelPipeline;
26  import io.netty5.channel.ChannelShutdownDirection;
27  import io.netty5.channel.DefaultFileRegion;
28  import io.netty5.channel.EventLoop;
29  import io.netty5.channel.FileRegion;
30  import io.netty5.channel.RecvBufferAllocator;
31  import io.netty5.channel.internal.ChannelUtils;
32  import io.netty5.channel.socket.SocketChannel;
33  import io.netty5.channel.socket.SocketProtocolFamily;
34  import io.netty5.channel.unix.DomainSocketReadMode;
35  import io.netty5.channel.unix.FileDescriptor;
36  import io.netty5.channel.unix.IntegerUnixChannelOption;
37  import io.netty5.channel.unix.IovArray;
38  import io.netty5.channel.unix.PeerCredentials;
39  import io.netty5.channel.unix.RawUnixChannelOption;
40  import io.netty5.channel.unix.SocketWritableByteChannel;
41  import io.netty5.channel.unix.UnixChannel;
42  import io.netty5.channel.unix.UnixChannelOption;
43  import io.netty5.channel.unix.UnixChannelUtil;
44  import io.netty5.util.Resource;
45  import io.netty5.util.concurrent.Future;
46  import io.netty5.util.concurrent.GlobalEventExecutor;
47  import io.netty5.util.internal.StringUtil;
48  
49  import java.io.IOException;
50  import java.net.InetAddress;
51  import java.net.ProtocolFamily;
52  import java.net.SocketAddress;
53  import java.nio.ByteBuffer;
54  import java.nio.channels.NotYetConnectedException;
55  import java.nio.channels.WritableByteChannel;
56  import java.util.Collection;
57  import java.util.Collections;
58  import java.util.Map;
59  import java.util.Set;
60  import java.util.concurrent.Executor;
61  import java.util.function.Predicate;
62  
63  import static io.netty5.channel.ChannelOption.IP_TOS;
64  import static io.netty5.channel.ChannelOption.SO_KEEPALIVE;
65  import static io.netty5.channel.ChannelOption.SO_LINGER;
66  import static io.netty5.channel.ChannelOption.SO_RCVBUF;
67  import static io.netty5.channel.ChannelOption.SO_REUSEADDR;
68  import static io.netty5.channel.ChannelOption.SO_SNDBUF;
69  import static io.netty5.channel.ChannelOption.TCP_NODELAY;
70  import static io.netty5.channel.epoll.Native.IS_SUPPORTING_TCP_FASTOPEN_CLIENT;
71  import static io.netty5.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
72  import static io.netty5.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
73  import static io.netty5.channel.unix.Limits.SSIZE_MAX;
74  import static io.netty5.channel.unix.UnixChannelOption.DOMAIN_SOCKET_READ_MODE;
75  import static java.util.Objects.requireNonNull;
76  
77  /**
78   * {@link SocketChannel} implementation that uses linux EPOLL Edge-Triggered Mode for
79   * maximal performance.
80   *
81   * <h3>Available options</h3>
82   *
83   * In addition to the options provided by {@link SocketChannel} and {@link UnixChannel},
84   * {@link EpollSocketChannel} allows the following options in the option map:
85   * <table border="1" cellspacing="0" cellpadding="6">
86   * <tr>
87   * <th>{@link ChannelOption}</th>
88   * <th>{@code INET}</th>
89   * <th>{@code INET6}</th>
90   * <th>{@code UNIX}</th>
91   * </tr><tr>
92   * <td>{@link IntegerUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
93   * </tr><tr>
94   * <td>{@link RawUnixChannelOption}</td><td>X</td><td>X</td><td>X</td>
95   * </tr><tr>
96   * <td>{@link EpollChannelOption#TCP_CORK}</td><td>X</td><td>X</td><td>-</td>
97   * </tr><tr>
98   * <td>{@link EpollChannelOption#TCP_NOTSENT_LOWAT}</td><td>X</td><td>X</td><td>-</td>
99   * </tr><tr>
100  * <td>{@link EpollChannelOption#TCP_KEEPCNT}</td><td>X</td><td>X</td><td>-</td>
101  * </tr><tr>
102  * <td>{@link EpollChannelOption#TCP_KEEPIDLE}</td><td>X</td><td>X</td><td>-</td>
103  * </tr><tr>
104  * <td>{@link EpollChannelOption#TCP_KEEPINTVL}</td><td>X</td><td>X</td><td>-</td>
105  * </tr><tr>
106  * <td>{@link EpollChannelOption#TCP_MD5SIG}</td><td>X</td><td>X</td><td>-</td>
107  * </tr><tr>
108  * <td>{@link EpollChannelOption#TCP_QUICKACK}</td><td>X</td><td>X</td><td>-</td>
109  * </tr><tr>
110  * <td>{@link EpollChannelOption#TCP_INFO}</td><td>X</td><td>X</td><td>-</td>
111  * </tr><tr>
112  * <td>{@link ChannelOption#TCP_FASTOPEN_CONNECT}</td><td>X</td><td>X</td><td>-</td>
113  * </tr><tr>
114  * <td>{@link EpollChannelOption#IP_TRANSPARENT}</td><td>X</td><td>X</td><td>-</td>
115  * </tr><tr>
116  * <td>{@link EpollChannelOption#SO_BUSY_POLL}</td><td>X</td><td>X</td><td>-</td>
117  * </tr><tr>
118  * <td>{@link UnixChannelOption#SO_PEERCRED}</td><td></td><td></td><td>X</td>
119  * </tr><tr>
120  * <td>{@link UnixChannelOption#DOMAIN_SOCKET_READ_MODE}</td><td></td><td></td><td>X</td>
121  * </tr>
122  * </table>
123  */
124 public final class EpollSocketChannel
125         extends AbstractEpollChannel<EpollServerSocketChannel>
126         implements SocketChannel {
127 
128     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS = supportedOptions();
129     private static final Set<ChannelOption<?>> SUPPORTED_OPTIONS_DOMAIN_SOCKET = supportedOptionsDomainSocket();
130 
131     private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
132     private static final String EXPECTED_TYPES =
133             " (expected: " + StringUtil.simpleClassName(Buffer.class) + ", " +
134                     StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
135     // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
136     // meantime.
137     private final Runnable flushTask = this::writeFlushed;
138 
139     private WritableByteChannel byteChannel;
140     private volatile long maxBytesPerGatheringWrite = SSIZE_MAX;
141 
142     private volatile Collection<InetAddress> tcpMd5SigAddresses = Collections.emptyList();
143 
144     private volatile DomainSocketReadMode mode = DomainSocketReadMode.BYTES;
145 
146     private volatile boolean tcpFastopen;
147 
148     private static final Predicate<RecvBufferAllocator.Handle> MAYBE_MORE_DATA = h ->
149          h.lastBytesRead() == h.attemptedBytesRead();
150 
151     private static final Predicate<RecvBufferAllocator.Handle> MAYBE_MORE_DATA_RDHUP = h -> true;
152 
153     public EpollSocketChannel(EventLoop eventLoop) {
154         this(eventLoop, (ProtocolFamily) null);
155     }
156 
157     public EpollSocketChannel(EventLoop eventLoop, ProtocolFamily protocolFamily) {
158         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
159         super(null, eventLoop, METADATA, Native.EPOLLRDHUP, new AdaptiveRecvBufferAllocator(),
160                 LinuxSocket.newSocket(protocolFamily), false);
161     }
162 
163     public EpollSocketChannel(EventLoop eventLoop, int fd, ProtocolFamily family) {
164         this(eventLoop, new LinuxSocket(fd, SocketProtocolFamily.of(family)));
165     }
166 
167     private EpollSocketChannel(EventLoop eventLoop, LinuxSocket socket) {
168         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
169         super(null, eventLoop, METADATA, Native.EPOLLRDHUP, new AdaptiveRecvBufferAllocator(),
170                 socket, isSoErrorZero(socket));
171     }
172 
173     EpollSocketChannel(EpollServerSocketChannel parent, EventLoop eventLoop,
174                        LinuxSocket fd, SocketAddress remoteAddress) {
175         // Add EPOLLRDHUP so we are notified once the remote peer close the connection.
176         super(parent, eventLoop, METADATA, Native.EPOLLRDHUP, new AdaptiveRecvBufferAllocator(), fd, remoteAddress);
177 
178         if (fd.protocolFamily() != SocketProtocolFamily.UNIX && parent != null) {
179             tcpMd5SigAddresses = parent.tcpMd5SigAddresses();
180         }
181     }
182 
183     /**
184      * Write bytes form the given {@link Buffer} to the underlying {@link java.nio.channels.Channel}.
185      * @param in the collection which contains objects to write.
186      * @param buf the {@link Buffer} from which the bytes should be written
187      * @return The value that should be decremented from the write-quantum which starts at
188      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
189      * <ul>
190      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
191      *     is encountered</li>
192      *     <li>1 - if a single call to write data was made to the OS</li>
193      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
194      *     no data was accepted</li>
195      * </ul>
196      */
197     private int writeBytes(ChannelOutboundBuffer in, Buffer buf) throws Exception {
198         int readableBytes = buf.readableBytes();
199         if (readableBytes == 0) {
200             in.remove();
201             return 0;
202         }
203 
204         int readableComponents = buf.countReadableComponents();
205         if (readableComponents == 1) {
206             return doWriteBytes(in, buf);
207         } else {
208             ByteBuffer[] nioBuffers = new ByteBuffer[readableComponents];
209             buf.forEachReadable(0, (index, component) -> {
210                 nioBuffers[index] = component.readableBuffer();
211                 return true;
212             });
213             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
214                     getMaxBytesPerGatheringWrite());
215         }
216     }
217 
218     void setMaxBytesPerGatheringWrite(long maxBytesPerGatheringWrite) {
219         this.maxBytesPerGatheringWrite = maxBytesPerGatheringWrite;
220     }
221 
222     long getMaxBytesPerGatheringWrite() {
223         return maxBytesPerGatheringWrite;
224     }
225 
226     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
227         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
228         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
229         // make a best effort to adjust as OS behavior changes.
230         if (attempted == written) {
231             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
232                 setMaxBytesPerGatheringWrite(attempted << 1);
233             }
234         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
235             setMaxBytesPerGatheringWrite(attempted >>> 1);
236         }
237     }
238 
239     /**
240      * Write multiple bytes via {@link IovArray}.
241      * @param in the collection which contains objects to write.
242      * @param array The array which contains the content to write.
243      * @return The value that should be decremented from the write quantum which starts at
244      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
245      * <ul>
246      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
247      *     is encountered</li>
248      *     <li>1 - if a single call to write data was made to the OS</li>
249      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
250      *     no data was accepted</li>
251      * </ul>
252      * @throws IOException If an I/O exception occurs during write.
253      */
254     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
255         final long expectedWrittenBytes = array.size();
256         assert expectedWrittenBytes != 0;
257         final int cnt = array.count();
258         assert cnt != 0;
259 
260         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
261         if (localWrittenBytes > 0) {
262             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
263             in.removeBytes(localWrittenBytes);
264             return 1;
265         }
266         return WRITE_STATUS_SNDBUF_FULL;
267     }
268 
269     /**
270      * Write multiple bytes via {@link ByteBuffer} array.
271      * @param in the collection which contains objects to write.
272      * @param nioBuffers The buffers to write.
273      * @param nioBufferCnt The number of buffers to write.
274      * @param expectedWrittenBytes The number of bytes we expect to write.
275      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
276      * @return The value that should be decremented from the write quantum which starts at
277      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
278      * <ul>
279      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
280      *     is encountered</li>
281      *     <li>1 - if a single call to write data was made to the OS</li>
282      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
283      *     no data was accepted</li>
284      * </ul>
285      * @throws IOException If an I/O exception occurs during write.
286      */
287     private int writeBytesMultiple(
288             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
289             long maxBytesPerGatheringWrite) throws IOException {
290         assert expectedWrittenBytes != 0;
291         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
292             expectedWrittenBytes = maxBytesPerGatheringWrite;
293         }
294 
295         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
296         if (localWrittenBytes > 0) {
297             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
298             in.removeBytes(localWrittenBytes);
299             return 1;
300         }
301         return WRITE_STATUS_SNDBUF_FULL;
302     }
303 
304     /**
305      * Write a {@link DefaultFileRegion}
306      * @param in the collection which contains objects to write.
307      * @param region the {@link DefaultFileRegion} from which the bytes should be written
308      * @return The value that should be decremented from the write quantum which starts at
309      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
310      * <ul>
311      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
312      *     is encountered</li>
313      *     <li>1 - if a single call to write data was made to the OS</li>
314      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
315      *     no data was accepted</li>
316      * </ul>
317      */
318     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
319         final long offset = region.transferred();
320         final long regionCount = region.count();
321         if (offset >= regionCount) {
322             in.remove();
323             return 0;
324         }
325 
326         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
327         if (flushedAmount > 0) {
328             in.progress(flushedAmount);
329             if (region.transferred() >= regionCount) {
330                 in.remove();
331             }
332             return 1;
333         }
334         if (flushedAmount == 0) {
335             validateFileRegion(region, offset);
336         }
337         return WRITE_STATUS_SNDBUF_FULL;
338     }
339 
340     /**
341      * Write a {@link FileRegion}
342      * @param in the collection which contains objects to write.
343      * @param region the {@link FileRegion} from which the bytes should be written
344      * @return The value that should be decremented from the write quantum which starts at
345      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
346      * <ul>
347      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
348      *     is encountered</li>
349      *     <li>1 - if a single call to write data was made to the OS</li>
350      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
351      *     no data was accepted</li>
352      * </ul>
353      */
354     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
355         if (region.transferred() >= region.count()) {
356             in.remove();
357             return 0;
358         }
359 
360         if (byteChannel == null) {
361             byteChannel = new EpollSocketWritableByteChannel();
362         }
363         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
364         if (flushedAmount > 0) {
365             in.progress(flushedAmount);
366             if (region.transferred() >= region.count()) {
367                 in.remove();
368             }
369             return 1;
370         }
371         return WRITE_STATUS_SNDBUF_FULL;
372     }
373 
374     @Override
375     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
376         int writeSpinCount = getWriteSpinCount();
377         do {
378             final int msgCount = in.size();
379             // Do gathering write if the outbound buffer entries start with more than one Buffer.
380             if (msgCount > 1 && in.current() instanceof Buffer) {
381                 writeSpinCount -= doWriteMultiple(in);
382             } else if (msgCount == 0) {
383                 // Wrote all messages.
384                 clearFlag(Native.EPOLLOUT);
385                 // Return here so we not set the EPOLLOUT flag.
386                 return;
387             } else { // msgCount == 1
388                 writeSpinCount -= doWriteSingle(in);
389             }
390 
391             // We do not break the loop here even if the outbound buffer was flushed completely,
392             // because a user might have triggered another write and flush when we notify his or her
393             // listeners.
394         } while (writeSpinCount > 0);
395 
396         if (writeSpinCount == 0) {
397             // It is possible that we have set EPOLLOUT, woken up by EPOLL because the socket is writable, and then use
398             // our write quantum. In this case we no longer want to set the EPOLLOUT flag because the socket is still
399             // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
400             // and set the EPOLLOUT if necessary.
401             clearFlag(Native.EPOLLOUT);
402 
403             // We used our writeSpin quantum, and should try to write again later.
404             executor().execute(flushTask);
405         } else {
406             // Underlying descriptor can not accept all data currently, so set the EPOLLOUT flag to be woken up
407             // when it can accept more data.
408             setFlag(Native.EPOLLOUT);
409         }
410     }
411 
412     /**
413      * Attempt to write a single object.
414      * @param in the collection which contains objects to write.
415      * @return The value that should be decremented from the write quantum which starts at
416      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
417      * <ul>
418      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
419      *     is encountered</li>
420      *     <li>1 - if a single call to write data was made to the OS</li>
421      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
422      *     no data was accepted</li>
423      * </ul>
424      * @throws Exception If an I/O error occurs.
425      */
426     private int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
427         // The outbound buffer contains only one message or it contains a file region.
428         Object msg = in.current();
429         if (msg instanceof FileDescriptor && socket.sendFd(((FileDescriptor) msg).intValue()) > 0) {
430             // File descriptor was written, so remove it.
431             in.remove();
432             return 1;
433         }
434         if (msg instanceof Buffer) {
435             return writeBytes(in, (Buffer) msg);
436         } else if (msg instanceof DefaultFileRegion) {
437             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
438         } else if (msg instanceof FileRegion) {
439             return writeFileRegion(in, (FileRegion) msg);
440         } else {
441             // Should never reach here.
442             throw new Error();
443         }
444     }
445 
446     /**
447      * Attempt to write multiple {@link Buffer} objects.
448      * @param in the collection which contains objects to write.
449      * @return The value that should be decremented from the write quantum which starts at
450      * {@link #getWriteSpinCount()}. The typical use cases are as follows:
451      * <ul>
452      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link Buffer} (or other empty content)
453      *     is encountered</li>
454      *     <li>1 - if a single call to write data was made to the OS</li>
455      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
456      *     no data was accepted</li>
457      * </ul>
458      * @throws Exception If an I/O error occurs.
459      */
460     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
461         final long maxBytesPerGatheringWrite = getMaxBytesPerGatheringWrite();
462         IovArray array = registration().cleanIovArray();
463         array.maxBytes(maxBytesPerGatheringWrite);
464         in.forEachFlushedMessage(array);
465 
466         if (array.count() >= 1) {
467             return writeBytesMultiple(in, array);
468         }
469         // cnt == 0, which means the outbound buffer contained empty buffers only.
470         in.removeBytes(0);
471         return 0;
472     }
473 
474     @Override
475     protected Object filterOutboundMessage(Object msg) {
476         if (socket.protocolFamily() == SocketProtocolFamily.UNIX && msg instanceof FileDescriptor) {
477             return msg;
478         }
479         if (msg instanceof Buffer) {
480             Buffer buf = (Buffer) msg;
481             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
482         }
483 
484         if (msg instanceof FileRegion) {
485             return msg;
486         }
487 
488         throw new UnsupportedOperationException(
489                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
490     }
491 
492     @Override
493     protected void doShutdown(ChannelShutdownDirection direction) throws Exception {
494         switch (direction) {
495             case Outbound:
496                 socket.shutdown(false, true);
497                 break;
498             case Inbound:
499                 try {
500                     socket.shutdown(true, false);
501                 } catch (NotYetConnectedException ignore) {
502                     // We attempted to shutdown and failed, which means the input has already effectively been
503                     // shutdown.
504                 }
505                 break;
506             default:
507                 throw new AssertionError();
508         }
509     }
510 
511     @Override
512     public boolean isShutdown(ChannelShutdownDirection direction) {
513         if (!isActive()) {
514             return true;
515         }
516         switch (direction) {
517             case Outbound:
518                 return socket.isOutputShutdown();
519             case Inbound:
520                 return socket.isInputShutdown();
521             default:
522                 throw new AssertionError();
523         }
524     }
525 
526     private void handleReadException(ChannelPipeline pipeline, Buffer buffer, Throwable cause, boolean close,
527                                      RecvBufferAllocator.Handle allocHandle) {
528         if (buffer.readableBytes() > 0) {
529             readPending = false;
530             pipeline.fireChannelRead(buffer);
531         } else {
532             buffer.close();
533         }
534         allocHandle.readComplete();
535         pipeline.fireChannelReadComplete();
536         pipeline.fireChannelExceptionCaught(cause);
537 
538         // If oom will close the read event, release connection.
539         // See https://github.com/netty/netty/issues/10434
540         if (close || cause instanceof OutOfMemoryError || cause instanceof IOException) {
541             shutdownInput(false);
542         } else {
543             readIfIsAutoRead();
544         }
545     }
546 
547     @Override
548     protected void epollInReady(RecvBufferAllocator.Handle handle, BufferAllocator recvBufferAllocator,
549                                 boolean receivedRdHup) {
550         if (socket.protocolFamily() == SocketProtocolFamily.UNIX
551                 && getReadMode() == DomainSocketReadMode.FILE_DESCRIPTORS) {
552             epollInReadFd(handle, receivedRdHup);
553         } else {
554             epollInReadyBytes(handle, recvBufferAllocator, receivedRdHup);
555         }
556     }
557 
558     private static Predicate<RecvBufferAllocator.Handle> maybeMoreData(boolean receivedRdHup) {
559         return receivedRdHup ? MAYBE_MORE_DATA_RDHUP : MAYBE_MORE_DATA;
560     }
561 
562     private void epollInReadyBytes(RecvBufferAllocator.Handle recvAlloc, BufferAllocator bufferAllocator,
563                                    boolean receivedRdHup) {
564         final ChannelPipeline pipeline = pipeline();
565         Predicate<RecvBufferAllocator.Handle> maybeMoreData = maybeMoreData(receivedRdHup);
566 
567         Buffer buffer = null;
568         boolean close = false;
569         try {
570             do {
571                 // we use a direct buffer here as the native implementations only be able
572                 // to handle direct buffers.
573                 buffer = recvAlloc.allocate(bufferAllocator);
574                 doReadBytes(buffer);
575                 if (recvAlloc.lastBytesRead() <= 0) {
576                     // nothing was read, release the buffer.
577                     Resource.dispose(buffer);
578                     buffer = null;
579                     close = recvAlloc.lastBytesRead() < 0;
580                     if (close) {
581                         // There is nothing left to read as we received an EOF.
582                         readPending = false;
583                     }
584                     break;
585                 }
586                 recvAlloc.incMessagesRead(1);
587                 readPending = false;
588                 pipeline.fireChannelRead(buffer);
589                 buffer = null;
590 
591                 if (shouldBreakEpollInReady()) {
592                     // We need to do this for two reasons:
593                     //
594                     // - If the input was shutdown in between (which may be the case when the user did it in the
595                     //   fireChannelRead(...) method we should not try to read again to not produce any
596                     //   miss-leading exceptions.
597                     //
598                     // - If the user closes the channel we need to ensure we not try to read from it again as
599                     //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
600                     //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
601                     //   reading data from a filedescriptor that belongs to another socket then the socket that
602                     //   was "wrapped" by this Channel implementation.
603                     break;
604                 }
605             } while (recvAlloc.continueReading(isAutoRead(), maybeMoreData)
606                     && !isShutdown(ChannelShutdownDirection.Inbound));
607 
608             recvAlloc.readComplete();
609             pipeline.fireChannelReadComplete();
610 
611             if (close) {
612                 shutdownInput(false);
613             } else {
614                 readIfIsAutoRead();
615             }
616         } catch (Throwable t) {
617             handleReadException(pipeline, buffer, t, close, recvAlloc);
618         }
619     }
620 
621     private final class EpollSocketWritableByteChannel extends SocketWritableByteChannel {
622         EpollSocketWritableByteChannel() {
623             super(socket);
624         }
625 
626         @Override
627         protected BufferAllocator alloc() {
628             return bufferAllocator();
629         }
630     }
631 
632     @SuppressWarnings("unchecked")
633     @Override
634     protected <T> T getExtendedOption(ChannelOption<T> option) {
635         if (isOptionSupported(socket.protocolFamily(), option)) {
636             if (option == SO_RCVBUF) {
637                 return (T) Integer.valueOf(getReceiveBufferSize());
638             }
639             if (option == SO_SNDBUF) {
640                 return (T) Integer.valueOf(getSendBufferSize());
641             }
642             if (option == TCP_NODELAY) {
643                 return (T) Boolean.valueOf(isTcpNoDelay());
644             }
645             if (option == SO_KEEPALIVE) {
646                 return (T) Boolean.valueOf(isKeepAlive());
647             }
648             if (option == SO_REUSEADDR) {
649                 return (T) Boolean.valueOf(isReuseAddress());
650             }
651             if (option == SO_LINGER) {
652                 return (T) Integer.valueOf(getSoLinger());
653             }
654             if (option == IP_TOS) {
655                 return (T) Integer.valueOf(getTrafficClass());
656             }
657             if (option == EpollChannelOption.TCP_CORK) {
658                 return (T) Boolean.valueOf(isTcpCork());
659             }
660             if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
661                 return (T) Long.valueOf(getTcpNotSentLowAt());
662             }
663             if (option == EpollChannelOption.TCP_KEEPIDLE) {
664                 return (T) Integer.valueOf(getTcpKeepIdle());
665             }
666             if (option == EpollChannelOption.TCP_KEEPINTVL) {
667                 return (T) Integer.valueOf(getTcpKeepIntvl());
668             }
669             if (option == EpollChannelOption.TCP_KEEPCNT) {
670                 return (T) Integer.valueOf(getTcpKeepCnt());
671             }
672             if (option == EpollChannelOption.TCP_USER_TIMEOUT) {
673                 return (T) Integer.valueOf(getTcpUserTimeout());
674             }
675             if (option == EpollChannelOption.TCP_QUICKACK) {
676                 return (T) Boolean.valueOf(isTcpQuickAck());
677             }
678             if (option == EpollChannelOption.IP_TRANSPARENT) {
679                 return (T) Boolean.valueOf(isIpTransparent());
680             }
681             if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
682                 return (T) Boolean.valueOf(isTcpFastOpenConnect());
683             }
684             if (option == EpollChannelOption.SO_BUSY_POLL) {
685                 return (T) Integer.valueOf(getSoBusyPoll());
686             }
687             if (option == DOMAIN_SOCKET_READ_MODE) {
688                 return (T) getReadMode();
689             }
690             if (option == EpollChannelOption.TCP_INFO) {
691                 return (T) getTcpInfo();
692             }
693             if (option == UnixChannelOption.SO_PEERCRED) {
694                 return (T) getPeerCredentials();
695             }
696         }
697         return super.getExtendedOption(option);
698     }
699 
700     @SuppressWarnings("unchecked")
701     @Override
702     protected <T> void setExtendedOption(ChannelOption<T> option, T value) {
703         if (isOptionSupported(socket.protocolFamily(), option)) {
704             if (option == SO_RCVBUF) {
705                 setReceiveBufferSize((Integer) value);
706             } else if (option == SO_SNDBUF) {
707                 setSendBufferSize((Integer) value);
708             } else if (option == TCP_NODELAY) {
709                 setTcpNoDelay((Boolean) value);
710             } else if (option == SO_KEEPALIVE) {
711                 setKeepAlive((Boolean) value);
712             } else if (option == SO_REUSEADDR) {
713                 setReuseAddress((Boolean) value);
714             } else if (option == SO_LINGER) {
715                 setSoLinger((Integer) value);
716             } else if (option == IP_TOS) {
717                 setTrafficClass((Integer) value);
718             } else if (option == EpollChannelOption.TCP_CORK) {
719                 setTcpCork((Boolean) value);
720             } else if (option == EpollChannelOption.TCP_NOTSENT_LOWAT) {
721                 setTcpNotSentLowAt((Long) value);
722             } else if (option == EpollChannelOption.TCP_KEEPIDLE) {
723                 setTcpKeepIdle((Integer) value);
724             } else if (option == EpollChannelOption.TCP_KEEPCNT) {
725                 setTcpKeepCnt((Integer) value);
726             } else if (option == EpollChannelOption.TCP_KEEPINTVL) {
727                 setTcpKeepIntvl((Integer) value);
728             } else if (option == EpollChannelOption.TCP_USER_TIMEOUT) {
729                 setTcpUserTimeout((Integer) value);
730             } else if (option == EpollChannelOption.IP_TRANSPARENT) {
731                 setIpTransparent((Boolean) value);
732             } else if (option == EpollChannelOption.TCP_MD5SIG) {
733                 @SuppressWarnings("unchecked")
734                 final Map<InetAddress, byte[]> m = (Map<InetAddress, byte[]>) value;
735                 setTcpMd5Sig(m);
736             } else if (option == EpollChannelOption.TCP_QUICKACK) {
737                 setTcpQuickAck((Boolean) value);
738             } else if (option == ChannelOption.TCP_FASTOPEN_CONNECT) {
739                 setTcpFastOpenConnect((Boolean) value);
740             } else if (option == EpollChannelOption.SO_BUSY_POLL) {
741                 setSoBusyPoll((Integer) value);
742             } else if (option == DOMAIN_SOCKET_READ_MODE) {
743                 setReadMode((DomainSocketReadMode) value);
744             } else if (option == EpollChannelOption.TCP_INFO) {
745                 throw new UnsupportedOperationException("read-only option: " + option);
746             } else if (option == UnixChannelOption.SO_PEERCRED) {
747                 throw new UnsupportedOperationException("read-only option: " + option);
748             }
749         } else {
750             super.setExtendedOption(option, value);
751         }
752     }
753 
754     private static boolean isOptionSupported(SocketProtocolFamily family, ChannelOption<?> option) {
755         if (family == SocketProtocolFamily.UNIX) {
756             return SUPPORTED_OPTIONS_DOMAIN_SOCKET.contains(option);
757         }
758         return SUPPORTED_OPTIONS.contains(option);
759     }
760 
761     @Override
762     protected boolean isExtendedOptionSupported(ChannelOption<?> option) {
763         return isOptionSupported(socket.protocolFamily(), option) || super.isExtendedOptionSupported(option);
764     }
765 
766     private static Set<ChannelOption<?>> supportedOptions() {
767         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, TCP_NODELAY, SO_KEEPALIVE, SO_REUSEADDR, SO_LINGER,
768                 IP_TOS, EpollChannelOption.TCP_CORK, EpollChannelOption.TCP_KEEPIDLE, EpollChannelOption.TCP_KEEPCNT,
769                 EpollChannelOption.TCP_KEEPINTVL, EpollChannelOption.TCP_USER_TIMEOUT,
770                 EpollChannelOption.IP_TRANSPARENT, EpollChannelOption.TCP_MD5SIG, EpollChannelOption.TCP_QUICKACK,
771                 ChannelOption.TCP_FASTOPEN_CONNECT, EpollChannelOption.SO_BUSY_POLL,
772                 EpollChannelOption.TCP_NOTSENT_LOWAT, EpollChannelOption.TCP_INFO);
773     }
774 
775     private static Set<ChannelOption<?>> supportedOptionsDomainSocket() {
776         return newSupportedIdentityOptionsSet(SO_RCVBUF, SO_SNDBUF, DOMAIN_SOCKET_READ_MODE,
777                 UnixChannelOption.SO_PEERCRED);
778     }
779 
780     private int getReceiveBufferSize() {
781         try {
782             return socket.getReceiveBufferSize();
783         } catch (IOException e) {
784             throw new ChannelException(e);
785         }
786     }
787 
788     private int getSendBufferSize() {
789         try {
790             return socket.getSendBufferSize();
791         } catch (IOException e) {
792             throw new ChannelException(e);
793         }
794     }
795 
796     private int getSoLinger() {
797         try {
798             return socket.getSoLinger();
799         } catch (IOException e) {
800             throw new ChannelException(e);
801         }
802     }
803 
804     private int getTrafficClass() {
805         try {
806             return socket.getTrafficClass();
807         } catch (IOException e) {
808             throw new ChannelException(e);
809         }
810     }
811 
812     private boolean isKeepAlive() {
813         try {
814             return socket.isKeepAlive();
815         } catch (IOException e) {
816             throw new ChannelException(e);
817         }
818     }
819 
820     private boolean isReuseAddress() {
821         try {
822             return socket.isReuseAddress();
823         } catch (IOException e) {
824             throw new ChannelException(e);
825         }
826     }
827 
828     private boolean isTcpNoDelay() {
829         try {
830             return socket.isTcpNoDelay();
831         } catch (IOException e) {
832             throw new ChannelException(e);
833         }
834     }
835 
836     /**
837      * Get the {@code TCP_CORK} option on the socket. See {@code man 7 tcp} for more details.
838      */
839     private boolean isTcpCork() {
840         try {
841             return socket.isTcpCork();
842         } catch (IOException e) {
843             throw new ChannelException(e);
844         }
845     }
846 
847     /**
848      * Get the {@code SO_BUSY_POLL} option on the socket. See {@code man 7 tcp} for more details.
849      */
850     private int getSoBusyPoll() {
851         try {
852             return socket.getSoBusyPoll();
853         } catch (IOException e) {
854             throw new ChannelException(e);
855         }
856     }
857 
858     /**
859      * Get the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details.
860      * @return value is a uint32_t
861      */
862     private long getTcpNotSentLowAt() {
863         try {
864             return socket.getTcpNotSentLowAt();
865         } catch (IOException e) {
866             throw new ChannelException(e);
867         }
868     }
869 
870     /**
871      * Get the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details.
872      */
873     private int getTcpKeepIdle() {
874         try {
875             return socket.getTcpKeepIdle();
876         } catch (IOException e) {
877             throw new ChannelException(e);
878         }
879     }
880 
881     /**
882      * Get the {@code TCP_KEEPINTVL} option on the socket. See {@code man 7 tcp} for more details.
883      */
884     private int getTcpKeepIntvl() {
885         try {
886             return socket.getTcpKeepIntvl();
887         } catch (IOException e) {
888             throw new ChannelException(e);
889         }
890     }
891 
892     /**
893      * Get the {@code TCP_KEEPCNT} option on the socket. See {@code man 7 tcp} for more details.
894      */
895     private int getTcpKeepCnt() {
896         try {
897             return socket.getTcpKeepCnt();
898         } catch (IOException e) {
899             throw new ChannelException(e);
900         }
901     }
902 
903     /**
904      * Get the {@code TCP_USER_TIMEOUT} option on the socket. See {@code man 7 tcp} for more details.
905      */
906     private int getTcpUserTimeout() {
907         try {
908             return socket.getTcpUserTimeout();
909         } catch (IOException e) {
910             throw new ChannelException(e);
911         }
912     }
913 
914     private void setKeepAlive(boolean keepAlive) {
915         try {
916             socket.setKeepAlive(keepAlive);
917         } catch (IOException e) {
918             throw new ChannelException(e);
919         }
920     }
921 
922     private void setReceiveBufferSize(int receiveBufferSize) {
923         try {
924             socket.setReceiveBufferSize(receiveBufferSize);
925         } catch (IOException e) {
926             throw new ChannelException(e);
927         }
928     }
929 
930     private void setReuseAddress(boolean reuseAddress) {
931         try {
932             socket.setReuseAddress(reuseAddress);
933         } catch (IOException e) {
934             throw new ChannelException(e);
935         }
936     }
937 
938     private void setSendBufferSize(int sendBufferSize) {
939         try {
940             socket.setSendBufferSize(sendBufferSize);
941             calculateMaxBytesPerGatheringWrite();
942         } catch (IOException e) {
943             throw new ChannelException(e);
944         }
945     }
946 
947     private void setSoLinger(int soLinger) {
948         try {
949             socket.setSoLinger(soLinger);
950         } catch (IOException e) {
951             throw new ChannelException(e);
952         }
953     }
954 
955     private void setTcpNoDelay(boolean tcpNoDelay) {
956         try {
957             socket.setTcpNoDelay(tcpNoDelay);
958         } catch (IOException e) {
959             throw new ChannelException(e);
960         }
961     }
962 
963     /**
964      * Set the {@code TCP_CORK} option on the socket. See {@code man 7 tcp} for more details.
965      */
966     private void setTcpCork(boolean tcpCork) {
967         try {
968             socket.setTcpCork(tcpCork);
969         } catch (IOException e) {
970             throw new ChannelException(e);
971         }
972     }
973 
974     /**
975      * Set the {@code SO_BUSY_POLL} option on the socket. See {@code man 7 tcp} for more details.
976      */
977     private void setSoBusyPoll(int loopMicros) {
978         try {
979             socket.setSoBusyPoll(loopMicros);
980         } catch (IOException e) {
981             throw new ChannelException(e);
982         }
983     }
984 
985     /**
986      * Set the {@code TCP_NOTSENT_LOWAT} option on the socket. See {@code man 7 tcp} for more details.
987      * @param tcpNotSentLowAt is a uint32_t
988      */
989     private void setTcpNotSentLowAt(long tcpNotSentLowAt) {
990         try {
991             socket.setTcpNotSentLowAt(tcpNotSentLowAt);
992         } catch (IOException e) {
993             throw new ChannelException(e);
994         }
995     }
996 
997     private void setTrafficClass(int trafficClass) {
998         try {
999             socket.setTrafficClass(trafficClass);
1000         } catch (IOException e) {
1001             throw new ChannelException(e);
1002         }
1003     }
1004 
1005     /**
1006      * Set the {@code TCP_KEEPIDLE} option on the socket. See {@code man 7 tcp} for more details.
1007      */
1008     private void setTcpKeepIdle(int seconds) {
1009         try {
1010             socket.setTcpKeepIdle(seconds);
1011         } catch (IOException e) {
1012             throw new ChannelException(e);
1013         }
1014     }
1015 
1016     /**
1017      * Set the {@code TCP_KEEPINTVL} option on the socket. See {@code man 7 tcp} for more details.
1018      */
1019     private void setTcpKeepIntvl(int seconds) {
1020         try {
1021             socket.setTcpKeepIntvl(seconds);
1022         } catch (IOException e) {
1023             throw new ChannelException(e);
1024         }
1025     }
1026 
1027     /**
1028      * Set the {@code TCP_KEEPCNT} option on the socket. See {@code man 7 tcp} for more details.
1029      */
1030     private void setTcpKeepCnt(int probes) {
1031         try {
1032             socket.setTcpKeepCnt(probes);
1033         } catch (IOException e) {
1034             throw new ChannelException(e);
1035         }
1036     }
1037 
1038     /**
1039      * Set the {@code TCP_USER_TIMEOUT} option on the socket. See {@code man 7 tcp} for more details.
1040      */
1041     private void setTcpUserTimeout(int milliseconds) {
1042         try {
1043             socket.setTcpUserTimeout(milliseconds);
1044         } catch (IOException e) {
1045             throw new ChannelException(e);
1046         }
1047     }
1048 
1049     /**
1050      * Returns {@code true} if <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_TRANSPARENT</a> is enabled,
1051      * {@code false} otherwise.
1052      */
1053     public boolean isIpTransparent() {
1054         try {
1055             return socket.isIpTransparent();
1056         } catch (IOException e) {
1057             throw new ChannelException(e);
1058         }
1059     }
1060 
1061     /**
1062      * If {@code true} is used <a href="https://man7.org/linux/man-pages/man7/ip.7.html">IP_TRANSPARENT</a> is enabled,
1063      * {@code false} for disable it. Default is disabled.
1064      */
1065     private void setIpTransparent(boolean transparent) {
1066         try {
1067             socket.setIpTransparent(transparent);
1068         } catch (IOException e) {
1069             throw new ChannelException(e);
1070         }
1071     }
1072 
1073     /**
1074      * Set the {@code TCP_QUICKACK} option on the socket.
1075      * See <a href="https://linux.die.net//man/7/tcp">TCP_QUICKACK</a>
1076      * for more details.
1077      */
1078     private void setTcpQuickAck(boolean quickAck) {
1079         try {
1080             socket.setTcpQuickAck(quickAck);
1081         } catch (IOException e) {
1082             throw new ChannelException(e);
1083         }
1084     }
1085 
1086     /**
1087      * Returns {@code true} if <a href="https://linux.die.net//man/7/tcp">TCP_QUICKACK</a> is enabled,
1088      * {@code false} otherwise.
1089      */
1090     private boolean isTcpQuickAck() {
1091         try {
1092             return socket.isTcpQuickAck();
1093         } catch (IOException e) {
1094             throw new ChannelException(e);
1095         }
1096     }
1097 
1098     private void setReadMode(DomainSocketReadMode mode) {
1099         requireNonNull(mode, "mode");
1100         this.mode = mode;
1101     }
1102 
1103     private DomainSocketReadMode getReadMode() {
1104         return mode;
1105     }
1106 
1107     /**
1108      * Enables client TCP fast open. {@code TCP_FASTOPEN_CONNECT} normally
1109      * requires Linux kernel 4.11 or later, so instead we use the traditional fast open
1110      * client socket mechanics that work with kernel 3.6 and later. See this
1111      * <a href="https://lwn.net/Articles/508865/">LWN article</a> for more info.
1112      */
1113     private void setTcpFastOpenConnect(boolean fastOpenConnect) {
1114         this.tcpFastopen = fastOpenConnect;
1115     }
1116 
1117     /**
1118      * Returns {@code true} if TCP fast open is enabled, {@code false} otherwise.
1119      */
1120     private boolean isTcpFastOpenConnect() {
1121         return tcpFastopen;
1122     }
1123 
1124     private void calculateMaxBytesPerGatheringWrite() {
1125         // Multiply by 2 to give some extra space in case the OS can process write data faster than we can provide.
1126         int newSendBufferSize = getSendBufferSize() << 1;
1127         if (newSendBufferSize > 0) {
1128             setMaxBytesPerGatheringWrite(newSendBufferSize);
1129         }
1130     }
1131 
1132     /**
1133      * Updates and returns the {@code TCP_INFO} for the current socket.
1134      * See <a href="https://linux.die.net//man/7/tcp">man 7 tcp</a>.
1135      */
1136     private EpollTcpInfo getTcpInfo() {
1137         try {
1138             EpollTcpInfo info = new EpollTcpInfo();
1139             socket.getTcpInfo(info);
1140             return info;
1141         } catch (IOException e) {
1142             throw new ChannelException(e);
1143         }
1144     }
1145 
1146     @Override
1147     protected boolean doConnect0(SocketAddress remote) throws Exception {
1148         if (IS_SUPPORTING_TCP_FASTOPEN_CLIENT && socket.protocolFamily() != SocketProtocolFamily.UNIX &&
1149                 isTcpFastOpenConnect()) {
1150             ChannelOutboundBuffer outbound = outboundBuffer();
1151             outbound.addFlush();
1152             Object curr = outbound.current();
1153             if (curr instanceof Buffer) {
1154                 // If no cookie is present, the write fails with EINPROGRESS and this call basically
1155                 // becomes a normal async connect. All writes will be sent normally afterwards.
1156                 final long localFlushedAmount;
1157                 Buffer initialData = (Buffer) curr;
1158                 localFlushedAmount = doWriteOrSendBytes(initialData, remote, true);
1159                 if (localFlushedAmount > 0) {
1160                     // We had a cookie and our fast-open proceeded. Remove written data
1161                     // then continue with normal TCP operation.
1162                     outbound.removeBytes(localFlushedAmount);
1163                     return true;
1164                 }
1165             }
1166         }
1167         return super.doConnect0(remote);
1168     }
1169 
1170     @Override
1171     protected Future<Executor> prepareToClose() {
1172         if (socket.protocolFamily() != SocketProtocolFamily.UNIX) {
1173             try {
1174                 // Check isOpen() first as otherwise it will throw a RuntimeException
1175                 // when call getSoLinger() as the fd is not valid anymore.
1176                 if (isOpen() && getSoLinger() > 0) {
1177                     // We need to cancel this key of the channel so we may not end up in a eventloop spin
1178                     // because we try to read or write until the actual close happens which may be later due
1179                     // SO_LINGER handling.
1180                     // See https://github.com/netty/netty/issues/4449
1181                     executor().deregisterForIo(this).map(v -> GlobalEventExecutor.INSTANCE);
1182                 }
1183             } catch (Throwable ignore) {
1184                 // Ignore the error as the underlying channel may be closed in the meantime and so
1185                 // getSoLinger() may produce an exception. In this case we just return null.
1186                 // See https://github.com/netty/netty/issues/4449
1187             }
1188         }
1189         return null;
1190     }
1191 
1192     /**
1193      * Set the {@code TCP_MD5SIG} option on the socket. See {@code linux/tcp.h} for more details.
1194      * Keys can only be set on, not read to prevent a potential leak, as they are confidential.
1195      * Allowing them being read would mean anyone with access to the channel could get them.
1196      */
1197     private void setTcpMd5Sig(Map<InetAddress, byte[]> keys) {
1198         // Add synchronized as newTcpMp5Sigs might do multiple operations on the socket itself.
1199         synchronized (this) {
1200             try {
1201                 tcpMd5SigAddresses = TcpMd5Util.newTcpMd5Sigs(this, tcpMd5SigAddresses, keys);
1202             } catch (IOException e) {
1203                 throw new ChannelException(e);
1204             }
1205         }
1206     }
1207 
1208     private PeerCredentials getPeerCredentials() {
1209         try {
1210             return socket.getPeerCredentials();
1211         } catch (IOException e) {
1212             throw new ChannelException(e);
1213         }
1214     }
1215 
1216     private void epollInReadFd(RecvBufferAllocator.Handle allocHandle, boolean receivedRdHup) {
1217         final ChannelPipeline pipeline = pipeline();
1218         Predicate<RecvBufferAllocator.Handle> maybeMoreData = maybeMoreData(receivedRdHup);
1219         try {
1220             readLoop: do {
1221                 // lastBytesRead represents the fd. We use lastBytesRead because it must be set so that the
1222                 // EpollRecvBufferAllocatorHandle knows if it should try to read again or not when autoRead is
1223                 // enabled.
1224                 allocHandle.lastBytesRead(socket.recvFd());
1225                 switch(allocHandle.lastBytesRead()) {
1226                     case 0:
1227                         break readLoop;
1228                     case -1:
1229                         closeTransport(newPromise());
1230                         return;
1231                     default:
1232                         allocHandle.incMessagesRead(1);
1233                         readPending = false;
1234                         pipeline.fireChannelRead(new FileDescriptor(allocHandle.lastBytesRead()));
1235                         break;
1236                 }
1237             } while (allocHandle.continueReading(isAutoRead(), maybeMoreData)
1238                     && !isShutdown(ChannelShutdownDirection.Inbound));
1239 
1240             allocHandle.readComplete();
1241             pipeline.fireChannelReadComplete();
1242         } catch (Throwable t) {
1243             allocHandle.readComplete();
1244             pipeline.fireChannelReadComplete();
1245             pipeline.fireChannelExceptionCaught(t);
1246         }
1247     }
1248 
1249     @Override
1250     protected boolean maybeMoreDataToRead(RecvBufferAllocator.Handle handle) {
1251         return handle.lastBytesRead() == handle.attemptedBytesRead();
1252     }
1253 }