View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport.socket;
17  
18  import io.netty.bootstrap.Bootstrap;
19  import io.netty.bootstrap.ServerBootstrap;
20  import io.netty.buffer.ByteBuf;
21  import io.netty.channel.Channel;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandler;
24  import io.netty.channel.ChannelInboundHandlerAdapter;
25  import io.netty.channel.ChannelOption;
26  import io.netty.channel.DefaultFileRegion;
27  import io.netty.channel.FileRegion;
28  import io.netty.channel.SimpleChannelInboundHandler;
29  import io.netty.channel.socket.oio.OioSocketChannel;
30  import io.netty.util.AbstractReferenceCounted;
31  import io.netty.util.internal.ObjectUtil;
32  import io.netty.util.internal.PlatformDependent;
33  import org.junit.jupiter.api.Test;
34  import org.junit.jupiter.api.TestInfo;
35  import org.junit.jupiter.api.Timeout;
36  
37  import java.io.File;
38  import java.io.FileOutputStream;
39  import java.io.IOException;
40  import java.io.RandomAccessFile;
41  import java.nio.ByteBuffer;
42  import java.nio.channels.WritableByteChannel;
43  import java.util.Random;
44  import java.util.concurrent.ThreadLocalRandom;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicInteger;
47  import java.util.concurrent.atomic.AtomicReference;
48  
49  import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
50  import static org.junit.jupiter.api.Assertions.assertEquals;
51  import static org.junit.jupiter.api.Assertions.assertInstanceOf;
52  import static org.junit.jupiter.api.Assertions.assertNotEquals;
53  import static org.junit.jupiter.api.Assumptions.assumeFalse;
54  import static org.junit.jupiter.api.Assumptions.assumeTrue;
55  
56  public class SocketFileRegionTest extends AbstractSocketTest {
57  
58      static final byte[] data = new byte[1048576 * 10];
59  
60      static {
61          ThreadLocalRandom.current().nextBytes(data);
62      }
63  
64      @Test
65      public void testFileRegion(TestInfo testInfo) throws Throwable {
66          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
67              @Override
68              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
69                  testFileRegion(serverBootstrap, bootstrap);
70              }
71          });
72      }
73  
74      protected boolean supportsCustomFileRegion() {
75          return true;
76      }
77  
78      @Test
79      public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
80          assumeTrue(supportsCustomFileRegion());
81          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
82              @Override
83              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
84                  testCustomFileRegion(serverBootstrap, bootstrap);
85              }
86          });
87      }
88  
89      @Test
90      public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
91          run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
92              @Override
93              public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
94                  testFileRegionNotAutoRead(serverBootstrap, bootstrap);
95              }
96          });
97      }
98  
99      @Test
100     public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
101         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
102             @Override
103             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
104                 testFileRegionVoidPromise(serverBootstrap, bootstrap);
105             }
106         });
107     }
108 
109     @Test
110     public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
111         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
112             @Override
113             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
114                 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
115             }
116         });
117     }
118 
119     @Test
120     public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
121         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
122             @Override
123             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
124                 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
125             }
126         });
127     }
128 
129     @Test
130     @Timeout(value = 30, unit = TimeUnit.SECONDS)
131     public void testFileRegionDrainStopsAtCompletion(TestInfo testInfo) throws Throwable {
132         assumeTrue(supportsCustomFileRegion());
133         run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
134             @Override
135             public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
136                 testFileRegionDrainStopsAtCompletion(serverBootstrap, bootstrap);
137             }
138         });
139     }
140 
141     public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
142         testFileRegion0(sb, cb, false, true, true);
143     }
144 
145     public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
146         testFileRegion0(sb, cb, false, true, false);
147     }
148 
149     public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
150         testFileRegion0(sb, cb, true, true, true);
151     }
152 
153     public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
154         testFileRegion0(sb, cb, false, false, true);
155     }
156 
157     public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
158         testFileRegion0(sb, cb, true, false, true);
159     }
160 
161     public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
162         File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
163         file.deleteOnExit();
164 
165         final FileOutputStream out = new FileOutputStream(file);
166         out.write(data);
167         out.close();
168 
169         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
170             @Override
171             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
172                 // Just drop the message.
173             }
174         });
175         cb.handler(new ChannelInboundHandlerAdapter());
176 
177         Channel sc = sb.bind().sync().channel();
178         Channel cc = cb.connect(sc.localAddress()).sync().channel();
179 
180         // Request file region which is bigger then the underlying file.
181         FileRegion region = new DefaultFileRegion(
182                 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
183 
184         assertInstanceOf(IOException.class, cc.writeAndFlush(region).await().cause());
185         cc.close().sync();
186         sc.close().sync();
187     }
188 
189     /**
190      * Reproducer for a {@code FileRegion} drain-loop overshoot. Transports must short-circuit
191      * as soon as the region reports {@code transferred() >= count()}; any extra
192      * {@code transferTo} call violates the contract and would corrupt
193      * implementations that lazily emit per-chunk framing.
194      *
195      * <p>A custom {@link FileRegion} whose {@code transferTo} advances {@code transferred}
196      * past the bytes it writes to the target on a single call (legal: an encryption- or
197      * framing-on-write FileRegion that flushes a complete inner chunk in one call behaves
198      * this way) records every {@code transferTo} invocation made past
199      * {@code transferred == count}. The expected behaviour for every transport is that
200      * {@code transferToCallsPastCompletion} stays zero.
201      */
202     public void testFileRegionDrainStopsAtCompletion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
203         // Region size > 1 so any chunked transport (e.g. io_uring's generic FileRegion fallback)
204         // that sizes its chunk buffer from count() retains spare capacity after the first
205         // (and only) source byte is written -- without that spare capacity an inner drain loop
206         // would exit on writableBytes() == 0 and the overshoot path would not be exercised.
207         final int regionSize = 16;
208 
209         sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
210             @Override
211             protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
212                 // drain
213             }
214         });
215         cb.handler(new ChannelInboundHandlerAdapter());
216 
217         Channel sc = sb.bind().sync().channel();
218         Channel cc = cb.connect(sc.localAddress()).sync().channel();
219         try {
220             // OioByteStreamChannel#doWriteFileRegion drains using a locally-tracked
221             // bytes-written counter as the transferTo position (ignoring transferred()), so an
222             // ill-behaved FileRegion that advances transferred() past actual bytes written --
223             // the exact pattern this fixture uses to exercise the overshoot path -- cannot
224             // satisfy OIO's loop without violating the position invariant. The overshoot
225             // detection is meaningless on OIO for the same reason; skip the permutation.
226             assumeFalse(cc instanceof OioSocketChannel,
227                     "OIO transport does not honour transferred() for drain-loop termination");
228 
229             OvershootDetectingFileRegion region = new OvershootDetectingFileRegion(regionSize);
230             // sync() blocks until the write future completes, by which point every
231             // transferTo() call the transport is going to make has already been made --
232             // the fixture's call counter is observable here without any timing wait. Ref
233             // ownership is transferred to the pipeline on writeAndFlush(), which releases
234             // it as the write completes (success or failure).
235             cc.writeAndFlush(region).sync();
236             int overshoot = region.transferToCallsPastCompletion.get();
237             assertEquals(0, overshoot,
238                     "transferTo() invoked " + overshoot
239                             + " time(s) after region.transferred() reached region.count()="
240                             + regionSize);
241         } finally {
242             cc.close().sync();
243             sc.close().sync();
244         }
245     }
246 
247     private static void testFileRegion0(
248             ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
249             throws Throwable {
250         sb.childOption(ChannelOption.AUTO_READ, autoRead);
251         cb.option(ChannelOption.AUTO_READ, autoRead);
252 
253         final int bufferSize = 1024;
254         final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
255         file.deleteOnExit();
256 
257         final FileOutputStream out = new FileOutputStream(file);
258         final Random random = ThreadLocalRandom.current();
259 
260         // Prepend random data which will not be transferred, so that we can test non-zero start offset
261         final int startOffset = random.nextInt(8192);
262         for (int i = 0; i < startOffset; i ++) {
263             out.write(random.nextInt());
264         }
265 
266         // .. and here comes the real data to transfer.
267         out.write(data, bufferSize, data.length - bufferSize);
268 
269         // .. and then some extra data which is not supposed to be transferred.
270         for (int i = random.nextInt(8192); i > 0; i --) {
271             out.write(random.nextInt());
272         }
273 
274         out.close();
275 
276         ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
277             @Override
278             public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
279             }
280 
281             @Override
282             public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
283                 if (!autoRead) {
284                     ctx.read();
285                 }
286             }
287 
288             @Override
289             public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
290                 ctx.close();
291             }
292         };
293         TestHandler sh = new TestHandler(autoRead);
294 
295         sb.childHandler(sh);
296         cb.handler(ch);
297 
298         Channel sc = sb.bind().sync().channel();
299 
300         Channel cc = cb.connect(sc.localAddress()).sync().channel();
301         FileRegion region = new DefaultFileRegion(
302                 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
303         FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
304 
305         if (!defaultFileRegion) {
306             region = new FileRegionWrapper(region);
307             emptyRegion = new FileRegionWrapper(emptyRegion);
308         }
309         // Do write ByteBuf and then FileRegion to ensure that mixed writes work
310         // Also, write an empty FileRegion to test if writing an empty FileRegion does not cause any issues.
311         //
312         // See https://github.com/netty/netty/issues/2769
313         //     https://github.com/netty/netty/issues/2964
314         if (voidPromise) {
315             assertEquals(cc.voidPromise(), cc.write(
316                     randomBufferType(cc.alloc(), data, 0, bufferSize), cc.voidPromise()));
317             assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
318             assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
319         } else {
320             assertNotEquals(cc.voidPromise(), cc.write(
321                     randomBufferType(cc.alloc(), data, 0, bufferSize)));
322             assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
323             assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
324         }
325 
326         while (sh.counter < data.length) {
327             if (sh.exception.get() != null) {
328                 break;
329             }
330 
331             Thread.sleep(50);
332         }
333 
334         sh.channel.close().sync();
335         cc.close().sync();
336         sc.close().sync();
337 
338         if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
339             throw sh.exception.get();
340         }
341 
342         if (sh.exception.get() != null) {
343             throw sh.exception.get();
344         }
345 
346         // Make sure we did not receive more than we expected.
347         assertEquals(data.length, sh.counter);
348     }
349 
350     private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
351         private final boolean autoRead;
352         volatile Channel channel;
353         final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
354         volatile int counter;
355 
356         TestHandler(boolean autoRead) {
357             this.autoRead = autoRead;
358         }
359 
360         @Override
361         public void channelActive(ChannelHandlerContext ctx)
362                 throws Exception {
363             channel = ctx.channel();
364             if (!autoRead) {
365                 ctx.read();
366             }
367         }
368 
369         @Override
370         public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
371             byte[] actual = new byte[in.readableBytes()];
372             in.readBytes(actual);
373 
374             int lastIdx = counter;
375             for (int i = 0; i < actual.length; i ++) {
376                 assertEquals(data[i + lastIdx], actual[i]);
377             }
378             counter += actual.length;
379         }
380 
381         @Override
382         public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
383             if (!autoRead) {
384                 ctx.read();
385             }
386         }
387 
388         @Override
389         public void exceptionCaught(ChannelHandlerContext ctx,
390                 Throwable cause) throws Exception {
391             if (exception.compareAndSet(null, cause)) {
392                 ctx.close();
393             }
394         }
395     }
396 
397     /**
398      * Custom {@link FileRegion} that advances {@code transferred()} past the bytes it writes
399      * to the target on a single {@code transferTo} call -- the same pattern an
400      * encryption-on-write or framing FileRegion follows when it reports an inner chunk as
401      * "delivered" once that chunk fully drains. Records every {@code transferTo} invocation
402      * made after the region has been fully transferred so a drain-loop overshoot is visible
403      * synchronously on the writer side.
404      *
405      * <ul>
406      *   <li>{@link #count()} returns the constant configured size.</li>
407      *   <li>The first {@code transferTo} call writes one byte to the target; once that
408      *       byte is accepted it advances {@code transferred} to {@code count} and returns
409      *       the bytes written.</li>
410      *   <li>Every subsequent {@code transferTo} call increments
411      *       {@link #transferToCallsPastCompletion} and writes one phantom byte to mimic
412      *       the protocol-corrupting side effect a real overshoot would produce.</li>
413      * </ul>
414      * The {@code transferred} field is written and read only on the EventLoop (in
415      * {@code transferTo} and the surrounding drain loop), so no synchronization is needed.
416      * The cross-thread observation channel is the {@link AtomicInteger} counter, which the
417      * test reads after {@code writeAndFlush(...).sync()} establishes the happens-before.
418      */
419     private static final class OvershootDetectingFileRegion extends AbstractReferenceCounted
420             implements FileRegion {
421         private final long count;
422         private long transferred;
423         final AtomicInteger transferToCallsPastCompletion = new AtomicInteger();
424 
425         OvershootDetectingFileRegion(long count) {
426             // count must be > 1 so the chunk buffer retains writable capacity after the first
427             // source byte is written -- otherwise the drain-loop overshoot path is not exercised.
428             this.count = ObjectUtil.checkInRange(count, 2L, Long.MAX_VALUE, "count");
429         }
430 
431         @Override
432         public long position() {
433             return 0;
434         }
435 
436         @Override
437         public long count() {
438             return count;
439         }
440 
441         @Override
442         public long transferred() {
443             return transferred;
444         }
445 
446         @Override
447         @Deprecated
448         public long transfered() {
449             return transferred;
450         }
451 
452         @Override
453         public long transferTo(WritableByteChannel target, long position) throws IOException {
454             // Per FileRegion's contract, the caller passes transferred() as position. Surface
455             // a violation via IOException so the transport's catch (Exception) at the write
456             // site routes it to the write future's cause -- a JUnit AssertionError would
457             // bypass that catch and wedge the EventLoop.
458             if (position != transferred) {
459                 throw new IOException("transferTo position " + position + " != transferred() "
460                         + transferred);
461             }
462             if (transferred < count) {
463                 int n = target.write(ByteBuffer.wrap(new byte[] { 0x42 }));
464                 if (n > 0) {
465                     transferred = count;
466                 }
467                 return n;
468             }
469             transferToCallsPastCompletion.incrementAndGet();
470             return target.write(ByteBuffer.wrap(new byte[] { (byte) 0xFF }));
471         }
472 
473         @Override
474         protected void deallocate() {
475             // No native resources to release. The overshoot counter is observed on the
476             // writer thread immediately after sync() returns; nothing else needs cleanup.
477         }
478 
479         @Override
480         public FileRegion retain() {
481             super.retain();
482             return this;
483         }
484 
485         @Override
486         public FileRegion retain(int increment) {
487             super.retain(increment);
488             return this;
489         }
490 
491         @Override
492         public FileRegion touch() {
493             return this;
494         }
495 
496         @Override
497         public FileRegion touch(Object hint) {
498             return this;
499         }
500     }
501 
502     private static final class FileRegionWrapper implements FileRegion {
503         private final FileRegion region;
504 
505         FileRegionWrapper(FileRegion region) {
506             this.region = region;
507         }
508 
509         @Override
510         public int refCnt() {
511             return region.refCnt();
512         }
513 
514         @Override
515         public long position() {
516             return region.position();
517         }
518 
519         @Override
520         @Deprecated
521         public long transfered() {
522             return region.transferred();
523         }
524 
525         @Override
526         public boolean release() {
527             return region.release();
528         }
529 
530         @Override
531         public long transferred() {
532             return region.transferred();
533         }
534 
535         @Override
536         public long count() {
537             return region.count();
538         }
539 
540         @Override
541         public boolean release(int decrement) {
542             return region.release(decrement);
543         }
544 
545         @Override
546         public long transferTo(WritableByteChannel target, long position) throws IOException {
547             return region.transferTo(target, position);
548         }
549 
550         @Override
551         public FileRegion retain() {
552             region.retain();
553             return this;
554         }
555 
556         @Override
557         public FileRegion retain(int increment) {
558             region.retain(increment);
559             return this;
560         }
561 
562         @Override
563         public FileRegion touch() {
564             region.touch();
565             return this;
566         }
567 
568         @Override
569         public FileRegion touch(Object hint) {
570             region.touch(hint);
571             return this;
572         }
573     }
574 }