View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.buffer.api.MemoryManager;
22  import io.netty5.channel.Channel;
23  import io.netty5.channel.ChannelHandler;
24  import io.netty5.channel.ChannelHandlerContext;
25  import io.netty5.channel.ChannelOption;
26  import io.netty5.channel.DefaultFileRegion;
27  import io.netty5.channel.FileRegion;
28  import io.netty5.channel.SimpleChannelInboundHandler;
29  import io.netty5.util.internal.PlatformDependent;
30  import org.junit.jupiter.api.Test;
31  import org.junit.jupiter.api.TestInfo;
32  
33  import java.io.File;
34  import java.io.FileOutputStream;
35  import java.io.IOException;
36  import java.io.RandomAccessFile;
37  import java.nio.channels.WritableByteChannel;
38  import java.util.Random;
39  import java.util.concurrent.ThreadLocalRandom;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import static org.assertj.core.api.Assertions.assertThat;
43  import static org.junit.jupiter.api.Assertions.assertEquals;
44  
45  public class SocketFileRegionTest extends AbstractSocketTest {
46  
47      static final byte[] data = new byte[1048576 * 10];
48  
49      static {
50          ThreadLocalRandom.current().nextBytes(data);
51      }
52  
53      @Test
54      public void testFileRegion(TestInfo testInfo) throws Throwable {
55          run(testInfo, this::testFileRegion);
56      }
57  
58      @Test
59      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
60          run(testInfo, this::testCustomFileRegion);
61      }
62  
63      @Test
64      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
65          run(testInfo, this::testFileRegionNotAutoRead);
66      }
67  
68      @Test
69      public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
70          run(testInfo, this::testFileRegionCountLargerThenFile);
71      }
72  
73      public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
74          testFileRegion0(sb, cb, true, true);
75      }
76  
77      public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
78          testFileRegion0(sb, cb, true, false);
79      }
80  
81      public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
82          testFileRegion0(sb, cb, false, true);
83      }
84  
85      public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
86          File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
87          file.deleteOnExit();
88  
89          final FileOutputStream out = new FileOutputStream(file);
90          out.write(data);
91          out.close();
92  
93          sb.childHandler(new SimpleChannelInboundHandler<Buffer>() {
94              @Override
95              protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
96                  // Just drop the message.
97              }
98          });
99          cb.handler(new ChannelHandler() { });
100 
101         Channel sc = sb.bind().asStage().get();
102         Channel cc = cb.connect(sc.localAddress()).asStage().get();
103 
104         // Request file region which is bigger then the underlying file.
105         FileRegion region = new DefaultFileRegion(
106                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
107 
108         Throwable result = cc.writeAndFlush(region).asStage().getCause();
109         assertThat(result).isInstanceOf(IOException.class);
110         cc.close().asStage().sync();
111         sc.close().asStage().sync();
112     }
113 
114     private static void testFileRegion0(
115             ServerBootstrap sb, Bootstrap cb, final boolean autoRead, boolean defaultFileRegion)
116             throws Throwable {
117         sb.childOption(ChannelOption.AUTO_READ, autoRead);
118         cb.option(ChannelOption.AUTO_READ, autoRead);
119 
120         final int bufferSize = 1024;
121         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
122         file.deleteOnExit();
123 
124         final FileOutputStream out = new FileOutputStream(file);
125         final Random random = ThreadLocalRandom.current();
126 
127         // Prepend random data which will not be transferred, so that we can test non-zero start offset
128         final int startOffset = random.nextInt(8192);
129         for (int i = 0; i < startOffset; i ++) {
130             out.write(random.nextInt());
131         }
132 
133         // .. and here comes the real data to transfer.
134         out.write(data, bufferSize, data.length - bufferSize);
135 
136         // .. and then some extra data which is not supposed to be transferred.
137         for (int i = random.nextInt(8192); i > 0; i --) {
138             out.write(random.nextInt());
139         }
140 
141         out.close();
142 
143         ChannelHandler ch = new SimpleChannelInboundHandler<>() {
144             @Override
145             public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
146             }
147 
148             @Override
149             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
150                 if (!autoRead) {
151                     ctx.read();
152                 }
153             }
154 
155             @Override
156             public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
157                 ctx.close();
158             }
159         };
160         TestHandler sh = new TestHandler(autoRead);
161 
162         sb.childHandler(sh);
163         cb.handler(ch);
164 
165         Channel sc = sb.bind().asStage().get();
166 
167         Channel cc = cb.connect(sc.localAddress()).asStage().get();
168         FileRegion region = new DefaultFileRegion(
169                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
170         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
171 
172         if (!defaultFileRegion) {
173             region = new FileRegionWrapper(region);
174             emptyRegion = new FileRegionWrapper(emptyRegion);
175         }
176         // Do write Buffer and then FileRegion to ensure that mixed writes work
177         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
178         //
179         // See https://github.com/netty/netty/issues/2769
180         //     https://github.com/netty/netty/issues/2964
181         try (Buffer buffer = MemoryManager.unsafeWrap(data)) {
182             cc.write(buffer.readSplit(bufferSize));
183         }
184         cc.write(emptyRegion);
185         cc.writeAndFlush(region);
186 
187         while (sh.counter < data.length) {
188             if (sh.exception.get() != null) {
189                 break;
190             }
191 
192             try {
193                 Thread.sleep(50);
194             } catch (InterruptedException e) {
195                 // Ignore.
196             }
197         }
198 
199         sh.channel.close().asStage().sync();
200         cc.close().asStage().sync();
201         sc.close().asStage().sync();
202 
203         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
204             throw sh.exception.get();
205         }
206 
207         if (sh.exception.get() != null) {
208             throw sh.exception.get();
209         }
210 
211         // Make sure we did not receive more than we expected.
212         assertThat(sh.counter).isEqualTo(data.length);
213     }
214 
215     private static class TestHandler extends SimpleChannelInboundHandler<Buffer> {
216         private final boolean autoRead;
217         volatile Channel channel;
218         final AtomicReference<Throwable> exception = new AtomicReference<>();
219         volatile int counter;
220 
221         TestHandler(boolean autoRead) {
222             this.autoRead = autoRead;
223         }
224 
225         @Override
226         public void channelActive(ChannelHandlerContext ctx)
227                 throws Exception {
228             channel = ctx.channel();
229             if (!autoRead) {
230                 ctx.read();
231             }
232         }
233 
234         @Override
235         public void messageReceived(ChannelHandlerContext ctx, Buffer in) throws Exception {
236             byte[] actual = new byte[in.readableBytes()];
237             in.readBytes(actual, 0, actual.length);
238 
239             int lastIdx = counter;
240             for (int i = 0; i < actual.length; i ++) {
241                 assertEquals(data[i + lastIdx], actual[i]);
242             }
243             counter += actual.length;
244         }
245 
246         @Override
247         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
248             if (!autoRead) {
249                 ctx.read();
250             }
251         }
252 
253         @Override
254         public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
255             if (exception.compareAndSet(null, cause)) {
256                 ctx.close();
257             }
258         }
259     }
260 
261     private static final class FileRegionWrapper implements FileRegion {
262         private final FileRegion region;
263 
264         FileRegionWrapper(FileRegion region) {
265             this.region = region;
266         }
267 
268         @Override
269         public int refCnt() {
270             return region.refCnt();
271         }
272 
273         @Override
274         public long position() {
275             return region.position();
276         }
277 
278         @Override
279         @Deprecated
280         public long transfered() {
281             return region.transferred();
282         }
283 
284         @Override
285         public boolean release() {
286             return region.release();
287         }
288 
289         @Override
290         public long transferred() {
291             return region.transferred();
292         }
293 
294         @Override
295         public long count() {
296             return region.count();
297         }
298 
299         @Override
300         public boolean release(int decrement) {
301             return region.release(decrement);
302         }
303 
304         @Override
305         public long transferTo(WritableByteChannel target, long position) throws IOException {
306             return region.transferTo(target, position);
307         }
308 
309         @Override
310         public FileRegion retain() {
311             region.retain();
312             return this;
313         }
314 
315         @Override
316         public FileRegion retain(int increment) {
317             region.retain(increment);
318             return this;
319         }
320 
321         @Override
322         public FileRegion touch() {
323             region.touch();
324             return this;
325         }
326 
327         @Override
328         public FileRegion touch(Object hint) {
329             region.touch(hint);
330             return this;
331         }
332     }
333 }