View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelFuture;
24  import io.netty.channel.ChannelFutureListener;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.channel.ChannelInboundHandlerAdapter;
27  import io.netty.channel.ChannelInitializer;
28  import io.netty.channel.ChannelOption;
29  import io.netty.testsuite.transport.TestsuitePermutation;
30  import io.netty.util.CharsetUtil;
31  import io.netty.util.concurrent.ImmediateEventExecutor;
32  import io.netty.util.concurrent.Promise;
33  import org.junit.jupiter.api.Test;
34  import org.junit.jupiter.api.TestInfo;
35  import org.junit.jupiter.api.Timeout;
36  
37  import java.io.IOException;
38  import java.net.SocketAddress;
39  import java.util.List;
40  import java.util.concurrent.TimeUnit;
41  import java.util.concurrent.atomic.AtomicInteger;
42  import java.util.concurrent.atomic.AtomicReference;
43  
44  public abstract class AbstractSocketReuseFdTest extends AbstractSocketTest {
45      @Override
46      protected abstract SocketAddress newSocketAddress();
47  
48      @Override
49      protected abstract List<TestsuitePermutation.BootstrapComboFactory<ServerBootstrap, Bootstrap>> newFactories();
50  
51      @Test
52      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
53      public void testReuseFd(TestInfo testInfo) throws Throwable {
54          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
55              @Override
56              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
57                  testReuseFd(serverBootstrap, bootstrap);
58              }
59          });
60      }
61  
62      public void testReuseFd(ServerBootstrap sb, Bootstrap cb) throws Throwable {
63          sb.childOption(ChannelOption.AUTO_READ, true);
64          cb.option(ChannelOption.AUTO_READ, true);
65  
66          // Use a number which will typically not exceed /proc/sys/net/core/somaxconn (which is 128 on linux by default
67          // often).
68          int numChannels = 100;
69          final AtomicReference<Throwable> globalException = new AtomicReference<Throwable>();
70          final AtomicInteger serverRemaining = new AtomicInteger(numChannels);
71          final AtomicInteger clientRemaining = new AtomicInteger(numChannels);
72          final Promise<Void> serverDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
73          final Promise<Void> clientDonePromise = ImmediateEventExecutor.INSTANCE.newPromise();
74  
75          sb.childHandler(new ChannelInitializer<Channel>() {
76              @Override
77              public void initChannel(Channel sch) {
78                  ReuseFdHandler sh = new ReuseFdHandler(
79                      false,
80                      globalException,
81                      serverRemaining,
82                      serverDonePromise);
83                  sch.pipeline().addLast("handler", sh);
84              }
85          });
86  
87          cb.handler(new ChannelInitializer<Channel>() {
88              @Override
89              public void initChannel(Channel sch) {
90                  ReuseFdHandler ch = new ReuseFdHandler(
91                      true,
92                      globalException,
93                      clientRemaining,
94                      clientDonePromise);
95                  sch.pipeline().addLast("handler", ch);
96              }
97          });
98  
99          ChannelFutureListener listener = future -> {
100             if (!future.isSuccess()) {
101                 clientDonePromise.tryFailure(future.cause());
102             }
103         };
104 
105         Channel sc = sb.bind().sync().channel();
106         for (int i = 0; i < numChannels; i++) {
107             cb.connect(sc.localAddress()).addListener(listener);
108         }
109 
110         clientDonePromise.sync();
111         serverDonePromise.sync();
112         sc.close().sync();
113 
114         if (globalException.get() != null && !(globalException.get() instanceof IOException)) {
115             throw globalException.get();
116         }
117     }
118 
119     static class ReuseFdHandler extends ChannelInboundHandlerAdapter {
120         private static final String EXPECTED_PAYLOAD = "payload";
121 
122         private final Promise<Void> donePromise;
123         private final AtomicInteger remaining;
124         private final boolean client;
125         volatile Channel channel;
126         final AtomicReference<Throwable> globalException;
127         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
128         final StringBuilder received = new StringBuilder();
129 
130         ReuseFdHandler(
131             boolean client,
132             AtomicReference<Throwable> globalException,
133             AtomicInteger remaining,
134             Promise<Void> donePromise) {
135             this.client = client;
136             this.globalException = globalException;
137             this.remaining = remaining;
138             this.donePromise = donePromise;
139         }
140 
141         @Override
142         public void channelActive(ChannelHandlerContext ctx) {
143             channel = ctx.channel();
144             if (client) {
145                 ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII))
146                         .addListener(f -> {
147                             if (!f.isSuccess()) {
148                                 donePromise.tryFailure(f.cause());
149                             }
150                         });
151             }
152         }
153 
154         @Override
155         public void channelRead(ChannelHandlerContext ctx, Object msg) {
156             if (msg instanceof ByteBuf) {
157                 ByteBuf buf = (ByteBuf) msg;
158                 received.append(buf.toString(CharsetUtil.US_ASCII));
159                 buf.release();
160 
161                 if (received.toString().equals(EXPECTED_PAYLOAD)) {
162                     if (client) {
163                         ctx.close();
164                     } else {
165                         ctx.writeAndFlush(Unpooled.copiedBuffer(EXPECTED_PAYLOAD, CharsetUtil.US_ASCII))
166                                 .addListener(f -> {
167                                     if (!f.isSuccess()) {
168                                         donePromise.tryFailure(f.cause());
169                                     }
170                                 });
171                     }
172                 }
173             }
174         }
175 
176         @Override
177         public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
178             if (exception.compareAndSet(null, cause)) {
179                 donePromise.tryFailure(new IllegalStateException("exceptionCaught: " + ctx.channel(), cause));
180                 ctx.close();
181             }
182             globalException.compareAndSet(null, cause);
183         }
184 
185         @Override
186         public void channelInactive(ChannelHandlerContext ctx) {
187             if (remaining.decrementAndGet() == 0) {
188                 if (received.toString().equals(EXPECTED_PAYLOAD)) {
189                     donePromise.setSuccess(null);
190                 } else {
191                     donePromise.tryFailure(new Exception("Unexpected payload:" + received));
192                 }
193             }
194         }
195     }
196 }