View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.Channel;
24  import io.netty.channel.ChannelConfig;
25  import io.netty.channel.ChannelFuture;
26  import io.netty.channel.ChannelFutureListener;
27  import io.netty.channel.ChannelHandlerContext;
28  import io.netty.channel.ChannelInboundHandlerAdapter;
29  import io.netty.channel.ChannelInitializer;
30  import io.netty.channel.ChannelOption;
31  import io.netty.channel.RecvByteBufAllocator;
32  import io.netty.channel.SimpleChannelInboundHandler;
33  import io.netty.channel.socket.ChannelInputShutdownEvent;
34  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
35  import io.netty.channel.socket.ChannelOutputShutdownEvent;
36  import io.netty.channel.socket.DuplexChannel;
37  import io.netty.channel.socket.SocketChannel;
38  import io.netty.util.ReferenceCountUtil;
39  import io.netty.util.UncheckedBooleanSupplier;
40  import io.netty.util.internal.PlatformDependent;
41  import org.junit.jupiter.api.Test;
42  import org.junit.jupiter.api.TestInfo;
43  import org.junit.jupiter.api.Timeout;
44  
45  import java.util.concurrent.CountDownLatch;
46  import java.util.concurrent.TimeUnit;
47  import java.util.concurrent.atomic.AtomicInteger;
48  import java.util.concurrent.atomic.AtomicReference;
49  
50  import static java.util.concurrent.TimeUnit.MILLISECONDS;
51  import static org.junit.jupiter.api.Assertions.assertEquals;
52  import static org.junit.jupiter.api.Assertions.assertNull;
53  import static org.junit.jupiter.api.Assertions.assertTrue;
54  import static org.junit.jupiter.api.Assumptions.assumeFalse;
55  
56  public class SocketHalfClosedTest extends AbstractSocketTest {
57  
58      @Test
59      @Timeout(value = 5000, unit = MILLISECONDS)
60      public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
61          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
62              @Override
63              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
64                  testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
65              }
66          });
67      }
68  
69      private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
70              throws Throwable {
71          Channel serverChannel = null;
72          Channel clientChannel = null;
73  
74          final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
75          try {
76              sb.childOption(ChannelOption.SO_LINGER, 1)
77                .childHandler(new ChannelInitializer<Channel>() {
78  
79                    @Override
80                    protected void initChannel(Channel ch) throws Exception {
81                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
82  
83                              @Override
84                              public void channelActive(final ChannelHandlerContext ctx) {
85                                  SocketChannel channel = (SocketChannel) ctx.channel();
86                                  channel.shutdownOutput();
87                              }
88  
89                              @Override
90                              public void channelRead(ChannelHandlerContext ctx, Object msg) {
91                                  ReferenceCountUtil.release(msg);
92                                  waitHalfClosureDone.countDown();
93                              }
94                          });
95                    }
96                });
97  
98              cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
99                .handler(new ChannelInitializer<Channel>() {
100                   @Override
101                   protected void initChannel(Channel ch) throws Exception {
102                       ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
103 
104                             @Override
105                             public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
106                                 if (ChannelInputShutdownEvent.INSTANCE == evt) {
107                                     ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
108                                 }
109 
110                                 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
111                                     ctx.close();
112                                 }
113                             }
114                         });
115                   }
116               });
117 
118             serverChannel = sb.bind().sync().channel();
119             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
120             waitHalfClosureDone.await();
121         } finally {
122             if (clientChannel != null) {
123                 clientChannel.close().sync();
124             }
125 
126             if (serverChannel != null) {
127                 serverChannel.close().sync();
128             }
129         }
130     }
131 
132     @Test
133     @Timeout(value = 10000, unit = MILLISECONDS)
134     public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
135         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
136             @Override
137             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
138                 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
139             }
140         });
141     }
142 
143     public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
144         Channel serverChannel = null;
145         try {
146             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
147                     .option(ChannelOption.AUTO_READ, true);
148             sb.childHandler(new ChannelInitializer<Channel>() {
149                 @Override
150                 protected void initChannel(Channel ch) {
151                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
152                         @Override
153                         public void channelActive(ChannelHandlerContext ctx) {
154                             ((DuplexChannel) ctx).shutdownOutput();
155                         }
156 
157                         @Override
158                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
159                             ctx.close();
160                         }
161                     });
162                 }
163             });
164 
165             final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
166             final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
167 
168             cb.handler(new ChannelInitializer<Channel>() {
169                 @Override
170                 protected void initChannel(Channel ch) {
171                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
172 
173                         @Override
174                         public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
175                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
176                                 shutdownEventReceivedCounter.incrementAndGet();
177                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
178                                 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
179                                 ctx.executor().schedule(new Runnable() {
180                                     @Override
181                                     public void run() {
182                                         ctx.close();
183                                     }
184                                 }, 100, MILLISECONDS);
185                             }
186                         }
187 
188                         @Override
189                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
190                             ctx.close();
191                         }
192                     });
193                 }
194             });
195 
196             serverChannel = sb.bind().sync().channel();
197             Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
198             clientChannel.closeFuture().await();
199             assertEquals(1, shutdownEventReceivedCounter.get());
200             assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
201         } finally {
202             if (serverChannel != null) {
203                 serverChannel.close().sync();
204             }
205         }
206     }
207 
208     @Test
209     public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
210         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
211             @Override
212             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
213                 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
214             }
215         });
216     }
217 
218     public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
219         testAllDataReadAfterHalfClosure(true, sb, cb);
220         testAllDataReadAfterHalfClosure(false, sb, cb);
221     }
222 
223     private static void testAllDataReadAfterHalfClosure(final boolean autoRead,
224                                                         ServerBootstrap sb, Bootstrap cb) throws Throwable {
225         final int totalServerBytesWritten = 1024 * 16;
226         final int numReadsPerReadLoop = 2;
227         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
228         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
229         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
230         final AtomicInteger clientReadCompletes = new AtomicInteger();
231         Channel serverChannel = null;
232         Channel clientChannel = null;
233         try {
234             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
235               .option(ChannelOption.AUTO_READ, autoRead)
236               .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
237 
238             sb.childHandler(new ChannelInitializer<Channel>() {
239                 @Override
240                 protected void initChannel(Channel ch) throws Exception {
241                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
242                         @Override
243                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
244                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
245                             buf.writerIndex(buf.capacity());
246                             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
247                                 @Override
248                                 public void operationComplete(ChannelFuture future) throws Exception {
249                                     ((DuplexChannel) future.channel()).shutdownOutput();
250                                 }
251                             });
252                             serverInitializedLatch.countDown();
253                         }
254 
255                         @Override
256                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
257                             ctx.close();
258                         }
259                     });
260                 }
261             });
262 
263             cb.handler(new ChannelInitializer<Channel>() {
264                 @Override
265                 protected void initChannel(Channel ch) throws Exception {
266                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
267                         private int bytesRead;
268 
269                         @Override
270                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
271                             ByteBuf buf = (ByteBuf) msg;
272                             bytesRead += buf.readableBytes();
273                             buf.release();
274                         }
275 
276                         @Override
277                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
278                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
279                                 clientHalfClosedLatch.countDown();
280                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
281                                 ctx.close();
282                             }
283                         }
284 
285                         @Override
286                         public void channelReadComplete(ChannelHandlerContext ctx) {
287                             clientReadCompletes.incrementAndGet();
288                             if (bytesRead == totalServerBytesWritten) {
289                                 clientReadAllDataLatch.countDown();
290                             }
291                             if (!autoRead) {
292                                 ctx.read();
293                             }
294                         }
295 
296                         @Override
297                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
298                             ctx.close();
299                         }
300                     });
301                 }
302             });
303 
304             serverChannel = sb.bind().sync().channel();
305             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
306             clientChannel.read();
307 
308             serverInitializedLatch.await();
309             clientReadAllDataLatch.await();
310             clientHalfClosedLatch.await();
311             assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
312                 "too many read complete events: " + clientReadCompletes.get());
313         } finally {
314             if (clientChannel != null) {
315                 clientChannel.close().sync();
316             }
317             if (serverChannel != null) {
318                 serverChannel.close().sync();
319             }
320         }
321     }
322 
323     @Test
324     public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
325         // This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows.
326         assumeFalse(PlatformDependent.isWindows());
327         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
328             @Override
329             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
330                 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
331             }
332         });
333     }
334 
335     public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
336         testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
337         testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
338         testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
339         testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
340     }
341 
342     private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
343                                                              final boolean clientIsLeader,
344                                                              ServerBootstrap sb,
345                                                              Bootstrap cb) throws InterruptedException {
346         final int expectedBytes = 100;
347         final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
348         final CountDownLatch doneLatch = new CountDownLatch(1);
349         final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
350         Channel serverChannel = null;
351         Channel clientChannel = null;
352         try {
353             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
354                     .option(ChannelOption.AUTO_CLOSE, false)
355                     .option(ChannelOption.SO_LINGER, 0);
356             sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
357                     .childOption(ChannelOption.AUTO_CLOSE, false)
358                     .childOption(ChannelOption.SO_LINGER, 0);
359 
360             final SimpleChannelInboundHandler<ByteBuf> leaderHandler = new AutoCloseFalseLeader(expectedBytes,
361                     serverReadExpectedLatch, doneLatch, causeRef);
362             final SimpleChannelInboundHandler<ByteBuf> followerHandler = new AutoCloseFalseFollower(expectedBytes,
363                     serverReadExpectedLatch, doneLatch, causeRef);
364             sb.childHandler(new ChannelInitializer<Channel>() {
365                 @Override
366                 protected void initChannel(Channel ch) throws Exception {
367                     ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
368                 }
369             });
370 
371             cb.handler(new ChannelInitializer<Channel>() {
372                 @Override
373                 protected void initChannel(Channel ch) throws Exception {
374                     ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
375                 }
376             });
377 
378             serverChannel = sb.bind().sync().channel();
379             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
380 
381             doneLatch.await();
382             assertNull(causeRef.get());
383         } finally {
384             if (clientChannel != null) {
385                 clientChannel.close().sync();
386             }
387             if (serverChannel != null) {
388                 serverChannel.close().sync();
389             }
390         }
391     }
392 
393     private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
394         private final int expectedBytes;
395         private final CountDownLatch followerCloseLatch;
396         private final CountDownLatch doneLatch;
397         private final AtomicReference<Throwable> causeRef;
398         private int bytesRead;
399 
400         AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
401                                AtomicReference<Throwable> causeRef) {
402             this.expectedBytes = expectedBytes;
403             this.followerCloseLatch = followerCloseLatch;
404             this.doneLatch = doneLatch;
405             this.causeRef = causeRef;
406         }
407 
408         @Override
409         public void channelInactive(ChannelHandlerContext ctx) {
410             checkPrematureClose();
411         }
412 
413         @Override
414         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
415             ctx.close();
416             checkPrematureClose();
417         }
418 
419         @Override
420         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
421             bytesRead += msg.readableBytes();
422             if (bytesRead >= expectedBytes) {
423                 // We write a reply and immediately close our end of the socket.
424                 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
425                 buf.writerIndex(buf.writerIndex() + expectedBytes);
426                 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
427                     @Override
428                     public void operationComplete(ChannelFuture future) throws Exception {
429                         future.channel().close().addListener(new ChannelFutureListener() {
430                             @Override
431                             public void operationComplete(final ChannelFuture future) throws Exception {
432                                 // This is a bit racy but there is no better way how to handle this in Java11.
433                                 // The problem is that on close() the underlying FD will not actually be closed directly
434                                 // but the close will be done after the Selector did process all events. Because of
435                                 // this we will need to give it a bit time to ensure the FD is actual closed before we
436                                 // count down the latch and try to write.
437                                 future.channel().eventLoop().schedule(new Runnable() {
438                                     @Override
439                                     public void run() {
440                                         followerCloseLatch.countDown();
441                                     }
442                                 }, 200, TimeUnit.MILLISECONDS);
443                             }
444                         });
445                     }
446                 });
447             }
448         }
449 
450         private void checkPrematureClose() {
451             if (bytesRead < expectedBytes) {
452                 causeRef.set(new IllegalStateException("follower premature close"));
453                 doneLatch.countDown();
454             }
455         }
456     }
457 
458     private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
459         private final int expectedBytes;
460         private final CountDownLatch followerCloseLatch;
461         private final CountDownLatch doneLatch;
462         private final AtomicReference<Throwable> causeRef;
463         private int bytesRead;
464         private boolean seenOutputShutdown;
465 
466         AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
467                              AtomicReference<Throwable> causeRef) {
468             this.expectedBytes = expectedBytes;
469             this.followerCloseLatch = followerCloseLatch;
470             this.doneLatch = doneLatch;
471             this.causeRef = causeRef;
472         }
473 
474         @Override
475         public void channelActive(ChannelHandlerContext ctx) throws Exception {
476             ByteBuf buf = ctx.alloc().buffer(expectedBytes);
477             buf.writerIndex(buf.writerIndex() + expectedBytes);
478             ctx.writeAndFlush(buf.retainedDuplicate());
479 
480             // We wait here to ensure that we write before we have a chance to process the outbound
481             // shutdown event.
482             followerCloseLatch.await();
483 
484             // This write should fail, but we should still be allowed to read the peer's data
485             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
486                 @Override
487                 public void operationComplete(ChannelFuture future) throws Exception {
488                     if (future.cause() == null) {
489                         causeRef.set(new IllegalStateException("second write should have failed!"));
490                         doneLatch.countDown();
491                     }
492                 }
493             });
494         }
495 
496         @Override
497         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
498             bytesRead += msg.readableBytes();
499             if (bytesRead >= expectedBytes) {
500                 if (!seenOutputShutdown) {
501                     causeRef.set(new IllegalStateException(
502                             ChannelOutputShutdownEvent.class.getSimpleName() + " event was not seen"));
503                 }
504                 doneLatch.countDown();
505             }
506         }
507 
508         @Override
509         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
510             if (evt instanceof ChannelOutputShutdownEvent) {
511                 seenOutputShutdown = true;
512             }
513         }
514 
515         @Override
516         public void channelInactive(ChannelHandlerContext ctx) {
517             checkPrematureClose();
518         }
519 
520         @Override
521         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
522             ctx.close();
523             checkPrematureClose();
524         }
525 
526         private void checkPrematureClose() {
527             if (bytesRead < expectedBytes || !seenOutputShutdown) {
528                 causeRef.set(new IllegalStateException("leader premature close"));
529                 doneLatch.countDown();
530             }
531         }
532     }
533 
534     @Test
535     public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
536         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
537             @Override
538             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
539                 testAllDataReadClosure(serverBootstrap, bootstrap);
540             }
541         });
542     }
543 
544     public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
545         testAllDataReadClosure(true, false, sb, cb);
546         testAllDataReadClosure(true, true, sb, cb);
547         testAllDataReadClosure(false, false, sb, cb);
548         testAllDataReadClosure(false, true, sb, cb);
549     }
550 
551     private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
552                                                ServerBootstrap sb, Bootstrap cb) throws Throwable {
553         final int totalServerBytesWritten = 1024 * 16;
554         final int numReadsPerReadLoop = 2;
555         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
556         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
557         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
558         final AtomicInteger clientReadCompletes = new AtomicInteger();
559         Channel serverChannel = null;
560         Channel clientChannel = null;
561         try {
562             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
563                     .option(ChannelOption.AUTO_READ, autoRead)
564                     .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
565 
566             sb.childHandler(new ChannelInitializer<Channel>() {
567                 @Override
568                 protected void initChannel(Channel ch) throws Exception {
569                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
570                         @Override
571                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
572                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
573                             buf.writerIndex(buf.capacity());
574                             ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
575                             serverInitializedLatch.countDown();
576                         }
577 
578                         @Override
579                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
580                             ctx.close();
581                         }
582                     });
583                 }
584             });
585 
586             cb.handler(new ChannelInitializer<Channel>() {
587                 @Override
588                 protected void initChannel(Channel ch) throws Exception {
589                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
590                         private int bytesRead;
591 
592                         @Override
593                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
594                             ByteBuf buf = (ByteBuf) msg;
595                             bytesRead += buf.readableBytes();
596                             buf.release();
597                         }
598 
599                         @Override
600                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
601                             if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
602                                 clientHalfClosedLatch.countDown();
603                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
604                                 ctx.close();
605                             }
606                         }
607 
608                         @Override
609                         public void channelInactive(ChannelHandlerContext ctx) {
610                             if (!allowHalfClosed) {
611                                 clientHalfClosedLatch.countDown();
612                             }
613                         }
614 
615                         @Override
616                         public void channelReadComplete(ChannelHandlerContext ctx) {
617                             clientReadCompletes.incrementAndGet();
618                             if (bytesRead == totalServerBytesWritten) {
619                                 clientReadAllDataLatch.countDown();
620                             }
621                             if (!autoRead) {
622                                 ctx.read();
623                             }
624                         }
625 
626                         @Override
627                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
628                             ctx.close();
629                         }
630                     });
631                 }
632             });
633 
634             serverChannel = sb.bind().sync().channel();
635             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
636             clientChannel.read();
637 
638             serverInitializedLatch.await();
639             clientReadAllDataLatch.await();
640             clientHalfClosedLatch.await();
641             assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
642                 "too many read complete events: " + clientReadCompletes.get());
643         } finally {
644             if (clientChannel != null) {
645                 clientChannel.close().sync();
646             }
647             if (serverChannel != null) {
648                 serverChannel.close().sync();
649             }
650         }
651     }
652 
653     /**
654      * Designed to read a single byte at a time to control the number of reads done at a fine granularity.
655      */
656     private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
657         private final int numReads;
658         TestNumReadsRecvByteBufAllocator(int numReads) {
659             this.numReads = numReads;
660         }
661 
662         @Override
663         public ExtendedHandle newHandle() {
664             return new ExtendedHandle() {
665                 private int attemptedBytesRead;
666                 private int lastBytesRead;
667                 private int numMessagesRead;
668                 @Override
669                 public ByteBuf allocate(ByteBufAllocator alloc) {
670                     return alloc.ioBuffer(guess(), guess());
671                 }
672 
673                 @Override
674                 public int guess() {
675                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
676                 }
677 
678                 @Override
679                 public void reset(ChannelConfig config) {
680                     numMessagesRead = 0;
681                 }
682 
683                 @Override
684                 public void incMessagesRead(int numMessages) {
685                     numMessagesRead += numMessages;
686                 }
687 
688                 @Override
689                 public void lastBytesRead(int bytes) {
690                     lastBytesRead = bytes;
691                 }
692 
693                 @Override
694                 public int lastBytesRead() {
695                     return lastBytesRead;
696                 }
697 
698                 @Override
699                 public void attemptedBytesRead(int bytes) {
700                     attemptedBytesRead = bytes;
701                 }
702 
703                 @Override
704                 public int attemptedBytesRead() {
705                     return attemptedBytesRead;
706                 }
707 
708                 @Override
709                 public boolean continueReading() {
710                     return numMessagesRead < numReads;
711                 }
712 
713                 @Override
714                 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
715                     return continueReading() && maybeMoreDataSupplier.get();
716                 }
717 
718                 @Override
719                 public void readComplete() {
720                     // Nothing needs to be done or adjusted after each read cycle is completed.
721                 }
722             };
723         }
724     }
725 }