View Javadoc
1   /*
2    * Copyright 2018 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.ChannelInitializer;
25  import io.netty.channel.SimpleChannelInboundHandler;
26  import org.junit.jupiter.api.Test;
27  import org.junit.jupiter.api.TestInfo;
28  import org.junit.jupiter.api.Timeout;
29  
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import static io.netty.channel.ChannelOption.AUTO_READ;
35  import static org.junit.jupiter.api.Assertions.assertEquals;
36  import static org.junit.jupiter.api.Assertions.assertNotNull;
37  
38  public class SocketDataReadInitialStateTest extends AbstractSocketTest {
39      @Test
40      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
41      public void testAutoReadOffNoDataReadUntilReadCalled(TestInfo testInfo) throws Throwable {
42          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
43              @Override
44              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
45                  testAutoReadOffNoDataReadUntilReadCalled(serverBootstrap, bootstrap);
46              }
47          });
48      }
49  
50      public void testAutoReadOffNoDataReadUntilReadCalled(ServerBootstrap sb, Bootstrap cb) throws Throwable {
51          Channel serverChannel = null;
52          Channel clientChannel = null;
53          final int sleepMs = 100;
54          try {
55              sb.option(AUTO_READ, false);
56              sb.childOption(AUTO_READ, false);
57              cb.option(AUTO_READ, false);
58              final CountDownLatch serverReadyLatch = new CountDownLatch(1);
59              final CountDownLatch acceptorReadLatch = new CountDownLatch(1);
60              final CountDownLatch serverReadLatch = new CountDownLatch(1);
61              final CountDownLatch clientReadLatch = new CountDownLatch(1);
62              final AtomicReference<Channel> serverConnectedChannelRef = new AtomicReference<Channel>();
63  
64              sb.handler(new ChannelInitializer<Channel>() {
65                  @Override
66                  protected void initChannel(Channel ch) {
67                      ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
68                          @Override
69                          public void channelRead(ChannelHandlerContext ctx, Object msg) {
70                              acceptorReadLatch.countDown();
71                              ctx.fireChannelRead(msg);
72                          }
73                      });
74                  }
75              });
76  
77              sb.childHandler(new ChannelInitializer<Channel>() {
78                  @Override
79                  protected void initChannel(Channel ch) {
80                      serverConnectedChannelRef.set(ch);
81                      ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
82                          @Override
83                          protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
84                              ctx.writeAndFlush(msg.retainedDuplicate());
85                              serverReadLatch.countDown();
86                          }
87                      });
88                      serverReadyLatch.countDown();
89                  }
90              });
91  
92              cb.handler(new ChannelInitializer<Channel>() {
93                  @Override
94                  protected void initChannel(Channel ch) {
95                      ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
96                          @Override
97                          protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
98                              clientReadLatch.countDown();
99                          }
100                     });
101                 }
102             });
103 
104             serverChannel = sb.bind().sync().channel();
105             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
106             clientChannel.writeAndFlush(clientChannel.alloc().buffer().writeZero(1)).syncUninterruptibly();
107 
108             // The acceptor shouldn't read any data until we call read() below, but give it some time to see if it will.
109             Thread.sleep(sleepMs);
110             assertEquals(1, acceptorReadLatch.getCount());
111             serverChannel.read();
112             serverReadyLatch.await();
113 
114             Channel serverConnectedChannel = serverConnectedChannelRef.get();
115             assertNotNull(serverConnectedChannel);
116 
117             // Allow some amount of time for the server peer to receive the message (which isn't expected to happen
118             // until we call read() below).
119             Thread.sleep(sleepMs);
120             assertEquals(1, serverReadLatch.getCount());
121             serverConnectedChannel.read();
122             serverReadLatch.await();
123 
124             // Allow some amount of time for the client to read the echo.
125             Thread.sleep(sleepMs);
126             assertEquals(1, clientReadLatch.getCount());
127             clientChannel.read();
128             clientReadLatch.await();
129         } finally {
130             if (serverChannel != null) {
131                 serverChannel.close().sync();
132             }
133             if (clientChannel != null) {
134                 clientChannel.close().sync();
135             }
136         }
137     }
138 
139     @Test
140     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
141     public void testAutoReadOnDataReadImmediately(TestInfo testInfo) throws Throwable {
142         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
143             @Override
144             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
145                 testAutoReadOnDataReadImmediately(serverBootstrap, bootstrap);
146             }
147         });
148     }
149 
150     public void testAutoReadOnDataReadImmediately(ServerBootstrap sb, Bootstrap cb) throws Throwable {
151         Channel serverChannel = null;
152         Channel clientChannel = null;
153         try {
154             sb.option(AUTO_READ, true);
155             sb.childOption(AUTO_READ, true);
156             cb.option(AUTO_READ, true);
157             final CountDownLatch serverReadLatch = new CountDownLatch(1);
158             final CountDownLatch clientReadLatch = new CountDownLatch(1);
159 
160             sb.childHandler(new ChannelInitializer<Channel>() {
161                 @Override
162                 protected void initChannel(Channel ch) {
163                     ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
164                         @Override
165                         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
166                             ctx.writeAndFlush(msg.retainedDuplicate());
167                             serverReadLatch.countDown();
168                         }
169                     });
170                 }
171             });
172 
173             cb.handler(new ChannelInitializer<Channel>() {
174                 @Override
175                 protected void initChannel(Channel ch) {
176                     ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
177                         @Override
178                         protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
179                             clientReadLatch.countDown();
180                         }
181                     });
182                 }
183             });
184 
185             serverChannel = sb.bind().sync().channel();
186             clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
187             clientChannel.writeAndFlush(clientChannel.alloc().buffer().writeZero(1)).syncUninterruptibly();
188             serverReadLatch.await();
189             clientReadLatch.await();
190         } finally {
191             if (serverChannel != null) {
192                 serverChannel.close().sync();
193             }
194             if (clientChannel != null) {
195                 clientChannel.close().sync();
196             }
197         }
198     }
199 }