1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.SimpleChannelInboundHandler;
24 import io.netty.util.concurrent.DefaultEventExecutorGroup;
25 import io.netty.util.concurrent.DefaultPromise;
26 import io.netty.util.concurrent.DefaultThreadFactory;
27 import io.netty.util.concurrent.EventExecutor;
28 import io.netty.util.concurrent.Promise;
29 import org.junit.jupiter.api.Test;
30 import org.junit.jupiter.api.TestInfo;
31
32 import java.util.Random;
33 import java.util.concurrent.CountDownLatch;
34
35 import static org.junit.jupiter.api.Assertions.assertEquals;
36
37 public class SocketBufReleaseTest extends AbstractSocketTest {
38
39 private static final EventExecutor executor =
40 new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
41
42 @Test
43 public void testBufRelease(TestInfo testInfo) throws Throwable {
44 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
45 @Override
46 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
47 testBufRelease(serverBootstrap, bootstrap);
48 }
49 });
50 }
51
52 public void testBufRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
53 BufWriterHandler serverHandler = new BufWriterHandler();
54 BufWriterHandler clientHandler = new BufWriterHandler();
55
56 sb.childHandler(serverHandler);
57 cb.handler(clientHandler);
58
59 Channel sc = sb.bind().sync().channel();
60 Channel cc = cb.connect(sc.localAddress()).sync().channel();
61
62
63 serverHandler.channelFuture.sync();
64
65
66 sc.close().sync();
67 cc.close().sync();
68
69 serverHandler.check();
70 clientHandler.check();
71
72 serverHandler.release();
73 clientHandler.release();
74 }
75
76 private static class BufWriterHandler extends SimpleChannelInboundHandler<Object> {
77
78 private final Random random = new Random();
79 private final CountDownLatch latch = new CountDownLatch(1);
80 private ByteBuf buf;
81 private final Promise<Channel> channelFuture = new DefaultPromise<Channel>(executor);
82
83 @Override
84 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
85 channelFuture.setSuccess(ctx.channel());
86 }
87
88 @Override
89 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
90 byte[] data = new byte[1024];
91 random.nextBytes(data);
92
93 buf = ctx.alloc().buffer();
94
95 buf.writeBytes(data).retain();
96
97 ctx.channel().writeAndFlush(buf).addListener(future -> latch.countDown());
98 }
99
100 @Override
101 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
102
103 }
104
105 public void check() throws InterruptedException {
106 latch.await();
107 assertEquals(1, buf.refCnt());
108 }
109
110 void release() {
111 buf.release();
112 }
113 }
114 }