View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelConfig;
24  import io.netty.channel.ChannelFuture;
25  import io.netty.channel.ChannelFutureListener;
26  import io.netty.channel.ChannelHandlerContext;
27  import io.netty.channel.ChannelInboundHandlerAdapter;
28  import io.netty.channel.ChannelInitializer;
29  import io.netty.channel.ChannelOption;
30  import io.netty.channel.RecvByteBufAllocator;
31  import io.netty.channel.SimpleChannelInboundHandler;
32  import io.netty.channel.socket.ChannelInputShutdownEvent;
33  import io.netty.channel.socket.ChannelInputShutdownReadComplete;
34  import io.netty.channel.socket.ChannelOutputShutdownEvent;
35  import io.netty.channel.socket.DuplexChannel;
36  import io.netty.channel.socket.SocketChannel;
37  import io.netty.util.ReferenceCountUtil;
38  import io.netty.util.UncheckedBooleanSupplier;
39  import io.netty.util.internal.PlatformDependent;
40  import org.junit.jupiter.api.Test;
41  import org.junit.jupiter.api.TestInfo;
42  import org.junit.jupiter.api.Timeout;
43  
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicInteger;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import static java.util.concurrent.TimeUnit.MILLISECONDS;
50  import static org.junit.jupiter.api.Assertions.assertEquals;
51  import static org.junit.jupiter.api.Assertions.assertNull;
52  import static org.junit.jupiter.api.Assertions.assertTrue;
53  import static org.junit.jupiter.api.Assumptions.assumeFalse;
54  
55  public class SocketHalfClosedTest extends AbstractSocketTest {
56  
57      protected int maxReadCompleteWithNoDataAfterInputShutdown() {
58          return 2; // nio needs read flag to detect full closure.
59      }
60  
61      @Test
62      @Timeout(value = 5000, unit = MILLISECONDS)
63      public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
64          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
65              @Override
66              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
67                  testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
68              }
69          });
70      }
71  
72      private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
73              throws Throwable {
74          Channel serverChannel = null;
75          Channel clientChannel = null;
76  
77          final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
78          try {
79              sb.childOption(ChannelOption.SO_LINGER, 1)
80                .childHandler(new ChannelInitializer<Channel>() {
81  
82                    @Override
83                    protected void initChannel(Channel ch) throws Exception {
84                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
85  
86                              @Override
87                              public void channelActive(final ChannelHandlerContext ctx) {
88                                  SocketChannel channel = (SocketChannel) ctx.channel();
89                                  channel.shutdownOutput();
90                              }
91  
92                              @Override
93                              public void channelRead(ChannelHandlerContext ctx, Object msg) {
94                                  ReferenceCountUtil.release(msg);
95                                  waitHalfClosureDone.countDown();
96                              }
97                          });
98                    }
99                });
100 
101             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
102               .handler(new ChannelInitializer<Channel>() {
103                   @Override
104                   protected void initChannel(Channel ch) throws Exception {
105                       ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
106 
107                             @Override
108                             public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
109                                 if (ChannelInputShutdownEvent.INSTANCE == evt) {
110                                     ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
111                                 }
112 
113                                 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
114                                     ctx.close();
115                                 }
116                             }
117                         });
118                   }
119               });
120 
121             serverChannel = sb.bind().sync().channel();
122             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
123             waitHalfClosureDone.await();
124         } finally {
125             if (clientChannel != null) {
126                 clientChannel.close().sync();
127             }
128 
129             if (serverChannel != null) {
130                 serverChannel.close().sync();
131             }
132         }
133     }
134 
135     @Test
136     @Timeout(value = 10000, unit = MILLISECONDS)
137     public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
138         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
139             @Override
140             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
141                 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
142             }
143         });
144     }
145 
146     public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
147         Channel serverChannel = null;
148         try {
149             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
150                     .option(ChannelOption.AUTO_READ, true);
151             sb.childHandler(new ChannelInitializer<Channel>() {
152                 @Override
153                 protected void initChannel(Channel ch) {
154                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
155                         @Override
156                         public void channelActive(ChannelHandlerContext ctx) {
157                             ((DuplexChannel) ctx).shutdownOutput();
158                         }
159 
160                         @Override
161                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
162                             ctx.close();
163                         }
164                     });
165                 }
166             });
167 
168             final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
169             final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
170 
171             cb.handler(new ChannelInitializer<Channel>() {
172                 @Override
173                 protected void initChannel(Channel ch) {
174                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
175 
176                         @Override
177                         public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
178                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
179                                 shutdownEventReceivedCounter.incrementAndGet();
180                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
181                                 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
182                                 ctx.executor().schedule(new Runnable() {
183                                     @Override
184                                     public void run() {
185                                         ctx.close();
186                                     }
187                                 }, 100, MILLISECONDS);
188                             }
189                         }
190 
191                         @Override
192                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
193                             ctx.close();
194                         }
195                     });
196                 }
197             });
198 
199             serverChannel = sb.bind().sync().channel();
200             Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
201             clientChannel.closeFuture().await();
202             assertEquals(1, shutdownEventReceivedCounter.get());
203             assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
204         } finally {
205             if (serverChannel != null) {
206                 serverChannel.close().sync();
207             }
208         }
209     }
210 
211     @Test
212     public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
213         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
214             @Override
215             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
216                 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
217             }
218         });
219     }
220 
221     public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
222         testAllDataReadAfterHalfClosure(true, sb, cb);
223         testAllDataReadAfterHalfClosure(false, sb, cb);
224     }
225 
226     private void testAllDataReadAfterHalfClosure(final boolean autoRead,
227                                                  ServerBootstrap sb, Bootstrap cb) throws Throwable {
228         final int totalServerBytesWritten = 1024 * 16;
229         final int numReadsPerReadLoop = 2;
230         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
231         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
232         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
233         final AtomicInteger clientReadCompletes = new AtomicInteger();
234         final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
235         Channel serverChannel = null;
236         Channel clientChannel = null;
237         try {
238             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
239               .option(ChannelOption.AUTO_READ, autoRead)
240               .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
241 
242             sb.childHandler(new ChannelInitializer<Channel>() {
243                 @Override
244                 protected void initChannel(Channel ch) throws Exception {
245                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
246                         @Override
247                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
248                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
249                             buf.writerIndex(buf.capacity());
250                             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
251                                 @Override
252                                 public void operationComplete(ChannelFuture future) throws Exception {
253                                     ((DuplexChannel) future.channel()).shutdownOutput();
254                                 }
255                             });
256                             serverInitializedLatch.countDown();
257                         }
258 
259                         @Override
260                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
261                             ctx.close();
262                         }
263                     });
264                 }
265             });
266 
267             cb.handler(new ChannelInitializer<Channel>() {
268                 @Override
269                 protected void initChannel(Channel ch) throws Exception {
270                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
271                         private int bytesRead;
272                         private int bytesSinceReadComplete;
273 
274                         @Override
275                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
276                             ByteBuf buf = (ByteBuf) msg;
277                             bytesRead += buf.readableBytes();
278                             bytesSinceReadComplete += buf.readableBytes();
279                             buf.release();
280                         }
281 
282                         @Override
283                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
284                             if (evt == ChannelInputShutdownEvent.INSTANCE) {
285                                 clientHalfClosedLatch.countDown();
286                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
287                                 ctx.close();
288                             }
289                         }
290 
291                         @Override
292                         public void channelReadComplete(ChannelHandlerContext ctx) {
293                             if (bytesSinceReadComplete == 0) {
294                                 clientZeroDataReadCompletes.incrementAndGet();
295                             } else {
296                                 bytesSinceReadComplete = 0;
297                             }
298                             clientReadCompletes.incrementAndGet();
299                             if (bytesRead == totalServerBytesWritten) {
300                                 clientReadAllDataLatch.countDown();
301                             }
302                             if (!autoRead) {
303                                 ctx.read();
304                             }
305                         }
306 
307                         @Override
308                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
309                             ctx.close();
310                         }
311                     });
312                 }
313             });
314 
315             serverChannel = sb.bind().sync().channel();
316             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
317             clientChannel.read();
318 
319             serverInitializedLatch.await();
320             clientReadAllDataLatch.await();
321             clientHalfClosedLatch.await();
322             // In practice this should be much less, as we allow numReadsPerReadLoop per wakeup, but we limit the
323             // number of bytes to 1 per read so in theory we may need more. We check below that readComplete is called
324             // when data is actually read.
325             assertTrue(totalServerBytesWritten > clientReadCompletes.get(),
326                     "too many read complete events: " + clientReadCompletes.get());
327             assertTrue(clientZeroDataReadCompletes.get() <= maxReadCompleteWithNoDataAfterInputShutdown(),
328                     "too many readComplete with no data: " + clientZeroDataReadCompletes.get() + " readComplete: " +
329                             clientReadCompletes.get());
330         } finally {
331             if (clientChannel != null) {
332                 clientChannel.close().sync();
333             }
334             if (serverChannel != null) {
335                 serverChannel.close().sync();
336             }
337         }
338     }
339 
340     @Test
341     public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
342         // This test only works on Linux / BSD / MacOS as we assume some semantics that are not true for Windows.
343         assumeFalse(PlatformDependent.isWindows());
344         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
345             @Override
346             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
347                 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
348             }
349         });
350     }
351 
352     public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
353         testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
354         testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
355         testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
356         testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
357     }
358 
359     private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
360                                                              final boolean clientIsLeader,
361                                                              ServerBootstrap sb,
362                                                              Bootstrap cb) throws InterruptedException {
363         final int expectedBytes = 100;
364         final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
365         final CountDownLatch doneLatch = new CountDownLatch(2);
366         final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
367         Channel serverChannel = null;
368         Channel clientChannel = null;
369         try {
370             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
371                     .option(ChannelOption.AUTO_CLOSE, false)
372                     .option(ChannelOption.SO_LINGER, 0);
373             sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
374                     .childOption(ChannelOption.AUTO_CLOSE, false)
375                     .childOption(ChannelOption.SO_LINGER, 0);
376 
377             final AutoCloseFalseLeader leaderHandler = new AutoCloseFalseLeader(expectedBytes,
378                     serverReadExpectedLatch, doneLatch, causeRef);
379             final AutoCloseFalseFollower followerHandler = new AutoCloseFalseFollower(expectedBytes,
380                     serverReadExpectedLatch, doneLatch, causeRef);
381             sb.childHandler(new ChannelInitializer<Channel>() {
382                 @Override
383                 protected void initChannel(Channel ch) throws Exception {
384                     ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
385                 }
386             });
387 
388             cb.handler(new ChannelInitializer<Channel>() {
389                 @Override
390                 protected void initChannel(Channel ch) throws Exception {
391                     ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
392                 }
393             });
394 
395             serverChannel = sb.bind().sync().channel();
396             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
397 
398             doneLatch.await();
399             assertNull(causeRef.get());
400             assertTrue(leaderHandler.seenOutputShutdown);
401         } finally {
402             if (clientChannel != null) {
403                 clientChannel.close().sync();
404             }
405             if (serverChannel != null) {
406                 serverChannel.close().sync();
407             }
408         }
409     }
410 
411     private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
412         private final int expectedBytes;
413         private final CountDownLatch followerCloseLatch;
414         private final CountDownLatch doneLatch;
415         private final AtomicReference<Throwable> causeRef;
416         private int bytesRead;
417 
418         AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
419                                AtomicReference<Throwable> causeRef) {
420             this.expectedBytes = expectedBytes;
421             this.followerCloseLatch = followerCloseLatch;
422             this.doneLatch = doneLatch;
423             this.causeRef = causeRef;
424         }
425 
426         @Override
427         public void channelInactive(ChannelHandlerContext ctx) {
428             checkPrematureClose();
429         }
430 
431         @Override
432         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
433             ctx.close();
434             checkPrematureClose();
435         }
436 
437         @Override
438         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
439             bytesRead += msg.readableBytes();
440             if (bytesRead >= expectedBytes) {
441                 // We write a reply and immediately close our end of the socket.
442                 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
443                 buf.writerIndex(buf.writerIndex() + expectedBytes);
444                 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
445                     @Override
446                     public void operationComplete(ChannelFuture future) throws Exception {
447                         future.channel().close().addListener(new ChannelFutureListener() {
448                             @Override
449                             public void operationComplete(final ChannelFuture future) throws Exception {
450                                 // This is a bit racy but there is no better way how to handle this in Java11.
451                                 // The problem is that on close() the underlying FD will not actually be closed directly
452                                 // but the close will be done after the Selector did process all events. Because of
453                                 // this we will need to give it a bit time to ensure the FD is actual closed before we
454                                 // count down the latch and try to write.
455                                 future.channel().eventLoop().schedule(new Runnable() {
456                                     @Override
457                                     public void run() {
458                                         followerCloseLatch.countDown();
459                                     }
460                                 }, 200, TimeUnit.MILLISECONDS);
461                             }
462                         });
463                     }
464                 });
465             }
466         }
467 
468         private void checkPrematureClose() {
469             if (bytesRead < expectedBytes) {
470                 causeRef.set(new IllegalStateException("follower premature close"));
471                 doneLatch.countDown();
472             }
473         }
474     }
475 
476     private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
477         private final int expectedBytes;
478         private final CountDownLatch followerCloseLatch;
479         private final CountDownLatch doneLatch;
480         private final AtomicReference<Throwable> causeRef;
481         private int bytesRead;
482         boolean seenOutputShutdown;
483 
484         AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
485                              AtomicReference<Throwable> causeRef) {
486             this.expectedBytes = expectedBytes;
487             this.followerCloseLatch = followerCloseLatch;
488             this.doneLatch = doneLatch;
489             this.causeRef = causeRef;
490         }
491 
492         @Override
493         public void channelActive(ChannelHandlerContext ctx) throws Exception {
494             ByteBuf buf = ctx.alloc().buffer(expectedBytes);
495             buf.writerIndex(buf.writerIndex() + expectedBytes);
496             ctx.writeAndFlush(buf.retainedDuplicate());
497 
498             // We wait here to ensure that we write before we have a chance to process the outbound
499             // shutdown event.
500             followerCloseLatch.await();
501 
502             // This write should fail, but we should still be allowed to read the peer's data
503             ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
504                 @Override
505                 public void operationComplete(ChannelFuture future) throws Exception {
506                     if (future.cause() == null) {
507                         causeRef.set(new IllegalStateException("second write should have failed!"));
508                         doneLatch.countDown();
509                     }
510                 }
511             });
512         }
513 
514         @Override
515         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
516             bytesRead += msg.readableBytes();
517             if (bytesRead >= expectedBytes) {
518                 doneLatch.countDown();
519             }
520         }
521 
522         @Override
523         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
524             if (evt instanceof ChannelOutputShutdownEvent) {
525                 seenOutputShutdown = true;
526                 doneLatch.countDown();
527             }
528         }
529 
530         @Override
531         public void channelInactive(ChannelHandlerContext ctx) {
532             checkPrematureClose();
533         }
534 
535         @Override
536         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
537             ctx.close();
538             checkPrematureClose();
539         }
540 
541         private void checkPrematureClose() {
542             if (bytesRead < expectedBytes || !seenOutputShutdown) {
543                 causeRef.set(new IllegalStateException("leader premature close"));
544                 doneLatch.countDown();
545             }
546         }
547     }
548 
549     @Test
550     public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
551         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
552             @Override
553             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
554                 testAllDataReadClosure(serverBootstrap, bootstrap);
555             }
556         });
557     }
558 
559     public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
560         testAllDataReadClosure(true, false, sb, cb);
561         testAllDataReadClosure(true, true, sb, cb);
562         testAllDataReadClosure(false, false, sb, cb);
563         testAllDataReadClosure(false, true, sb, cb);
564     }
565 
566     private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
567                                                ServerBootstrap sb, Bootstrap cb) throws Throwable {
568         final int totalServerBytesWritten = 1024 * 16;
569         final int numReadsPerReadLoop = 2;
570         final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
571         final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
572         final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
573         final AtomicInteger clientReadCompletes = new AtomicInteger();
574         Channel serverChannel = null;
575         Channel clientChannel = null;
576         try {
577             cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
578                     .option(ChannelOption.AUTO_READ, autoRead)
579                     .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
580 
581             sb.childHandler(new ChannelInitializer<Channel>() {
582                 @Override
583                 protected void initChannel(Channel ch) throws Exception {
584                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
585                         @Override
586                         public void channelActive(ChannelHandlerContext ctx) throws Exception {
587                             ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
588                             buf.writerIndex(buf.capacity());
589                             ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
590                             serverInitializedLatch.countDown();
591                         }
592 
593                         @Override
594                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
595                             ctx.close();
596                         }
597                     });
598                 }
599             });
600 
601             cb.handler(new ChannelInitializer<Channel>() {
602                 @Override
603                 protected void initChannel(Channel ch) throws Exception {
604                     ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
605                         private int bytesRead;
606 
607                         @Override
608                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
609                             ByteBuf buf = (ByteBuf) msg;
610                             bytesRead += buf.readableBytes();
611                             buf.release();
612                         }
613 
614                         @Override
615                         public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
616                             if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
617                                 clientHalfClosedLatch.countDown();
618                             } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
619                                 ctx.close();
620                             }
621                         }
622 
623                         @Override
624                         public void channelInactive(ChannelHandlerContext ctx) {
625                             if (!allowHalfClosed) {
626                                 clientHalfClosedLatch.countDown();
627                             }
628                         }
629 
630                         @Override
631                         public void channelReadComplete(ChannelHandlerContext ctx) {
632                             clientReadCompletes.incrementAndGet();
633                             if (bytesRead == totalServerBytesWritten) {
634                                 clientReadAllDataLatch.countDown();
635                             }
636                             if (!autoRead) {
637                                 ctx.read();
638                             }
639                         }
640 
641                         @Override
642                         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
643                             ctx.close();
644                         }
645                     });
646                 }
647             });
648 
649             serverChannel = sb.bind().sync().channel();
650             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
651             clientChannel.read();
652 
653             serverInitializedLatch.await();
654             clientReadAllDataLatch.await();
655             clientHalfClosedLatch.await();
656             assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
657                 "too many read complete events: " + clientReadCompletes.get());
658         } finally {
659             if (clientChannel != null) {
660                 clientChannel.close().sync();
661             }
662             if (serverChannel != null) {
663                 serverChannel.close().sync();
664             }
665         }
666     }
667 
668     /**
669      * Designed to read a single byte at a time to control the number of reads done at a fine granularity.
670      */
671     private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
672         private final int numReads;
673         TestNumReadsRecvByteBufAllocator(int numReads) {
674             this.numReads = numReads;
675         }
676 
677         @Override
678         public ExtendedHandle newHandle() {
679             return new ExtendedHandle() {
680                 private int attemptedBytesRead;
681                 private int lastBytesRead;
682                 private int numMessagesRead;
683                 @Override
684                 public ByteBuf allocate(ByteBufAllocator alloc) {
685                     return alloc.ioBuffer(guess(), guess());
686                 }
687 
688                 @Override
689                 public int guess() {
690                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
691                 }
692 
693                 @Override
694                 public void reset(ChannelConfig config) {
695                     numMessagesRead = 0;
696                 }
697 
698                 @Override
699                 public void incMessagesRead(int numMessages) {
700                     numMessagesRead += numMessages;
701                 }
702 
703                 @Override
704                 public void lastBytesRead(int bytes) {
705                     lastBytesRead = bytes;
706                 }
707 
708                 @Override
709                 public int lastBytesRead() {
710                     return lastBytesRead;
711                 }
712 
713                 @Override
714                 public void attemptedBytesRead(int bytes) {
715                     attemptedBytesRead = bytes;
716                 }
717 
718                 @Override
719                 public int attemptedBytesRead() {
720                     return attemptedBytesRead;
721                 }
722 
723                 @Override
724                 public boolean continueReading() {
725                     return numMessagesRead < numReads;
726                 }
727 
728                 @Override
729                 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
730                     return continueReading() && maybeMoreDataSupplier.get();
731                 }
732 
733                 @Override
734                 public void readComplete() {
735                     // Nothing needs to be done or adjusted after each read cycle is completed.
736                 }
737             };
738         }
739     }
740 }