View Javadoc
1   /*
2    * Copyright 2016 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.buffer.api.BufferAllocator;
22  import io.netty5.util.Resource;
23  import io.netty5.channel.Channel;
24  import io.netty5.channel.ChannelHandler;
25  import io.netty5.channel.ChannelHandlerContext;
26  import io.netty5.channel.ChannelInitializer;
27  import io.netty5.channel.ChannelOption;
28  import io.netty5.channel.RecvBufferAllocator;
29  import org.junit.jupiter.api.Test;
30  import org.junit.jupiter.api.TestInfo;
31  import org.junit.jupiter.api.Timeout;
32  
33  import java.util.concurrent.CountDownLatch;
34  import java.util.concurrent.TimeUnit;
35  import java.util.concurrent.atomic.AtomicInteger;
36  import java.util.function.Predicate;
37  
38  import static io.netty5.buffer.api.DefaultBufferAllocators.preferredAllocator;
39  import static org.junit.jupiter.api.Assertions.assertEquals;
40  import static org.junit.jupiter.api.Assertions.assertFalse;
41  import static org.junit.jupiter.api.Assertions.assertTrue;
42  
43  public class SocketReadPendingTest extends AbstractSocketTest {
44      @Test
45      @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
46      public void testReadPendingIsResetAfterEachRead(TestInfo testInfo) throws Throwable {
47          run(testInfo, this::testReadPendingIsResetAfterEachRead);
48      }
49  
50      public void testReadPendingIsResetAfterEachRead(ServerBootstrap sb, Bootstrap cb)
51              throws Throwable {
52          Channel serverChannel = null;
53          Channel clientChannel = null;
54          try {
55              ReadPendingInitializer serverInitializer = new ReadPendingInitializer();
56              ReadPendingInitializer clientInitializer = new ReadPendingInitializer();
57              sb.option(ChannelOption.SO_BACKLOG, 1024)
58                .option(ChannelOption.AUTO_READ, true)
59                .childOption(ChannelOption.AUTO_READ, false)
60                // We intend to do 2 reads per read loop wakeup
61                .childOption(ChannelOption.RCVBUFFER_ALLOCATOR, new TestNumReadsRecvBufferAllocator(2))
62                .childHandler(serverInitializer);
63  
64              serverChannel = sb.bind().asStage().get();
65  
66              cb.option(ChannelOption.AUTO_READ, false)
67                // We intend to do 2 reads per read loop wakeup
68                .option(ChannelOption.RCVBUFFER_ALLOCATOR, new TestNumReadsRecvBufferAllocator(2))
69                .handler(clientInitializer);
70              clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
71  
72              // 4 bytes means 2 read loops for TestNumReadsRecvBufferAllocator
73              clientChannel.writeAndFlush(preferredAllocator().copyOf(new byte[4]));
74  
75              // 4 bytes means 2 read loops for TestNumReadsRecvBufferAllocator
76              assertTrue(serverInitializer.channelInitLatch.await(5, TimeUnit.SECONDS));
77              serverInitializer.channel.writeAndFlush(preferredAllocator().copyOf(new byte[4]));
78  
79              serverInitializer.channel.read();
80              serverInitializer.readPendingHandler.assertAllRead();
81  
82              clientChannel.read();
83              clientInitializer.readPendingHandler.assertAllRead();
84          } finally {
85              if (serverChannel != null) {
86                  serverChannel.close().asStage().sync();
87              }
88              if (clientChannel != null) {
89                  clientChannel.close().asStage().sync();
90              }
91          }
92      }
93  
94      private static class ReadPendingInitializer extends ChannelInitializer<Channel> {
95          final ReadPendingReadHandler readPendingHandler = new ReadPendingReadHandler();
96          final CountDownLatch channelInitLatch = new CountDownLatch(1);
97          volatile Channel channel;
98  
99          @Override
100         protected void initChannel(Channel ch) throws Exception {
101             channel = ch;
102             ch.pipeline().addLast(readPendingHandler);
103             channelInitLatch.countDown();
104         }
105     }
106 
107     private static final class ReadPendingReadHandler implements ChannelHandler {
108         private final AtomicInteger count = new AtomicInteger();
109         private final CountDownLatch latch = new CountDownLatch(1);
110         private final CountDownLatch latch2 = new CountDownLatch(2);
111 
112         @Override
113         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
114             Resource.dispose(msg);
115             if (count.incrementAndGet() == 1) {
116                 // Call read the first time, to ensure it is not reset the second time.
117                 ctx.read();
118             }
119         }
120 
121         @Override
122         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
123             latch.countDown();
124             latch2.countDown();
125         }
126 
127         void assertAllRead() throws InterruptedException {
128             assertTrue(latch.await(5, TimeUnit.SECONDS));
129             // We should only do 1 read loop, because we only called read() on the first channelRead.
130             assertFalse(latch2.await(1, TimeUnit.SECONDS));
131             assertEquals(2, count.get());
132         }
133     }
134 
135     /**
136      * Designed to read a single byte at a time to control the number of reads done at a fine granularity.
137      */
138     private static final class TestNumReadsRecvBufferAllocator implements RecvBufferAllocator {
139         private final int numReads;
140         TestNumReadsRecvBufferAllocator(int numReads) {
141             this.numReads = numReads;
142         }
143 
144         @Override
145         public Handle newHandle() {
146             return new Handle() {
147                 private int attemptedBytesRead;
148                 private int lastBytesRead;
149                 private int numMessagesRead;
150 
151                 @Override
152                 public Buffer allocate(BufferAllocator alloc) {
153                     return alloc.allocate(guess());
154                 }
155 
156                 @Override
157                 public int guess() {
158                     return 1; // only ever allocate buffers of size 1 to ensure the number of reads is controlled.
159                 }
160 
161                 @Override
162                 public void reset() {
163                     numMessagesRead = 0;
164                 }
165 
166                 @Override
167                 public void incMessagesRead(int numMessages) {
168                     numMessagesRead += numMessages;
169                 }
170 
171                 @Override
172                 public void lastBytesRead(int bytes) {
173                     lastBytesRead = bytes;
174                 }
175 
176                 @Override
177                 public int lastBytesRead() {
178                     return lastBytesRead;
179                 }
180 
181                 @Override
182                 public void attemptedBytesRead(int bytes) {
183                     attemptedBytesRead = bytes;
184                 }
185 
186                 @Override
187                 public int attemptedBytesRead() {
188                     return attemptedBytesRead;
189                 }
190 
191                 @Override
192                 public boolean continueReading(boolean autoRead) {
193                     return numMessagesRead < numReads;
194                 }
195 
196                 @Override
197                 public boolean continueReading(boolean autoRead, Predicate<Handle> maybeMoreDataSupplier) {
198                     return continueReading(autoRead);
199                 }
200 
201                 @Override
202                 public void readComplete() {
203                     // Nothing needs to be done or adjusted after each read cycle is completed.
204                 }
205             };
206         }
207     }
208 }