1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelConfig;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelInboundHandlerAdapter;
28 import io.netty.channel.ChannelInitializer;
29 import io.netty.channel.ChannelOption;
30 import io.netty.channel.RecvByteBufAllocator;
31 import io.netty.channel.SimpleChannelInboundHandler;
32 import io.netty.channel.nio.NioEventLoopGroup;
33 import io.netty.channel.oio.OioEventLoopGroup;
34 import io.netty.channel.socket.ChannelInputShutdownEvent;
35 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
36 import io.netty.channel.socket.ChannelOutputShutdownEvent;
37 import io.netty.channel.socket.DuplexChannel;
38 import io.netty.channel.socket.SocketChannel;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.UncheckedBooleanSupplier;
41 import io.netty.util.internal.PlatformDependent;
42 import org.junit.jupiter.api.Test;
43 import org.junit.jupiter.api.TestInfo;
44 import org.junit.jupiter.api.Timeout;
45
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.TimeUnit;
48 import java.util.concurrent.atomic.AtomicInteger;
49 import java.util.concurrent.atomic.AtomicReference;
50
51 import static java.util.concurrent.TimeUnit.MILLISECONDS;
52 import static org.junit.jupiter.api.Assertions.assertEquals;
53 import static org.junit.jupiter.api.Assertions.assertNull;
54 import static org.junit.jupiter.api.Assertions.assertTrue;
55 import static org.junit.jupiter.api.Assumptions.assumeFalse;
56
57 @Timeout(value = 20000, unit = MILLISECONDS)
58 public class SocketHalfClosedTest extends AbstractSocketTest {
59
60 protected int maxReadCompleteWithNoDataAfterInputShutdown() {
61 return 2;
62 }
63
64 @Test
65 public void testAllDataReadEventTriggeredAfterHalfClosure(TestInfo testInfo) throws Throwable {
66 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
67 @Override
68 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
69 if (bootstrap.config().group() instanceof OioEventLoopGroup) {
70 logger.debug("Ignoring test for incompatible OIO event system");
71 return;
72 } else if (bootstrap.config().group() instanceof NioEventLoopGroup) {
73 logger.debug("Ignoring test for incompatible NioHandler");
74 return;
75 }
76 allDataReadEventTriggeredAfterHalfClosure(serverBootstrap, bootstrap);
77 }
78 });
79 }
80
81 private void allDataReadEventTriggeredAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
82 final int totalServerBytesWritten = 1;
83 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
84 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
85 final CountDownLatch clientHalfClosedAllBytesRead = new CountDownLatch(1);
86 final AtomicInteger clientReadCompletes = new AtomicInteger();
87 final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
88 Channel serverChannel = null;
89 Channel clientChannel = null;
90 final AtomicReference<Channel> serverChildChannel = new AtomicReference<Channel>();
91 try {
92 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
93 .option(ChannelOption.AUTO_CLOSE, false)
94 .option(ChannelOption.AUTO_READ, false);
95
96 sb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
97 .option(ChannelOption.AUTO_CLOSE, false)
98 .childOption(ChannelOption.TCP_NODELAY, true);
99
100 sb.childHandler(new ChannelInitializer<Channel>() {
101 @Override
102 protected void initChannel(Channel ch) throws Exception {
103 serverChildChannel.set(ch);
104 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
105 @Override
106 public void channelActive(ChannelHandlerContext ctx) throws Exception {
107 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
108 buf.writerIndex(buf.capacity());
109 ctx.writeAndFlush(buf);
110 }
111
112 @Override
113 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
114 ctx.close();
115 }
116 });
117 }
118 });
119
120
121 cb.handler(new ChannelInitializer<Channel>() {
122 @Override
123 protected void initChannel(final Channel ch) {
124 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
125 private int bytesRead;
126 private int bytesSinceReadComplete;
127
128 @Override
129 public void channelRead(ChannelHandlerContext ctx, Object msg) {
130 ByteBuf buf = (ByteBuf) msg;
131 bytesRead += buf.readableBytes();
132 bytesSinceReadComplete += buf.readableBytes();
133 buf.release();
134 }
135
136 @Override
137 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
138 if (evt == ChannelInputShutdownEvent.INSTANCE) {
139 clientHalfClosedLatch.countDown();
140 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
141 clientHalfClosedAllBytesRead.countDown();
142 ctx.close();
143 }
144 }
145
146 @Override
147 public void channelReadComplete(ChannelHandlerContext ctx) {
148 if (bytesSinceReadComplete == 0) {
149 clientZeroDataReadCompletes.incrementAndGet();
150 } else {
151 bytesSinceReadComplete = 0;
152 }
153 clientReadCompletes.incrementAndGet();
154 if (bytesRead == totalServerBytesWritten) {
155
156
157 ch.eventLoop().execute(new Runnable() {
158 @Override
159 public void run() {
160 clientReadAllDataLatch.countDown();
161 }
162 });
163 } else {
164 ctx.read();
165 }
166 }
167
168 @Override
169 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
170 ctx.fireExceptionCaught(cause);
171 ctx.close();
172 }
173 });
174 ch.read();
175 }
176 });
177
178 serverChannel = sb.bind().sync().channel();
179 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
180 clientChannel.read();
181
182 clientReadAllDataLatch.await();
183
184
185 ((DuplexChannel) serverChildChannel.get()).shutdownOutput();
186
187 clientHalfClosedLatch.await();
188 clientHalfClosedAllBytesRead.await();
189 } finally {
190 if (clientChannel != null) {
191 clientChannel.close().sync();
192 }
193 if (serverChannel != null) {
194 serverChannel.close().sync();
195 }
196 }
197 }
198
199 @Test
200 public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
201 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
202 @Override
203 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
204 testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
205 }
206 });
207 }
208
209 private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
210 throws Throwable {
211 Channel serverChannel = null;
212 Channel clientChannel = null;
213
214 final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
215 try {
216 sb.childOption(ChannelOption.SO_LINGER, 1)
217 .childHandler(new ChannelInitializer<Channel>() {
218
219 @Override
220 protected void initChannel(Channel ch) throws Exception {
221 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
222
223 @Override
224 public void channelActive(final ChannelHandlerContext ctx) {
225 SocketChannel channel = (SocketChannel) ctx.channel();
226 channel.shutdownOutput();
227 }
228
229 @Override
230 public void channelRead(ChannelHandlerContext ctx, Object msg) {
231 ReferenceCountUtil.release(msg);
232 waitHalfClosureDone.countDown();
233 }
234 });
235 }
236 });
237
238 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
239 .handler(new ChannelInitializer<Channel>() {
240 @Override
241 protected void initChannel(Channel ch) throws Exception {
242 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
243
244 @Override
245 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
246 if (ChannelInputShutdownEvent.INSTANCE == evt) {
247 ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
248 }
249
250 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
251 ctx.close();
252 }
253 }
254 });
255 }
256 });
257
258 serverChannel = sb.bind().sync().channel();
259 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
260 waitHalfClosureDone.await();
261 } finally {
262 if (clientChannel != null) {
263 clientChannel.close().sync();
264 }
265
266 if (serverChannel != null) {
267 serverChannel.close().sync();
268 }
269 }
270 }
271
272 @Test
273 @Timeout(value = 10000, unit = MILLISECONDS)
274 public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
275 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
276 @Override
277 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
278 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
279 }
280 });
281 }
282
283 public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
284 Channel serverChannel = null;
285 try {
286 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
287 .option(ChannelOption.AUTO_READ, true);
288 sb.childHandler(new ChannelInitializer<Channel>() {
289 @Override
290 protected void initChannel(Channel ch) {
291 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
292 @Override
293 public void channelActive(ChannelHandlerContext ctx) {
294 ((DuplexChannel) ctx).shutdownOutput();
295 }
296
297 @Override
298 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
299 ctx.close();
300 }
301 });
302 }
303 });
304
305 final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
306 final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
307
308 cb.handler(new ChannelInitializer<Channel>() {
309 @Override
310 protected void initChannel(Channel ch) {
311 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
312
313 @Override
314 public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
315 if (evt == ChannelInputShutdownEvent.INSTANCE) {
316 shutdownEventReceivedCounter.incrementAndGet();
317 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
318 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
319 ctx.executor().schedule(new Runnable() {
320 @Override
321 public void run() {
322 ctx.close();
323 }
324 }, 100, MILLISECONDS);
325 }
326 }
327
328 @Override
329 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
330 ctx.close();
331 }
332 });
333 }
334 });
335
336 serverChannel = sb.bind().sync().channel();
337 Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
338 clientChannel.closeFuture().await();
339 assertEquals(1, shutdownEventReceivedCounter.get());
340 assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
341 } finally {
342 if (serverChannel != null) {
343 serverChannel.close().sync();
344 }
345 }
346 }
347
348 @Test
349 public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
350 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
351 @Override
352 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
353 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
354 }
355 });
356 }
357
358 public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
359 testAllDataReadAfterHalfClosure(true, sb, cb);
360 testAllDataReadAfterHalfClosure(false, sb, cb);
361 }
362
363 private void testAllDataReadAfterHalfClosure(final boolean autoRead,
364 ServerBootstrap sb, Bootstrap cb) throws Throwable {
365 final int totalServerBytesWritten = 1024 * 16;
366 final int numReadsPerReadLoop = 2;
367 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
368 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
369 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
370 final AtomicInteger clientReadCompletes = new AtomicInteger();
371 final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
372 Channel serverChannel = null;
373 Channel clientChannel = null;
374 try {
375 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
376 .option(ChannelOption.AUTO_READ, autoRead)
377 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
378
379 sb.childHandler(new ChannelInitializer<Channel>() {
380 @Override
381 protected void initChannel(Channel ch) throws Exception {
382 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
383 @Override
384 public void channelActive(ChannelHandlerContext ctx) throws Exception {
385 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
386 buf.writerIndex(buf.capacity());
387 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
388 @Override
389 public void operationComplete(ChannelFuture future) throws Exception {
390 ((DuplexChannel) future.channel()).shutdownOutput();
391 }
392 });
393 serverInitializedLatch.countDown();
394 }
395
396 @Override
397 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
398 ctx.close();
399 }
400 });
401 }
402 });
403
404 cb.handler(new ChannelInitializer<Channel>() {
405 @Override
406 protected void initChannel(Channel ch) throws Exception {
407 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
408 private int bytesRead;
409 private int bytesSinceReadComplete;
410
411 @Override
412 public void channelRead(ChannelHandlerContext ctx, Object msg) {
413 ByteBuf buf = (ByteBuf) msg;
414 bytesRead += buf.readableBytes();
415 bytesSinceReadComplete += buf.readableBytes();
416 buf.release();
417 }
418
419 @Override
420 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
421 if (evt == ChannelInputShutdownEvent.INSTANCE) {
422 clientHalfClosedLatch.countDown();
423 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
424 ctx.close();
425 }
426 }
427
428 @Override
429 public void channelReadComplete(ChannelHandlerContext ctx) {
430 if (bytesSinceReadComplete == 0) {
431 clientZeroDataReadCompletes.incrementAndGet();
432 } else {
433 bytesSinceReadComplete = 0;
434 }
435 clientReadCompletes.incrementAndGet();
436 if (bytesRead == totalServerBytesWritten) {
437 clientReadAllDataLatch.countDown();
438 }
439 if (!autoRead) {
440 ctx.read();
441 }
442 }
443
444 @Override
445 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
446 ctx.close();
447 }
448 });
449 }
450 });
451
452 serverChannel = sb.bind().sync().channel();
453 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
454 clientChannel.read();
455
456 serverInitializedLatch.await();
457 clientReadAllDataLatch.await();
458 clientHalfClosedLatch.await();
459
460
461
462 assertTrue(totalServerBytesWritten > clientReadCompletes.get(),
463 "too many read complete events: " + clientReadCompletes.get());
464 assertTrue(clientZeroDataReadCompletes.get() <= maxReadCompleteWithNoDataAfterInputShutdown(),
465 "too many readComplete with no data: " + clientZeroDataReadCompletes.get() + " readComplete: " +
466 clientReadCompletes.get());
467 } finally {
468 if (clientChannel != null) {
469 clientChannel.close().sync();
470 }
471 if (serverChannel != null) {
472 serverChannel.close().sync();
473 }
474 }
475 }
476
477 @Test
478 public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
479
480 assumeFalse(PlatformDependent.isWindows());
481 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
482 @Override
483 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
484 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
485 }
486 });
487 }
488
489 public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
490 testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
491 testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
492 testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
493 testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
494 }
495
496 private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
497 final boolean clientIsLeader,
498 ServerBootstrap sb,
499 Bootstrap cb) throws InterruptedException {
500 final int expectedBytes = 100;
501 final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
502 final CountDownLatch doneLatch = new CountDownLatch(2);
503 final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
504 Channel serverChannel = null;
505 Channel clientChannel = null;
506 try {
507 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
508 .option(ChannelOption.AUTO_CLOSE, false)
509 .option(ChannelOption.SO_LINGER, 0);
510 sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
511 .childOption(ChannelOption.AUTO_CLOSE, false)
512 .childOption(ChannelOption.SO_LINGER, 0);
513
514 final AutoCloseFalseLeader leaderHandler = new AutoCloseFalseLeader(expectedBytes,
515 serverReadExpectedLatch, doneLatch, causeRef);
516 final AutoCloseFalseFollower followerHandler = new AutoCloseFalseFollower(expectedBytes,
517 serverReadExpectedLatch, doneLatch, causeRef);
518 sb.childHandler(new ChannelInitializer<Channel>() {
519 @Override
520 protected void initChannel(Channel ch) throws Exception {
521 ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
522 }
523 });
524
525 cb.handler(new ChannelInitializer<Channel>() {
526 @Override
527 protected void initChannel(Channel ch) throws Exception {
528 ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
529 }
530 });
531
532 serverChannel = sb.bind().sync().channel();
533 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
534
535 doneLatch.await();
536 assertNull(causeRef.get());
537 assertTrue(leaderHandler.seenOutputShutdown);
538 } finally {
539 if (clientChannel != null) {
540 clientChannel.close().sync();
541 }
542 if (serverChannel != null) {
543 serverChannel.close().sync();
544 }
545 }
546 }
547
548 private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
549 private final int expectedBytes;
550 private final CountDownLatch followerCloseLatch;
551 private final CountDownLatch doneLatch;
552 private final AtomicReference<Throwable> causeRef;
553 private int bytesRead;
554
555 AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
556 AtomicReference<Throwable> causeRef) {
557 this.expectedBytes = expectedBytes;
558 this.followerCloseLatch = followerCloseLatch;
559 this.doneLatch = doneLatch;
560 this.causeRef = causeRef;
561 }
562
563 @Override
564 public void channelInactive(ChannelHandlerContext ctx) {
565 checkPrematureClose();
566 }
567
568 @Override
569 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
570 ctx.close();
571 checkPrematureClose();
572 }
573
574 @Override
575 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
576 bytesRead += msg.readableBytes();
577 if (bytesRead >= expectedBytes) {
578
579 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
580 buf.writerIndex(buf.writerIndex() + expectedBytes);
581 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
582 @Override
583 public void operationComplete(ChannelFuture future) throws Exception {
584 future.channel().close().addListener(new ChannelFutureListener() {
585 @Override
586 public void operationComplete(final ChannelFuture future) throws Exception {
587
588
589
590
591
592 future.channel().eventLoop().schedule(new Runnable() {
593 @Override
594 public void run() {
595 followerCloseLatch.countDown();
596 }
597 }, 200, TimeUnit.MILLISECONDS);
598 }
599 });
600 }
601 });
602 }
603 }
604
605 private void checkPrematureClose() {
606 if (bytesRead < expectedBytes) {
607 causeRef.set(new IllegalStateException("follower premature close"));
608 doneLatch.countDown();
609 }
610 }
611 }
612
613 private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
614 private final int expectedBytes;
615 private final CountDownLatch followerCloseLatch;
616 private final CountDownLatch doneLatch;
617 private final AtomicReference<Throwable> causeRef;
618 private int bytesRead;
619 boolean seenOutputShutdown;
620
621 AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
622 AtomicReference<Throwable> causeRef) {
623 this.expectedBytes = expectedBytes;
624 this.followerCloseLatch = followerCloseLatch;
625 this.doneLatch = doneLatch;
626 this.causeRef = causeRef;
627 }
628
629 @Override
630 public void channelActive(ChannelHandlerContext ctx) throws Exception {
631 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
632 buf.writerIndex(buf.writerIndex() + expectedBytes);
633 ctx.writeAndFlush(buf.retainedDuplicate());
634
635
636
637 followerCloseLatch.await();
638
639
640 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
641 @Override
642 public void operationComplete(ChannelFuture future) throws Exception {
643 if (future.cause() == null) {
644 causeRef.set(new IllegalStateException("second write should have failed!"));
645 doneLatch.countDown();
646 }
647 }
648 });
649 }
650
651 @Override
652 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
653 bytesRead += msg.readableBytes();
654 if (bytesRead >= expectedBytes) {
655 doneLatch.countDown();
656 }
657 }
658
659 @Override
660 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
661 if (evt instanceof ChannelOutputShutdownEvent) {
662 seenOutputShutdown = true;
663 doneLatch.countDown();
664 }
665 }
666
667 @Override
668 public void channelInactive(ChannelHandlerContext ctx) {
669 checkPrematureClose();
670 }
671
672 @Override
673 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
674 ctx.close();
675 checkPrematureClose();
676 }
677
678 private void checkPrematureClose() {
679 if (bytesRead < expectedBytes || !seenOutputShutdown) {
680 causeRef.set(new IllegalStateException("leader premature close"));
681 doneLatch.countDown();
682 }
683 }
684 }
685
686 @Test
687 public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
688 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
689 @Override
690 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
691 testAllDataReadClosure(serverBootstrap, bootstrap);
692 }
693 });
694 }
695
696 public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
697 testAllDataReadClosure(true, false, sb, cb);
698 testAllDataReadClosure(true, true, sb, cb);
699 testAllDataReadClosure(false, false, sb, cb);
700 testAllDataReadClosure(false, true, sb, cb);
701 }
702
703 private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
704 ServerBootstrap sb, Bootstrap cb) throws Throwable {
705 final int totalServerBytesWritten = 1024 * 16;
706 final int numReadsPerReadLoop = 2;
707 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
708 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
709 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
710 final AtomicInteger clientReadCompletes = new AtomicInteger();
711 Channel serverChannel = null;
712 Channel clientChannel = null;
713 try {
714 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
715 .option(ChannelOption.AUTO_READ, autoRead)
716 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
717
718 sb.childHandler(new ChannelInitializer<Channel>() {
719 @Override
720 protected void initChannel(Channel ch) throws Exception {
721 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
722 @Override
723 public void channelActive(ChannelHandlerContext ctx) throws Exception {
724 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
725 buf.writerIndex(buf.capacity());
726 ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
727 serverInitializedLatch.countDown();
728 }
729
730 @Override
731 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
732 ctx.close();
733 }
734 });
735 }
736 });
737
738 cb.handler(new ChannelInitializer<Channel>() {
739 @Override
740 protected void initChannel(Channel ch) throws Exception {
741 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
742 private int bytesRead;
743
744 @Override
745 public void channelRead(ChannelHandlerContext ctx, Object msg) {
746 ByteBuf buf = (ByteBuf) msg;
747 bytesRead += buf.readableBytes();
748 buf.release();
749 }
750
751 @Override
752 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
753 if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
754 clientHalfClosedLatch.countDown();
755 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
756 ctx.close();
757 }
758 }
759
760 @Override
761 public void channelInactive(ChannelHandlerContext ctx) {
762 if (!allowHalfClosed) {
763 clientHalfClosedLatch.countDown();
764 }
765 }
766
767 @Override
768 public void channelReadComplete(ChannelHandlerContext ctx) {
769 clientReadCompletes.incrementAndGet();
770 if (bytesRead == totalServerBytesWritten) {
771 clientReadAllDataLatch.countDown();
772 }
773 if (!autoRead) {
774 ctx.read();
775 }
776 }
777
778 @Override
779 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
780 ctx.close();
781 }
782 });
783 }
784 });
785
786 serverChannel = sb.bind().sync().channel();
787 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
788 clientChannel.read();
789
790 serverInitializedLatch.await();
791 clientReadAllDataLatch.await();
792 clientHalfClosedLatch.await();
793 assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
794 "too many read complete events: " + clientReadCompletes.get());
795 } finally {
796 if (clientChannel != null) {
797 clientChannel.close().sync();
798 }
799 if (serverChannel != null) {
800 serverChannel.close().sync();
801 }
802 }
803 }
804
805
806
807
808 private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
809 private final int numReads;
810 TestNumReadsRecvByteBufAllocator(int numReads) {
811 this.numReads = numReads;
812 }
813
814 @Override
815 public ExtendedHandle newHandle() {
816 return new ExtendedHandle() {
817 private int attemptedBytesRead;
818 private int lastBytesRead;
819 private int numMessagesRead;
820 @Override
821 public ByteBuf allocate(ByteBufAllocator alloc) {
822 return alloc.ioBuffer(guess(), guess());
823 }
824
825 @Override
826 public int guess() {
827 return 1;
828 }
829
830 @Override
831 public void reset(ChannelConfig config) {
832 numMessagesRead = 0;
833 }
834
835 @Override
836 public void incMessagesRead(int numMessages) {
837 numMessagesRead += numMessages;
838 }
839
840 @Override
841 public void lastBytesRead(int bytes) {
842 lastBytesRead = bytes;
843 }
844
845 @Override
846 public int lastBytesRead() {
847 return lastBytesRead;
848 }
849
850 @Override
851 public void attemptedBytesRead(int bytes) {
852 attemptedBytesRead = bytes;
853 }
854
855 @Override
856 public int attemptedBytesRead() {
857 return attemptedBytesRead;
858 }
859
860 @Override
861 public boolean continueReading() {
862 return numMessagesRead < numReads;
863 }
864
865 @Override
866 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
867 return continueReading() && maybeMoreDataSupplier.get();
868 }
869
870 @Override
871 public void readComplete() {
872
873 }
874 };
875 }
876 }
877 }