1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInboundHandler;
24 import io.netty.channel.ChannelInboundHandlerAdapter;
25 import io.netty.channel.ChannelOption;
26 import io.netty.channel.DefaultFileRegion;
27 import io.netty.channel.FileRegion;
28 import io.netty.channel.SimpleChannelInboundHandler;
29 import io.netty.channel.socket.oio.OioSocketChannel;
30 import io.netty.util.AbstractReferenceCounted;
31 import io.netty.util.internal.ObjectUtil;
32 import io.netty.util.internal.PlatformDependent;
33 import org.junit.jupiter.api.Test;
34 import org.junit.jupiter.api.TestInfo;
35 import org.junit.jupiter.api.Timeout;
36
37 import java.io.File;
38 import java.io.FileOutputStream;
39 import java.io.IOException;
40 import java.io.RandomAccessFile;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.WritableByteChannel;
43 import java.util.Random;
44 import java.util.concurrent.ThreadLocalRandom;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicInteger;
47 import java.util.concurrent.atomic.AtomicReference;
48
49 import static io.netty.testsuite.transport.TestsuitePermutation.randomBufferType;
50 import static org.junit.jupiter.api.Assertions.assertEquals;
51 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
52 import static org.junit.jupiter.api.Assertions.assertNotEquals;
53 import static org.junit.jupiter.api.Assumptions.assumeFalse;
54 import static org.junit.jupiter.api.Assumptions.assumeTrue;
55
56 public class SocketFileRegionTest extends AbstractSocketTest {
57
58 static final byte[] data = new byte[1048576 * 10];
59
60 static {
61 ThreadLocalRandom.current().nextBytes(data);
62 }
63
64 @Test
65 public void testFileRegion(TestInfo testInfo) throws Throwable {
66 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
67 @Override
68 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
69 testFileRegion(serverBootstrap, bootstrap);
70 }
71 });
72 }
73
74 protected boolean supportsCustomFileRegion() {
75 return true;
76 }
77
78 @Test
79 public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
80 assumeTrue(supportsCustomFileRegion());
81 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
82 @Override
83 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
84 testCustomFileRegion(serverBootstrap, bootstrap);
85 }
86 });
87 }
88
89 @Test
90 public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
91 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
92 @Override
93 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
94 testFileRegionNotAutoRead(serverBootstrap, bootstrap);
95 }
96 });
97 }
98
99 @Test
100 public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
101 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
102 @Override
103 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
104 testFileRegionVoidPromise(serverBootstrap, bootstrap);
105 }
106 });
107 }
108
109 @Test
110 public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
111 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
112 @Override
113 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
114 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
115 }
116 });
117 }
118
119 @Test
120 public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
121 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
122 @Override
123 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
124 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
125 }
126 });
127 }
128
129 @Test
130 @Timeout(value = 30, unit = TimeUnit.SECONDS)
131 public void testFileRegionDrainStopsAtCompletion(TestInfo testInfo) throws Throwable {
132 assumeTrue(supportsCustomFileRegion());
133 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
134 @Override
135 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
136 testFileRegionDrainStopsAtCompletion(serverBootstrap, bootstrap);
137 }
138 });
139 }
140
141 public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
142 testFileRegion0(sb, cb, false, true, true);
143 }
144
145 public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
146 testFileRegion0(sb, cb, false, true, false);
147 }
148
149 public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
150 testFileRegion0(sb, cb, true, true, true);
151 }
152
153 public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
154 testFileRegion0(sb, cb, false, false, true);
155 }
156
157 public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
158 testFileRegion0(sb, cb, true, false, true);
159 }
160
161 public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
162 File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
163 file.deleteOnExit();
164
165 final FileOutputStream out = new FileOutputStream(file);
166 out.write(data);
167 out.close();
168
169 sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
170 @Override
171 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
172
173 }
174 });
175 cb.handler(new ChannelInboundHandlerAdapter());
176
177 Channel sc = sb.bind().sync().channel();
178 Channel cc = cb.connect(sc.localAddress()).sync().channel();
179
180
181 FileRegion region = new DefaultFileRegion(
182 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
183
184 assertInstanceOf(IOException.class, cc.writeAndFlush(region).await().cause());
185 cc.close().sync();
186 sc.close().sync();
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202 public void testFileRegionDrainStopsAtCompletion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
203
204
205
206
207 final int regionSize = 16;
208
209 sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
210 @Override
211 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
212
213 }
214 });
215 cb.handler(new ChannelInboundHandlerAdapter());
216
217 Channel sc = sb.bind().sync().channel();
218 Channel cc = cb.connect(sc.localAddress()).sync().channel();
219 try {
220
221
222
223
224
225
226 assumeFalse(cc instanceof OioSocketChannel,
227 "OIO transport does not honour transferred() for drain-loop termination");
228
229 OvershootDetectingFileRegion region = new OvershootDetectingFileRegion(regionSize);
230
231
232
233
234
235 cc.writeAndFlush(region).sync();
236 int overshoot = region.transferToCallsPastCompletion.get();
237 assertEquals(0, overshoot,
238 "transferTo() invoked " + overshoot
239 + " time(s) after region.transferred() reached region.count()="
240 + regionSize);
241 } finally {
242 cc.close().sync();
243 sc.close().sync();
244 }
245 }
246
247 private static void testFileRegion0(
248 ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
249 throws Throwable {
250 sb.childOption(ChannelOption.AUTO_READ, autoRead);
251 cb.option(ChannelOption.AUTO_READ, autoRead);
252
253 final int bufferSize = 1024;
254 final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
255 file.deleteOnExit();
256
257 final FileOutputStream out = new FileOutputStream(file);
258 final Random random = ThreadLocalRandom.current();
259
260
261 final int startOffset = random.nextInt(8192);
262 for (int i = 0; i < startOffset; i ++) {
263 out.write(random.nextInt());
264 }
265
266
267 out.write(data, bufferSize, data.length - bufferSize);
268
269
270 for (int i = random.nextInt(8192); i > 0; i --) {
271 out.write(random.nextInt());
272 }
273
274 out.close();
275
276 ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
277 @Override
278 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
279 }
280
281 @Override
282 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
283 if (!autoRead) {
284 ctx.read();
285 }
286 }
287
288 @Override
289 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
290 ctx.close();
291 }
292 };
293 TestHandler sh = new TestHandler(autoRead);
294
295 sb.childHandler(sh);
296 cb.handler(ch);
297
298 Channel sc = sb.bind().sync().channel();
299
300 Channel cc = cb.connect(sc.localAddress()).sync().channel();
301 FileRegion region = new DefaultFileRegion(
302 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
303 FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
304
305 if (!defaultFileRegion) {
306 region = new FileRegionWrapper(region);
307 emptyRegion = new FileRegionWrapper(emptyRegion);
308 }
309
310
311
312
313
314 if (voidPromise) {
315 assertEquals(cc.voidPromise(), cc.write(
316 randomBufferType(cc.alloc(), data, 0, bufferSize), cc.voidPromise()));
317 assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
318 assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
319 } else {
320 assertNotEquals(cc.voidPromise(), cc.write(
321 randomBufferType(cc.alloc(), data, 0, bufferSize)));
322 assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
323 assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
324 }
325
326 while (sh.counter < data.length) {
327 if (sh.exception.get() != null) {
328 break;
329 }
330
331 Thread.sleep(50);
332 }
333
334 sh.channel.close().sync();
335 cc.close().sync();
336 sc.close().sync();
337
338 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
339 throw sh.exception.get();
340 }
341
342 if (sh.exception.get() != null) {
343 throw sh.exception.get();
344 }
345
346
347 assertEquals(data.length, sh.counter);
348 }
349
350 private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
351 private final boolean autoRead;
352 volatile Channel channel;
353 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
354 volatile int counter;
355
356 TestHandler(boolean autoRead) {
357 this.autoRead = autoRead;
358 }
359
360 @Override
361 public void channelActive(ChannelHandlerContext ctx)
362 throws Exception {
363 channel = ctx.channel();
364 if (!autoRead) {
365 ctx.read();
366 }
367 }
368
369 @Override
370 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
371 byte[] actual = new byte[in.readableBytes()];
372 in.readBytes(actual);
373
374 int lastIdx = counter;
375 for (int i = 0; i < actual.length; i ++) {
376 assertEquals(data[i + lastIdx], actual[i]);
377 }
378 counter += actual.length;
379 }
380
381 @Override
382 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
383 if (!autoRead) {
384 ctx.read();
385 }
386 }
387
388 @Override
389 public void exceptionCaught(ChannelHandlerContext ctx,
390 Throwable cause) throws Exception {
391 if (exception.compareAndSet(null, cause)) {
392 ctx.close();
393 }
394 }
395 }
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419 private static final class OvershootDetectingFileRegion extends AbstractReferenceCounted
420 implements FileRegion {
421 private final long count;
422 private long transferred;
423 final AtomicInteger transferToCallsPastCompletion = new AtomicInteger();
424
425 OvershootDetectingFileRegion(long count) {
426
427
428 this.count = ObjectUtil.checkInRange(count, 2L, Long.MAX_VALUE, "count");
429 }
430
431 @Override
432 public long position() {
433 return 0;
434 }
435
436 @Override
437 public long count() {
438 return count;
439 }
440
441 @Override
442 public long transferred() {
443 return transferred;
444 }
445
446 @Override
447 @Deprecated
448 public long transfered() {
449 return transferred;
450 }
451
452 @Override
453 public long transferTo(WritableByteChannel target, long position) throws IOException {
454
455
456
457
458 if (position != transferred) {
459 throw new IOException("transferTo position " + position + " != transferred() "
460 + transferred);
461 }
462 if (transferred < count) {
463 int n = target.write(ByteBuffer.wrap(new byte[] { 0x42 }));
464 if (n > 0) {
465 transferred = count;
466 }
467 return n;
468 }
469 transferToCallsPastCompletion.incrementAndGet();
470 return target.write(ByteBuffer.wrap(new byte[] { (byte) 0xFF }));
471 }
472
473 @Override
474 protected void deallocate() {
475
476
477 }
478
479 @Override
480 public FileRegion retain() {
481 super.retain();
482 return this;
483 }
484
485 @Override
486 public FileRegion retain(int increment) {
487 super.retain(increment);
488 return this;
489 }
490
491 @Override
492 public FileRegion touch() {
493 return this;
494 }
495
496 @Override
497 public FileRegion touch(Object hint) {
498 return this;
499 }
500 }
501
502 private static final class FileRegionWrapper implements FileRegion {
503 private final FileRegion region;
504
505 FileRegionWrapper(FileRegion region) {
506 this.region = region;
507 }
508
509 @Override
510 public int refCnt() {
511 return region.refCnt();
512 }
513
514 @Override
515 public long position() {
516 return region.position();
517 }
518
519 @Override
520 @Deprecated
521 public long transfered() {
522 return region.transferred();
523 }
524
525 @Override
526 public boolean release() {
527 return region.release();
528 }
529
530 @Override
531 public long transferred() {
532 return region.transferred();
533 }
534
535 @Override
536 public long count() {
537 return region.count();
538 }
539
540 @Override
541 public boolean release(int decrement) {
542 return region.release(decrement);
543 }
544
545 @Override
546 public long transferTo(WritableByteChannel target, long position) throws IOException {
547 return region.transferTo(target, position);
548 }
549
550 @Override
551 public FileRegion retain() {
552 region.retain();
553 return this;
554 }
555
556 @Override
557 public FileRegion retain(int increment) {
558 region.retain(increment);
559 return this;
560 }
561
562 @Override
563 public FileRegion touch() {
564 region.touch();
565 return this;
566 }
567
568 @Override
569 public FileRegion touch(Object hint) {
570 region.touch(hint);
571 return this;
572 }
573 }
574 }