View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandler;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.DefaultFileRegion;
28  import io.netty.channel.FileRegion;
29  import io.netty.channel.SimpleChannelInboundHandler;
30  import io.netty.util.internal.PlatformDependent;
31  import org.hamcrest.CoreMatchers;
32  import org.junit.jupiter.api.Test;
33  import org.junit.jupiter.api.TestInfo;
34  
35  import java.io.File;
36  import java.io.FileOutputStream;
37  import java.io.IOException;
38  import java.io.RandomAccessFile;
39  import java.nio.channels.WritableByteChannel;
40  import java.util.Random;
41  import java.util.concurrent.atomic.AtomicReference;
42  
43  import static org.hamcrest.CoreMatchers.is;
44  import static org.hamcrest.MatcherAssert.assertThat;
45  import static org.junit.jupiter.api.Assertions.assertEquals;
46  import static org.junit.jupiter.api.Assertions.assertNotEquals;
47  import static org.junit.jupiter.api.Assumptions.assumeTrue;
48  
49  public class SocketFileRegionTest extends AbstractSocketTest {
50  
51      static final byte[] data = new byte[1048576 * 10];
52  
53      static {
54          PlatformDependent.threadLocalRandom().nextBytes(data);
55      }
56  
57      @Test
58      public void testFileRegion(TestInfo testInfo) throws Throwable {
59          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
60              @Override
61              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
62                  testFileRegion(serverBootstrap, bootstrap);
63              }
64          });
65      }
66  
67      protected boolean supportsCustomFileRegion() {
68          return true;
69      }
70  
71      @Test
72      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
73          assumeTrue(supportsCustomFileRegion());
74          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
75              @Override
76              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
77                  testCustomFileRegion(serverBootstrap, bootstrap);
78              }
79          });
80      }
81  
82      @Test
83      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
84          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
85              @Override
86              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
87                  testFileRegionNotAutoRead(serverBootstrap, bootstrap);
88              }
89          });
90      }
91  
92      @Test
93      public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
94          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
95              @Override
96              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
97                  testFileRegionVoidPromise(serverBootstrap, bootstrap);
98              }
99          });
100     }
101 
102     @Test
103     public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
104         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
105             @Override
106             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
107                 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
108             }
109         });
110     }
111 
112     @Test
113     public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
114         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
115             @Override
116             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
117                 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
118             }
119         });
120     }
121 
122     public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
123         testFileRegion0(sb, cb, false, true, true);
124     }
125 
126     public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
127         testFileRegion0(sb, cb, false, true, false);
128     }
129 
130     public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
131         testFileRegion0(sb, cb, true, true, true);
132     }
133 
134     public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
135         testFileRegion0(sb, cb, false, false, true);
136     }
137 
138     public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
139         testFileRegion0(sb, cb, true, false, true);
140     }
141 
142     public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
143         File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
144         file.deleteOnExit();
145 
146         final FileOutputStream out = new FileOutputStream(file);
147         out.write(data);
148         out.close();
149 
150         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
151             @Override
152             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
153                 // Just drop the message.
154             }
155         });
156         cb.handler(new ChannelInboundHandlerAdapter());
157 
158         Channel sc = sb.bind().sync().channel();
159         Channel cc = cb.connect(sc.localAddress()).sync().channel();
160 
161         // Request file region which is bigger then the underlying file.
162         FileRegion region = new DefaultFileRegion(
163                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
164 
165         assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
166         cc.close().sync();
167         sc.close().sync();
168     }
169 
170     private static void testFileRegion0(
171             ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
172             throws Throwable {
173         sb.childOption(ChannelOption.AUTO_READ, autoRead);
174         cb.option(ChannelOption.AUTO_READ, autoRead);
175 
176         final int bufferSize = 1024;
177         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
178         file.deleteOnExit();
179 
180         final FileOutputStream out = new FileOutputStream(file);
181         final Random random = PlatformDependent.threadLocalRandom();
182 
183         // Prepend random data which will not be transferred, so that we can test non-zero start offset
184         final int startOffset = random.nextInt(8192);
185         for (int i = 0; i < startOffset; i ++) {
186             out.write(random.nextInt());
187         }
188 
189         // .. and here comes the real data to transfer.
190         out.write(data, bufferSize, data.length - bufferSize);
191 
192         // .. and then some extra data which is not supposed to be transferred.
193         for (int i = random.nextInt(8192); i > 0; i --) {
194             out.write(random.nextInt());
195         }
196 
197         out.close();
198 
199         ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
200             @Override
201             public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
202             }
203 
204             @Override
205             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
206                 if (!autoRead) {
207                     ctx.read();
208                 }
209             }
210 
211             @Override
212             public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
213                 ctx.close();
214             }
215         };
216         TestHandler sh = new TestHandler(autoRead);
217 
218         sb.childHandler(sh);
219         cb.handler(ch);
220 
221         Channel sc = sb.bind().sync().channel();
222 
223         Channel cc = cb.connect(sc.localAddress()).sync().channel();
224         FileRegion region = new DefaultFileRegion(
225                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
226         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
227 
228         if (!defaultFileRegion) {
229             region = new FileRegionWrapper(region);
230             emptyRegion = new FileRegionWrapper(emptyRegion);
231         }
232         // Do write ByteBuf and then FileRegion to ensure that mixed writes work
233         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
234         //
235         // See https://github.com/netty/netty/issues/2769
236         //     https://github.com/netty/netty/issues/2964
237         if (voidPromise) {
238             assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
239             assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
240             assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
241         } else {
242             assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
243             assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
244             assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
245         }
246 
247         while (sh.counter < data.length) {
248             if (sh.exception.get() != null) {
249                 break;
250             }
251 
252             Thread.sleep(50);
253         }
254 
255         sh.channel.close().sync();
256         cc.close().sync();
257         sc.close().sync();
258 
259         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
260             throw sh.exception.get();
261         }
262 
263         if (sh.exception.get() != null) {
264             throw sh.exception.get();
265         }
266 
267         // Make sure we did not receive more than we expected.
268         assertThat(sh.counter, is(data.length));
269     }
270 
271     private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
272         private final boolean autoRead;
273         volatile Channel channel;
274         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
275         volatile int counter;
276 
277         TestHandler(boolean autoRead) {
278             this.autoRead = autoRead;
279         }
280 
281         @Override
282         public void channelActive(ChannelHandlerContext ctx)
283                 throws Exception {
284             channel = ctx.channel();
285             if (!autoRead) {
286                 ctx.read();
287             }
288         }
289 
290         @Override
291         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
292             byte[] actual = new byte[in.readableBytes()];
293             in.readBytes(actual);
294 
295             int lastIdx = counter;
296             for (int i = 0; i < actual.length; i ++) {
297                 assertEquals(data[i + lastIdx], actual[i]);
298             }
299             counter += actual.length;
300         }
301 
302         @Override
303         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
304             if (!autoRead) {
305                 ctx.read();
306             }
307         }
308 
309         @Override
310         public void exceptionCaught(ChannelHandlerContext ctx,
311                 Throwable cause) throws Exception {
312             if (exception.compareAndSet(null, cause)) {
313                 ctx.close();
314             }
315         }
316     }
317 
318     private static final class FileRegionWrapper implements FileRegion {
319         private final FileRegion region;
320 
321         FileRegionWrapper(FileRegion region) {
322             this.region = region;
323         }
324 
325         @Override
326         public int refCnt() {
327             return region.refCnt();
328         }
329 
330         @Override
331         public long position() {
332             return region.position();
333         }
334 
335         @Override
336         @Deprecated
337         public long transfered() {
338             return region.transferred();
339         }
340 
341         @Override
342         public boolean release() {
343             return region.release();
344         }
345 
346         @Override
347         public long transferred() {
348             return region.transferred();
349         }
350 
351         @Override
352         public long count() {
353             return region.count();
354         }
355 
356         @Override
357         public boolean release(int decrement) {
358             return region.release(decrement);
359         }
360 
361         @Override
362         public long transferTo(WritableByteChannel target, long position) throws IOException {
363             return region.transferTo(target, position);
364         }
365 
366         @Override
367         public FileRegion retain() {
368             region.retain();
369             return this;
370         }
371 
372         @Override
373         public FileRegion retain(int increment) {
374             region.retain(increment);
375             return this;
376         }
377 
378         @Override
379         public FileRegion touch() {
380             region.touch();
381             return this;
382         }
383 
384         @Override
385         public FileRegion touch(Object hint) {
386             region.touch(hint);
387             return this;
388         }
389     }
390 }