View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.kqueue;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelConfig;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelMetadata;
25  import io.netty.channel.ChannelOutboundBuffer;
26  import io.netty.channel.ChannelPipeline;
27  import io.netty.channel.ChannelPromise;
28  import io.netty.channel.DefaultFileRegion;
29  import io.netty.channel.EventLoop;
30  import io.netty.channel.FileRegion;
31  import io.netty.channel.internal.ChannelUtils;
32  import io.netty.channel.socket.DuplexChannel;
33  import io.netty.channel.unix.IovArray;
34  import io.netty.channel.unix.SocketWritableByteChannel;
35  import io.netty.channel.unix.UnixChannelUtil;
36  import io.netty.util.LeakPresenceDetector;
37  import io.netty.util.internal.StringUtil;
38  import io.netty.util.internal.UnstableApi;
39  import io.netty.util.internal.logging.InternalLogger;
40  import io.netty.util.internal.logging.InternalLoggerFactory;
41  
42  import java.io.IOException;
43  import java.net.SocketAddress;
44  import java.nio.ByteBuffer;
45  import java.nio.channels.WritableByteChannel;
46  import java.util.concurrent.Executor;
47  
48  import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
49  import static io.netty.channel.internal.ChannelUtils.WRITE_STATUS_SNDBUF_FULL;
50  import static io.netty.util.internal.StringUtil.className;
51  
52  public abstract class AbstractKQueueStreamChannel extends AbstractKQueueChannel implements DuplexChannel {
53      private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractKQueueStreamChannel.class);
54      private static final ChannelMetadata METADATA = new ChannelMetadata(false, 16);
55      private static final String EXPECTED_TYPES =
56              " (expected: " + StringUtil.simpleClassName(ByteBuf.class) + ", " +
57                      StringUtil.simpleClassName(DefaultFileRegion.class) + ')';
58      private WritableByteChannel byteChannel;
59      private final Runnable flushTask = new Runnable() {
60          @Override
61          public void run() {
62              // Calling flush0 directly to ensure we not try to flush messages that were added via write(...) in the
63              // meantime.
64              ((AbstractKQueueUnsafe) unsafe()).flush0();
65          }
66      };
67  
68      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, boolean active) {
69          super(parent, fd, active);
70      }
71  
72      AbstractKQueueStreamChannel(Channel parent, BsdSocket fd, SocketAddress remote) {
73          super(parent, fd, remote);
74      }
75  
76      AbstractKQueueStreamChannel(BsdSocket fd) {
77          this(null, fd, isSoErrorZero(fd));
78      }
79  
80      @Override
81      protected AbstractKQueueUnsafe newUnsafe() {
82          return new KQueueStreamUnsafe();
83      }
84  
85      @Override
86      public ChannelMetadata metadata() {
87          return METADATA;
88      }
89  
90      /**
91       * Write bytes form the given {@link ByteBuf} to the underlying {@link java.nio.channels.Channel}.
92       * @param in the collection which contains objects to write.
93       * @param buf the {@link ByteBuf} from which the bytes should be written
94       * @return The value that should be decremented from the write quantum which starts at
95       * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
96       * <ul>
97       *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
98       *     is encountered</li>
99       *     <li>1 - if a single call to write data was made to the OS</li>
100      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
101      *     data was accepted</li>
102      * </ul>
103      */
104     private int writeBytes(ChannelOutboundBuffer in, ByteBuf buf) throws Exception {
105         int readableBytes = buf.readableBytes();
106         if (readableBytes == 0) {
107             in.remove();
108             return 0;
109         }
110 
111         if (buf.hasMemoryAddress() || buf.nioBufferCount() == 1) {
112             return doWriteBytes(in, buf);
113         } else {
114             ByteBuffer[] nioBuffers = buf.nioBuffers();
115             return writeBytesMultiple(in, nioBuffers, nioBuffers.length, readableBytes,
116                     config().getMaxBytesPerGatheringWrite());
117         }
118     }
119 
120     private void adjustMaxBytesPerGatheringWrite(long attempted, long written, long oldMaxBytesPerGatheringWrite) {
121         // By default we track the SO_SNDBUF when ever it is explicitly set. However some OSes may dynamically change
122         // SO_SNDBUF (and other characteristics that determine how much data can be written at once) so we should try
123         // make a best effort to adjust as OS behavior changes.
124         if (attempted == written) {
125             if (attempted << 1 > oldMaxBytesPerGatheringWrite) {
126                 config().setMaxBytesPerGatheringWrite(attempted << 1);
127             }
128         } else if (attempted > MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD && written < attempted >>> 1) {
129             config().setMaxBytesPerGatheringWrite(attempted >>> 1);
130         }
131     }
132 
133     /**
134      * Write multiple bytes via {@link IovArray}.
135      * @param in the collection which contains objects to write.
136      * @param array The array which contains the content to write.
137      * @return The value that should be decremented from the write quantum which starts at
138      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
139      * <ul>
140      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
141      *     is encountered</li>
142      *     <li>1 - if a single call to write data was made to the OS</li>
143      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
144      *     no data was accepted</li>
145      * </ul>
146      * @throws IOException If an I/O exception occurs during write.
147      */
148     private int writeBytesMultiple(ChannelOutboundBuffer in, IovArray array) throws IOException {
149         final long expectedWrittenBytes = array.size();
150         assert expectedWrittenBytes != 0;
151         final int cnt = array.count();
152         assert cnt != 0;
153 
154         final long localWrittenBytes = socket.writevAddresses(array.memoryAddress(0), cnt);
155         if (localWrittenBytes > 0) {
156             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, array.maxBytes());
157             in.removeBytes(localWrittenBytes);
158             return 1;
159         }
160         return WRITE_STATUS_SNDBUF_FULL;
161     }
162 
163     /**
164      * Write multiple bytes via {@link ByteBuffer} array.
165      * @param in the collection which contains objects to write.
166      * @param nioBuffers The buffers to write.
167      * @param nioBufferCnt The number of buffers to write.
168      * @param expectedWrittenBytes The number of bytes we expect to write.
169      * @param maxBytesPerGatheringWrite The maximum number of bytes we should attempt to write.
170      * @return The value that should be decremented from the write quantum which starts at
171      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
172      * <ul>
173      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
174      *     is encountered</li>
175      *     <li>1 - if a single call to write data was made to the OS</li>
176      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
177      *     no data was accepted</li>
178      * </ul>
179      * @throws IOException If an I/O exception occurs during write.
180      */
181     private int writeBytesMultiple(
182             ChannelOutboundBuffer in, ByteBuffer[] nioBuffers, int nioBufferCnt, long expectedWrittenBytes,
183             long maxBytesPerGatheringWrite) throws IOException {
184         assert expectedWrittenBytes != 0;
185         if (expectedWrittenBytes > maxBytesPerGatheringWrite) {
186             expectedWrittenBytes = maxBytesPerGatheringWrite;
187         }
188 
189         final long localWrittenBytes = socket.writev(nioBuffers, 0, nioBufferCnt, expectedWrittenBytes);
190         if (localWrittenBytes > 0) {
191             adjustMaxBytesPerGatheringWrite(expectedWrittenBytes, localWrittenBytes, maxBytesPerGatheringWrite);
192             in.removeBytes(localWrittenBytes);
193             return 1;
194         }
195         return WRITE_STATUS_SNDBUF_FULL;
196     }
197 
198     /**
199      * Write a {@link DefaultFileRegion}
200      * @param in the collection which contains objects to write.
201      * @param region the {@link DefaultFileRegion} from which the bytes should be written
202      * @return The value that should be decremented from the write quantum which starts at
203      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
204      * <ul>
205      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
206      *     is encountered</li>
207      *     <li>1 - if a single call to write data was made to the OS</li>
208      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but
209      *     no data was accepted</li>
210      * </ul>
211      */
212     private int writeDefaultFileRegion(ChannelOutboundBuffer in, DefaultFileRegion region) throws Exception {
213         final long regionCount = region.count();
214         final long offset = region.transferred();
215 
216         if (offset >= regionCount) {
217             in.remove();
218             return 0;
219         }
220 
221         final long flushedAmount = socket.sendFile(region, region.position(), offset, regionCount - offset);
222         if (flushedAmount > 0) {
223             in.progress(flushedAmount);
224             if (region.transferred() >= regionCount) {
225                 in.remove();
226             }
227             return 1;
228         } else if (flushedAmount == 0) {
229             validateFileRegion(region, offset);
230         }
231         return WRITE_STATUS_SNDBUF_FULL;
232     }
233 
234     /**
235      * Write a {@link FileRegion}
236      * @param in the collection which contains objects to write.
237      * @param region the {@link FileRegion} from which the bytes should be written
238      * @return The value that should be decremented from the write quantum which starts at
239      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
240      * <ul>
241      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
242      *     is encountered</li>
243      *     <li>1 - if a single call to write data was made to the OS</li>
244      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
245      *     data was accepted</li>
246      * </ul>
247      */
248     private int writeFileRegion(ChannelOutboundBuffer in, FileRegion region) throws Exception {
249         if (region.transferred() >= region.count()) {
250             in.remove();
251             return 0;
252         }
253 
254         if (byteChannel == null) {
255             byteChannel = new KQueueSocketWritableByteChannel();
256         }
257         final long flushedAmount = region.transferTo(byteChannel, region.transferred());
258         if (flushedAmount > 0) {
259             in.progress(flushedAmount);
260             if (region.transferred() >= region.count()) {
261                 in.remove();
262             }
263             return 1;
264         }
265         return WRITE_STATUS_SNDBUF_FULL;
266     }
267 
268     @Override
269     protected void doWrite(ChannelOutboundBuffer in) throws Exception {
270         int writeSpinCount = config().getWriteSpinCount();
271         do {
272             final int msgCount = in.size();
273             // Do gathering write if the outbound buffer entries start with more than one ByteBuf.
274             if (msgCount > 1 && in.current() instanceof ByteBuf) {
275                 writeSpinCount -= doWriteMultiple(in);
276             } else if (msgCount == 0) {
277                 // Wrote all messages.
278                 writeFilter(false);
279                 // Return here so we don't set the WRITE flag.
280                 return;
281             } else { // msgCount == 1
282                 writeSpinCount -= doWriteSingle(in);
283             }
284 
285             // We do not break the loop here even if the outbound buffer was flushed completely,
286             // because a user might have triggered another write and flush when we notify his or her
287             // listeners.
288         } while (writeSpinCount > 0);
289 
290         if (writeSpinCount == 0) {
291             // It is possible that we have set the write filter, woken up by KQUEUE because the socket is writable, and
292             // then use our write quantum. In this case we no longer want to set the write filter because the socket is
293             // still writable (as far as we know). We will find out next time we attempt to write if the socket is
294             // writable and set the write filter if necessary.
295             writeFilter(false);
296 
297             // We used our writeSpin quantum, and should try to write again later.
298             eventLoop().execute(flushTask);
299         } else {
300             // Underlying descriptor can not accept all data currently, so set the WRITE flag to be woken up
301             // when it can accept more data.
302             writeFilter(true);
303         }
304     }
305 
306     /**
307      * Attempt to write a single object.
308      * @param in the collection which contains objects to write.
309      * @return The value that should be decremented from the write quantum which starts at
310      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
311      * <ul>
312      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
313      *     is encountered</li>
314      *     <li>1 - if a single call to write data was made to the OS</li>
315      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
316      *     data was accepted</li>
317      * </ul>
318      * @throws Exception If an I/O error occurs.
319      */
320     protected int doWriteSingle(ChannelOutboundBuffer in) throws Exception {
321         // The outbound buffer contains only one message or it contains a file region.
322         Object msg = in.current();
323         if (msg instanceof ByteBuf) {
324             return writeBytes(in, (ByteBuf) msg);
325         } else if (msg instanceof DefaultFileRegion) {
326             return writeDefaultFileRegion(in, (DefaultFileRegion) msg);
327         } else if (msg instanceof FileRegion) {
328             return writeFileRegion(in, (FileRegion) msg);
329         } else {
330             // Should never reach here.
331             throw new Error("Unexpected message type: " + className(msg));
332         }
333     }
334 
335     /**
336      * Attempt to write multiple {@link ByteBuf} objects.
337      * @param in the collection which contains objects to write.
338      * @return The value that should be decremented from the write quantum which starts at
339      * {@link ChannelConfig#getWriteSpinCount()}. The typical use cases are as follows:
340      * <ul>
341      *     <li>0 - if no write was attempted. This is appropriate if an empty {@link ByteBuf} (or other empty content)
342      *     is encountered</li>
343      *     <li>1 - if a single call to write data was made to the OS</li>
344      *     <li>{@link ChannelUtils#WRITE_STATUS_SNDBUF_FULL} - if an attempt to write data was made to the OS, but no
345      *     data was accepted</li>
346      * </ul>
347      * @throws Exception If an I/O error occurs.
348      */
349     private int doWriteMultiple(ChannelOutboundBuffer in) throws Exception {
350         final long maxBytesPerGatheringWrite = config().getMaxBytesPerGatheringWrite();
351         IovArray array = ((NativeArrays) registration().attachment()).cleanIovArray();
352         array.maxBytes(maxBytesPerGatheringWrite);
353         in.forEachFlushedMessage(array);
354 
355         if (array.count() >= 1) {
356             // TODO: Handle the case where cnt == 1 specially.
357             return writeBytesMultiple(in, array);
358         }
359         // cnt == 0, which means the outbound buffer contained empty buffers only.
360         in.removeBytes(0);
361         return 0;
362     }
363 
364     @Override
365     protected Object filterOutboundMessage(Object msg) {
366         if (msg instanceof ByteBuf) {
367             ByteBuf buf = (ByteBuf) msg;
368             return UnixChannelUtil.isBufferCopyNeededForWrite(buf)? newDirectBuffer(buf) : buf;
369         }
370 
371         if (msg instanceof FileRegion) {
372             return msg;
373         }
374 
375         throw new UnsupportedOperationException(
376                 "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
377     }
378 
379     @UnstableApi
380     @Override
381     protected final void doShutdownOutput() throws Exception {
382         socket.shutdown(false, true);
383     }
384 
385     @Override
386     public boolean isOutputShutdown() {
387         return socket.isOutputShutdown();
388     }
389 
390     @Override
391     public boolean isInputShutdown() {
392         return socket.isInputShutdown();
393     }
394 
395     @Override
396     public boolean isShutdown() {
397         return socket.isShutdown();
398     }
399 
400     @Override
401     public ChannelFuture shutdownOutput() {
402         return shutdownOutput(newPromise());
403     }
404 
405     @Override
406     public ChannelFuture shutdownOutput(final ChannelPromise promise) {
407         EventLoop loop = eventLoop();
408         if (loop.inEventLoop()) {
409             ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
410         } else {
411             loop.execute(new Runnable() {
412                 @Override
413                 public void run() {
414                     ((AbstractUnsafe) unsafe()).shutdownOutput(promise);
415                 }
416             });
417         }
418         return promise;
419     }
420 
421     @Override
422     public ChannelFuture shutdownInput() {
423         return shutdownInput(newPromise());
424     }
425 
426     @Override
427     public ChannelFuture shutdownInput(final ChannelPromise promise) {
428         EventLoop loop = eventLoop();
429         if (loop.inEventLoop()) {
430             shutdownInput0(promise);
431         } else {
432             loop.execute(new Runnable() {
433                 @Override
434                 public void run() {
435                     shutdownInput0(promise);
436                 }
437             });
438         }
439         return promise;
440     }
441 
442     private void shutdownInput0(ChannelPromise promise) {
443         try {
444             socket.shutdown(true, false);
445         } catch (Throwable cause) {
446             promise.setFailure(cause);
447             return;
448         }
449         promise.setSuccess();
450     }
451 
452     @Override
453     public ChannelFuture shutdown() {
454         return shutdown(newPromise());
455     }
456 
457     @Override
458     public ChannelFuture shutdown(final ChannelPromise promise) {
459         ChannelFuture shutdownOutputFuture = shutdownOutput();
460         if (shutdownOutputFuture.isDone()) {
461             shutdownOutputDone(shutdownOutputFuture, promise);
462         } else {
463             shutdownOutputFuture.addListener(new ChannelFutureListener() {
464                 @Override
465                 public void operationComplete(final ChannelFuture shutdownOutputFuture) throws Exception {
466                     shutdownOutputDone(shutdownOutputFuture, promise);
467                 }
468             });
469         }
470         return promise;
471     }
472 
473     private void shutdownOutputDone(final ChannelFuture shutdownOutputFuture, final ChannelPromise promise) {
474         ChannelFuture shutdownInputFuture = shutdownInput();
475         if (shutdownInputFuture.isDone()) {
476             shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
477         } else {
478             shutdownInputFuture.addListener(new ChannelFutureListener() {
479                 @Override
480                 public void operationComplete(ChannelFuture shutdownInputFuture) throws Exception {
481                     shutdownDone(shutdownOutputFuture, shutdownInputFuture, promise);
482                 }
483             });
484         }
485     }
486 
487     private static void shutdownDone(ChannelFuture shutdownOutputFuture,
488                                      ChannelFuture shutdownInputFuture,
489                                      ChannelPromise promise) {
490         Throwable shutdownOutputCause = shutdownOutputFuture.cause();
491         Throwable shutdownInputCause = shutdownInputFuture.cause();
492         if (shutdownOutputCause != null) {
493             if (shutdownInputCause != null) {
494                 logger.debug("Exception suppressed because a previous exception occurred.",
495                         shutdownInputCause);
496             }
497             promise.setFailure(shutdownOutputCause);
498         } else if (shutdownInputCause != null) {
499             promise.setFailure(shutdownInputCause);
500         } else {
501             promise.setSuccess();
502         }
503     }
504 
505     class KQueueStreamUnsafe extends AbstractKQueueUnsafe {
506         // Overridden here just to be able to access this method from AbstractKQueueStreamChannel
507         @Override
508         protected Executor prepareToClose() {
509             return super.prepareToClose();
510         }
511 
512         @Override
513         void readReady(final KQueueRecvByteAllocatorHandle allocHandle) {
514             final ChannelConfig config = config();
515             if (shouldBreakReadReady(config)) {
516                 clearReadFilter0();
517                 return;
518             }
519             final ChannelPipeline pipeline = pipeline();
520             final ByteBufAllocator allocator = config.getAllocator();
521             allocHandle.reset(config);
522 
523             ByteBuf byteBuf = null;
524             boolean close = false;
525             try {
526                 do {
527                     // we use a direct buffer here as the native implementations only be able
528                     // to handle direct buffers.
529                     byteBuf = allocHandle.allocate(allocator);
530                     allocHandle.lastBytesRead(doReadBytes(byteBuf));
531                     if (allocHandle.lastBytesRead() <= 0) {
532                         // nothing was read, release the buffer.
533                         byteBuf.release();
534                         byteBuf = null;
535                         close = allocHandle.lastBytesRead() < 0;
536                         if (close) {
537                             // There is nothing left to read as we received an EOF.
538                             readPending = false;
539                         }
540                         break;
541                     }
542                     allocHandle.incMessagesRead(1);
543                     readPending = false;
544                     pipeline.fireChannelRead(byteBuf);
545                     byteBuf = null;
546 
547                     if (shouldBreakReadReady(config)) {
548                         // We need to do this for two reasons:
549                         //
550                         // - If the input was shutdown in between (which may be the case when the user did it in the
551                         //   fireChannelRead(...) method we should not try to read again to not produce any
552                         //   miss-leading exceptions.
553                         //
554                         // - If the user closes the channel we need to ensure we not try to read from it again as
555                         //   the filedescriptor may be re-used already by the OS if the system is handling a lot of
556                         //   concurrent connections and so needs a lot of filedescriptors. If not do this we risk
557                         //   reading data from a filedescriptor that belongs to another socket then the socket that
558                         //   was "wrapped" by this Channel implementation.
559                         break;
560                     }
561                 } while (allocHandle.continueReading());
562 
563                 allocHandle.readComplete();
564                 pipeline.fireChannelReadComplete();
565 
566                 if (close) {
567                     shutdownInput(false);
568                 }
569             } catch (Throwable t) {
570                 handleReadException(pipeline, byteBuf, t, close, allocHandle);
571             } finally {
572                 if (shouldStopReading(config)) {
573                     clearReadFilter0();
574                 }
575             }
576         }
577 
578         private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
579                                          KQueueRecvByteAllocatorHandle allocHandle) {
580             if (byteBuf != null) {
581                 if (byteBuf.isReadable()) {
582                     readPending = false;
583                     pipeline.fireChannelRead(byteBuf);
584                 } else {
585                     byteBuf.release();
586                 }
587             }
588             if (!failConnectPromise(cause)) {
589                 allocHandle.readComplete();
590                 pipeline.fireChannelReadComplete();
591                 pipeline.fireExceptionCaught(cause);
592 
593                 // If oom will close the read event, release connection.
594                 // See https://github.com/netty/netty/issues/10434
595                 if (close ||
596                         cause instanceof OutOfMemoryError ||
597                         cause instanceof LeakPresenceDetector.AllocationProhibitedException ||
598                         cause instanceof IOException) {
599                     shutdownInput(false);
600                 }
601             }
602         }
603     }
604 
605     private final class KQueueSocketWritableByteChannel extends SocketWritableByteChannel {
606         KQueueSocketWritableByteChannel() {
607             super(socket);
608         }
609 
610         @Override
611         protected ByteBufAllocator alloc() {
612             return AbstractKQueueStreamChannel.this.alloc();
613         }
614     }
615 }