View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.internal.ChannelUtils;
32  import io.netty.channel.socket.DuplexChannel;
33  import io.netty.channel.unix.IovArray;
34  import io.netty.channel.unix.SocketWritableByteChannel;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.internal.PlatformDependent;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.UnstableApi;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.SocketAddress;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.WritableByteChannel;
46  import java.util.concurrent.Executor;
47  
48  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
49  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
50  
51  @UnstableApi
52  public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
54      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
55      private static final String EXPECTED_TYPES =
56              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
57                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
58      private WritableByteChannel byteChannel;
59      private final Runnable flushTask = new Runnable() {
60          @Override
61          public void run() {
62              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
63              // meantime.
64              ((AbstractKQueueUnsafe) unsafe()).flush0();
65          }
66      };
67  
68      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
69          super(parent, fd, active);
70      }
71  
72      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
73          super(parent, fd, remote);
74      }
75  
76      AbstractKQueueStreamChannel(BsdSocket fd) {
77          this(null, fd, isSoErrorZero(fd));
78      }
79  
80      @Override
81      protected AbstractKQueueUnsafe newUnsafe() {
82          return new KQueueStreamUnsafe();
83      }
84  
85      @Override
86      public ChannelMetadata metadata() {
87          return METADATA;
88      }
89  
90      /**
91       * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
92       * @param in the collection which contains objects to write.
93       * @param buf the {@link ByteBuf} from which the bytes should be written
94       * @return The value that should be decremented from the write quantum which starts at
95       * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
96       * <ul>
97       *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
98       *     is encountered</li>
99       *     <li>1 - if a single call to write data was made to the OS</li>
100      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
101      *     data was accepted</li>
102      * </ul>
103      */
104     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
105         int readableBytes = buf.readableBytes();
106         if (readableBytes == 0) {
107             in.remove();
108             return 0;
109         }
110 
111         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
112             return doWriteBytes(in, buf);
113         } else {
114             ByteBuffer[] nioBuffers = buf.nioBuffers();
115             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
116                     config().getMaxBytesPerGatheringWrite());
117         }
118     }
119 
120     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
121         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
122         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
123         // make a best effort to adjust as OS behavior changes.
124         if (attempted == written) {
125             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
126                 config().setMaxBytesPerGatheringWrite(attempted << 1);
127             }
128         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
129             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
130         }
131     }
132 
133     /**
134      * Write multiple bytes via {@link IovArray}.
135      * @param in the collection which contains objects to write.
136      * @param array The array which contains the content to write.
137      * @return The value that should be decremented from the write quantum which starts at
138      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
139      * <ul>
140      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
141      *     is encountered</li>
142      *     <li>1 - if a single call to write data was made to the OS</li>
143      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
144      *     no data was accepted</li>
145      * </ul>
146      * @throws IOException If an I/O exception occurs during write.
147      */
148     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
149         final long expectedWrittenBytes = array.size();
150         assert expectedWrittenBytes != 0;
151         final int cnt = array.count();
152         assert cnt != 0;
153 
154         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
155         if (localWrittenBytes > 0) {
156             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
157             in.removeBytes(localWrittenBytes);
158             return 1;
159         }
160         return WRITE_STATUS_SNDBUF_FULL;
161     }
162 
163     /**
164      * Write multiple bytes via {@link ByteBuffer} array.
165      * @param in the collection which contains objects to write.
166      * @param nioBuffers The buffers to write.
167      * @param nioBufferCnt The number of buffers to write.
168      * @param expectedWrittenBytes The number of bytes we expect to write.
169      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
170      * @return The value that should be decremented from the write quantum which starts at
171      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
172      * <ul>
173      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
174      *     is encountered</li>
175      *     <li>1 - if a single call to write data was made to the OS</li>
176      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
177      *     no data was accepted</li>
178      * </ul>
179      * @throws IOException If an I/O exception occurs during write.
180      */
181     private int writeBytesMultiple(
182             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
183             long maxBytesPerGatheringWrite) throws IOException {
184         assert expectedWrittenBytes != 0;
185         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
186             expectedWrittenBytes = maxBytesPerGatheringWrite;
187         }
188 
189         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
190         if (localWrittenBytes > 0) {
191             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
192             in.removeBytes(localWrittenBytes);
193             return 1;
194         }
195         return WRITE_STATUS_SNDBUF_FULL;
196     }
197 
198     /**
199      * Write a {@link DefaultFileRegion}
200      * @param in the collection which contains objects to write.
201      * @param region the {@link DefaultFileRegion} from which the bytes should be written
202      * @return The value that should be decremented from the write quantum which starts at
203      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
204      * <ul>
205      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
206      *     is encountered</li>
207      *     <li>1 - if a single call to write data was made to the OS</li>
208      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
209      *     no data was accepted</li>
210      * </ul>
211      */
212     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
213         final long regionCount = region.count();
214         if (region.transferred() >= regionCount) {
215             in.remove();
216             return 0;
217         }
218 
219         final long offset = region.transferred();
220         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
221         if (flushedAmount > 0) {
222             in.progress(flushedAmount);
223             if (region.transferred() >= regionCount) {
224                 in.remove();
225             }
226             return 1;
227         }
228         return WRITE_STATUS_SNDBUF_FULL;
229     }
230 
231     /**
232      * Write a {@link FileRegion}
233      * @param in the collection which contains objects to write.
234      * @param region the {@link FileRegion} from which the bytes should be written
235      * @return The value that should be decremented from the write quantum which starts at
236      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
237      * <ul>
238      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
239      *     is encountered</li>
240      *     <li>1 - if a single call to write data was made to the OS</li>
241      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
242      *     data was accepted</li>
243      * </ul>
244      */
245     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
246         if (region.transferred() >= region.count()) {
247             in.remove();
248             return 0;
249         }
250 
251         if (byteChannel == null) {
252             byteChannel = new KQueueSocketWritableByteChannel();
253         }
254         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
255         if (flushedAmount > 0) {
256             in.progress(flushedAmount);
257             if (region.transferred() >= region.count()) {
258                 in.remove();
259             }
260             return 1;
261         }
262         return WRITE_STATUS_SNDBUF_FULL;
263     }
264 
265     @Override
266     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
267         int writeSpinCount = config().getWriteSpinCount();
268         do {
269             final int msgCount = in.size();
270             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
271             if (msgCount > 1 && in.current() instanceof ByteBuf) {
272                 writeSpinCount -= doWriteMultiple(in);
273             } else if (msgCount == 0) {
274                 // Wrote all messages.
275                 writeFilter(false);
276                 // Return here so we don't set the WRITE flag.
277                 return;
278             } else { // msgCount == 1
279                 writeSpinCount -= doWriteSingle(in);
280             }
281 
282             // We do not break the loop here even if the outbound buffer was flushed completely,
283             // because a user might have triggered another write and flush when we notify his or her
284             // listeners.
285         } while (writeSpinCount > 0);
286 
287         if (writeSpinCount == 0) {
288             // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
289             // then use our write quantum. In this case we no longer want to set the write filter because the socket is
290             // still writable (as far as we know). We will find out next time we attempt to write if the socket is
291             // writable and set the write filter if necessary.
292             writeFilter(false);
293 
294             // We used our writeSpin quantum, and should try to write again later.
295             eventLoop().execute(flushTask);
296         } else {
297             // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
298             // when it can accept more data.
299             writeFilter(true);
300         }
301     }
302 
303     /**
304      * Attempt to write a single object.
305      * @param in the collection which contains objects to write.
306      * @return The value that should be decremented from the write quantum which starts at
307      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
308      * <ul>
309      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
310      *     is encountered</li>
311      *     <li>1 - if a single call to write data was made to the OS</li>
312      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
313      *     data was accepted</li>
314      * </ul>
315      * @throws Exception If an I/O error occurs.
316      */
317     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
318         // The outbound buffer contains only one message or it contains a file region.
319         Object msg = in.current();
320         if (msg instanceof ByteBuf) {
321             return writeBytes(in, (ByteBuf) msg);
322         } else if (msg instanceof DefaultFileRegion) {
323             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
324         } else if (msg instanceof FileRegion) {
325             return writeFileRegion(in, (FileRegion) msg);
326         } else {
327             // Should never reach here.
328             throw new Error();
329         }
330     }
331 
332     /**
333      * Attempt to write multiple {@link ByteBuf} objects.
334      * @param in the collection which contains objects to write.
335      * @return The value that should be decremented from the write quantum which starts at
336      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
337      * <ul>
338      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
339      *     is encountered</li>
340      *     <li>1 - if a single call to write data was made to the OS</li>
341      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
342      *     data was accepted</li>
343      * </ul>
344      * @throws Exception If an I/O error occurs.
345      */
346     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
347         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
348         if (PlatformDependent.hasUnsafe()) {
349             IovArray array = ((KQueueEventLoop) eventLoop()).cleanArray();
350             array.maxBytes(maxBytesPerGatheringWrite);
351             in.forEachFlushedMessage(array);
352 
353             if (array.count() >= 1) {
354                 // TODO: Handle the case where cnt == 1 specially.
355                 return writeBytesMultiple(in, array);
356             }
357         } else {
358             ByteBuffer[] buffers = in.nioBuffers();
359             int cnt = in.nioBufferCount();
360             if (cnt >= 1) {
361                 // TODO: Handle the case where cnt == 1 specially.
362                 return writeBytesMultiple(in, buffers, cnt, in.nioBufferSize(), maxBytesPerGatheringWrite);
363             }
364         }
365         // cnt == 0, which means the outbound buffer contained empty buffers only.
366         in.removeBytes(0);
367         return 0;
368     }
369 
370     @Override
371     protected Object filterOutboundMessage(Object msg) {
372         if (msg instanceof ByteBuf) {
373             ByteBuf buf = (ByteBuf) msg;
374             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
375         }
376 
377         if (msg instanceof FileRegion) {
378             return msg;
379         }
380 
381         throw new UnsupportedOperationException(
382                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
383     }
384 
385     @UnstableApi
386     @Override
387     protected final void doShutdownOutput() throws Exception {
388         socket.shutdown(false, true);
389     }
390 
391     @Override
392     public boolean isOutputShutdown() {
393         return socket.isOutputShutdown();
394     }
395 
396     @Override
397     public boolean isInputShutdown() {
398         return socket.isInputShutdown();
399     }
400 
401     @Override
402     public boolean isShutdown() {
403         return socket.isShutdown();
404     }
405 
406     @Override
407     public ChannelFuture shutdownOutput() {
408         return shutdownOutput(newPromise());
409     }
410 
411     @Override
412     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
413         EventLoop loop = eventLoop();
414         if (loop.inEventLoop()) {
415             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
416         } else {
417             loop.execute(new Runnable() {
418                 @Override
419                 public void run() {
420                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
421                 }
422             });
423         }
424         return promise;
425     }
426 
427     @Override
428     public ChannelFuture shutdownInput() {
429         return shutdownInput(newPromise());
430     }
431 
432     @Override
433     public ChannelFuture shutdownInput(final ChannelPromise promise) {
434         EventLoop loop = eventLoop();
435         if (loop.inEventLoop()) {
436             shutdownInput0(promise);
437         } else {
438             loop.execute(new Runnable() {
439                 @Override
440                 public void run() {
441                     shutdownInput0(promise);
442                 }
443             });
444         }
445         return promise;
446     }
447 
448     private void shutdownInput0(ChannelPromise promise) {
449         try {
450             socket.shutdown(true, false);
451         } catch (Throwable cause) {
452             promise.setFailure(cause);
453             return;
454         }
455         promise.setSuccess();
456     }
457 
458     @Override
459     public ChannelFuture shutdown() {
460         return shutdown(newPromise());
461     }
462 
463     @Override
464     public ChannelFuture shutdown(final ChannelPromise promise) {
465         ChannelFuture shutdownOutputFuture = shutdownOutput();
466         if (shutdownOutputFuture.isDone()) {
467             shutdownOutputDone(shutdownOutputFuture, promise);
468         } else {
469             shutdownOutputFuture.addListener(new ChannelFutureListener() {
470                 @Override
471                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
472                     shutdownOutputDone(shutdownOutputFuture, promise);
473                 }
474             });
475         }
476         return promise;
477     }
478 
479     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
480         ChannelFuture shutdownInputFuture = shutdownInput();
481         if (shutdownInputFuture.isDone()) {
482             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
483         } else {
484             shutdownInputFuture.addListener(new ChannelFutureListener() {
485                 @Override
486                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
487                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
488                 }
489             });
490         }
491     }
492 
493     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
494                                      ChannelFuture shutdownInputFuture,
495                                      ChannelPromise promise) {
496         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
497         Throwable shutdownInputCause = shutdownInputFuture.cause();
498         if (shutdownOutputCause != null) {
499             if (shutdownInputCause != null) {
500                 logger.debug("Exception suppressed because a previous exception occurred.",
501                         shutdownInputCause);
502             }
503             promise.setFailure(shutdownOutputCause);
504         } else if (shutdownInputCause != null) {
505             promise.setFailure(shutdownInputCause);
506         } else {
507             promise.setSuccess();
508         }
509     }
510 
511     class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
512         // Overridden here just to be able to access this method from AbstractKQueueStreamChannel
513         @Override
514         protected Executor prepareToClose() {
515             return super.prepareToClose();
516         }
517 
518         @Override
519         void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
520             final ChannelConfig config = config();
521             if (shouldBreakReadReady(config)) {
522                 clearReadFilter0();
523                 return;
524             }
525             final ChannelPipeline pipeline = pipeline();
526             final ByteBufAllocator allocator = config.getAllocator();
527             allocHandle.reset(config);
528             readReadyBefore();
529 
530             ByteBuf byteBuf = null;
531             boolean close = false;
532             try {
533                 do {
534                     // we use a direct buffer here as the native implementations only be able
535                     // to handle direct buffers.
536                     byteBuf = allocHandle.allocate(allocator);
537                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
538                     if (allocHandle.lastBytesRead() <= 0) {
539                         // nothing was read, release the buffer.
540                         byteBuf.release();
541                         byteBuf = null;
542                         close = allocHandle.lastBytesRead() < 0;
543                         if (close) {
544                             // There is nothing left to read as we received an EOF.
545                             readPending = false;
546                         }
547                         break;
548                     }
549                     allocHandle.incMessagesRead(1);
550                     readPending = false;
551                     pipeline.fireChannelRead(byteBuf);
552                     byteBuf = null;
553 
554                     if (shouldBreakReadReady(config)) {
555                         // We need to do this for two reasons:
556                         //
557                         // - If the input was shutdown in between (which may be the case when the user did it in the
558                         //   fireChannelRead(...) method we should not try to read again to not produce any
559                         //   miss-leading exceptions.
560                         //
561                         // - If the user closes the channel we need to ensure we not try to read from it again as
562                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
563                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
564                         //   reading data from a filedescriptor that belongs to another socket then the socket that
565                         //   was "wrapped" by this Channel implementation.
566                         break;
567                     }
568                 } while (allocHandle.continueReading());
569 
570                 allocHandle.readComplete();
571                 pipeline.fireChannelReadComplete();
572 
573                 if (close) {
574                     shutdownInput(false);
575                 }
576             } catch (Throwable t) {
577                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
578             } finally {
579                 readReadyFinally(config);
580             }
581         }
582 
583         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
584                                          KQueueRecvByteAllocatorHandle allocHandle) {
585             if (byteBuf != null) {
586                 if (byteBuf.isReadable()) {
587                     readPending = false;
588                     pipeline.fireChannelRead(byteBuf);
589                 } else {
590                     byteBuf.release();
591                 }
592             }
593             if (!failConnectPromise(cause)) {
594                 allocHandle.readComplete();
595                 pipeline.fireChannelReadComplete();
596                 pipeline.fireExceptionCaught(cause);
597                 if (close || cause instanceof IOException) {
598                     shutdownInput(false);
599                 }
600             }
601         }
602     }
603 
604     private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
605         KQueueSocketWritableByteChannel() {
606             super(socket);
607         }
608 
609         @Override
610         protected ByteBufAllocator alloc() {
611             return AbstractKQueueStreamChannel.this.alloc();
612         }
613     }
614 }