View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.channel.unix.tests;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.channel.Channel;
22  import io.netty5.channel.ChannelFutureListeners;
23  import io.netty5.channel.ChannelHandler;
24  import io.netty5.channel.ChannelHandlerContext;
25  import io.netty5.channel.ChannelInitializer;
26  import io.netty5.channel.ChannelOption;
27  import io.netty5.channel.EventLoopGroup;
28  import io.netty5.channel.FixedRecvBufferAllocator;
29  import io.netty5.channel.ServerChannel;
30  import io.netty5.channel.SimpleChannelInboundHandler;
31  import org.junit.jupiter.api.Test;
32  import org.junit.jupiter.api.Timeout;
33  
34  import java.net.InetSocketAddress;
35  import java.util.concurrent.CountDownLatch;
36  import java.util.concurrent.TimeUnit;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  import static org.junit.jupiter.api.Assertions.assertEquals;
40  
41  public abstract class DetectPeerCloseWithoutReadTest {
42      protected abstract EventLoopGroup newGroup();
43      protected abstract Class<? extends ServerChannel> serverChannel();
44      protected abstract Class<? extends Channel> clientChannel();
45  
46      @Test
47      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
48      public void clientCloseWithoutServerReadIsDetectedNoExtraReadRequested() throws Exception {
49          clientCloseWithoutServerReadIsDetected0(false);
50      }
51  
52      @Test
53      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
54      public void clientCloseWithoutServerReadIsDetectedExtraReadRequested() throws Exception {
55          clientCloseWithoutServerReadIsDetected0(true);
56      }
57  
58      private void clientCloseWithoutServerReadIsDetected0(final boolean extraReadRequested) throws Exception {
59          EventLoopGroup serverGroup = null;
60          EventLoopGroup clientGroup = null;
61          Channel serverChannel = null;
62          try {
63              final CountDownLatch latch = new CountDownLatch(1);
64              final AtomicInteger bytesRead = new AtomicInteger();
65              final int expectedBytes = 100;
66              serverGroup = newGroup();
67              clientGroup = newGroup();
68              ServerBootstrap sb = new ServerBootstrap();
69              sb.group(serverGroup);
70              sb.channel(serverChannel());
71              // Ensure we read only one message per read() call and that we need multiple read()
72              // calls to consume everything.
73              sb.childOption(ChannelOption.AUTO_READ, false);
74              sb.childOption(ChannelOption.MAX_MESSAGES_PER_READ, 1);
75              sb.childOption(ChannelOption.RCVBUFFER_ALLOCATOR, new FixedRecvBufferAllocator(expectedBytes / 10));
76              sb.childHandler(new ChannelInitializer<>() {
77                  @Override
78                  protected void initChannel(Channel ch) {
79                      ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
80                  }
81              });
82  
83              serverChannel = sb.bind(new InetSocketAddress(0)).asStage().get();
84  
85              Bootstrap cb = new Bootstrap();
86              cb.group(serverGroup);
87              cb.channel(clientChannel());
88              cb.handler(new ChannelHandler() { });
89              Channel clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
90              Buffer buf = clientChannel.bufferAllocator().allocate(expectedBytes);
91              buf.skipWritableBytes(expectedBytes);
92              clientChannel.writeAndFlush(buf).addListener(clientChannel, ChannelFutureListeners.CLOSE);
93  
94              latch.await();
95              assertEquals(expectedBytes, bytesRead.get());
96          } finally {
97              if (serverChannel != null) {
98                  serverChannel.close().asStage().sync();
99              }
100             if (serverGroup != null) {
101                 serverGroup.shutdownGracefully();
102             }
103             if (clientGroup != null) {
104                 clientGroup.shutdownGracefully();
105             }
106         }
107     }
108 
109     @Test
110     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
111     public void serverCloseWithoutClientReadIsDetectedNoExtraReadRequested() throws Exception {
112         serverCloseWithoutClientReadIsDetected0(false);
113     }
114 
115     @Test
116     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
117     public void serverCloseWithoutClientReadIsDetectedExtraReadRequested() throws Exception {
118         serverCloseWithoutClientReadIsDetected0(true);
119     }
120 
121     private void serverCloseWithoutClientReadIsDetected0(final boolean extraReadRequested) throws Exception {
122         EventLoopGroup serverGroup = null;
123         EventLoopGroup clientGroup = null;
124         Channel serverChannel = null;
125         Channel clientChannel = null;
126         try {
127             final CountDownLatch latch = new CountDownLatch(1);
128             final AtomicInteger bytesRead = new AtomicInteger();
129             final int expectedBytes = 100;
130             serverGroup = newGroup();
131             clientGroup = newGroup();
132             ServerBootstrap sb = new ServerBootstrap();
133             sb.group(serverGroup);
134             sb.channel(serverChannel());
135             sb.childHandler(new ChannelInitializer<>() {
136                 @Override
137                 protected void initChannel(Channel ch) {
138                     ch.pipeline().addLast(new ChannelHandler() {
139                         @Override
140                         public void channelActive(ChannelHandlerContext ctx) {
141                             Buffer buf = ctx.bufferAllocator().allocate(expectedBytes);
142                             buf.skipWritableBytes(expectedBytes);
143                             ctx.writeAndFlush(buf).addListener(ctx.channel(), ChannelFutureListeners.CLOSE);
144                             ctx.fireChannelActive();
145                         }
146                     });
147                 }
148             });
149 
150             serverChannel = sb.bind(new InetSocketAddress(0)).asStage().get();
151 
152             Bootstrap cb = new Bootstrap();
153             cb.group(serverGroup);
154             cb.channel(clientChannel());
155             // Ensure we read only one message per read() call and that we need multiple read()
156             // calls to consume everything.
157             cb.option(ChannelOption.AUTO_READ, false);
158             cb.option(ChannelOption.MAX_MESSAGES_PER_READ, 1);
159             cb.option(ChannelOption.RCVBUFFER_ALLOCATOR, new FixedRecvBufferAllocator(expectedBytes / 10));
160             cb.handler(new ChannelInitializer<>() {
161                 @Override
162                 protected void initChannel(Channel ch) throws Exception {
163                     ch.pipeline().addLast(new TestHandler(bytesRead, extraReadRequested, latch));
164                 }
165             });
166             clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
167 
168             latch.await();
169             assertEquals(expectedBytes, bytesRead.get());
170         } finally {
171             if (serverChannel != null) {
172                 serverChannel.close().asStage().sync();
173             }
174             if (clientChannel != null) {
175                 clientChannel.close().asStage().sync();
176             }
177             if (serverGroup != null) {
178                 serverGroup.shutdownGracefully();
179             }
180             if (clientGroup != null) {
181                 clientGroup.shutdownGracefully();
182             }
183         }
184     }
185 
186     private static final class TestHandler extends SimpleChannelInboundHandler<Buffer> {
187         private final AtomicInteger bytesRead;
188         private final boolean extraReadRequested;
189         private final CountDownLatch latch;
190 
191         TestHandler(AtomicInteger bytesRead, boolean extraReadRequested, CountDownLatch latch) {
192             this.bytesRead = bytesRead;
193             this.extraReadRequested = extraReadRequested;
194             this.latch = latch;
195         }
196 
197         @Override
198         protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
199             bytesRead.addAndGet(msg.readableBytes());
200 
201             if (extraReadRequested) {
202                 // Because autoread is off, we call read to consume all data until we detect the close.
203                 ctx.read();
204             }
205         }
206 
207         @Override
208         public void channelInactive(ChannelHandlerContext ctx) {
209             latch.countDown();
210             ctx.fireChannelInactive();
211         }
212     }
213 }