View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelConfig;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelInboundHandlerAdapter;
28  import io.netty.channel.ChannelInitializer;
29  import io.netty.channel.ChannelOption;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.SimpleChannelInboundHandler;
32  import io.netty.channel.nio.NioEventLoopGroup;
33  import io.netty.channel.oio.OioEventLoopGroup;
34  import io.netty.channel.socket.ChannelInputShutdownEvent;
35  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36  import io.netty.channel.socket.ChannelOutputShutdownEvent;
37  import io.netty.channel.socket.DuplexChannel;
38  import io.netty.channel.socket.SocketChannel;
39  import io.netty.util.ReferenceCountUtil;
40  import io.netty.util.UncheckedBooleanSupplier;
41  import io.netty.util.internal.PlatformDependent;
42  import org.junit.jupiter.api.Test;
43  import org.junit.jupiter.api.TestInfo;
44  import org.junit.jupiter.api.Timeout;
45  
46  import java.util.concurrent.CountDownLatch;
47  import java.util.concurrent.TimeUnit;
48  import java.util.concurrent.atomic.AtomicInteger;
49  import java.util.concurrent.atomic.AtomicReference;
50  
51  import static java.util.concurrent.TimeUnit.MILLISECONDS;
52  import static org.junit.jupiter.api.Assertions.assertEquals;
53  import static org.junit.jupiter.api.Assertions.assertNull;
54  import static org.junit.jupiter.api.Assertions.assertTrue;
55  import static org.junit.jupiter.api.Assumptions.assumeFalse;
56  
57  @Timeout(value = 20000, unit = MILLISECONDS)
58  public class SocketHalfClosedTest extends AbstractSocketTest {
59  
60      protected int maxReadCompleteWithNoDataAfterInputShutdown() {
61          return 2; // nio needs read flag to detect full closure.
62      }
63  
64      @Test
65      public void testAllDataReadEventTriggeredAfterHalfClosure(TestInfo testInfo) throws Throwable {
66          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
67              @Override
68              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
69                  if (bootstrap.config().group() instanceof OioEventLoopGroup) {
70                      logger.debug("Ignoring test for incompatible OIO event system");
71                      return;
72                  } else if (bootstrap.config().group() instanceof NioEventLoopGroup) {
73                      logger.debug("Ignoring test for incompatible NioHandler");
74                      return;
75                  }
76                  allDataReadEventTriggeredAfterHalfClosure(serverBootstrap, bootstrap);
77              }
78          });
79      }
80  
81      private void allDataReadEventTriggeredAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
82          final int totalServerBytesWritten = 1;
83          final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
84          final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
85          final CountDownLatch clientHalfClosedAllBytesRead = new CountDownLatch(1);
86          final AtomicInteger clientReadCompletes = new AtomicInteger();
87          final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
88          Channel serverChannel = null;
89          Channel clientChannel = null;
90          final AtomicReference<Channel> serverChildChannel = new AtomicReference<Channel>();
91          try {
92              cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
93                      .option(ChannelOption.AUTO_CLOSE, false)
94                      .option(ChannelOption.AUTO_READ, false);
95  
96              sb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
97                      .option(ChannelOption.AUTO_CLOSE, false)
98                      .childOption(ChannelOption.TCP_NODELAY, true);
99  
100             sb.childHandler(new ChannelInitializer<Channel>() {
101                 @Override
102                 protected void initChannel(Channel ch) throws Exception {
103                     serverChildChannel.set(ch);
104                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
105                         @Override
106                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
107                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
108                             buf.writerIndex(buf.capacity());
109                             ctx.writeAndFlush(buf);
110                         }
111 
112                         @Override
113                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
114                             ctx.close();
115                         }
116                     });
117                 }
118             });
119 
120             // client.
121             cb.handler(new ChannelInitializer<Channel>() {
122                 @Override
123                 protected void initChannel(final Channel ch) {
124                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
125                         private int bytesRead;
126                         private int bytesSinceReadComplete;
127 
128                         @Override
129                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
130                             ByteBuf buf = (ByteBuf) msg;
131                             bytesRead += buf.readableBytes();
132                             bytesSinceReadComplete += buf.readableBytes();
133                             buf.release();
134                         }
135 
136                         @Override
137                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
138                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
139                                 clientHalfClosedLatch.countDown();
140                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
141                                 clientHalfClosedAllBytesRead.countDown();
142                                 ctx.close();
143                             }
144                         }
145 
146                         @Override
147                         public void channelReadComplete(ChannelHandlerContext ctx) {
148                             if (bytesSinceReadComplete == 0) {
149                                 clientZeroDataReadCompletes.incrementAndGet();
150                             } else {
151                                 bytesSinceReadComplete = 0;
152                             }
153                             clientReadCompletes.incrementAndGet();
154                             if (bytesRead == totalServerBytesWritten) {
155                                 // Bounce this through the event loop to make sure it happens after we're done
156                                 // with the read operation.
157                                 ch.eventLoop().execute(new Runnable() {
158                                     @Override
159                                     public void run() {
160                                         clientReadAllDataLatch.countDown();
161                                     }
162                                 });
163                             } else {
164                                 ctx.read();
165                             }
166                         }
167 
168                         @Override
169                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
170                             ctx.fireExceptionCaught(cause);
171                             ctx.close();
172                         }
173                     });
174                     ch.read();
175                 }
176             });
177 
178             serverChannel = sb.bind().sync().channel();
179             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
180             clientChannel.read();
181 
182             clientReadAllDataLatch.await();
183 
184             // Now we need to trigger server half-close
185             ((DuplexChannel) serverChildChannel.get()).shutdownOutput();
186 
187             clientHalfClosedLatch.await();
188             clientHalfClosedAllBytesRead.await();
189         } finally {
190             if (clientChannel != null) {
191                 clientChannel.close().sync();
192             }
193             if (serverChannel != null) {
194                 serverChannel.close().sync();
195             }
196         }
197     }
198 
199     @Test
200     public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
201         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
202             @Override
203             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
204                 testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
205             }
206         });
207     }
208 
209     private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
210             throws Throwable {
211         Channel serverChannel = null;
212         Channel clientChannel = null;
213 
214         final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
215         try {
216             sb.childOption(ChannelOption.SO_LINGER, 1)
217               .childHandler(new ChannelInitializer<Channel>() {
218 
219                   @Override
220                   protected void initChannel(Channel ch) throws Exception {
221                       ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
222 
223                             @Override
224                             public void channelActive(final ChannelHandlerContext ctx) {
225                                 SocketChannel channel = (SocketChannel) ctx.channel();
226                                 channel.shutdownOutput();
227                             }
228 
229                             @Override
230                             public void channelRead(ChannelHandlerContext ctx, Object msg) {
231                                 ReferenceCountUtil.release(msg);
232                                 waitHalfClosureDone.countDown();
233                             }
234                         });
235                   }
236               });
237 
238             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
239               .handler(new ChannelInitializer<Channel>() {
240                   @Override
241                   protected void initChannel(Channel ch) throws Exception {
242                       ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
243 
244                             @Override
245                             public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
246                                 if (ChannelInputShutdownEvent.INSTANCE == evt) {
247                                     ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
248                                 }
249 
250                                 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
251                                     ctx.close();
252                                 }
253                             }
254                         });
255                   }
256               });
257 
258             serverChannel = sb.bind().sync().channel();
259             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
260             waitHalfClosureDone.await();
261         } finally {
262             if (clientChannel != null) {
263                 clientChannel.close().sync();
264             }
265 
266             if (serverChannel != null) {
267                 serverChannel.close().sync();
268             }
269         }
270     }
271 
272     @Test
273     @Timeout(value = 10000, unit = MILLISECONDS)
274     public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
275         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
276             @Override
277             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
278                 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
279             }
280         });
281     }
282 
283     public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
284         Channel serverChannel = null;
285         try {
286             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
287                     .option(ChannelOption.AUTO_READ, true);
288             sb.childHandler(new ChannelInitializer<Channel>() {
289                 @Override
290                 protected void initChannel(Channel ch) {
291                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
292                         @Override
293                         public void channelActive(ChannelHandlerContext ctx) {
294                             ((DuplexChannel) ctx).shutdownOutput();
295                         }
296 
297                         @Override
298                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
299                             ctx.close();
300                         }
301                     });
302                 }
303             });
304 
305             final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
306             final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
307 
308             cb.handler(new ChannelInitializer<Channel>() {
309                 @Override
310                 protected void initChannel(Channel ch) {
311                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
312 
313                         @Override
314                         public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
315                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
316                                 shutdownEventReceivedCounter.incrementAndGet();
317                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
318                                 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
319                                 ctx.executor().schedule(new Runnable() {
320                                     @Override
321                                     public void run() {
322                                         ctx.close();
323                                     }
324                                 }, 100, MILLISECONDS);
325                             }
326                         }
327 
328                         @Override
329                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
330                             ctx.close();
331                         }
332                     });
333                 }
334             });
335 
336             serverChannel = sb.bind().sync().channel();
337             Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
338             clientChannel.closeFuture().await();
339             assertEquals(1, shutdownEventReceivedCounter.get());
340             assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
341         } finally {
342             if (serverChannel != null) {
343                 serverChannel.close().sync();
344             }
345         }
346     }
347 
348     @Test
349     public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
350         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
351             @Override
352             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
353                 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
354             }
355         });
356     }
357 
358     public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
359         testAllDataReadAfterHalfClosure(true, sb, cb);
360         testAllDataReadAfterHalfClosure(false, sb, cb);
361     }
362 
363     private void testAllDataReadAfterHalfClosure(final boolean autoRead,
364                                                  ServerBootstrap sb, Bootstrap cb) throws Throwable {
365         final int totalServerBytesWritten = 1024 * 16;
366         final int numReadsPerReadLoop = 2;
367         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
368         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
369         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
370         final AtomicInteger clientReadCompletes = new AtomicInteger();
371         final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
372         Channel serverChannel = null;
373         Channel clientChannel = null;
374         try {
375             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
376               .option(ChannelOption.AUTO_READ, autoRead)
377               .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
378 
379             sb.childHandler(new ChannelInitializer<Channel>() {
380                 @Override
381                 protected void initChannel(Channel ch) throws Exception {
382                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
383                         @Override
384                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
385                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
386                             buf.writerIndex(buf.capacity());
387                             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
388                                 @Override
389                                 public void operationComplete(ChannelFuture future) throws Exception {
390                                     ((DuplexChannel) future.channel()).shutdownOutput();
391                                 }
392                             });
393                             serverInitializedLatch.countDown();
394                         }
395 
396                         @Override
397                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
398                             ctx.close();
399                         }
400                     });
401                 }
402             });
403 
404             cb.handler(new ChannelInitializer<Channel>() {
405                 @Override
406                 protected void initChannel(Channel ch) throws Exception {
407                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
408                         private int bytesRead;
409                         private int bytesSinceReadComplete;
410 
411                         @Override
412                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
413                             ByteBuf buf = (ByteBuf) msg;
414                             bytesRead += buf.readableBytes();
415                             bytesSinceReadComplete += buf.readableBytes();
416                             buf.release();
417                         }
418 
419                         @Override
420                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
421                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
422                                 clientHalfClosedLatch.countDown();
423                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
424                                 ctx.close();
425                             }
426                         }
427 
428                         @Override
429                         public void channelReadComplete(ChannelHandlerContext ctx) {
430                             if (bytesSinceReadComplete == 0) {
431                                 clientZeroDataReadCompletes.incrementAndGet();
432                             } else {
433                                 bytesSinceReadComplete = 0;
434                             }
435                             clientReadCompletes.incrementAndGet();
436                             if (bytesRead == totalServerBytesWritten) {
437                                 clientReadAllDataLatch.countDown();
438                             }
439                             if (!autoRead) {
440                                 ctx.read();
441                             }
442                         }
443 
444                         @Override
445                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
446                             ctx.close();
447                         }
448                     });
449                 }
450             });
451 
452             serverChannel = sb.bind().sync().channel();
453             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
454             clientChannel.read();
455 
456             serverInitializedLatch.await();
457             clientReadAllDataLatch.await();
458             clientHalfClosedLatch.await();
459             // In practice this should be much less, as we allow numReadsPerReadLoop per wakeup, but we limit the
460             // number of bytes to 1 per read so in theory we may need more. We check below that readComplete is called
461             // when data is actually read.
462             assertTrue(totalServerBytesWritten > clientReadCompletes.get(),
463                     "too many read complete events: " + clientReadCompletes.get());
464             assertTrue(clientZeroDataReadCompletes.get() <= maxReadCompleteWithNoDataAfterInputShutdown(),
465                     "too many readComplete with no data: " + clientZeroDataReadCompletes.get() + " readComplete: " +
466                             clientReadCompletes.get());
467         } finally {
468             if (clientChannel != null) {
469                 clientChannel.close().sync();
470             }
471             if (serverChannel != null) {
472                 serverChannel.close().sync();
473             }
474         }
475     }
476 
477     @Test
478     public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
479         // This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows.
480         assumeFalse(PlatformDependent.isWindows());
481         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
482             @Override
483             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
484                 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
485             }
486         });
487     }
488 
489     public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
490         testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
491         testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
492         testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
493         testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
494     }
495 
496     private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
497                                                              final boolean clientIsLeader,
498                                                              ServerBootstrap sb,
499                                                              Bootstrap cb) throws InterruptedException {
500         final int expectedBytes = 100;
501         final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
502         final CountDownLatch doneLatch = new CountDownLatch(2);
503         final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
504         Channel serverChannel = null;
505         Channel clientChannel = null;
506         try {
507             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
508                     .option(ChannelOption.AUTO_CLOSE, false)
509                     .option(ChannelOption.SO_LINGER, 0);
510             sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
511                     .childOption(ChannelOption.AUTO_CLOSE, false)
512                     .childOption(ChannelOption.SO_LINGER, 0);
513 
514             final AutoCloseFalseLeader leaderHandler = new AutoCloseFalseLeader(expectedBytes,
515                     serverReadExpectedLatch, doneLatch, causeRef);
516             final AutoCloseFalseFollower followerHandler = new AutoCloseFalseFollower(expectedBytes,
517                     serverReadExpectedLatch, doneLatch, causeRef);
518             sb.childHandler(new ChannelInitializer<Channel>() {
519                 @Override
520                 protected void initChannel(Channel ch) throws Exception {
521                     ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
522                 }
523             });
524 
525             cb.handler(new ChannelInitializer<Channel>() {
526                 @Override
527                 protected void initChannel(Channel ch) throws Exception {
528                     ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
529                 }
530             });
531 
532             serverChannel = sb.bind().sync().channel();
533             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
534 
535             doneLatch.await();
536             assertNull(causeRef.get());
537             assertTrue(leaderHandler.seenOutputShutdown);
538         } finally {
539             if (clientChannel != null) {
540                 clientChannel.close().sync();
541             }
542             if (serverChannel != null) {
543                 serverChannel.close().sync();
544             }
545         }
546     }
547 
548     private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
549         private final int expectedBytes;
550         private final CountDownLatch followerCloseLatch;
551         private final CountDownLatch doneLatch;
552         private final AtomicReference<Throwable> causeRef;
553         private int bytesRead;
554 
555         AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
556                                AtomicReference<Throwable> causeRef) {
557             this.expectedBytes = expectedBytes;
558             this.followerCloseLatch = followerCloseLatch;
559             this.doneLatch = doneLatch;
560             this.causeRef = causeRef;
561         }
562 
563         @Override
564         public void channelInactive(ChannelHandlerContext ctx) {
565             checkPrematureClose();
566         }
567 
568         @Override
569         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
570             ctx.close();
571             checkPrematureClose();
572         }
573 
574         @Override
575         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
576             bytesRead += msg.readableBytes();
577             if (bytesRead >= expectedBytes) {
578                 // We write a reply and immediately close our end of the socket.
579                 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
580                 buf.writerIndex(buf.writerIndex() + expectedBytes);
581                 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
582                     @Override
583                     public void operationComplete(ChannelFuture future) throws Exception {
584                         future.channel().close().addListener(new ChannelFutureListener() {
585                             @Override
586                             public void operationComplete(final ChannelFuture future) throws Exception {
587                                 // This is a bit racy but there is no better way how to handle this in Java11.
588                                 // The problem is that on close() the underlying FD will not actually be closed directly
589                                 // but the close will be done after the Selector did process all events. Because of
590                                 // this we will need to give it a bit time to ensure the FD is actual closed before we
591                                 // count down the latch and try to write.
592                                 future.channel().eventLoop().schedule(new Runnable() {
593                                     @Override
594                                     public void run() {
595                                         followerCloseLatch.countDown();
596                                     }
597                                 }, 200, TimeUnit.MILLISECONDS);
598                             }
599                         });
600                     }
601                 });
602             }
603         }
604 
605         private void checkPrematureClose() {
606             if (bytesRead < expectedBytes) {
607                 causeRef.set(new IllegalStateException("follower premature close"));
608                 doneLatch.countDown();
609             }
610         }
611     }
612 
613     private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
614         private final int expectedBytes;
615         private final CountDownLatch followerCloseLatch;
616         private final CountDownLatch doneLatch;
617         private final AtomicReference<Throwable> causeRef;
618         private int bytesRead;
619         boolean seenOutputShutdown;
620 
621         AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
622                              AtomicReference<Throwable> causeRef) {
623             this.expectedBytes = expectedBytes;
624             this.followerCloseLatch = followerCloseLatch;
625             this.doneLatch = doneLatch;
626             this.causeRef = causeRef;
627         }
628 
629         @Override
630         public void channelActive(ChannelHandlerContext ctx) throws Exception {
631             ByteBuf buf = ctx.alloc().buffer(expectedBytes);
632             buf.writerIndex(buf.writerIndex() + expectedBytes);
633             ctx.writeAndFlush(buf.retainedDuplicate());
634 
635             // We wait here to ensure that we write before we have a chance to process the outbound
636             // shutdown event.
637             followerCloseLatch.await();
638 
639             // This write should fail, but we should still be allowed to read the peer's data
640             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
641                 @Override
642                 public void operationComplete(ChannelFuture future) throws Exception {
643                     if (future.cause() == null) {
644                         causeRef.set(new IllegalStateException("second write should have failed!"));
645                         doneLatch.countDown();
646                     }
647                 }
648             });
649         }
650 
651         @Override
652         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
653             bytesRead += msg.readableBytes();
654             if (bytesRead >= expectedBytes) {
655                 doneLatch.countDown();
656             }
657         }
658 
659         @Override
660         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
661             if (evt instanceof ChannelOutputShutdownEvent) {
662                 seenOutputShutdown = true;
663                 doneLatch.countDown();
664             }
665         }
666 
667         @Override
668         public void channelInactive(ChannelHandlerContext ctx) {
669             checkPrematureClose();
670         }
671 
672         @Override
673         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
674             ctx.close();
675             checkPrematureClose();
676         }
677 
678         private void checkPrematureClose() {
679             if (bytesRead < expectedBytes || !seenOutputShutdown) {
680                 causeRef.set(new IllegalStateException("leader premature close"));
681                 doneLatch.countDown();
682             }
683         }
684     }
685 
686     @Test
687     public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
688         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
689             @Override
690             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
691                 testAllDataReadClosure(serverBootstrap, bootstrap);
692             }
693         });
694     }
695 
696     public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
697         testAllDataReadClosure(true, false, sb, cb);
698         testAllDataReadClosure(true, true, sb, cb);
699         testAllDataReadClosure(false, false, sb, cb);
700         testAllDataReadClosure(false, true, sb, cb);
701     }
702 
703     private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
704                                                ServerBootstrap sb, Bootstrap cb) throws Throwable {
705         final int totalServerBytesWritten = 1024 * 16;
706         final int numReadsPerReadLoop = 2;
707         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
708         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
709         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
710         final AtomicInteger clientReadCompletes = new AtomicInteger();
711         Channel serverChannel = null;
712         Channel clientChannel = null;
713         try {
714             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
715                     .option(ChannelOption.AUTO_READ, autoRead)
716                     .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
717 
718             sb.childHandler(new ChannelInitializer<Channel>() {
719                 @Override
720                 protected void initChannel(Channel ch) throws Exception {
721                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
722                         @Override
723                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
724                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
725                             buf.writerIndex(buf.capacity());
726                             ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
727                             serverInitializedLatch.countDown();
728                         }
729 
730                         @Override
731                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
732                             ctx.close();
733                         }
734                     });
735                 }
736             });
737 
738             cb.handler(new ChannelInitializer<Channel>() {
739                 @Override
740                 protected void initChannel(Channel ch) throws Exception {
741                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
742                         private int bytesRead;
743 
744                         @Override
745                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
746                             ByteBuf buf = (ByteBuf) msg;
747                             bytesRead += buf.readableBytes();
748                             buf.release();
749                         }
750 
751                         @Override
752                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
753                             if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
754                                 clientHalfClosedLatch.countDown();
755                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
756                                 ctx.close();
757                             }
758                         }
759 
760                         @Override
761                         public void channelInactive(ChannelHandlerContext ctx) {
762                             if (!allowHalfClosed) {
763                                 clientHalfClosedLatch.countDown();
764                             }
765                         }
766 
767                         @Override
768                         public void channelReadComplete(ChannelHandlerContext ctx) {
769                             clientReadCompletes.incrementAndGet();
770                             if (bytesRead == totalServerBytesWritten) {
771                                 clientReadAllDataLatch.countDown();
772                             }
773                             if (!autoRead) {
774                                 ctx.read();
775                             }
776                         }
777 
778                         @Override
779                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
780                             ctx.close();
781                         }
782                     });
783                 }
784             });
785 
786             serverChannel = sb.bind().sync().channel();
787             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
788             clientChannel.read();
789 
790             serverInitializedLatch.await();
791             clientReadAllDataLatch.await();
792             clientHalfClosedLatch.await();
793             assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
794                 "too many read complete events: " + clientReadCompletes.get());
795         } finally {
796             if (clientChannel != null) {
797                 clientChannel.close().sync();
798             }
799             if (serverChannel != null) {
800                 serverChannel.close().sync();
801             }
802         }
803     }
804 
805     /**
806      * Designed to read a single byte at a time to control the number of reads done at a fine granularity.
807      */
808     private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
809         private final int numReads;
810         TestNumReadsRecvByteBufAllocator(int numReads) {
811             this.numReads = numReads;
812         }
813 
814         @Override
815         public ExtendedHandle newHandle() {
816             return new ExtendedHandle() {
817                 private int attemptedBytesRead;
818                 private int lastBytesRead;
819                 private int numMessagesRead;
820                 @Override
821                 public ByteBuf allocate(ByteBufAllocator alloc) {
822                     return alloc.ioBuffer(guess(), guess());
823                 }
824 
825                 @Override
826                 public int guess() {
827                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
828                 }
829 
830                 @Override
831                 public void reset(ChannelConfig config) {
832                     numMessagesRead = 0;
833                 }
834 
835                 @Override
836                 public void incMessagesRead(int numMessages) {
837                     numMessagesRead += numMessages;
838                 }
839 
840                 @Override
841                 public void lastBytesRead(int bytes) {
842                     lastBytesRead = bytes;
843                 }
844 
845                 @Override
846                 public int lastBytesRead() {
847                     return lastBytesRead;
848                 }
849 
850                 @Override
851                 public void attemptedBytesRead(int bytes) {
852                     attemptedBytesRead = bytes;
853                 }
854 
855                 @Override
856                 public int attemptedBytesRead() {
857                     return attemptedBytesRead;
858                 }
859 
860                 @Override
861                 public boolean continueReading() {
862                     return numMessagesRead < numReads;
863                 }
864 
865                 @Override
866                 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
867                     return continueReading() && maybeMoreDataSupplier.get();
868                 }
869 
870                 @Override
871                 public void readComplete() {
872                     // Nothing needs to be done or adjusted after each read cycle is completed.
873                 }
874             };
875         }
876     }
877 }