View Javadoc
1   /*
2    * Copyright 2018 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.channel.Channel;
22  import io.netty5.channel.ChannelHandler;
23  import io.netty5.channel.ChannelHandlerContext;
24  import io.netty5.channel.ChannelInitializer;
25  import io.netty5.channel.SimpleChannelInboundHandler;
26  import org.junit.jupiter.api.Test;
27  import org.junit.jupiter.api.TestInfo;
28  import org.junit.jupiter.api.Timeout;
29  
30  import java.util.concurrent.CountDownLatch;
31  import java.util.concurrent.TimeUnit;
32  import java.util.concurrent.atomic.AtomicReference;
33  
34  import static io.netty5.channel.ChannelOption.AUTO_READ;
35  import static org.junit.jupiter.api.Assertions.assertEquals;
36  import static org.junit.jupiter.api.Assertions.assertNotNull;
37  
38  public class SocketDataReadInitialStateTest extends AbstractSocketTest {
39      @Test
40      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
41      public void testAutoReadOffNoDataReadUntilReadCalled(TestInfo testInfo) throws Throwable {
42          run(testInfo, this::testAutoReadOffNoDataReadUntilReadCalled);
43      }
44  
45      public void testAutoReadOffNoDataReadUntilReadCalled(ServerBootstrap sb, Bootstrap cb) throws Throwable {
46          Channel serverChannel = null;
47          Channel clientChannel = null;
48          final int sleepMs = 100;
49          try {
50              sb.option(AUTO_READ, false);
51              sb.childOption(AUTO_READ, false);
52              cb.option(AUTO_READ, false);
53              final CountDownLatch serverReadyLatch = new CountDownLatch(1);
54              final CountDownLatch acceptorReadLatch = new CountDownLatch(1);
55              final CountDownLatch serverReadLatch = new CountDownLatch(1);
56              final CountDownLatch clientReadLatch = new CountDownLatch(1);
57              final AtomicReference<Channel> serverConnectedChannelRef = new AtomicReference<>();
58  
59              sb.handler(new ChannelInitializer<>() {
60                  @Override
61                  protected void initChannel(Channel ch) {
62                      ch.pipeline().addLast(new ChannelHandler() {
63                          @Override
64                          public void channelRead(ChannelHandlerContext ctx, Object msg) {
65                              acceptorReadLatch.countDown();
66                              ctx.fireChannelRead(msg);
67                          }
68                      });
69                  }
70              });
71  
72              sb.childHandler(new ChannelInitializer<>() {
73                  @Override
74                  protected void initChannel(Channel ch) {
75                      serverConnectedChannelRef.set(ch);
76                      ch.pipeline().addLast(new SimpleChannelInboundHandler<Buffer>() {
77                          @Override
78                          protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
79                              ctx.writeAndFlush(msg.split());
80                              serverReadLatch.countDown();
81                          }
82                      });
83                      serverReadyLatch.countDown();
84                  }
85              });
86  
87              cb.handler(new ChannelInitializer<>() {
88                  @Override
89                  protected void initChannel(Channel ch) {
90                      ch.pipeline().addLast(new SimpleChannelInboundHandler<>() {
91                          @Override
92                          protected void messageReceived(ChannelHandlerContext ctx, Object msg) {
93                              clientReadLatch.countDown();
94                          }
95                      });
96                  }
97              });
98  
99              serverChannel = sb.bind().asStage().get();
100             clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
101             clientChannel.writeAndFlush(clientChannel.bufferAllocator().copyOf(new byte[] {0})).asStage().sync();
102 
103             // The acceptor shouldn't read any data until we call read() below, but give it some time to see if it will.
104             Thread.sleep(sleepMs);
105             assertEquals(1, acceptorReadLatch.getCount());
106             serverChannel.read();
107             serverReadyLatch.await();
108 
109             Channel serverConnectedChannel = serverConnectedChannelRef.get();
110             assertNotNull(serverConnectedChannel);
111 
112             // Allow some amount of time for the server peer to receive the message (which isn't expected to happen
113             // until we call read() below).
114             Thread.sleep(sleepMs);
115             assertEquals(1, serverReadLatch.getCount());
116             serverConnectedChannel.read();
117             serverReadLatch.await();
118 
119             // Allow some amount of time for the client to read the echo.
120             Thread.sleep(sleepMs);
121             assertEquals(1, clientReadLatch.getCount());
122             clientChannel.read();
123             clientReadLatch.await();
124         } finally {
125             if (serverChannel != null) {
126                 serverChannel.close().asStage().sync();
127             }
128             if (clientChannel != null) {
129                 clientChannel.close().asStage().sync();
130             }
131         }
132     }
133 
134     @Test
135     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
136     public void testAutoReadOnDataReadImmediately(TestInfo testInfo) throws Throwable {
137         run(testInfo, this::testAutoReadOnDataReadImmediately);
138     }
139 
140     public void testAutoReadOnDataReadImmediately(ServerBootstrap sb, Bootstrap cb) throws Throwable {
141         Channel serverChannel = null;
142         Channel clientChannel = null;
143         try {
144             sb.option(AUTO_READ, true);
145             sb.childOption(AUTO_READ, true);
146             cb.option(AUTO_READ, true);
147             final CountDownLatch serverReadLatch = new CountDownLatch(1);
148             final CountDownLatch clientReadLatch = new CountDownLatch(1);
149 
150             sb.childHandler(new ChannelInitializer<>() {
151                 @Override
152                 protected void initChannel(Channel ch) {
153                     ch.pipeline().addLast(new SimpleChannelInboundHandler<Buffer>() {
154                         @Override
155                         protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
156                             ctx.writeAndFlush(msg.split());
157                             serverReadLatch.countDown();
158                         }
159                     });
160                 }
161             });
162 
163             cb.handler(new ChannelInitializer<>() {
164                 @Override
165                 protected void initChannel(Channel ch) {
166                     ch.pipeline().addLast(new SimpleChannelInboundHandler<>() {
167                         @Override
168                         protected void messageReceived(ChannelHandlerContext ctx, Object msg) {
169                             clientReadLatch.countDown();
170                         }
171                     });
172                 }
173             });
174 
175             serverChannel = sb.bind().asStage().get();
176             clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
177             clientChannel.writeAndFlush(clientChannel.bufferAllocator().copyOf(new byte[] {0})).asStage().sync();
178             serverReadLatch.await();
179             clientReadLatch.await();
180         } finally {
181             if (serverChannel != null) {
182                 serverChannel.close().asStage().sync();
183             }
184             if (clientChannel != null) {
185                 clientChannel.close().asStage().sync();
186             }
187         }
188     }
189 }