1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelConfig;
24 import io.netty.channel.ChannelFuture;
25 import io.netty.channel.ChannelFutureListener;
26 import io.netty.channel.ChannelHandlerContext;
27 import io.netty.channel.ChannelInboundHandlerAdapter;
28 import io.netty.channel.ChannelInitializer;
29 import io.netty.channel.ChannelOption;
30 import io.netty.channel.IoEventLoopGroup;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.SimpleChannelInboundHandler;
33 import io.netty.channel.nio.NioIoHandler;
34 import io.netty.channel.oio.OioEventLoopGroup;
35 import io.netty.channel.socket.ChannelInputShutdownEvent;
36 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
37 import io.netty.channel.socket.ChannelOutputShutdownEvent;
38 import io.netty.channel.socket.DuplexChannel;
39 import io.netty.channel.socket.SocketChannel;
40 import io.netty.util.ReferenceCountUtil;
41 import io.netty.util.UncheckedBooleanSupplier;
42 import io.netty.util.internal.PlatformDependent;
43 import org.junit.jupiter.api.Test;
44 import org.junit.jupiter.api.TestInfo;
45 import org.junit.jupiter.api.Timeout;
46
47 import java.util.concurrent.CountDownLatch;
48 import java.util.concurrent.TimeUnit;
49 import java.util.concurrent.atomic.AtomicInteger;
50 import java.util.concurrent.atomic.AtomicReference;
51
52 import static java.util.concurrent.TimeUnit.MILLISECONDS;
53 import static org.junit.jupiter.api.Assertions.assertEquals;
54 import static org.junit.jupiter.api.Assertions.assertNull;
55 import static org.junit.jupiter.api.Assertions.assertTrue;
56 import static org.junit.jupiter.api.Assumptions.assumeFalse;
57
58 @Timeout(value = 20000, unit = MILLISECONDS)
59 public class SocketHalfClosedTest extends AbstractSocketTest {
60
61 protected int maxReadCompleteWithNoDataAfterInputShutdown() {
62 return 2;
63 }
64
65 @Test
66 public void testAllDataReadEventTriggeredAfterHalfClosure(TestInfo testInfo) throws Throwable {
67 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
68 @Override
69 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
70 if (bootstrap.config().group() instanceof OioEventLoopGroup) {
71 logger.debug("Ignoring test for incompatible OIO event system");
72 return;
73 } else if (bootstrap.config().group() instanceof IoEventLoopGroup) {
74 IoEventLoopGroup group = (IoEventLoopGroup) bootstrap.config().group();
75 if (group.isIoType(NioIoHandler.class)) {
76 logger.debug("Ignoring test for incompatible NioHandler");
77 return;
78 }
79 }
80 allDataReadEventTriggeredAfterHalfClosure(serverBootstrap, bootstrap);
81 }
82 });
83 }
84
85 private void allDataReadEventTriggeredAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
86 final int totalServerBytesWritten = 1;
87 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
88 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
89 final CountDownLatch clientHalfClosedAllBytesRead = new CountDownLatch(1);
90 final AtomicInteger clientReadCompletes = new AtomicInteger();
91 final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
92 Channel serverChannel = null;
93 Channel clientChannel = null;
94 AtomicReference<Channel> serverChildChannel = new AtomicReference<>();
95 try {
96 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
97 .option(ChannelOption.AUTO_CLOSE, false)
98 .option(ChannelOption.AUTO_READ, false);
99
100 sb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
101 .option(ChannelOption.AUTO_CLOSE, false)
102 .childOption(ChannelOption.TCP_NODELAY, true);
103
104 sb.childHandler(new ChannelInitializer<Channel>() {
105 @Override
106 protected void initChannel(Channel ch) throws Exception {
107 serverChildChannel.set(ch);
108 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
109 @Override
110 public void channelActive(ChannelHandlerContext ctx) throws Exception {
111 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
112 buf.writerIndex(buf.capacity());
113 ctx.writeAndFlush(buf);
114 }
115
116 @Override
117 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
118 ctx.close();
119 }
120 });
121 }
122 });
123
124
125 cb.handler(new ChannelInitializer<Channel>() {
126 @Override
127 protected void initChannel(Channel ch) {
128 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
129 private int bytesRead;
130 private int bytesSinceReadComplete;
131
132 @Override
133 public void channelRead(ChannelHandlerContext ctx, Object msg) {
134 ByteBuf buf = (ByteBuf) msg;
135 bytesRead += buf.readableBytes();
136 bytesSinceReadComplete += buf.readableBytes();
137 buf.release();
138 }
139
140 @Override
141 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
142 if (evt == ChannelInputShutdownEvent.INSTANCE) {
143 clientHalfClosedLatch.countDown();
144 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
145 clientHalfClosedAllBytesRead.countDown();
146 ctx.close();
147 }
148 }
149
150 @Override
151 public void channelReadComplete(ChannelHandlerContext ctx) {
152 if (bytesSinceReadComplete == 0) {
153 clientZeroDataReadCompletes.incrementAndGet();
154 } else {
155 bytesSinceReadComplete = 0;
156 }
157 clientReadCompletes.incrementAndGet();
158 if (bytesRead == totalServerBytesWritten) {
159
160
161 ch.eventLoop().execute(new Runnable() {
162 @Override
163 public void run() {
164 clientReadAllDataLatch.countDown();
165 }
166 });
167 } else {
168 ctx.read();
169 }
170 }
171
172 @Override
173 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
174 ctx.fireExceptionCaught(cause);
175 ctx.close();
176 }
177 });
178 ch.read();
179 }
180 });
181
182 serverChannel = sb.bind().sync().channel();
183 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
184 clientChannel.read();
185
186 clientReadAllDataLatch.await();
187
188
189 ((DuplexChannel) serverChildChannel.get()).shutdownOutput();
190
191 clientHalfClosedLatch.await();
192 clientHalfClosedAllBytesRead.await();
193 } finally {
194 if (clientChannel != null) {
195 clientChannel.close().sync();
196 }
197 if (serverChannel != null) {
198 serverChannel.close().sync();
199 }
200 }
201 }
202
203 @Test
204 public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
205 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
206 @Override
207 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
208 testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
209 }
210 });
211 }
212
213 private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
214 throws Throwable {
215 Channel serverChannel = null;
216 Channel clientChannel = null;
217
218 final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
219 try {
220 sb.childOption(ChannelOption.SO_LINGER, 1)
221 .childHandler(new ChannelInitializer<Channel>() {
222
223 @Override
224 protected void initChannel(Channel ch) throws Exception {
225 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
226
227 @Override
228 public void channelActive(final ChannelHandlerContext ctx) {
229 SocketChannel channel = (SocketChannel) ctx.channel();
230 channel.shutdownOutput();
231 }
232
233 @Override
234 public void channelRead(ChannelHandlerContext ctx, Object msg) {
235 ReferenceCountUtil.release(msg);
236 waitHalfClosureDone.countDown();
237 }
238 });
239 }
240 });
241
242 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
243 .handler(new ChannelInitializer<Channel>() {
244 @Override
245 protected void initChannel(Channel ch) throws Exception {
246 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
247
248 @Override
249 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
250 if (ChannelInputShutdownEvent.INSTANCE == evt) {
251 ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
252 }
253
254 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
255 ctx.close();
256 }
257 }
258 });
259 }
260 });
261
262 serverChannel = sb.bind().sync().channel();
263 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
264 waitHalfClosureDone.await();
265 } finally {
266 if (clientChannel != null) {
267 clientChannel.close().sync();
268 }
269
270 if (serverChannel != null) {
271 serverChannel.close().sync();
272 }
273 }
274 }
275
276 @Test
277 public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
278 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
279 @Override
280 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
281 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
282 }
283 });
284 }
285
286 public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
287 Channel serverChannel = null;
288 try {
289 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
290 .option(ChannelOption.AUTO_READ, true);
291 sb.childHandler(new ChannelInitializer<Channel>() {
292 @Override
293 protected void initChannel(Channel ch) {
294 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
295 @Override
296 public void channelActive(ChannelHandlerContext ctx) {
297 ((DuplexChannel) ctx).shutdownOutput();
298 }
299
300 @Override
301 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
302 ctx.close();
303 }
304 });
305 }
306 });
307
308 final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
309 final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
310
311 cb.handler(new ChannelInitializer<Channel>() {
312 @Override
313 protected void initChannel(Channel ch) {
314 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
315
316 @Override
317 public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
318 if (evt == ChannelInputShutdownEvent.INSTANCE) {
319 shutdownEventReceivedCounter.incrementAndGet();
320 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
321 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
322 ctx.executor().schedule(new Runnable() {
323 @Override
324 public void run() {
325 ctx.close();
326 }
327 }, 100, MILLISECONDS);
328 }
329 }
330
331 @Override
332 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
333 ctx.close();
334 }
335 });
336 }
337 });
338
339 serverChannel = sb.bind().sync().channel();
340 Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
341 clientChannel.closeFuture().await();
342 assertEquals(1, shutdownEventReceivedCounter.get());
343 assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
344 } finally {
345 if (serverChannel != null) {
346 serverChannel.close().sync();
347 }
348 }
349 }
350
351 @Test
352 public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
353 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
354 @Override
355 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
356 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
357 }
358 });
359 }
360
361 public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
362 testAllDataReadAfterHalfClosure(true, sb, cb);
363 testAllDataReadAfterHalfClosure(false, sb, cb);
364 }
365
366 private void testAllDataReadAfterHalfClosure(final boolean autoRead,
367 ServerBootstrap sb, Bootstrap cb) throws Throwable {
368 final int totalServerBytesWritten = 1024 * 16;
369 final int numReadsPerReadLoop = 2;
370 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
371 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
372 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
373 final AtomicInteger clientReadCompletes = new AtomicInteger();
374 final AtomicInteger clientZeroDataReadCompletes = new AtomicInteger();
375 Channel serverChannel = null;
376 Channel clientChannel = null;
377 try {
378 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
379 .option(ChannelOption.AUTO_READ, autoRead)
380 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
381
382 sb.childHandler(new ChannelInitializer<Channel>() {
383 @Override
384 protected void initChannel(Channel ch) throws Exception {
385 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
386 @Override
387 public void channelActive(ChannelHandlerContext ctx) throws Exception {
388 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
389 buf.writerIndex(buf.capacity());
390 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
391 @Override
392 public void operationComplete(ChannelFuture future) throws Exception {
393 ((DuplexChannel) future.channel()).shutdownOutput();
394 }
395 });
396 serverInitializedLatch.countDown();
397 }
398
399 @Override
400 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
401 ctx.close();
402 }
403 });
404 }
405 });
406
407 cb.handler(new ChannelInitializer<Channel>() {
408 @Override
409 protected void initChannel(Channel ch) throws Exception {
410 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
411 private int bytesRead;
412 private int bytesSinceReadComplete;
413
414 @Override
415 public void channelRead(ChannelHandlerContext ctx, Object msg) {
416 ByteBuf buf = (ByteBuf) msg;
417 bytesRead += buf.readableBytes();
418 bytesSinceReadComplete += buf.readableBytes();
419 buf.release();
420 }
421
422 @Override
423 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
424 if (evt == ChannelInputShutdownEvent.INSTANCE) {
425 clientHalfClosedLatch.countDown();
426 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
427 ctx.close();
428 }
429 }
430
431 @Override
432 public void channelReadComplete(ChannelHandlerContext ctx) {
433 if (bytesSinceReadComplete == 0) {
434 clientZeroDataReadCompletes.incrementAndGet();
435 } else {
436 bytesSinceReadComplete = 0;
437 }
438 clientReadCompletes.incrementAndGet();
439 if (bytesRead == totalServerBytesWritten) {
440 clientReadAllDataLatch.countDown();
441 }
442 if (!autoRead) {
443 ctx.read();
444 }
445 }
446
447 @Override
448 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
449 ctx.close();
450 }
451 });
452 }
453 });
454
455 serverChannel = sb.bind().sync().channel();
456 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
457 clientChannel.read();
458
459 serverInitializedLatch.await();
460 clientReadAllDataLatch.await();
461 clientHalfClosedLatch.await();
462
463
464
465 assertTrue(totalServerBytesWritten > clientReadCompletes.get(),
466 "too many read complete events: " + clientReadCompletes.get());
467 assertTrue(clientZeroDataReadCompletes.get() <= maxReadCompleteWithNoDataAfterInputShutdown(),
468 "too many readComplete with no data: " + clientZeroDataReadCompletes.get() + " readComplete: " +
469 clientReadCompletes.get());
470 } finally {
471 if (clientChannel != null) {
472 clientChannel.close().sync();
473 }
474 if (serverChannel != null) {
475 serverChannel.close().sync();
476 }
477 }
478 }
479
480 @Test
481 public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
482
483 assumeFalse(PlatformDependent.isWindows());
484 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
485 @Override
486 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
487 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
488 }
489 });
490 }
491
492 public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
493 testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
494 testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
495 testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
496 testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
497 }
498
499 private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
500 final boolean clientIsLeader,
501 ServerBootstrap sb,
502 Bootstrap cb) throws InterruptedException {
503 final int expectedBytes = 100;
504 final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
505 final CountDownLatch doneLatch = new CountDownLatch(2);
506 final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
507 Channel serverChannel = null;
508 Channel clientChannel = null;
509 try {
510 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
511 .option(ChannelOption.AUTO_CLOSE, false)
512 .option(ChannelOption.SO_LINGER, 0);
513 sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
514 .childOption(ChannelOption.AUTO_CLOSE, false)
515 .childOption(ChannelOption.SO_LINGER, 0);
516
517 final AutoCloseFalseLeader leaderHandler = new AutoCloseFalseLeader(expectedBytes,
518 serverReadExpectedLatch, doneLatch, causeRef);
519 final AutoCloseFalseFollower followerHandler = new AutoCloseFalseFollower(expectedBytes,
520 serverReadExpectedLatch, doneLatch, causeRef);
521 sb.childHandler(new ChannelInitializer<Channel>() {
522 @Override
523 protected void initChannel(Channel ch) throws Exception {
524 ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
525 }
526 });
527
528 cb.handler(new ChannelInitializer<Channel>() {
529 @Override
530 protected void initChannel(Channel ch) throws Exception {
531 ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
532 }
533 });
534
535 serverChannel = sb.bind().sync().channel();
536 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
537
538 doneLatch.await();
539 assertNull(causeRef.get());
540 assertTrue(leaderHandler.seenOutputShutdown);
541 } finally {
542 if (clientChannel != null) {
543 clientChannel.close().sync();
544 }
545 if (serverChannel != null) {
546 serverChannel.close().sync();
547 }
548 }
549 }
550
551 private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
552 private final int expectedBytes;
553 private final CountDownLatch followerCloseLatch;
554 private final CountDownLatch doneLatch;
555 private final AtomicReference<Throwable> causeRef;
556 private int bytesRead;
557
558 AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
559 AtomicReference<Throwable> causeRef) {
560 this.expectedBytes = expectedBytes;
561 this.followerCloseLatch = followerCloseLatch;
562 this.doneLatch = doneLatch;
563 this.causeRef = causeRef;
564 }
565
566 @Override
567 public void channelInactive(ChannelHandlerContext ctx) {
568 checkPrematureClose();
569 }
570
571 @Override
572 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
573 ctx.close();
574 checkPrematureClose();
575 }
576
577 @Override
578 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
579 bytesRead += msg.readableBytes();
580 if (bytesRead >= expectedBytes) {
581
582 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
583 buf.writerIndex(buf.writerIndex() + expectedBytes);
584 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
585 @Override
586 public void operationComplete(ChannelFuture future) throws Exception {
587 future.channel().close().addListener(new ChannelFutureListener() {
588 @Override
589 public void operationComplete(final ChannelFuture future) throws Exception {
590
591
592
593
594
595 future.channel().eventLoop().schedule(new Runnable() {
596 @Override
597 public void run() {
598 followerCloseLatch.countDown();
599 }
600 }, 200, TimeUnit.MILLISECONDS);
601 }
602 });
603 }
604 });
605 }
606 }
607
608 private void checkPrematureClose() {
609 if (bytesRead < expectedBytes) {
610 causeRef.set(new IllegalStateException("follower premature close"));
611 doneLatch.countDown();
612 }
613 }
614 }
615
616 private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
617 private final int expectedBytes;
618 private final CountDownLatch followerCloseLatch;
619 private final CountDownLatch doneLatch;
620 private final AtomicReference<Throwable> causeRef;
621 private int bytesRead;
622 boolean seenOutputShutdown;
623
624 AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
625 AtomicReference<Throwable> causeRef) {
626 this.expectedBytes = expectedBytes;
627 this.followerCloseLatch = followerCloseLatch;
628 this.doneLatch = doneLatch;
629 this.causeRef = causeRef;
630 }
631
632 @Override
633 public void channelActive(ChannelHandlerContext ctx) throws Exception {
634 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
635 buf.writerIndex(buf.writerIndex() + expectedBytes);
636 ctx.writeAndFlush(buf.retainedDuplicate());
637
638
639
640 followerCloseLatch.await();
641
642
643 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
644 @Override
645 public void operationComplete(ChannelFuture future) throws Exception {
646 if (future.cause() == null) {
647 causeRef.set(new IllegalStateException("second write should have failed!"));
648 doneLatch.countDown();
649 }
650 }
651 });
652 }
653
654 @Override
655 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
656 bytesRead += msg.readableBytes();
657 if (bytesRead >= expectedBytes) {
658 doneLatch.countDown();
659 }
660 }
661
662 @Override
663 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
664 if (evt instanceof ChannelOutputShutdownEvent) {
665 seenOutputShutdown = true;
666 doneLatch.countDown();
667 }
668 }
669
670 @Override
671 public void channelInactive(ChannelHandlerContext ctx) {
672 checkPrematureClose();
673 }
674
675 @Override
676 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
677 ctx.close();
678 checkPrematureClose();
679 }
680
681 private void checkPrematureClose() {
682 if (bytesRead < expectedBytes || !seenOutputShutdown) {
683 causeRef.set(new IllegalStateException("leader premature close"));
684 doneLatch.countDown();
685 }
686 }
687 }
688
689 @Test
690 public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
691 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
692 @Override
693 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
694 testAllDataReadClosure(serverBootstrap, bootstrap);
695 }
696 });
697 }
698
699 public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
700 testAllDataReadClosure(true, false, sb, cb);
701 testAllDataReadClosure(true, true, sb, cb);
702 testAllDataReadClosure(false, false, sb, cb);
703 testAllDataReadClosure(false, true, sb, cb);
704 }
705
706 private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
707 ServerBootstrap sb, Bootstrap cb) throws Throwable {
708 final int totalServerBytesWritten = 1024 * 16;
709 final int numReadsPerReadLoop = 2;
710 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
711 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
712 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
713 final AtomicInteger clientReadCompletes = new AtomicInteger();
714 Channel serverChannel = null;
715 Channel clientChannel = null;
716 try {
717 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
718 .option(ChannelOption.AUTO_READ, autoRead)
719 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
720
721 sb.childHandler(new ChannelInitializer<Channel>() {
722 @Override
723 protected void initChannel(Channel ch) throws Exception {
724 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
725 @Override
726 public void channelActive(ChannelHandlerContext ctx) throws Exception {
727 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
728 buf.writerIndex(buf.capacity());
729 ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
730 serverInitializedLatch.countDown();
731 }
732
733 @Override
734 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
735 ctx.close();
736 }
737 });
738 }
739 });
740
741 cb.handler(new ChannelInitializer<Channel>() {
742 @Override
743 protected void initChannel(Channel ch) throws Exception {
744 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
745 private int bytesRead;
746
747 @Override
748 public void channelRead(ChannelHandlerContext ctx, Object msg) {
749 ByteBuf buf = (ByteBuf) msg;
750 bytesRead += buf.readableBytes();
751 buf.release();
752 }
753
754 @Override
755 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
756 if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
757 clientHalfClosedLatch.countDown();
758 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
759 ctx.close();
760 }
761 }
762
763 @Override
764 public void channelInactive(ChannelHandlerContext ctx) {
765 if (!allowHalfClosed) {
766 clientHalfClosedLatch.countDown();
767 }
768 }
769
770 @Override
771 public void channelReadComplete(ChannelHandlerContext ctx) {
772 clientReadCompletes.incrementAndGet();
773 if (bytesRead == totalServerBytesWritten) {
774 clientReadAllDataLatch.countDown();
775 }
776 if (!autoRead) {
777 ctx.read();
778 }
779 }
780
781 @Override
782 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
783 ctx.close();
784 }
785 });
786 }
787 });
788
789 serverChannel = sb.bind().sync().channel();
790 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
791 clientChannel.read();
792
793 serverInitializedLatch.await();
794 clientReadAllDataLatch.await();
795 clientHalfClosedLatch.await();
796 assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
797 "too many read complete events: " + clientReadCompletes.get());
798 } finally {
799 if (clientChannel != null) {
800 clientChannel.close().sync();
801 }
802 if (serverChannel != null) {
803 serverChannel.close().sync();
804 }
805 }
806 }
807
808
809
810
811 private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
812 private final int numReads;
813 TestNumReadsRecvByteBufAllocator(int numReads) {
814 this.numReads = numReads;
815 }
816
817 @Override
818 public ExtendedHandle newHandle() {
819 return new ExtendedHandle() {
820 private int attemptedBytesRead;
821 private int lastBytesRead;
822 private int numMessagesRead;
823 @Override
824 public ByteBuf allocate(ByteBufAllocator alloc) {
825 return alloc.ioBuffer(guess(), guess());
826 }
827
828 @Override
829 public int guess() {
830 return 1;
831 }
832
833 @Override
834 public void reset(ChannelConfig config) {
835 numMessagesRead = 0;
836 }
837
838 @Override
839 public void incMessagesRead(int numMessages) {
840 numMessagesRead += numMessages;
841 }
842
843 @Override
844 public void lastBytesRead(int bytes) {
845 lastBytesRead = bytes;
846 }
847
848 @Override
849 public int lastBytesRead() {
850 return lastBytesRead;
851 }
852
853 @Override
854 public void attemptedBytesRead(int bytes) {
855 attemptedBytesRead = bytes;
856 }
857
858 @Override
859 public int attemptedBytesRead() {
860 return attemptedBytesRead;
861 }
862
863 @Override
864 public boolean continueReading() {
865 return numMessagesRead < numReads;
866 }
867
868 @Override
869 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
870 return continueReading() && maybeMoreDataSupplier.get();
871 }
872
873 @Override
874 public void readComplete() {
875
876 }
877 };
878 }
879 }
880 }