View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.ByteBufAllocator;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelConfig;
24  import io.netty.channel.ChannelHandlerContext;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelInitializer;
27  import io.netty.channel.ChannelOption;
28  import io.netty.channel.RecvByteBufAllocator;
29  import io.netty.util.ReferenceCountUtil;
30  import io.netty.util.UncheckedBooleanSupplier;
31  import org.junit.jupiter.api.Test;
32  import org.junit.jupiter.api.TestInfo;
33  
34  import java.util.concurrent.CountDownLatch;
35  import java.util.concurrent.TimeUnit;
36  import java.util.concurrent.atomic.AtomicInteger;
37  
38  import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
39  import static org.junit.jupiter.api.Assertions.assertEquals;
40  import static org.junit.jupiter.api.Assertions.assertTrue;
41  
42  public class SocketAutoReadTest extends AbstractSocketTest {
43      @Test
44      public void testAutoReadOffDuringReadOnlyReadsOneTime(TestInfo testInfo) throws Throwable {
45          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
46              @Override
47              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
48                  testAutoReadOffDuringReadOnlyReadsOneTime(serverBootstrap, bootstrap);
49              }
50          });
51      }
52  
53      public void testAutoReadOffDuringReadOnlyReadsOneTime(ServerBootstrap sb, Bootstrap cb) throws Throwable {
54          testAutoReadOffDuringReadOnlyReadsOneTime(true, sb, cb);
55          testAutoReadOffDuringReadOnlyReadsOneTime(false, sb, cb);
56      }
57  
58      private static void testAutoReadOffDuringReadOnlyReadsOneTime(boolean readOutsideEventLoopThread,
59                                                             ServerBootstrap sb, Bootstrap cb) throws Throwable {
60          Channel serverChannel = null;
61          Channel clientChannel = null;
62          try {
63              AutoReadInitializer serverInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
64              AutoReadInitializer clientInitializer = new AutoReadInitializer(!readOutsideEventLoopThread);
65              sb.option(ChannelOption.SO_BACKLOG, 1024)
66                      .option(ChannelOption.AUTO_READ, true)
67                      .childOption(ChannelOption.AUTO_READ, true)
68                      // We want to ensure that we attempt multiple individual read operations per read loop so we can
69                      // test the auto read feature being turned off when data is first read.
70                      .childOption(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
71                      .childHandler(serverInitializer);
72  
73              serverChannel = sb.bind().syncUninterruptibly().channel();
74  
75              cb.option(ChannelOption.AUTO_READ, true)
76                      // We want to ensure that we attempt multiple individual read operations per read loop so we can
77                      // test the auto read feature being turned off when data is first read.
78                      .option(ChannelOption.RCVBUF_ALLOCATOR, new TestRecvByteBufAllocator())
79                      .handler(clientInitializer);
80  
81              clientChannel = cb.connect(serverChannel.localAddress()).syncUninterruptibly().channel();
82  
83              // 3 bytes means 3 independent reads for TestRecvByteBufAllocator
84              clientChannel.writeAndFlush(randomBufferType(clientChannel.alloc(), new byte[3], 0, 3));
85              serverInitializer.autoReadHandler.assertSingleRead();
86  
87              // 3 bytes means 3 independent reads for TestRecvByteBufAllocator
88              serverInitializer.channel.writeAndFlush(
89                      randomBufferType(serverInitializer.channel.alloc(), new byte[3], 0, 3));
90              clientInitializer.autoReadHandler.assertSingleRead();
91  
92              if (readOutsideEventLoopThread) {
93                  serverInitializer.channel.read();
94              }
95              serverInitializer.autoReadHandler.assertSingleReadSecondTry();
96  
97              if (readOutsideEventLoopThread) {
98                  clientChannel.read();
99              }
100             clientInitializer.autoReadHandler.assertSingleReadSecondTry();
101         } finally {
102             if (clientChannel != null) {
103                 clientChannel.close().sync();
104             }
105             if (serverChannel != null) {
106                 serverChannel.close().sync();
107             }
108         }
109     }
110 
111     private static class AutoReadInitializer extends ChannelInitializer<Channel> {
112         final AutoReadHandler autoReadHandler;
113         volatile Channel channel;
114 
115         AutoReadInitializer(boolean readInEventLoop) {
116             autoReadHandler = new AutoReadHandler(readInEventLoop);
117         }
118 
119         @Override
120         protected void initChannel(Channel ch) throws Exception {
121             channel = ch;
122             ch.pipeline().addLast(autoReadHandler);
123         }
124     }
125 
126     private static final class AutoReadHandler extends ChannelInboundHandlerAdapter {
127         private final AtomicInteger count = new AtomicInteger();
128         private final CountDownLatch latch = new CountDownLatch(1);
129         private final CountDownLatch latch2;
130         private final boolean callRead;
131 
132         AutoReadHandler(boolean callRead) {
133             this.callRead = callRead;
134             latch2 = new CountDownLatch(callRead ? 3 : 2);
135         }
136 
137         @Override
138         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
139             ReferenceCountUtil.release(msg);
140             if (count.incrementAndGet() == 1) {
141                 ctx.channel().config().setAutoRead(false);
142             }
143             if (callRead) {
144                 // Test calling read in the EventLoop thread to ensure a read is eventually done.
145                 ctx.read();
146             }
147         }
148 
149         @Override
150         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
151             latch.countDown();
152             latch2.countDown();
153         }
154 
155         void assertSingleRead() throws InterruptedException {
156             assertTrue(latch.await(5, TimeUnit.SECONDS));
157             assertTrue(count.get() > 0);
158         }
159 
160         void assertSingleReadSecondTry() throws InterruptedException {
161             assertTrue(latch2.await(5, TimeUnit.SECONDS));
162             assertEquals(callRead ? 3 : 2, count.get());
163         }
164     }
165 
166     /**
167      * Designed to keep reading as long as autoread is enabled.
168      */
169     private static final class TestRecvByteBufAllocator implements RecvByteBufAllocator {
170         @Override
171         public ExtendedHandle newHandle() {
172             return new ExtendedHandle() {
173                 private ChannelConfig config;
174                 private int attemptedBytesRead;
175                 private int lastBytesRead;
176                 @Override
177                 public ByteBuf allocate(ByteBufAllocator alloc) {
178                     return alloc.ioBuffer(guess(), guess());
179                 }
180 
181                 @Override
182                 public int guess() {
183                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
184                 }
185 
186                 @Override
187                 public void reset(ChannelConfig config) {
188                     this.config = config;
189                 }
190 
191                 @Override
192                 public void incMessagesRead(int numMessages) {
193                     // No need to track the number of messages read because it is not used.
194                 }
195 
196                 @Override
197                 public void lastBytesRead(int bytes) {
198                     lastBytesRead = bytes;
199                 }
200 
201                 @Override
202                 public int lastBytesRead() {
203                     return lastBytesRead;
204                 }
205 
206                 @Override
207                 public void attemptedBytesRead(int bytes) {
208                     attemptedBytesRead = bytes;
209                 }
210 
211                 @Override
212                 public int attemptedBytesRead() {
213                     return attemptedBytesRead;
214                 }
215 
216                 @Override
217                 public boolean continueReading() {
218                     return config.isAutoRead();
219                 }
220 
221                 @Override
222                 public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
223                     return config.isAutoRead();
224                 }
225 
226                 @Override
227                 public void readComplete() {
228                     // Nothing needs to be done or adjusted after each read cycle is completed.
229                 }
230             };
231         }
232     }
233 }