1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.SimpleChannelInboundHandler;
26 import io.netty.util.concurrent.DefaultEventExecutorGroup;
27 import io.netty.util.concurrent.DefaultPromise;
28 import io.netty.util.concurrent.DefaultThreadFactory;
29 import io.netty.util.concurrent.EventExecutor;
30 import io.netty.util.concurrent.Promise;
31 import org.junit.jupiter.api.Test;
32 import org.junit.jupiter.api.TestInfo;
33
34 import java.util.Random;
35 import java.util.concurrent.CountDownLatch;
36
37 import static org.junit.jupiter.api.Assertions.assertEquals;
38
39 public class SocketBufReleaseTest extends AbstractSocketTest {
40
41 private static final EventExecutor executor =
42 new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
43
44 @Test
45 public void testBufRelease(TestInfo testInfo) throws Throwable {
46 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
47 @Override
48 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
49 testBufRelease(serverBootstrap, bootstrap);
50 }
51 });
52 }
53
54 public void testBufRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
55 BufWriterHandler serverHandler = new BufWriterHandler();
56 BufWriterHandler clientHandler = new BufWriterHandler();
57
58 sb.childHandler(serverHandler);
59 cb.handler(clientHandler);
60
61 Channel sc = sb.bind().sync().channel();
62 Channel cc = cb.connect(sc.localAddress()).sync().channel();
63
64
65 serverHandler.channelFuture.sync();
66
67
68 sc.close().sync();
69 cc.close().sync();
70
71 serverHandler.check();
72 clientHandler.check();
73
74 serverHandler.release();
75 clientHandler.release();
76 }
77
78 private static class BufWriterHandler extends SimpleChannelInboundHandler<Object> {
79
80 private final Random random = new Random();
81 private final CountDownLatch latch = new CountDownLatch(1);
82 private ByteBuf buf;
83 private final Promise<Channel> channelFuture = new DefaultPromise<Channel>(executor);
84
85 @Override
86 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
87 channelFuture.setSuccess(ctx.channel());
88 }
89
90 @Override
91 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
92 byte[] data = new byte[1024];
93 random.nextBytes(data);
94
95 buf = ctx.alloc().buffer();
96
97 buf.writeBytes(data).retain();
98
99 ctx.channel().writeAndFlush(buf).addListener(new ChannelFutureListener() {
100 @Override
101 public void operationComplete(ChannelFuture future) throws Exception {
102 latch.countDown();
103 }
104 });
105 }
106
107 @Override
108 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
109
110 }
111
112 public void check() throws InterruptedException {
113 latch.await();
114 assertEquals(1, buf.refCnt());
115 }
116
117 void release() {
118 buf.release();
119 }
120 }
121 }