View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.SimpleChannelInboundHandler;
24  import io.netty.util.concurrent.DefaultEventExecutorGroup;
25  import io.netty.util.concurrent.DefaultPromise;
26  import io.netty.util.concurrent.DefaultThreadFactory;
27  import io.netty.util.concurrent.EventExecutor;
28  import io.netty.util.concurrent.Promise;
29  import org.junit.jupiter.api.Test;
30  import org.junit.jupiter.api.TestInfo;
31  
32  import java.util.Random;
33  import java.util.concurrent.CountDownLatch;
34  
35  import static org.junit.jupiter.api.Assertions.assertEquals;
36  
37  public class SocketBufReleaseTest extends AbstractSocketTest {
38  
39      private static final EventExecutor executor =
40              new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
41  
42      @Test
43      public void testBufRelease(TestInfo testInfo) throws Throwable {
44          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
45              @Override
46              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
47                  testBufRelease(serverBootstrap, bootstrap);
48              }
49          });
50      }
51  
52      public void testBufRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
53          BufWriterHandler serverHandler = new BufWriterHandler();
54          BufWriterHandler clientHandler = new BufWriterHandler();
55  
56          sb.childHandler(serverHandler);
57          cb.handler(clientHandler);
58  
59          Channel sc = sb.bind().sync().channel();
60          Channel cc = cb.connect(sc.localAddress()).sync().channel();
61  
62          // Ensure the server socket accepted the client connection *and* initialized pipeline successfully.
63          serverHandler.channelFuture.sync();
64  
65          // and then close all sockets.
66          sc.close().sync();
67          cc.close().sync();
68  
69          serverHandler.check();
70          clientHandler.check();
71  
72          serverHandler.release();
73          clientHandler.release();
74      }
75  
76      private static class BufWriterHandler extends SimpleChannelInboundHandler<Object> {
77  
78          private final Random random = new Random();
79          private final CountDownLatch latch = new CountDownLatch(1);
80          private ByteBuf buf;
81          private final Promise<Channel> channelFuture = new DefaultPromise<Channel>(executor);
82  
83          @Override
84          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
85              channelFuture.setSuccess(ctx.channel());
86          }
87  
88          @Override
89          public void channelActive(final ChannelHandlerContext ctx) throws Exception {
90              byte[] data = new byte[1024];
91              random.nextBytes(data);
92  
93              buf = ctx.alloc().buffer();
94              // call retain on it so it can't be put back on the pool
95              buf.writeBytes(data).retain();
96  
97              ctx.channel().writeAndFlush(buf).addListener(future -> latch.countDown());
98          }
99  
100         @Override
101         public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
102             // discard
103         }
104 
105         public void check() throws InterruptedException {
106             latch.await();
107             assertEquals(1, buf.refCnt());
108         }
109 
110         void release() {
111             buf.release();
112         }
113     }
114 }