1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.channel.Channel;
22 import io.netty5.channel.ChannelHandlerContext;
23 import io.netty5.channel.SimpleChannelInboundHandler;
24 import io.netty5.util.concurrent.DefaultEventExecutorGroup;
25 import io.netty5.util.concurrent.DefaultThreadFactory;
26 import io.netty5.util.concurrent.EventExecutor;
27 import io.netty5.util.concurrent.Promise;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.TestInfo;
30
31 import java.util.Random;
32 import java.util.concurrent.CountDownLatch;
33
34 import static org.junit.jupiter.api.Assertions.assertFalse;
35
36 public class SocketBufReleaseTest extends AbstractSocketTest {
37
38 private static final EventExecutor executor =
39 new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
40
41 @Test
42 public void testBufferRelease(TestInfo testInfo) throws Throwable {
43 run(testInfo, this::testBufferRelease);
44 }
45
46 public void testBufferRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
47 testRelease(sb, cb);
48 }
49
50 public void testRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
51 final WriteHandler serverHandler = new BufferWriterHandler();
52 final WriteHandler clientHandler = new BufferWriterHandler();
53
54 sb.childHandler(serverHandler);
55 cb.handler(clientHandler);
56
57 Channel sc = sb.bind().asStage().get();
58 Channel cc = cb.connect(sc.localAddress()).asStage().get();
59
60
61 serverHandler.awaitPipelineInit();
62
63
64 sc.close().asStage().sync();
65 cc.close().asStage().sync();
66
67 serverHandler.check();
68 clientHandler.check();
69
70 serverHandler.release();
71 clientHandler.release();
72 }
73
74 private abstract static class WriteHandler extends SimpleChannelInboundHandler<Object> {
75 abstract void awaitPipelineInit() throws InterruptedException;
76 abstract void check() throws InterruptedException;
77 abstract void release();
78 }
79
80 private static final class BufferWriterHandler extends WriteHandler {
81
82 private final Random random = new Random();
83 private final CountDownLatch latch = new CountDownLatch(1);
84 private Buffer buf;
85 private final Promise<Channel> channelFuture = executor.newPromise();
86
87 @Override
88 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
89 channelFuture.setSuccess(ctx.channel());
90 }
91
92 @Override
93 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
94 byte[] data = new byte[1024];
95 random.nextBytes(data);
96 buf = ctx.bufferAllocator().copyOf(data);
97 ctx.writeAndFlush(buf).addListener(future -> latch.countDown());
98 }
99
100 @Override
101 public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
102
103 }
104
105 @Override
106 void awaitPipelineInit() throws InterruptedException {
107 channelFuture.asFuture().asStage().sync();
108 }
109
110 @Override
111 void check() throws InterruptedException {
112 latch.await();
113 assertFalse(buf.isAccessible());
114 }
115
116 @Override
117 void release() {
118 }
119 }
120 }