View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandler;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.ChannelOption;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.util.internal.PlatformDependent;
30  import org.junit.jupiter.api.Test;
31  import org.junit.jupiter.api.TestInfo;
32  
33  import java.io.File;
34  import java.io.FileOutputStream;
35  import java.io.IOException;
36  import java.io.RandomAccessFile;
37  import java.nio.channels.WritableByteChannel;
38  import java.util.Random;
39  import java.util.concurrent.ThreadLocalRandom;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
43  import static org.junit.jupiter.api.Assertions.assertEquals;
44  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
45  import static org.junit.jupiter.api.Assertions.assertNotEquals;
46  import static org.junit.jupiter.api.Assumptions.assumeTrue;
47  
48  public class SocketFileRegionTest extends AbstractSocketTest {
49  
50      static final byte[] data = new byte[1048576 * 10];
51  
52      static {
53          ThreadLocalRandom.current().nextBytes(data);
54      }
55  
56      @Test
57      public void testFileRegion(TestInfo testInfo) throws Throwable {
58          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
59              @Override
60              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
61                  testFileRegion(serverBootstrap, bootstrap);
62              }
63          });
64      }
65  
66      protected boolean supportsCustomFileRegion() {
67          return true;
68      }
69  
70      @Test
71      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
72          assumeTrue(supportsCustomFileRegion());
73          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
74              @Override
75              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
76                  testCustomFileRegion(serverBootstrap, bootstrap);
77              }
78          });
79      }
80  
81      @Test
82      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
83          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
84              @Override
85              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
86                  testFileRegionNotAutoRead(serverBootstrap, bootstrap);
87              }
88          });
89      }
90  
91      @Test
92      public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
93          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
94              @Override
95              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
96                  testFileRegionVoidPromise(serverBootstrap, bootstrap);
97              }
98          });
99      }
100 
101     @Test
102     public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
103         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
104             @Override
105             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
106                 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
107             }
108         });
109     }
110 
111     @Test
112     public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
113         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
114             @Override
115             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
116                 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
117             }
118         });
119     }
120 
121     public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
122         testFileRegion0(sb, cb, false, true, true);
123     }
124 
125     public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
126         testFileRegion0(sb, cb, false, true, false);
127     }
128 
129     public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
130         testFileRegion0(sb, cb, true, true, true);
131     }
132 
133     public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
134         testFileRegion0(sb, cb, false, false, true);
135     }
136 
137     public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
138         testFileRegion0(sb, cb, true, false, true);
139     }
140 
141     public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
142         File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
143         file.deleteOnExit();
144 
145         final FileOutputStream out = new FileOutputStream(file);
146         out.write(data);
147         out.close();
148 
149         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
150             @Override
151             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
152                 // Just drop the message.
153             }
154         });
155         cb.handler(new ChannelInboundHandlerAdapter());
156 
157         Channel sc = sb.bind().sync().channel();
158         Channel cc = cb.connect(sc.localAddress()).sync().channel();
159 
160         // Request file region which is bigger then the underlying file.
161         FileRegion region = new DefaultFileRegion(
162                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
163 
164         assertInstanceOf(IOException.class, cc.writeAndFlush(region).await().cause());
165         cc.close().sync();
166         sc.close().sync();
167     }
168 
169     private static void testFileRegion0(
170             ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
171             throws Throwable {
172         sb.childOption(ChannelOption.AUTO_READ, autoRead);
173         cb.option(ChannelOption.AUTO_READ, autoRead);
174 
175         final int bufferSize = 1024;
176         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
177         file.deleteOnExit();
178 
179         final FileOutputStream out = new FileOutputStream(file);
180         final Random random = ThreadLocalRandom.current();
181 
182         // Prepend random data which will not be transferred, so that we can test non-zero start offset
183         final int startOffset = random.nextInt(8192);
184         for (int i = 0; i < startOffset; i ++) {
185             out.write(random.nextInt());
186         }
187 
188         // .. and here comes the real data to transfer.
189         out.write(data, bufferSize, data.length - bufferSize);
190 
191         // .. and then some extra data which is not supposed to be transferred.
192         for (int i = random.nextInt(8192); i > 0; i --) {
193             out.write(random.nextInt());
194         }
195 
196         out.close();
197 
198         ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
199             @Override
200             public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
201             }
202 
203             @Override
204             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
205                 if (!autoRead) {
206                     ctx.read();
207                 }
208             }
209 
210             @Override
211             public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
212                 ctx.close();
213             }
214         };
215         TestHandler sh = new TestHandler(autoRead);
216 
217         sb.childHandler(sh);
218         cb.handler(ch);
219 
220         Channel sc = sb.bind().sync().channel();
221 
222         Channel cc = cb.connect(sc.localAddress()).sync().channel();
223         FileRegion region = new DefaultFileRegion(
224                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
225         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
226 
227         if (!defaultFileRegion) {
228             region = new FileRegionWrapper(region);
229             emptyRegion = new FileRegionWrapper(emptyRegion);
230         }
231         // Do write ByteBuf and then FileRegion to ensure that mixed writes work
232         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
233         //
234         // See https://github.com/netty/netty/issues/2769
235         //     https://github.com/netty/netty/issues/2964
236         if (voidPromise) {
237             assertEquals(cc.voidPromise(), cc.write(
238                     randomBufferType(cc.alloc(), data, 0, bufferSize), cc.voidPromise()));
239             assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
240             assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
241         } else {
242             assertNotEquals(cc.voidPromise(), cc.write(
243                     randomBufferType(cc.alloc(), data, 0, bufferSize)));
244             assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
245             assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
246         }
247 
248         while (sh.counter < data.length) {
249             if (sh.exception.get() != null) {
250                 break;
251             }
252 
253             Thread.sleep(50);
254         }
255 
256         sh.channel.close().sync();
257         cc.close().sync();
258         sc.close().sync();
259 
260         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
261             throw sh.exception.get();
262         }
263 
264         if (sh.exception.get() != null) {
265             throw sh.exception.get();
266         }
267 
268         // Make sure we did not receive more than we expected.
269         assertEquals(data.length, sh.counter);
270     }
271 
272     private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
273         private final boolean autoRead;
274         volatile Channel channel;
275         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
276         volatile int counter;
277 
278         TestHandler(boolean autoRead) {
279             this.autoRead = autoRead;
280         }
281 
282         @Override
283         public void channelActive(ChannelHandlerContext ctx)
284                 throws Exception {
285             channel = ctx.channel();
286             if (!autoRead) {
287                 ctx.read();
288             }
289         }
290 
291         @Override
292         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
293             byte[] actual = new byte[in.readableBytes()];
294             in.readBytes(actual);
295 
296             int lastIdx = counter;
297             for (int i = 0; i < actual.length; i ++) {
298                 assertEquals(data[i + lastIdx], actual[i]);
299             }
300             counter += actual.length;
301         }
302 
303         @Override
304         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
305             if (!autoRead) {
306                 ctx.read();
307             }
308         }
309 
310         @Override
311         public void exceptionCaught(ChannelHandlerContext ctx,
312                 Throwable cause) throws Exception {
313             if (exception.compareAndSet(null, cause)) {
314                 ctx.close();
315             }
316         }
317     }
318 
319     private static final class FileRegionWrapper implements FileRegion {
320         private final FileRegion region;
321 
322         FileRegionWrapper(FileRegion region) {
323             this.region = region;
324         }
325 
326         @Override
327         public int refCnt() {
328             return region.refCnt();
329         }
330 
331         @Override
332         public long position() {
333             return region.position();
334         }
335 
336         @Override
337         @Deprecated
338         public long transfered() {
339             return region.transferred();
340         }
341 
342         @Override
343         public boolean release() {
344             return region.release();
345         }
346 
347         @Override
348         public long transferred() {
349             return region.transferred();
350         }
351 
352         @Override
353         public long count() {
354             return region.count();
355         }
356 
357         @Override
358         public boolean release(int decrement) {
359             return region.release(decrement);
360         }
361 
362         @Override
363         public long transferTo(WritableByteChannel target, long position) throws IOException {
364             return region.transferTo(target, position);
365         }
366 
367         @Override
368         public FileRegion retain() {
369             region.retain();
370             return this;
371         }
372 
373         @Override
374         public FileRegion retain(int increment) {
375             region.retain(increment);
376             return this;
377         }
378 
379         @Override
380         public FileRegion touch() {
381             region.touch();
382             return this;
383         }
384 
385         @Override
386         public FileRegion touch(Object hint) {
387             region.touch(hint);
388             return this;
389         }
390     }
391 }