View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandler;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.DefaultFileRegion;
28  import io.netty.channel.FileRegion;
29  import io.netty.channel.SimpleChannelInboundHandler;
30  import io.netty.util.internal.PlatformDependent;
31  import org.hamcrest.CoreMatchers;
32  import org.junit.jupiter.api.Test;
33  import org.junit.jupiter.api.TestInfo;
34  
35  import java.io.File;
36  import java.io.FileOutputStream;
37  import java.io.IOException;
38  import java.io.RandomAccessFile;
39  import java.nio.channels.WritableByteChannel;
40  import java.util.Random;
41  import java.util.concurrent.ThreadLocalRandom;
42  import java.util.concurrent.atomic.AtomicReference;
43  
44  import static org.hamcrest.CoreMatchers.is;
45  import static org.hamcrest.MatcherAssert.assertThat;
46  import static org.junit.jupiter.api.Assertions.assertEquals;
47  import static org.junit.jupiter.api.Assertions.assertNotEquals;
48  import static org.junit.jupiter.api.Assumptions.assumeTrue;
49  
50  public class SocketFileRegionTest extends AbstractSocketTest {
51  
52      static final byte[] data = new byte[1048576 * 10];
53  
54      static {
55          ThreadLocalRandom.current().nextBytes(data);
56      }
57  
58      @Test
59      public void testFileRegion(TestInfo testInfo) throws Throwable {
60          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
61              @Override
62              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
63                  testFileRegion(serverBootstrap, bootstrap);
64              }
65          });
66      }
67  
68      protected boolean supportsCustomFileRegion() {
69          return true;
70      }
71  
72      @Test
73      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
74          assumeTrue(supportsCustomFileRegion());
75          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
76              @Override
77              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
78                  testCustomFileRegion(serverBootstrap, bootstrap);
79              }
80          });
81      }
82  
83      @Test
84      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
85          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
86              @Override
87              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
88                  testFileRegionNotAutoRead(serverBootstrap, bootstrap);
89              }
90          });
91      }
92  
93      @Test
94      public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
95          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
96              @Override
97              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
98                  testFileRegionVoidPromise(serverBootstrap, bootstrap);
99              }
100         });
101     }
102 
103     @Test
104     public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
105         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
106             @Override
107             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
108                 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
109             }
110         });
111     }
112 
113     @Test
114     public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
115         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
116             @Override
117             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
118                 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
119             }
120         });
121     }
122 
123     public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
124         testFileRegion0(sb, cb, false, true, true);
125     }
126 
127     public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
128         testFileRegion0(sb, cb, false, true, false);
129     }
130 
131     public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
132         testFileRegion0(sb, cb, true, true, true);
133     }
134 
135     public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
136         testFileRegion0(sb, cb, false, false, true);
137     }
138 
139     public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
140         testFileRegion0(sb, cb, true, false, true);
141     }
142 
143     public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
144         File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
145         file.deleteOnExit();
146 
147         final FileOutputStream out = new FileOutputStream(file);
148         out.write(data);
149         out.close();
150 
151         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
152             @Override
153             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
154                 // Just drop the message.
155             }
156         });
157         cb.handler(new ChannelInboundHandlerAdapter());
158 
159         Channel sc = sb.bind().sync().channel();
160         Channel cc = cb.connect(sc.localAddress()).sync().channel();
161 
162         // Request file region which is bigger then the underlying file.
163         FileRegion region = new DefaultFileRegion(
164                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
165 
166         assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
167         cc.close().sync();
168         sc.close().sync();
169     }
170 
171     private static void testFileRegion0(
172             ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
173             throws Throwable {
174         sb.childOption(ChannelOption.AUTO_READ, autoRead);
175         cb.option(ChannelOption.AUTO_READ, autoRead);
176 
177         final int bufferSize = 1024;
178         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
179         file.deleteOnExit();
180 
181         final FileOutputStream out = new FileOutputStream(file);
182         final Random random = ThreadLocalRandom.current();
183 
184         // Prepend random data which will not be transferred, so that we can test non-zero start offset
185         final int startOffset = random.nextInt(8192);
186         for (int i = 0; i < startOffset; i ++) {
187             out.write(random.nextInt());
188         }
189 
190         // .. and here comes the real data to transfer.
191         out.write(data, bufferSize, data.length - bufferSize);
192 
193         // .. and then some extra data which is not supposed to be transferred.
194         for (int i = random.nextInt(8192); i > 0; i --) {
195             out.write(random.nextInt());
196         }
197 
198         out.close();
199 
200         ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
201             @Override
202             public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
203             }
204 
205             @Override
206             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
207                 if (!autoRead) {
208                     ctx.read();
209                 }
210             }
211 
212             @Override
213             public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
214                 ctx.close();
215             }
216         };
217         TestHandler sh = new TestHandler(autoRead);
218 
219         sb.childHandler(sh);
220         cb.handler(ch);
221 
222         Channel sc = sb.bind().sync().channel();
223 
224         Channel cc = cb.connect(sc.localAddress()).sync().channel();
225         FileRegion region = new DefaultFileRegion(
226                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
227         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
228 
229         if (!defaultFileRegion) {
230             region = new FileRegionWrapper(region);
231             emptyRegion = new FileRegionWrapper(emptyRegion);
232         }
233         // Do write ByteBuf and then FileRegion to ensure that mixed writes work
234         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
235         //
236         // See https://github.com/netty/netty/issues/2769
237         //     https://github.com/netty/netty/issues/2964
238         if (voidPromise) {
239             assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
240             assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
241             assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
242         } else {
243             assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
244             assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
245             assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
246         }
247 
248         while (sh.counter < data.length) {
249             if (sh.exception.get() != null) {
250                 break;
251             }
252 
253             Thread.sleep(50);
254         }
255 
256         sh.channel.close().sync();
257         cc.close().sync();
258         sc.close().sync();
259 
260         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
261             throw sh.exception.get();
262         }
263 
264         if (sh.exception.get() != null) {
265             throw sh.exception.get();
266         }
267 
268         // Make sure we did not receive more than we expected.
269         assertThat(sh.counter, is(data.length));
270     }
271 
272     private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
273         private final boolean autoRead;
274         volatile Channel channel;
275         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
276         volatile int counter;
277 
278         TestHandler(boolean autoRead) {
279             this.autoRead = autoRead;
280         }
281 
282         @Override
283         public void channelActive(ChannelHandlerContext ctx)
284                 throws Exception {
285             channel = ctx.channel();
286             if (!autoRead) {
287                 ctx.read();
288             }
289         }
290 
291         @Override
292         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
293             byte[] actual = new byte[in.readableBytes()];
294             in.readBytes(actual);
295 
296             int lastIdx = counter;
297             for (int i = 0; i < actual.length; i ++) {
298                 assertEquals(data[i + lastIdx], actual[i]);
299             }
300             counter += actual.length;
301         }
302 
303         @Override
304         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
305             if (!autoRead) {
306                 ctx.read();
307             }
308         }
309 
310         @Override
311         public void exceptionCaught(ChannelHandlerContext ctx,
312                 Throwable cause) throws Exception {
313             if (exception.compareAndSet(null, cause)) {
314                 ctx.close();
315             }
316         }
317     }
318 
319     private static final class FileRegionWrapper implements FileRegion {
320         private final FileRegion region;
321 
322         FileRegionWrapper(FileRegion region) {
323             this.region = region;
324         }
325 
326         @Override
327         public int refCnt() {
328             return region.refCnt();
329         }
330 
331         @Override
332         public long position() {
333             return region.position();
334         }
335 
336         @Override
337         @Deprecated
338         public long transfered() {
339             return region.transferred();
340         }
341 
342         @Override
343         public boolean release() {
344             return region.release();
345         }
346 
347         @Override
348         public long transferred() {
349             return region.transferred();
350         }
351 
352         @Override
353         public long count() {
354             return region.count();
355         }
356 
357         @Override
358         public boolean release(int decrement) {
359             return region.release(decrement);
360         }
361 
362         @Override
363         public long transferTo(WritableByteChannel target, long position) throws IOException {
364             return region.transferTo(target, position);
365         }
366 
367         @Override
368         public FileRegion retain() {
369             region.retain();
370             return this;
371         }
372 
373         @Override
374         public FileRegion retain(int increment) {
375             region.retain(increment);
376             return this;
377         }
378 
379         @Override
380         public FileRegion touch() {
381             region.touch();
382             return this;
383         }
384 
385         @Override
386         public FileRegion touch(Object hint) {
387             region.touch(hint);
388             return this;
389         }
390     }
391 }