View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.buffer.Unpooled;
22  import io.netty.channel.Channel;
23  import io.netty.channel.ChannelHandlerContext;
24  import io.netty.channel.ChannelInboundHandler;
25  import io.netty.channel.ChannelInboundHandlerAdapter;
26  import io.netty.channel.ChannelOption;
27  import io.netty.channel.DefaultFileRegion;
28  import io.netty.channel.FileRegion;
29  import io.netty.channel.SimpleChannelInboundHandler;
30  import io.netty.util.internal.PlatformDependent;
31  import org.junit.jupiter.api.Test;
32  import org.junit.jupiter.api.TestInfo;
33  
34  import java.io.File;
35  import java.io.FileOutputStream;
36  import java.io.IOException;
37  import java.io.RandomAccessFile;
38  import java.nio.channels.WritableByteChannel;
39  import java.util.Random;
40  import java.util.concurrent.atomic.AtomicReference;
41  
42  import static org.junit.jupiter.api.Assertions.assertEquals;
43  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
44  import static org.junit.jupiter.api.Assertions.assertNotEquals;
45  
46  public class SocketFileRegionTest extends AbstractSocketTest {
47  
48      static final byte[] data = new byte[1048576 * 10];
49  
50      static {
51          PlatformDependent.threadLocalRandom().nextBytes(data);
52      }
53  
54      @Test
55      public void testFileRegion(TestInfo testInfo) throws Throwable {
56          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
57              @Override
58              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
59                  testFileRegion(serverBootstrap, bootstrap);
60              }
61          });
62      }
63  
64      @Test
65      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
66          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
67              @Override
68              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
69                  testCustomFileRegion(serverBootstrap, bootstrap);
70              }
71          });
72      }
73  
74      @Test
75      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
76          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
77              @Override
78              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
79                  testFileRegionNotAutoRead(serverBootstrap, bootstrap);
80              }
81          });
82      }
83  
84      @Test
85      public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
86          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
87              @Override
88              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
89                  testFileRegionVoidPromise(serverBootstrap, bootstrap);
90              }
91          });
92      }
93  
94      @Test
95      public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
96          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
97              @Override
98              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
99                  testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
100             }
101         });
102     }
103 
104     @Test
105     public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
106         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
107             @Override
108             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
109                 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
110             }
111         });
112     }
113 
114     public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
115         testFileRegion0(sb, cb, false, true, true);
116     }
117 
118     public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
119         testFileRegion0(sb, cb, false, true, false);
120     }
121 
122     public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
123         testFileRegion0(sb, cb, true, true, true);
124     }
125 
126     public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
127         testFileRegion0(sb, cb, false, false, true);
128     }
129 
130     public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
131         testFileRegion0(sb, cb, true, false, true);
132     }
133 
134     public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
135         File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
136         file.deleteOnExit();
137 
138         final FileOutputStream out = new FileOutputStream(file);
139         out.write(data);
140         out.close();
141 
142         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
143             @Override
144             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
145                 // Just drop the message.
146             }
147         });
148         cb.handler(new ChannelInboundHandlerAdapter());
149 
150         Channel sc = sb.bind().sync().channel();
151         Channel cc = cb.connect(sc.localAddress()).sync().channel();
152 
153         // Request file region which is bigger then the underlying file.
154         FileRegion region = new DefaultFileRegion(
155                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
156 
157         assertInstanceOf(IOException.class, cc.writeAndFlush(region).await().cause());
158         cc.close().sync();
159         sc.close().sync();
160     }
161 
162     private static void testFileRegion0(
163             ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
164             throws Throwable {
165         sb.childOption(ChannelOption.AUTO_READ, autoRead);
166         cb.option(ChannelOption.AUTO_READ, autoRead);
167 
168         final int bufferSize = 1024;
169         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
170         file.deleteOnExit();
171 
172         final FileOutputStream out = new FileOutputStream(file);
173         final Random random = PlatformDependent.threadLocalRandom();
174 
175         // Prepend random data which will not be transferred, so that we can test non-zero start offset
176         final int startOffset = random.nextInt(8192);
177         for (int i = 0; i < startOffset; i ++) {
178             out.write(random.nextInt());
179         }
180 
181         // .. and here comes the real data to transfer.
182         out.write(data, bufferSize, data.length - bufferSize);
183 
184         // .. and then some extra data which is not supposed to be transferred.
185         for (int i = random.nextInt(8192); i > 0; i --) {
186             out.write(random.nextInt());
187         }
188 
189         out.close();
190 
191         ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
192             @Override
193             public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
194             }
195 
196             @Override
197             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
198                 if (!autoRead) {
199                     ctx.read();
200                 }
201             }
202 
203             @Override
204             public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
205                 ctx.close();
206             }
207         };
208         TestHandler sh = new TestHandler(autoRead);
209 
210         sb.childHandler(sh);
211         cb.handler(ch);
212 
213         Channel sc = sb.bind().sync().channel();
214 
215         Channel cc = cb.connect(sc.localAddress()).sync().channel();
216         FileRegion region = new DefaultFileRegion(
217                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
218         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
219 
220         if (!defaultFileRegion) {
221             region = new FileRegionWrapper(region);
222             emptyRegion = new FileRegionWrapper(emptyRegion);
223         }
224         // Do write ByteBuf and then FileRegion to ensure that mixed writes work
225         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
226         //
227         // See https://github.com/netty/netty/issues/2769
228         //     https://github.com/netty/netty/issues/2964
229         if (voidPromise) {
230             assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
231             assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
232             assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
233         } else {
234             assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
235             assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
236             assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
237         }
238 
239         while (sh.counter < data.length) {
240             if (sh.exception.get() != null) {
241                 break;
242             }
243 
244             Thread.sleep(50);
245         }
246 
247         sh.channel.close().sync();
248         cc.close().sync();
249         sc.close().sync();
250 
251         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
252             throw sh.exception.get();
253         }
254 
255         if (sh.exception.get() != null) {
256             throw sh.exception.get();
257         }
258 
259         // Make sure we did not receive more than we expected.
260         assertEquals(data.length, sh.counter);
261     }
262 
263     private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
264         private final boolean autoRead;
265         volatile Channel channel;
266         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
267         volatile int counter;
268 
269         TestHandler(boolean autoRead) {
270             this.autoRead = autoRead;
271         }
272 
273         @Override
274         public void channelActive(ChannelHandlerContext ctx)
275                 throws Exception {
276             channel = ctx.channel();
277             if (!autoRead) {
278                 ctx.read();
279             }
280         }
281 
282         @Override
283         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
284             byte[] actual = new byte[in.readableBytes()];
285             in.readBytes(actual);
286 
287             int lastIdx = counter;
288             for (int i = 0; i < actual.length; i ++) {
289                 assertEquals(data[i + lastIdx], actual[i]);
290             }
291             counter += actual.length;
292         }
293 
294         @Override
295         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
296             if (!autoRead) {
297                 ctx.read();
298             }
299         }
300 
301         @Override
302         public void exceptionCaught(ChannelHandlerContext ctx,
303                 Throwable cause) throws Exception {
304             if (exception.compareAndSet(null, cause)) {
305                 ctx.close();
306             }
307         }
308     }
309 
310     private static final class FileRegionWrapper implements FileRegion {
311         private final FileRegion region;
312 
313         FileRegionWrapper(FileRegion region) {
314             this.region = region;
315         }
316 
317         @Override
318         public int refCnt() {
319             return region.refCnt();
320         }
321 
322         @Override
323         public long position() {
324             return region.position();
325         }
326 
327         @Override
328         @Deprecated
329         public long transfered() {
330             return region.transferred();
331         }
332 
333         @Override
334         public boolean release() {
335             return region.release();
336         }
337 
338         @Override
339         public long transferred() {
340             return region.transferred();
341         }
342 
343         @Override
344         public long count() {
345             return region.count();
346         }
347 
348         @Override
349         public boolean release(int decrement) {
350             return region.release(decrement);
351         }
352 
353         @Override
354         public long transferTo(WritableByteChannel target, long position) throws IOException {
355             return region.transferTo(target, position);
356         }
357 
358         @Override
359         public FileRegion retain() {
360             region.retain();
361             return this;
362         }
363 
364         @Override
365         public FileRegion retain(int increment) {
366             region.retain(increment);
367             return this;
368         }
369 
370         @Override
371         public FileRegion touch() {
372             region.touch();
373             return this;
374         }
375 
376         @Override
377         public FileRegion touch(Object hint) {
378             region.touch(hint);
379             return this;
380         }
381     }
382 }