1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.ByteBufAllocator;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelFuture;
26 import io.netty.channel.ChannelFutureListener;
27 import io.netty.channel.ChannelHandlerContext;
28 import io.netty.channel.ChannelInboundHandlerAdapter;
29 import io.netty.channel.ChannelInitializer;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.RecvByteBufAllocator;
32 import io.netty.channel.SimpleChannelInboundHandler;
33 import io.netty.channel.socket.ChannelInputShutdownEvent;
34 import io.netty.channel.socket.ChannelInputShutdownReadComplete;
35 import io.netty.channel.socket.ChannelOutputShutdownEvent;
36 import io.netty.channel.socket.DuplexChannel;
37 import io.netty.channel.socket.SocketChannel;
38 import io.netty.util.ReferenceCountUtil;
39 import io.netty.util.UncheckedBooleanSupplier;
40 import io.netty.util.internal.PlatformDependent;
41 import org.junit.jupiter.api.Test;
42 import org.junit.jupiter.api.TestInfo;
43 import org.junit.jupiter.api.Timeout;
44
45 import java.util.concurrent.CountDownLatch;
46 import java.util.concurrent.TimeUnit;
47 import java.util.concurrent.atomic.AtomicInteger;
48 import java.util.concurrent.atomic.AtomicReference;
49
50 import static java.util.concurrent.TimeUnit.MILLISECONDS;
51 import static org.junit.jupiter.api.Assertions.assertEquals;
52 import static org.junit.jupiter.api.Assertions.assertNull;
53 import static org.junit.jupiter.api.Assertions.assertTrue;
54 import static org.junit.jupiter.api.Assumptions.assumeFalse;
55
56 public class SocketHalfClosedTest extends AbstractSocketTest {
57
58 @Test
59 @Timeout(value = 5000, unit = MILLISECONDS)
60 public void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(TestInfo testInfo) throws Throwable {
61 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
62 @Override
63 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
64 testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(serverBootstrap, bootstrap);
65 }
66 });
67 }
68
69 private void testHalfClosureReceiveDataOnFinalWait2StateWhenSoLingerSet(ServerBootstrap sb, Bootstrap cb)
70 throws Throwable {
71 Channel serverChannel = null;
72 Channel clientChannel = null;
73
74 final CountDownLatch waitHalfClosureDone = new CountDownLatch(1);
75 try {
76 sb.childOption(ChannelOption.SO_LINGER, 1)
77 .childHandler(new ChannelInitializer<Channel>() {
78
79 @Override
80 protected void initChannel(Channel ch) throws Exception {
81 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
82
83 @Override
84 public void channelActive(final ChannelHandlerContext ctx) {
85 SocketChannel channel = (SocketChannel) ctx.channel();
86 channel.shutdownOutput();
87 }
88
89 @Override
90 public void channelRead(ChannelHandlerContext ctx, Object msg) {
91 ReferenceCountUtil.release(msg);
92 waitHalfClosureDone.countDown();
93 }
94 });
95 }
96 });
97
98 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
99 .handler(new ChannelInitializer<Channel>() {
100 @Override
101 protected void initChannel(Channel ch) throws Exception {
102 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
103
104 @Override
105 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
106 if (ChannelInputShutdownEvent.INSTANCE == evt) {
107 ctx.writeAndFlush(ctx.alloc().buffer().writeZero(16));
108 }
109
110 if (ChannelInputShutdownReadComplete.INSTANCE == evt) {
111 ctx.close();
112 }
113 }
114 });
115 }
116 });
117
118 serverChannel = sb.bind().sync().channel();
119 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
120 waitHalfClosureDone.await();
121 } finally {
122 if (clientChannel != null) {
123 clientChannel.close().sync();
124 }
125
126 if (serverChannel != null) {
127 serverChannel.close().sync();
128 }
129 }
130 }
131
132 @Test
133 @Timeout(value = 10000, unit = MILLISECONDS)
134 public void testHalfClosureOnlyOneEventWhenAutoRead(TestInfo testInfo) throws Throwable {
135 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
136 @Override
137 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
138 testHalfClosureOnlyOneEventWhenAutoRead(serverBootstrap, bootstrap);
139 }
140 });
141 }
142
143 public void testHalfClosureOnlyOneEventWhenAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
144 Channel serverChannel = null;
145 try {
146 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
147 .option(ChannelOption.AUTO_READ, true);
148 sb.childHandler(new ChannelInitializer<Channel>() {
149 @Override
150 protected void initChannel(Channel ch) {
151 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
152 @Override
153 public void channelActive(ChannelHandlerContext ctx) {
154 ((DuplexChannel) ctx).shutdownOutput();
155 }
156
157 @Override
158 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
159 ctx.close();
160 }
161 });
162 }
163 });
164
165 final AtomicInteger shutdownEventReceivedCounter = new AtomicInteger();
166 final AtomicInteger shutdownReadCompleteEventReceivedCounter = new AtomicInteger();
167
168 cb.handler(new ChannelInitializer<Channel>() {
169 @Override
170 protected void initChannel(Channel ch) {
171 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
172
173 @Override
174 public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) {
175 if (evt == ChannelInputShutdownEvent.INSTANCE) {
176 shutdownEventReceivedCounter.incrementAndGet();
177 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
178 shutdownReadCompleteEventReceivedCounter.incrementAndGet();
179 ctx.executor().schedule(new Runnable() {
180 @Override
181 public void run() {
182 ctx.close();
183 }
184 }, 100, MILLISECONDS);
185 }
186 }
187
188 @Override
189 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
190 ctx.close();
191 }
192 });
193 }
194 });
195
196 serverChannel = sb.bind().sync().channel();
197 Channel clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
198 clientChannel.closeFuture().await();
199 assertEquals(1, shutdownEventReceivedCounter.get());
200 assertEquals(1, shutdownReadCompleteEventReceivedCounter.get());
201 } finally {
202 if (serverChannel != null) {
203 serverChannel.close().sync();
204 }
205 }
206 }
207
208 @Test
209 public void testAllDataReadAfterHalfClosure(TestInfo testInfo) throws Throwable {
210 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
211 @Override
212 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
213 testAllDataReadAfterHalfClosure(serverBootstrap, bootstrap);
214 }
215 });
216 }
217
218 public void testAllDataReadAfterHalfClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
219 testAllDataReadAfterHalfClosure(true, sb, cb);
220 testAllDataReadAfterHalfClosure(false, sb, cb);
221 }
222
223 private static void testAllDataReadAfterHalfClosure(final boolean autoRead,
224 ServerBootstrap sb, Bootstrap cb) throws Throwable {
225 final int totalServerBytesWritten = 1024 * 16;
226 final int numReadsPerReadLoop = 2;
227 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
228 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
229 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
230 final AtomicInteger clientReadCompletes = new AtomicInteger();
231 Channel serverChannel = null;
232 Channel clientChannel = null;
233 try {
234 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, true)
235 .option(ChannelOption.AUTO_READ, autoRead)
236 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
237
238 sb.childHandler(new ChannelInitializer<Channel>() {
239 @Override
240 protected void initChannel(Channel ch) throws Exception {
241 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
242 @Override
243 public void channelActive(ChannelHandlerContext ctx) throws Exception {
244 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
245 buf.writerIndex(buf.capacity());
246 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
247 @Override
248 public void operationComplete(ChannelFuture future) throws Exception {
249 ((DuplexChannel) future.channel()).shutdownOutput();
250 }
251 });
252 serverInitializedLatch.countDown();
253 }
254
255 @Override
256 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
257 ctx.close();
258 }
259 });
260 }
261 });
262
263 cb.handler(new ChannelInitializer<Channel>() {
264 @Override
265 protected void initChannel(Channel ch) throws Exception {
266 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
267 private int bytesRead;
268
269 @Override
270 public void channelRead(ChannelHandlerContext ctx, Object msg) {
271 ByteBuf buf = (ByteBuf) msg;
272 bytesRead += buf.readableBytes();
273 buf.release();
274 }
275
276 @Override
277 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
278 if (evt == ChannelInputShutdownEvent.INSTANCE) {
279 clientHalfClosedLatch.countDown();
280 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
281 ctx.close();
282 }
283 }
284
285 @Override
286 public void channelReadComplete(ChannelHandlerContext ctx) {
287 clientReadCompletes.incrementAndGet();
288 if (bytesRead == totalServerBytesWritten) {
289 clientReadAllDataLatch.countDown();
290 }
291 if (!autoRead) {
292 ctx.read();
293 }
294 }
295
296 @Override
297 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
298 ctx.close();
299 }
300 });
301 }
302 });
303
304 serverChannel = sb.bind().sync().channel();
305 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
306 clientChannel.read();
307
308 serverInitializedLatch.await();
309 clientReadAllDataLatch.await();
310 clientHalfClosedLatch.await();
311 assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
312 "too many read complete events: " + clientReadCompletes.get());
313 } finally {
314 if (clientChannel != null) {
315 clientChannel.close().sync();
316 }
317 if (serverChannel != null) {
318 serverChannel.close().sync();
319 }
320 }
321 }
322
323 @Test
324 public void testAutoCloseFalseDoesShutdownOutput(TestInfo testInfo) throws Throwable {
325
326 assumeFalse(PlatformDependent.isWindows());
327 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
328 @Override
329 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
330 testAutoCloseFalseDoesShutdownOutput(serverBootstrap, bootstrap);
331 }
332 });
333 }
334
335 public void testAutoCloseFalseDoesShutdownOutput(ServerBootstrap sb, Bootstrap cb) throws Throwable {
336 testAutoCloseFalseDoesShutdownOutput(false, false, sb, cb);
337 testAutoCloseFalseDoesShutdownOutput(false, true, sb, cb);
338 testAutoCloseFalseDoesShutdownOutput(true, false, sb, cb);
339 testAutoCloseFalseDoesShutdownOutput(true, true, sb, cb);
340 }
341
342 private static void testAutoCloseFalseDoesShutdownOutput(boolean allowHalfClosed,
343 final boolean clientIsLeader,
344 ServerBootstrap sb,
345 Bootstrap cb) throws InterruptedException {
346 final int expectedBytes = 100;
347 final CountDownLatch serverReadExpectedLatch = new CountDownLatch(1);
348 final CountDownLatch doneLatch = new CountDownLatch(1);
349 final AtomicReference<Throwable> causeRef = new AtomicReference<Throwable>();
350 Channel serverChannel = null;
351 Channel clientChannel = null;
352 try {
353 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
354 .option(ChannelOption.AUTO_CLOSE, false)
355 .option(ChannelOption.SO_LINGER, 0);
356 sb.childOption(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
357 .childOption(ChannelOption.AUTO_CLOSE, false)
358 .childOption(ChannelOption.SO_LINGER, 0);
359
360 final SimpleChannelInboundHandler<ByteBuf> leaderHandler = new AutoCloseFalseLeader(expectedBytes,
361 serverReadExpectedLatch, doneLatch, causeRef);
362 final SimpleChannelInboundHandler<ByteBuf> followerHandler = new AutoCloseFalseFollower(expectedBytes,
363 serverReadExpectedLatch, doneLatch, causeRef);
364 sb.childHandler(new ChannelInitializer<Channel>() {
365 @Override
366 protected void initChannel(Channel ch) throws Exception {
367 ch.pipeline().addLast(clientIsLeader ? followerHandler :leaderHandler);
368 }
369 });
370
371 cb.handler(new ChannelInitializer<Channel>() {
372 @Override
373 protected void initChannel(Channel ch) throws Exception {
374 ch.pipeline().addLast(clientIsLeader ? leaderHandler : followerHandler);
375 }
376 });
377
378 serverChannel = sb.bind().sync().channel();
379 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
380
381 doneLatch.await();
382 assertNull(causeRef.get());
383 } finally {
384 if (clientChannel != null) {
385 clientChannel.close().sync();
386 }
387 if (serverChannel != null) {
388 serverChannel.close().sync();
389 }
390 }
391 }
392
393 private static final class AutoCloseFalseFollower extends SimpleChannelInboundHandler<ByteBuf> {
394 private final int expectedBytes;
395 private final CountDownLatch followerCloseLatch;
396 private final CountDownLatch doneLatch;
397 private final AtomicReference<Throwable> causeRef;
398 private int bytesRead;
399
400 AutoCloseFalseFollower(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
401 AtomicReference<Throwable> causeRef) {
402 this.expectedBytes = expectedBytes;
403 this.followerCloseLatch = followerCloseLatch;
404 this.doneLatch = doneLatch;
405 this.causeRef = causeRef;
406 }
407
408 @Override
409 public void channelInactive(ChannelHandlerContext ctx) {
410 checkPrematureClose();
411 }
412
413 @Override
414 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
415 ctx.close();
416 checkPrematureClose();
417 }
418
419 @Override
420 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
421 bytesRead += msg.readableBytes();
422 if (bytesRead >= expectedBytes) {
423
424 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
425 buf.writerIndex(buf.writerIndex() + expectedBytes);
426 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
427 @Override
428 public void operationComplete(ChannelFuture future) throws Exception {
429 future.channel().close().addListener(new ChannelFutureListener() {
430 @Override
431 public void operationComplete(final ChannelFuture future) throws Exception {
432
433
434
435
436
437 future.channel().eventLoop().schedule(new Runnable() {
438 @Override
439 public void run() {
440 followerCloseLatch.countDown();
441 }
442 }, 200, TimeUnit.MILLISECONDS);
443 }
444 });
445 }
446 });
447 }
448 }
449
450 private void checkPrematureClose() {
451 if (bytesRead < expectedBytes) {
452 causeRef.set(new IllegalStateException("follower premature close"));
453 doneLatch.countDown();
454 }
455 }
456 }
457
458 private static final class AutoCloseFalseLeader extends SimpleChannelInboundHandler<ByteBuf> {
459 private final int expectedBytes;
460 private final CountDownLatch followerCloseLatch;
461 private final CountDownLatch doneLatch;
462 private final AtomicReference<Throwable> causeRef;
463 private int bytesRead;
464 private boolean seenOutputShutdown;
465
466 AutoCloseFalseLeader(int expectedBytes, CountDownLatch followerCloseLatch, CountDownLatch doneLatch,
467 AtomicReference<Throwable> causeRef) {
468 this.expectedBytes = expectedBytes;
469 this.followerCloseLatch = followerCloseLatch;
470 this.doneLatch = doneLatch;
471 this.causeRef = causeRef;
472 }
473
474 @Override
475 public void channelActive(ChannelHandlerContext ctx) throws Exception {
476 ByteBuf buf = ctx.alloc().buffer(expectedBytes);
477 buf.writerIndex(buf.writerIndex() + expectedBytes);
478 ctx.writeAndFlush(buf.retainedDuplicate());
479
480
481
482 followerCloseLatch.await();
483
484
485 ctx.writeAndFlush(buf).addListener(new ChannelFutureListener() {
486 @Override
487 public void operationComplete(ChannelFuture future) throws Exception {
488 if (future.cause() == null) {
489 causeRef.set(new IllegalStateException("second write should have failed!"));
490 doneLatch.countDown();
491 }
492 }
493 });
494 }
495
496 @Override
497 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
498 bytesRead += msg.readableBytes();
499 if (bytesRead >= expectedBytes) {
500 if (!seenOutputShutdown) {
501 causeRef.set(new IllegalStateException(
502 ChannelOutputShutdownEvent.class.getSimpleName() + " event was not seen"));
503 }
504 doneLatch.countDown();
505 }
506 }
507
508 @Override
509 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
510 if (evt instanceof ChannelOutputShutdownEvent) {
511 seenOutputShutdown = true;
512 }
513 }
514
515 @Override
516 public void channelInactive(ChannelHandlerContext ctx) {
517 checkPrematureClose();
518 }
519
520 @Override
521 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
522 ctx.close();
523 checkPrematureClose();
524 }
525
526 private void checkPrematureClose() {
527 if (bytesRead < expectedBytes || !seenOutputShutdown) {
528 causeRef.set(new IllegalStateException("leader premature close"));
529 doneLatch.countDown();
530 }
531 }
532 }
533
534 @Test
535 public void testAllDataReadClosure(TestInfo testInfo) throws Throwable {
536 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
537 @Override
538 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
539 testAllDataReadClosure(serverBootstrap, bootstrap);
540 }
541 });
542 }
543
544 public void testAllDataReadClosure(ServerBootstrap sb, Bootstrap cb) throws Throwable {
545 testAllDataReadClosure(true, false, sb, cb);
546 testAllDataReadClosure(true, true, sb, cb);
547 testAllDataReadClosure(false, false, sb, cb);
548 testAllDataReadClosure(false, true, sb, cb);
549 }
550
551 private static void testAllDataReadClosure(final boolean autoRead, final boolean allowHalfClosed,
552 ServerBootstrap sb, Bootstrap cb) throws Throwable {
553 final int totalServerBytesWritten = 1024 * 16;
554 final int numReadsPerReadLoop = 2;
555 final CountDownLatch serverInitializedLatch = new CountDownLatch(1);
556 final CountDownLatch clientReadAllDataLatch = new CountDownLatch(1);
557 final CountDownLatch clientHalfClosedLatch = new CountDownLatch(1);
558 final AtomicInteger clientReadCompletes = new AtomicInteger();
559 Channel serverChannel = null;
560 Channel clientChannel = null;
561 try {
562 cb.option(ChannelOption.ALLOW_HALF_CLOSURE, allowHalfClosed)
563 .option(ChannelOption.AUTO_READ, autoRead)
564 .option(ChannelOption.RCVBUF_ALLOCATOR, new TestNumReadsRecvByteBufAllocator(numReadsPerReadLoop));
565
566 sb.childHandler(new ChannelInitializer<Channel>() {
567 @Override
568 protected void initChannel(Channel ch) throws Exception {
569 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
570 @Override
571 public void channelActive(ChannelHandlerContext ctx) throws Exception {
572 ByteBuf buf = ctx.alloc().buffer(totalServerBytesWritten);
573 buf.writerIndex(buf.capacity());
574 ctx.writeAndFlush(buf).addListener(ChannelFutureListener.CLOSE);
575 serverInitializedLatch.countDown();
576 }
577
578 @Override
579 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
580 ctx.close();
581 }
582 });
583 }
584 });
585
586 cb.handler(new ChannelInitializer<Channel>() {
587 @Override
588 protected void initChannel(Channel ch) throws Exception {
589 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
590 private int bytesRead;
591
592 @Override
593 public void channelRead(ChannelHandlerContext ctx, Object msg) {
594 ByteBuf buf = (ByteBuf) msg;
595 bytesRead += buf.readableBytes();
596 buf.release();
597 }
598
599 @Override
600 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
601 if (evt == ChannelInputShutdownEvent.INSTANCE && allowHalfClosed) {
602 clientHalfClosedLatch.countDown();
603 } else if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
604 ctx.close();
605 }
606 }
607
608 @Override
609 public void channelInactive(ChannelHandlerContext ctx) {
610 if (!allowHalfClosed) {
611 clientHalfClosedLatch.countDown();
612 }
613 }
614
615 @Override
616 public void channelReadComplete(ChannelHandlerContext ctx) {
617 clientReadCompletes.incrementAndGet();
618 if (bytesRead == totalServerBytesWritten) {
619 clientReadAllDataLatch.countDown();
620 }
621 if (!autoRead) {
622 ctx.read();
623 }
624 }
625
626 @Override
627 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
628 ctx.close();
629 }
630 });
631 }
632 });
633
634 serverChannel = sb.bind().sync().channel();
635 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
636 clientChannel.read();
637
638 serverInitializedLatch.await();
639 clientReadAllDataLatch.await();
640 clientHalfClosedLatch.await();
641 assertTrue(totalServerBytesWritten / numReadsPerReadLoop + 10 > clientReadCompletes.get(),
642 "too many read complete events: " + clientReadCompletes.get());
643 } finally {
644 if (clientChannel != null) {
645 clientChannel.close().sync();
646 }
647 if (serverChannel != null) {
648 serverChannel.close().sync();
649 }
650 }
651 }
652
653
654
655
656 private static final class TestNumReadsRecvByteBufAllocator implements RecvByteBufAllocator {
657 private final int numReads;
658 TestNumReadsRecvByteBufAllocator(int numReads) {
659 this.numReads = numReads;
660 }
661
662 @Override
663 public ExtendedHandle newHandle() {
664 return new ExtendedHandle() {
665 private int attemptedBytesRead;
666 private int lastBytesRead;
667 private int numMessagesRead;
668 @Override
669 public ByteBuf allocate(ByteBufAllocator alloc) {
670 return alloc.ioBuffer(guess(), guess());
671 }
672
673 @Override
674 public int guess() {
675 return 1;
676 }
677
678 @Override
679 public void reset(ChannelConfig config) {
680 numMessagesRead = 0;
681 }
682
683 @Override
684 public void incMessagesRead(int numMessages) {
685 numMessagesRead += numMessages;
686 }
687
688 @Override
689 public void lastBytesRead(int bytes) {
690 lastBytesRead = bytes;
691 }
692
693 @Override
694 public int lastBytesRead() {
695 return lastBytesRead;
696 }
697
698 @Override
699 public void attemptedBytesRead(int bytes) {
700 attemptedBytesRead = bytes;
701 }
702
703 @Override
704 public int attemptedBytesRead() {
705 return attemptedBytesRead;
706 }
707
708 @Override
709 public boolean continueReading() {
710 return numMessagesRead < numReads;
711 }
712
713 @Override
714 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
715 return continueReading() && maybeMoreDataSupplier.get();
716 }
717
718 @Override
719 public void readComplete() {
720
721 }
722 };
723 }
724 }
725 }