View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.buffer.api.BufferAllocator;
22  import io.netty5.buffer.api.DefaultBufferAllocators;
23  import io.netty5.util.Resource;
24  import io.netty5.channel.Channel;
25  import io.netty5.channel.ChannelHandler;
26  import io.netty5.channel.ChannelHandlerContext;
27  import io.netty5.channel.ChannelInitializer;
28  import io.netty5.channel.ChannelOption;
29  import io.netty5.channel.RecvBufferAllocator;
30  import org.junit.jupiter.api.Test;
31  import org.junit.jupiter.api.TestInfo;
32  
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.function.Predicate;
37  
38  import static org.junit.jupiter.api.Assertions.assertEquals;
39  import static org.junit.jupiter.api.Assertions.assertTrue;
40  
41  public class SocketAutoReadTest extends AbstractSocketTest {
42      @Test
43      public void testAutoReadOffDuringReadOnlyReadsOne(TestInfo testInfo) throws Throwable {
44          run(testInfo, this::testAutoReadOffDuringReadOnlyReadsOne);
45      }
46  
47      public void testAutoReadOffDuringReadOnlyReadsOne(ServerBootstrap sb, Bootstrap cb) throws Throwable {
48          testAutoReadOffDuringReadOnlyReadsOne(true, sb, cb);
49          testAutoReadOffDuringReadOnlyReadsOne(false, sb, cb);
50      }
51  
52      private static void testAutoReadOffDuringReadOnlyReadsOne(boolean readOutsideEventLoopThread,
53                                                             ServerBootstrap sb, Bootstrap cb) throws Throwable {
54          Channel serverChannel = null;
55          Channel clientChannel = null;
56          try {
57              AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
58              AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
59              sb.option(ChannelOption.SO_BACKLOG, 1024)
60                      .option(ChannelOption.AUTO_READ, true)
61                      .childOption(ChannelOption.AUTO_READ, true)
62                      // We want to ensure that we attempt multiple individual read operations per read loop so we can
63                      // test the auto read feature being turned off when data is first read.
64                      .childOption(ChannelOption.RCVBUFFER_ALLOCATOR, new TestRecvBufferAllocator())
65                      .childHandler(serverInitializer);
66  
67              serverChannel = sb.bind().asStage().get();
68  
69              cb.option(ChannelOption.AUTO_READ, true)
70                      // We want to ensure that we attempt multiple individual read operations per read loop so we can
71                      // test the auto read feature being turned off when data is first read.
72                      .option(ChannelOption.RCVBUFFER_ALLOCATOR, new TestRecvBufferAllocator())
73                      .handler(clientInitializer);
74  
75              clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
76              BufferAllocator alloc = DefaultBufferAllocators.onHeapAllocator();
77  
78              // 3 bytes means 3 independent reads for TestRecvBufferAllocator
79              clientChannel.writeAndFlush(alloc.copyOf(new byte[3]));
80              serverInitializer.autoReadHandler.assertSingleRead();
81  
82              // 3 bytes means 3 independent reads for TestRecvBufferAllocator
83              serverInitializer.channel.writeAndFlush(alloc.copyOf(new byte[3]));
84              clientInitializer.autoReadHandler.assertSingleRead();
85  
86              if (readOutsideEventLoopThread) {
87                  serverInitializer.channel.read();
88              }
89              serverInitializer.autoReadHandler.assertSingleReadSecondTry();
90  
91              if (readOutsideEventLoopThread) {
92                  clientChannel.read();
93              }
94              clientInitializer.autoReadHandler.assertSingleReadSecondTry();
95          } finally {
96              if (clientChannel != null) {
97                  clientChannel.close().asStage().sync();
98              }
99              if (serverChannel != null) {
100                 serverChannel.close().asStage().sync();
101             }
102         }
103     }
104 
105     private static class AutoReadInitializer extends ChannelInitializer<Channel> {
106         final AutoReadHandler autoReadHandler;
107         volatile Channel channel;
108 
109         AutoReadInitializer(boolean readInEventLoop) {
110             autoReadHandler = new AutoReadHandler(readInEventLoop);
111         }
112 
113         @Override
114         protected void initChannel(Channel ch) throws Exception {
115             channel = ch;
116             ch.pipeline().addLast(autoReadHandler);
117         }
118     }
119 
120     private static final class AutoReadHandler implements ChannelHandler {
121         private final AtomicInteger count = new AtomicInteger();
122         private final CountDownLatch latch = new CountDownLatch(1);
123         private final CountDownLatch latch2;
124         private final boolean callRead;
125 
126         AutoReadHandler(boolean callRead) {
127             this.callRead = callRead;
128             latch2 = new CountDownLatch(callRead ? 3 : 2);
129         }
130 
131         @Override
132         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
133             if (msg instanceof Resource<?>) {
134                 ((Resource<?>) msg).close();
135             } else {
136                 Resource.dispose(msg);
137             }
138             if (count.incrementAndGet() == 1) {
139                 ctx.channel().setOption(ChannelOption.AUTO_READ, false);
140             }
141             if (callRead) {
142                 // Test calling read in the EventLoop thread to ensure a read is eventually done.
143                 ctx.read();
144             }
145         }
146 
147         @Override
148         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
149             latch.countDown();
150             latch2.countDown();
151         }
152 
153         void assertSingleRead() throws InterruptedException {
154             assertTrue(latch.await(5, TimeUnit.SECONDS));
155             assertTrue(count.get() > 0);
156         }
157 
158         void assertSingleReadSecondTry() throws InterruptedException {
159             assertTrue(latch2.await(5, TimeUnit.SECONDS));
160             assertEquals(callRead ? 3 : 2, count.get());
161         }
162     }
163 
164     /**
165      * Designed to keep reading as long as autoread is enabled.
166      */
167     private static final class TestRecvBufferAllocator implements RecvBufferAllocator {
168         @Override
169         public Handle newHandle() {
170             return new Handle() {
171                 private int attemptedBytesRead;
172                 private int lastBytesRead;
173 
174                 @Override
175                 public Buffer allocate(BufferAllocator alloc) {
176                     return alloc.allocate(guess());
177                 }
178 
179                 @Override
180                 public int guess() {
181                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
182                 }
183 
184                 @Override
185                 public void reset() {
186                 }
187 
188                 @Override
189                 public void incMessagesRead(int numMessages) {
190                     // No need to track the number of messages read because it is not used.
191                 }
192 
193                 @Override
194                 public void lastBytesRead(int bytes) {
195                     lastBytesRead = bytes;
196                 }
197 
198                 @Override
199                 public int lastBytesRead() {
200                     return lastBytesRead;
201                 }
202 
203                 @Override
204                 public void attemptedBytesRead(int bytes) {
205                     attemptedBytesRead = bytes;
206                 }
207 
208                 @Override
209                 public int attemptedBytesRead() {
210                     return attemptedBytesRead;
211                 }
212 
213                 @Override
214                 public boolean continueReading(boolean autoRead) {
215                     return autoRead;
216                 }
217 
218                 @Override
219                 public boolean continueReading(boolean autoRead, Predicate<Handle> maybeMoreDataSupplier) {
220                     return autoRead;
221                 }
222 
223                 @Override
224                 public void readComplete() {
225                     // Nothing needs to be done or adjusted after each read cycle is completed.
226                 }
227             };
228         }
229     }
230 }