View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandler;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.DefaultFileRegion;
28  import io.netty.channel.FileRegion;
29  import io.netty.channel.SimpleChannelInboundHandler;
30  import io.netty.util.internal.PlatformDependent;
31  import org.hamcrest.CoreMatchers;
32  import org.junit.jupiter.api.Test;
33  import org.junit.jupiter.api.TestInfo;
34  
35  import java.io.File;
36  import java.io.FileOutputStream;
37  import java.io.IOException;
38  import java.io.RandomAccessFile;
39  import java.nio.channels.WritableByteChannel;
40  import java.util.Random;
41  import java.util.concurrent.atomic.AtomicReference;
42  
43  import static org.hamcrest.CoreMatchers.is;
44  import static org.hamcrest.MatcherAssert.assertThat;
45  import static org.junit.jupiter.api.Assertions.assertEquals;
46  import static org.junit.jupiter.api.Assertions.assertNotEquals;
47  
48  public class SocketFileRegionTest extends AbstractSocketTest {
49  
50      static final byte[] data = new byte[1048576 * 10];
51  
52      static {
53          PlatformDependent.threadLocalRandom().nextBytes(data);
54      }
55  
56      @Test
57      public void testFileRegion(TestInfo testInfo) throws Throwable {
58          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
59              @Override
60              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
61                  testFileRegion(serverBootstrap, bootstrap);
62              }
63          });
64      }
65  
66      @Test
67      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
68          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
69              @Override
70              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
71                  testCustomFileRegion(serverBootstrap, bootstrap);
72              }
73          });
74      }
75  
76      @Test
77      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
78          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
79              @Override
80              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
81                  testFileRegionNotAutoRead(serverBootstrap, bootstrap);
82              }
83          });
84      }
85  
86      @Test
87      public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
88          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
89              @Override
90              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
91                  testFileRegionVoidPromise(serverBootstrap, bootstrap);
92              }
93          });
94      }
95  
96      @Test
97      public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
98          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
99              @Override
100             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
101                 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
102             }
103         });
104     }
105 
106     @Test
107     public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
108         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
109             @Override
110             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
111                 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
112             }
113         });
114     }
115 
116     public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
117         testFileRegion0(sb, cb, false, true, true);
118     }
119 
120     public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
121         testFileRegion0(sb, cb, false, true, false);
122     }
123 
124     public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
125         testFileRegion0(sb, cb, true, true, true);
126     }
127 
128     public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
129         testFileRegion0(sb, cb, false, false, true);
130     }
131 
132     public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
133         testFileRegion0(sb, cb, true, false, true);
134     }
135 
136     public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
137         File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
138         file.deleteOnExit();
139 
140         final FileOutputStream out = new FileOutputStream(file);
141         out.write(data);
142         out.close();
143 
144         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
145             @Override
146             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
147                 // Just drop the message.
148             }
149         });
150         cb.handler(new ChannelInboundHandlerAdapter());
151 
152         Channel sc = sb.bind().sync().channel();
153         Channel cc = cb.connect(sc.localAddress()).sync().channel();
154 
155         // Request file region which is bigger then the underlying file.
156         FileRegion region = new DefaultFileRegion(
157                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
158 
159         assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
160         cc.close().sync();
161         sc.close().sync();
162     }
163 
164     private static void testFileRegion0(
165             ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
166             throws Throwable {
167         sb.childOption(ChannelOption.AUTO_READ, autoRead);
168         cb.option(ChannelOption.AUTO_READ, autoRead);
169 
170         final int bufferSize = 1024;
171         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
172         file.deleteOnExit();
173 
174         final FileOutputStream out = new FileOutputStream(file);
175         final Random random = PlatformDependent.threadLocalRandom();
176 
177         // Prepend random data which will not be transferred, so that we can test non-zero start offset
178         final int startOffset = random.nextInt(8192);
179         for (int i = 0; i < startOffset; i ++) {
180             out.write(random.nextInt());
181         }
182 
183         // .. and here comes the real data to transfer.
184         out.write(data, bufferSize, data.length - bufferSize);
185 
186         // .. and then some extra data which is not supposed to be transferred.
187         for (int i = random.nextInt(8192); i > 0; i --) {
188             out.write(random.nextInt());
189         }
190 
191         out.close();
192 
193         ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
194             @Override
195             public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
196             }
197 
198             @Override
199             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
200                 if (!autoRead) {
201                     ctx.read();
202                 }
203             }
204 
205             @Override
206             public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
207                 ctx.close();
208             }
209         };
210         TestHandler sh = new TestHandler(autoRead);
211 
212         sb.childHandler(sh);
213         cb.handler(ch);
214 
215         Channel sc = sb.bind().sync().channel();
216 
217         Channel cc = cb.connect(sc.localAddress()).sync().channel();
218         FileRegion region = new DefaultFileRegion(
219                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
220         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
221 
222         if (!defaultFileRegion) {
223             region = new FileRegionWrapper(region);
224             emptyRegion = new FileRegionWrapper(emptyRegion);
225         }
226         // Do write ByteBuf and then FileRegion to ensure that mixed writes work
227         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
228         //
229         // See https://github.com/netty/netty/issues/2769
230         //     https://github.com/netty/netty/issues/2964
231         if (voidPromise) {
232             assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
233             assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
234             assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
235         } else {
236             assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
237             assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
238             assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
239         }
240 
241         while (sh.counter < data.length) {
242             if (sh.exception.get() != null) {
243                 break;
244             }
245 
246             try {
247                 Thread.sleep(50);
248             } catch (InterruptedException e) {
249                 // Ignore.
250             }
251         }
252 
253         sh.channel.close().sync();
254         cc.close().sync();
255         sc.close().sync();
256 
257         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
258             throw sh.exception.get();
259         }
260 
261         if (sh.exception.get() != null) {
262             throw sh.exception.get();
263         }
264 
265         // Make sure we did not receive more than we expected.
266         assertThat(sh.counter, is(data.length));
267     }
268 
269     private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
270         private final boolean autoRead;
271         volatile Channel channel;
272         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
273         volatile int counter;
274 
275         TestHandler(boolean autoRead) {
276             this.autoRead = autoRead;
277         }
278 
279         @Override
280         public void channelActive(ChannelHandlerContext ctx)
281                 throws Exception {
282             channel = ctx.channel();
283             if (!autoRead) {
284                 ctx.read();
285             }
286         }
287 
288         @Override
289         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
290             byte[] actual = new byte[in.readableBytes()];
291             in.readBytes(actual);
292 
293             int lastIdx = counter;
294             for (int i = 0; i < actual.length; i ++) {
295                 assertEquals(data[i + lastIdx], actual[i]);
296             }
297             counter += actual.length;
298         }
299 
300         @Override
301         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
302             if (!autoRead) {
303                 ctx.read();
304             }
305         }
306 
307         @Override
308         public void exceptionCaught(ChannelHandlerContext ctx,
309                 Throwable cause) throws Exception {
310             if (exception.compareAndSet(null, cause)) {
311                 ctx.close();
312             }
313         }
314     }
315 
316     private static final class FileRegionWrapper implements FileRegion {
317         private final FileRegion region;
318 
319         FileRegionWrapper(FileRegion region) {
320             this.region = region;
321         }
322 
323         @Override
324         public int refCnt() {
325             return region.refCnt();
326         }
327 
328         @Override
329         public long position() {
330             return region.position();
331         }
332 
333         @Override
334         @Deprecated
335         public long transfered() {
336             return region.transferred();
337         }
338 
339         @Override
340         public boolean release() {
341             return region.release();
342         }
343 
344         @Override
345         public long transferred() {
346             return region.transferred();
347         }
348 
349         @Override
350         public long count() {
351             return region.count();
352         }
353 
354         @Override
355         public boolean release(int decrement) {
356             return region.release(decrement);
357         }
358 
359         @Override
360         public long transferTo(WritableByteChannel target, long position) throws IOException {
361             return region.transferTo(target, position);
362         }
363 
364         @Override
365         public FileRegion retain() {
366             region.retain();
367             return this;
368         }
369 
370         @Override
371         public FileRegion retain(int increment) {
372             region.retain(increment);
373             return this;
374         }
375 
376         @Override
377         public FileRegion touch() {
378             region.touch();
379             return this;
380         }
381 
382         @Override
383         public FileRegion touch(Object hint) {
384             region.touch(hint);
385             return this;
386         }
387     }
388 }