View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelFuture;
23  import io.netty.channel.ChannelFutureListener;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.SimpleChannelInboundHandler;
26  import io.netty.util.concurrent.DefaultEventExecutorGroup;
27  import io.netty.util.concurrent.DefaultPromise;
28  import io.netty.util.concurrent.DefaultThreadFactory;
29  import io.netty.util.concurrent.EventExecutor;
30  import io.netty.util.concurrent.Promise;
31  import org.junit.jupiter.api.Test;
32  import org.junit.jupiter.api.TestInfo;
33  
34  import java.util.Random;
35  import java.util.concurrent.CountDownLatch;
36  
37  import static org.junit.jupiter.api.Assertions.assertEquals;
38  
39  public class SocketBufReleaseTest extends AbstractSocketTest {
40  
41      private static final EventExecutor executor =
42              new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
43  
44      @Test
45      public void testBufRelease(TestInfo testInfo) throws Throwable {
46          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
47              @Override
48              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
49                  testBufRelease(serverBootstrap, bootstrap);
50              }
51          });
52      }
53  
54      public void testBufRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
55          BufWriterHandler serverHandler = new BufWriterHandler();
56          BufWriterHandler clientHandler = new BufWriterHandler();
57  
58          sb.childHandler(serverHandler);
59          cb.handler(clientHandler);
60  
61          Channel sc = sb.bind().sync().channel();
62          Channel cc = cb.connect(sc.localAddress()).sync().channel();
63  
64          // Ensure the server socket accepted the client connection *and* initialized pipeline successfully.
65          serverHandler.channelFuture.sync();
66  
67          // and then close all sockets.
68          sc.close().sync();
69          cc.close().sync();
70  
71          serverHandler.check();
72          clientHandler.check();
73  
74          serverHandler.release();
75          clientHandler.release();
76      }
77  
78      private static class BufWriterHandler extends SimpleChannelInboundHandler<Object> {
79  
80          private final Random random = new Random();
81          private final CountDownLatch latch = new CountDownLatch(1);
82          private ByteBuf buf;
83          private final Promise<Channel> channelFuture = new DefaultPromise<Channel>(executor);
84  
85          @Override
86          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
87              channelFuture.setSuccess(ctx.channel());
88          }
89  
90          @Override
91          public void channelActive(final ChannelHandlerContext ctx) throws Exception {
92              byte[] data = new byte[1024];
93              random.nextBytes(data);
94  
95              buf = ctx.alloc().buffer();
96              // call retain on it so it can't be put back on the pool
97              buf.writeBytes(data).retain();
98  
99              ctx.channel().writeAndFlush(buf).addListener(new ChannelFutureListener() {
100                 @Override
101                 public void operationComplete(ChannelFuture future) throws Exception {
102                     latch.countDown();
103                 }
104             });
105         }
106 
107         @Override
108         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
109             // discard
110         }
111 
112         public void check() throws InterruptedException {
113             latch.await();
114             assertEquals(1, buf.refCnt());
115         }
116 
117         void release() {
118             buf.release();
119         }
120     }
121 }