View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.channel.Channel;
22  import io.netty5.channel.ChannelHandlerContext;
23  import io.netty5.channel.SimpleChannelInboundHandler;
24  import io.netty5.util.concurrent.DefaultEventExecutorGroup;
25  import io.netty5.util.concurrent.DefaultThreadFactory;
26  import io.netty5.util.concurrent.EventExecutor;
27  import io.netty5.util.concurrent.Promise;
28  import org.junit.jupiter.api.Test;
29  import org.junit.jupiter.api.TestInfo;
30  
31  import java.util.Random;
32  import java.util.concurrent.CountDownLatch;
33  
34  import static org.junit.jupiter.api.Assertions.assertFalse;
35  
36  public class SocketBufReleaseTest extends AbstractSocketTest {
37  
38      private static final EventExecutor executor =
39              new DefaultEventExecutorGroup(1, new DefaultThreadFactory(SocketBufReleaseTest.class, true)).next();
40  
41      @Test
42      public void testBufferRelease(TestInfo testInfo) throws Throwable {
43          run(testInfo, this::testBufferRelease);
44      }
45  
46      public void testBufferRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
47          testRelease(sb, cb);
48      }
49  
50      public void testRelease(ServerBootstrap sb, Bootstrap cb) throws Throwable {
51          final WriteHandler serverHandler = new BufferWriterHandler();
52          final WriteHandler clientHandler = new BufferWriterHandler();
53  
54          sb.childHandler(serverHandler);
55          cb.handler(clientHandler);
56  
57          Channel sc = sb.bind().asStage().get();
58          Channel cc = cb.connect(sc.localAddress()).asStage().get();
59  
60          // Ensure the server socket accepted the client connection *and* initialized pipeline successfully.
61          serverHandler.awaitPipelineInit();
62  
63          // and then close all sockets.
64          sc.close().asStage().sync();
65          cc.close().asStage().sync();
66  
67          serverHandler.check();
68          clientHandler.check();
69  
70          serverHandler.release();
71          clientHandler.release();
72      }
73  
74      private abstract static class WriteHandler extends SimpleChannelInboundHandler<Object> {
75          abstract void awaitPipelineInit() throws InterruptedException;
76          abstract void check() throws InterruptedException;
77          abstract void release();
78      }
79  
80      private static final class BufferWriterHandler extends WriteHandler {
81  
82          private final Random random = new Random();
83          private final CountDownLatch latch = new CountDownLatch(1);
84          private Buffer buf;
85          private final Promise<Channel> channelFuture = executor.newPromise();
86  
87          @Override
88          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
89              channelFuture.setSuccess(ctx.channel());
90          }
91  
92          @Override
93          public void channelActive(final ChannelHandlerContext ctx) throws Exception {
94              byte[] data = new byte[1024];
95              random.nextBytes(data);
96              buf = ctx.bufferAllocator().copyOf(data);
97              ctx.writeAndFlush(buf).addListener(future -> latch.countDown());
98          }
99  
100         @Override
101         public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
102             // discard
103         }
104 
105         @Override
106         void awaitPipelineInit() throws InterruptedException {
107             channelFuture.asFuture().asStage().sync();
108         }
109 
110         @Override
111         void check() throws InterruptedException {
112             latch.await();
113             assertFalse(buf.isAccessible());
114         }
115 
116         @Override
117         void release() {
118         }
119     }
120 }