View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelConfig;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelInboundHandlerAdapter;
28  import io.netty.channel.ChannelInitializer;
29  import io.netty.channel.ChannelOption;
30  import io.netty.channel.IoEventLoopGroup;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.SimpleChannelInboundHandler;
33  import io.netty.channel.nio.NioIoHandler;
34  import io.netty.channel.oio.OioEventLoopGroup;
35  import io.netty.channel.socket.ChannelInputShutdownEvent;
36  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
37  import io.netty.channel.socket.ChannelOutputShutdownEvent;
38  import io.netty.channel.socket.DuplexChannel;
39  import io.netty.channel.socket.SocketChannel;
40  import io.netty.util.ReferenceCountUtil;
41  import io.netty.util.UncheckedBooleanSupplier;
42  import io.netty.util.internal.PlatformDependent;
43  import org.junit.jupiter.api.Test;
44  import org.junit.jupiter.api.TestInfo;
45  import org.junit.jupiter.api.Timeout;
46  
47  import java.util.concurrent.CountDownLatch;
48  import java.util.concurrent.TimeUnit;
49  import java.util.concurrent.atomic.AtomicInteger;
50  import java.util.concurrent.atomic.AtomicReference;
51  
52  import static java.util.concurrent.TimeUnit.MILLISECONDS;
53  import static org.junit.jupiter.api.Assertions.assertEquals;
54  import static org.junit.jupiter.api.Assertions.assertNull;
55  import static org.junit.jupiter.api.Assertions.assertTrue;
56  import static org.junit.jupiter.api.Assumptions.assumeFalse;
57  
58  @Timeout(value = 20000, unit = MILLISECONDS)
59  public class SocketHalfClosedTest extends AbstractSocketTest {
60  
61      protected int maxReadCompleteWithNoDataAfterInputShutdown() {
62          return 2; // nio needs read flag to detect full closure.
63      }
64  
65      @Test
66      public void testAllDataReadEventTriggeredAfterHalfClosure(TestInfo testInfo) throws Throwable {
67          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
68              @Override
69              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
70                  if (bootstrap.config().group() instanceof OioEventLoopGroup) {
71                      logger.debug("Ignoring test for incompatible OIO event system");
72                      return;
73                  } else if (bootstrap.config().group() instanceof IoEventLoopGroup) {
74                      IoEventLoopGroup group = (IoEventLoopGroup) bootstrap.config().group();
75                      if (group.isIoType(NioIoHandler.class)) {
76                          logger.debug("Ignoring test for incompatible NioHandler");
77                          return;
78                      }
79                  }
80                  allDataReadEventTriggeredAfterHalfClosure(serverBootstrap, bootstrap);
81              }
82          });
83      }
84  
85      private void allDataReadEventTriggeredAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
86          final int totalServerBytesWritten = 1;
87          final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
88          final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
89          final CountDownLatch clientHalfClosedAllBytesRead = new CountDownLatch(1);
90          final AtomicInteger clientReadCompletes = new AtomicInteger();
91          final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
92          Channel serverChannel = null;
93          Channel clientChannel = null;
94          AtomicReference<Channel> serverChildChannel = new AtomicReference<>();
95          try {
96              cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
97                      .option(ChannelOption.AUTO_CLOSE, false)
98                      .option(ChannelOption.AUTO_READ, false);
99  
100             sb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
101                     .option(ChannelOption.AUTO_CLOSE, false)
102                     .childOption(ChannelOption.TCP_NODELAY, true);
103 
104             sb.childHandler(new ChannelInitializer<Channel>() {
105                 @Override
106                 protected void initChannel(Channel ch) throws Exception {
107                     serverChildChannel.set(ch);
108                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
109                         @Override
110                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
111                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
112                             buf.writerIndex(buf.capacity());
113                             ctx.writeAndFlush(buf);
114                         }
115 
116                         @Override
117                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
118                             ctx.close();
119                         }
120                     });
121                 }
122             });
123 
124             // client.
125             cb.handler(new ChannelInitializer<Channel>() {
126                 @Override
127                 protected void initChannel(Channel ch) {
128                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
129                         private int bytesRead;
130                         private int bytesSinceReadComplete;
131 
132                         @Override
133                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
134                             ByteBuf buf = (ByteBuf) msg;
135                             bytesRead += buf.readableBytes();
136                             bytesSinceReadComplete += buf.readableBytes();
137                             buf.release();
138                         }
139 
140                         @Override
141                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
142                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
143                                 clientHalfClosedLatch.countDown();
144                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
145                                 clientHalfClosedAllBytesRead.countDown();
146                                 ctx.close();
147                             }
148                         }
149 
150                         @Override
151                         public void channelReadComplete(ChannelHandlerContext ctx) {
152                             if (bytesSinceReadComplete == 0) {
153                                 clientZeroDataReadCompletes.incrementAndGet();
154                             } else {
155                                 bytesSinceReadComplete = 0;
156                             }
157                             clientReadCompletes.incrementAndGet();
158                             if (bytesRead == totalServerBytesWritten) {
159                                 // Bounce this through the event loop to make sure it happens after we're done
160                                 // with the read operation.
161                                 ch.eventLoop().execute(new Runnable() {
162                                     @Override
163                                     public void run() {
164                                         clientReadAllDataLatch.countDown();
165                                     }
166                                 });
167                             } else {
168                                 ctx.read();
169                             }
170                         }
171 
172                         @Override
173                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
174                             ctx.fireExceptionCaught(cause);
175                             ctx.close();
176                         }
177                     });
178                     ch.read();
179                 }
180             });
181 
182             serverChannel = sb.bind().sync().channel();
183             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
184             clientChannel.read();
185 
186             clientReadAllDataLatch.await();
187 
188             // Now we need to trigger server half-close
189             ((DuplexChannel) serverChildChannel.get()).shutdownOutput();
190 
191             clientHalfClosedLatch.await();
192             clientHalfClosedAllBytesRead.await();
193         } finally {
194             if (clientChannel != null) {
195                 clientChannel.close().sync();
196             }
197             if (serverChannel != null) {
198                 serverChannel.close().sync();
199             }
200         }
201     }
202 
203     @Test
204     public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
205         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
206             @Override
207             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
208                 testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
209             }
210         });
211     }
212 
213     private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
214             throws Throwable {
215         Channel serverChannel = null;
216         Channel clientChannel = null;
217 
218         final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
219         try {
220             sb.childOption(ChannelOption.SO_LINGER, 1)
221               .childHandler(new ChannelInitializer<Channel>() {
222 
223                   @Override
224                   protected void initChannel(Channel ch) throws Exception {
225                       ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
226 
227                             @Override
228                             public void channelActive(final ChannelHandlerContext ctx) {
229                                 SocketChannel channel = (SocketChannel) ctx.channel();
230                                 channel.shutdownOutput();
231                             }
232 
233                             @Override
234                             public void channelRead(ChannelHandlerContext ctx, Object msg) {
235                                 ReferenceCountUtil.release(msg);
236                                 waitHalfClosureDone.countDown();
237                             }
238                         });
239                   }
240               });
241 
242             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
243               .handler(new ChannelInitializer<Channel>() {
244                   @Override
245                   protected void initChannel(Channel ch) throws Exception {
246                       ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
247 
248                             @Override
249                             public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
250                                 if (ChannelInputShutdownEvent.INSTANCE == evt) {
251                                     ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
252                                 }
253 
254                                 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
255                                     ctx.close();
256                                 }
257                             }
258                         });
259                   }
260               });
261 
262             serverChannel = sb.bind().sync().channel();
263             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
264             waitHalfClosureDone.await();
265         } finally {
266             if (clientChannel != null) {
267                 clientChannel.close().sync();
268             }
269 
270             if (serverChannel != null) {
271                 serverChannel.close().sync();
272             }
273         }
274     }
275 
276     @Test
277     public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
278         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
279             @Override
280             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
281                 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
282             }
283         });
284     }
285 
286     public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
287         Channel serverChannel = null;
288         try {
289             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
290                     .option(ChannelOption.AUTO_READ, true);
291             sb.childHandler(new ChannelInitializer<Channel>() {
292                 @Override
293                 protected void initChannel(Channel ch) {
294                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
295                         @Override
296                         public void channelActive(ChannelHandlerContext ctx) {
297                             ((DuplexChannel) ctx).shutdownOutput();
298                         }
299 
300                         @Override
301                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
302                             ctx.close();
303                         }
304                     });
305                 }
306             });
307 
308             final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
309             final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
310 
311             cb.handler(new ChannelInitializer<Channel>() {
312                 @Override
313                 protected void initChannel(Channel ch) {
314                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
315 
316                         @Override
317                         public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
318                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
319                                 shutdownEventReceivedCounter.incrementAndGet();
320                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
321                                 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
322                                 ctx.executor().schedule(new Runnable() {
323                                     @Override
324                                     public void run() {
325                                         ctx.close();
326                                     }
327                                 }, 100, MILLISECONDS);
328                             }
329                         }
330 
331                         @Override
332                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
333                             ctx.close();
334                         }
335                     });
336                 }
337             });
338 
339             serverChannel = sb.bind().sync().channel();
340             Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
341             clientChannel.closeFuture().await();
342             assertEquals(1, shutdownEventReceivedCounter.get());
343             assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
344         } finally {
345             if (serverChannel != null) {
346                 serverChannel.close().sync();
347             }
348         }
349     }
350 
351     @Test
352     public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
353         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
354             @Override
355             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
356                 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
357             }
358         });
359     }
360 
361     public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
362         testAllDataReadAfterHalfClosure(true, sb, cb);
363         testAllDataReadAfterHalfClosure(false, sb, cb);
364     }
365 
366     private void testAllDataReadAfterHalfClosure(final boolean autoRead,
367                                                  ServerBootstrap sb, Bootstrap cb) throws Throwable {
368         final int totalServerBytesWritten = 1024 * 16;
369         final int numReadsPerReadLoop = 2;
370         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
371         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
372         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
373         final AtomicInteger clientReadCompletes = new AtomicInteger();
374         final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
375         Channel serverChannel = null;
376         Channel clientChannel = null;
377         try {
378             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
379               .option(ChannelOption.AUTO_READ, autoRead)
380               .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
381 
382             sb.childHandler(new ChannelInitializer<Channel>() {
383                 @Override
384                 protected void initChannel(Channel ch) throws Exception {
385                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
386                         @Override
387                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
388                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
389                             buf.writerIndex(buf.capacity());
390                             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
391                                 @Override
392                                 public void operationComplete(ChannelFuture future) throws Exception {
393                                     ((DuplexChannel) future.channel()).shutdownOutput();
394                                 }
395                             });
396                             serverInitializedLatch.countDown();
397                         }
398 
399                         @Override
400                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
401                             ctx.close();
402                         }
403                     });
404                 }
405             });
406 
407             cb.handler(new ChannelInitializer<Channel>() {
408                 @Override
409                 protected void initChannel(Channel ch) throws Exception {
410                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
411                         private int bytesRead;
412                         private int bytesSinceReadComplete;
413 
414                         @Override
415                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
416                             ByteBuf buf = (ByteBuf) msg;
417                             bytesRead += buf.readableBytes();
418                             bytesSinceReadComplete += buf.readableBytes();
419                             buf.release();
420                         }
421 
422                         @Override
423                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
424                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
425                                 clientHalfClosedLatch.countDown();
426                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
427                                 ctx.close();
428                             }
429                         }
430 
431                         @Override
432                         public void channelReadComplete(ChannelHandlerContext ctx) {
433                             if (bytesSinceReadComplete == 0) {
434                                 clientZeroDataReadCompletes.incrementAndGet();
435                             } else {
436                                 bytesSinceReadComplete = 0;
437                             }
438                             clientReadCompletes.incrementAndGet();
439                             if (bytesRead == totalServerBytesWritten) {
440                                 clientReadAllDataLatch.countDown();
441                             }
442                             if (!autoRead) {
443                                 ctx.read();
444                             }
445                         }
446 
447                         @Override
448                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
449                             ctx.close();
450                         }
451                     });
452                 }
453             });
454 
455             serverChannel = sb.bind().sync().channel();
456             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
457             clientChannel.read();
458 
459             serverInitializedLatch.await();
460             clientReadAllDataLatch.await();
461             clientHalfClosedLatch.await();
462             // In practice this should be much less, as we allow numReadsPerReadLoop per wakeup, but we limit the
463             // number of bytes to 1 per read so in theory we may need more. We check below that readComplete is called
464             // when data is actually read.
465             assertTrue(totalServerBytesWritten > clientReadCompletes.get(),
466                     "too many read complete events: " + clientReadCompletes.get());
467             assertTrue(clientZeroDataReadCompletes.get() <= maxReadCompleteWithNoDataAfterInputShutdown(),
468                     "too many readComplete with no data: " + clientZeroDataReadCompletes.get() + " readComplete: " +
469                             clientReadCompletes.get());
470         } finally {
471             if (clientChannel != null) {
472                 clientChannel.close().sync();
473             }
474             if (serverChannel != null) {
475                 serverChannel.close().sync();
476             }
477         }
478     }
479 
480     @Test
481     public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
482         // This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows.
483         assumeFalse(PlatformDependent.isWindows());
484         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
485             @Override
486             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
487                 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
488             }
489         });
490     }
491 
492     public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
493         testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
494         testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
495         testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
496         testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
497     }
498 
499     private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
500                                                              final boolean clientIsLeader,
501                                                              ServerBootstrap sb,
502                                                              Bootstrap cb) throws InterruptedException {
503         final int expectedBytes = 100;
504         final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
505         final CountDownLatch doneLatch = new CountDownLatch(2);
506         final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
507         Channel serverChannel = null;
508         Channel clientChannel = null;
509         try {
510             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
511                     .option(ChannelOption.AUTO_CLOSE, false)
512                     .option(ChannelOption.SO_LINGER, 0);
513             sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
514                     .childOption(ChannelOption.AUTO_CLOSE, false)
515                     .childOption(ChannelOption.SO_LINGER, 0);
516 
517             final AutoCloseFalseLeader leaderHandler = new AutoCloseFalseLeader(expectedBytes,
518                     serverReadExpectedLatch, doneLatch, causeRef);
519             final AutoCloseFalseFollower followerHandler = new AutoCloseFalseFollower(expectedBytes,
520                     serverReadExpectedLatch, doneLatch, causeRef);
521             sb.childHandler(new ChannelInitializer<Channel>() {
522                 @Override
523                 protected void initChannel(Channel ch) throws Exception {
524                     ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
525                 }
526             });
527 
528             cb.handler(new ChannelInitializer<Channel>() {
529                 @Override
530                 protected void initChannel(Channel ch) throws Exception {
531                     ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
532                 }
533             });
534 
535             serverChannel = sb.bind().sync().channel();
536             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
537 
538             doneLatch.await();
539             assertNull(causeRef.get());
540             assertTrue(leaderHandler.seenOutputShutdown);
541         } finally {
542             if (clientChannel != null) {
543                 clientChannel.close().sync();
544             }
545             if (serverChannel != null) {
546                 serverChannel.close().sync();
547             }
548         }
549     }
550 
551     private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
552         private final int expectedBytes;
553         private final CountDownLatch followerCloseLatch;
554         private final CountDownLatch doneLatch;
555         private final AtomicReference<Throwable> causeRef;
556         private int bytesRead;
557 
558         AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
559                                AtomicReference<Throwable> causeRef) {
560             this.expectedBytes = expectedBytes;
561             this.followerCloseLatch = followerCloseLatch;
562             this.doneLatch = doneLatch;
563             this.causeRef = causeRef;
564         }
565 
566         @Override
567         public void channelInactive(ChannelHandlerContext ctx) {
568             checkPrematureClose();
569         }
570 
571         @Override
572         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
573             ctx.close();
574             checkPrematureClose();
575         }
576 
577         @Override
578         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
579             bytesRead += msg.readableBytes();
580             if (bytesRead >= expectedBytes) {
581                 // We write a reply and immediately close our end of the socket.
582                 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
583                 buf.writerIndex(buf.writerIndex() + expectedBytes);
584                 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
585                     @Override
586                     public void operationComplete(ChannelFuture future) throws Exception {
587                         future.channel().close().addListener(new ChannelFutureListener() {
588                             @Override
589                             public void operationComplete(final ChannelFuture future) throws Exception {
590                                 // This is a bit racy but there is no better way how to handle this in Java11.
591                                 // The problem is that on close() the underlying FD will not actually be closed directly
592                                 // but the close will be done after the Selector did process all events. Because of
593                                 // this we will need to give it a bit time to ensure the FD is actual closed before we
594                                 // count down the latch and try to write.
595                                 future.channel().eventLoop().schedule(new Runnable() {
596                                     @Override
597                                     public void run() {
598                                         followerCloseLatch.countDown();
599                                     }
600                                 }, 200, TimeUnit.MILLISECONDS);
601                             }
602                         });
603                     }
604                 });
605             }
606         }
607 
608         private void checkPrematureClose() {
609             if (bytesRead < expectedBytes) {
610                 causeRef.set(new IllegalStateException("follower premature close"));
611                 doneLatch.countDown();
612             }
613         }
614     }
615 
616     private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
617         private final int expectedBytes;
618         private final CountDownLatch followerCloseLatch;
619         private final CountDownLatch doneLatch;
620         private final AtomicReference<Throwable> causeRef;
621         private int bytesRead;
622         boolean seenOutputShutdown;
623 
624         AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
625                              AtomicReference<Throwable> causeRef) {
626             this.expectedBytes = expectedBytes;
627             this.followerCloseLatch = followerCloseLatch;
628             this.doneLatch = doneLatch;
629             this.causeRef = causeRef;
630         }
631 
632         @Override
633         public void channelActive(ChannelHandlerContext ctx) throws Exception {
634             ByteBuf buf = ctx.alloc().buffer(expectedBytes);
635             buf.writerIndex(buf.writerIndex() + expectedBytes);
636             ctx.writeAndFlush(buf.retainedDuplicate());
637 
638             // We wait here to ensure that we write before we have a chance to process the outbound
639             // shutdown event.
640             followerCloseLatch.await();
641 
642             // This write should fail, but we should still be allowed to read the peer's data
643             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
644                 @Override
645                 public void operationComplete(ChannelFuture future) throws Exception {
646                     if (future.cause() == null) {
647                         causeRef.set(new IllegalStateException("second write should have failed!"));
648                         doneLatch.countDown();
649                     }
650                 }
651             });
652         }
653 
654         @Override
655         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
656             bytesRead += msg.readableBytes();
657             if (bytesRead >= expectedBytes) {
658                 doneLatch.countDown();
659             }
660         }
661 
662         @Override
663         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
664             if (evt instanceof ChannelOutputShutdownEvent) {
665                 seenOutputShutdown = true;
666                 doneLatch.countDown();
667             }
668         }
669 
670         @Override
671         public void channelInactive(ChannelHandlerContext ctx) {
672             checkPrematureClose();
673         }
674 
675         @Override
676         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
677             ctx.close();
678             checkPrematureClose();
679         }
680 
681         private void checkPrematureClose() {
682             if (bytesRead < expectedBytes || !seenOutputShutdown) {
683                 causeRef.set(new IllegalStateException("leader premature close"));
684                 doneLatch.countDown();
685             }
686         }
687     }
688 
689     @Test
690     public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
691         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
692             @Override
693             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
694                 testAllDataReadClosure(serverBootstrap, bootstrap);
695             }
696         });
697     }
698 
699     public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
700         testAllDataReadClosure(true, false, sb, cb);
701         testAllDataReadClosure(true, true, sb, cb);
702         testAllDataReadClosure(false, false, sb, cb);
703         testAllDataReadClosure(false, true, sb, cb);
704     }
705 
706     private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
707                                                ServerBootstrap sb, Bootstrap cb) throws Throwable {
708         final int totalServerBytesWritten = 1024 * 16;
709         final int numReadsPerReadLoop = 2;
710         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
711         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
712         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
713         final AtomicInteger clientReadCompletes = new AtomicInteger();
714         Channel serverChannel = null;
715         Channel clientChannel = null;
716         try {
717             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
718                     .option(ChannelOption.AUTO_READ, autoRead)
719                     .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
720 
721             sb.childHandler(new ChannelInitializer<Channel>() {
722                 @Override
723                 protected void initChannel(Channel ch) throws Exception {
724                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
725                         @Override
726                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
727                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
728                             buf.writerIndex(buf.capacity());
729                             ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
730                             serverInitializedLatch.countDown();
731                         }
732 
733                         @Override
734                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
735                             ctx.close();
736                         }
737                     });
738                 }
739             });
740 
741             cb.handler(new ChannelInitializer<Channel>() {
742                 @Override
743                 protected void initChannel(Channel ch) throws Exception {
744                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
745                         private int bytesRead;
746 
747                         @Override
748                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
749                             ByteBuf buf = (ByteBuf) msg;
750                             bytesRead += buf.readableBytes();
751                             buf.release();
752                         }
753 
754                         @Override
755                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
756                             if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
757                                 clientHalfClosedLatch.countDown();
758                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
759                                 ctx.close();
760                             }
761                         }
762 
763                         @Override
764                         public void channelInactive(ChannelHandlerContext ctx) {
765                             if (!allowHalfClosed) {
766                                 clientHalfClosedLatch.countDown();
767                             }
768                         }
769 
770                         @Override
771                         public void channelReadComplete(ChannelHandlerContext ctx) {
772                             clientReadCompletes.incrementAndGet();
773                             if (bytesRead == totalServerBytesWritten) {
774                                 clientReadAllDataLatch.countDown();
775                             }
776                             if (!autoRead) {
777                                 ctx.read();
778                             }
779                         }
780 
781                         @Override
782                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
783                             ctx.close();
784                         }
785                     });
786                 }
787             });
788 
789             serverChannel = sb.bind().sync().channel();
790             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
791             clientChannel.read();
792 
793             serverInitializedLatch.await();
794             clientReadAllDataLatch.await();
795             clientHalfClosedLatch.await();
796             assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
797                 "too many read complete events: " + clientReadCompletes.get());
798         } finally {
799             if (clientChannel != null) {
800                 clientChannel.close().sync();
801             }
802             if (serverChannel != null) {
803                 serverChannel.close().sync();
804             }
805         }
806     }
807 
808     /**
809      * Designed to read a single byte at a time to control the number of reads done at a fine granularity.
810      */
811     private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
812         private final int numReads;
813         TestNumReadsRecvByteBufAllocator(int numReads) {
814             this.numReads = numReads;
815         }
816 
817         @Override
818         public ExtendedHandle newHandle() {
819             return new ExtendedHandle() {
820                 private int attemptedBytesRead;
821                 private int lastBytesRead;
822                 private int numMessagesRead;
823                 @Override
824                 public ByteBuf allocate(ByteBufAllocator alloc) {
825                     return alloc.ioBuffer(guess(), guess());
826                 }
827 
828                 @Override
829                 public int guess() {
830                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
831                 }
832 
833                 @Override
834                 public void reset(ChannelConfig config) {
835                     numMessagesRead = 0;
836                 }
837 
838                 @Override
839                 public void incMessagesRead(int numMessages) {
840                     numMessagesRead += numMessages;
841                 }
842 
843                 @Override
844                 public void lastBytesRead(int bytes) {
845                     lastBytesRead = bytes;
846                 }
847 
848                 @Override
849                 public int lastBytesRead() {
850                     return lastBytesRead;
851                 }
852 
853                 @Override
854                 public void attemptedBytesRead(int bytes) {
855                     attemptedBytesRead = bytes;
856                 }
857 
858                 @Override
859                 public int attemptedBytesRead() {
860                     return attemptedBytesRead;
861                 }
862 
863                 @Override
864                 public boolean continueReading() {
865                     return numMessagesRead < numReads;
866                 }
867 
868                 @Override
869                 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
870                     return continueReading() && maybeMoreDataSupplier.get();
871                 }
872 
873                 @Override
874                 public void readComplete() {
875                     // Nothing needs to be done or adjusted after each read cycle is completed.
876                 }
877             };
878         }
879     }
880 }