1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandler;
25 import io.netty.channel.ChannelInboundHandlerAdapter;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.DefaultFileRegion;
28 import io.netty.channel.FileRegion;
29 import io.netty.channel.SimpleChannelInboundHandler;
30 import io.netty.util.internal.PlatformDependent;
31 import org.hamcrest.CoreMatchers;
32 import org.junit.jupiter.api.Test;
33 import org.junit.jupiter.api.TestInfo;
34
35 import java.io.File;
36 import java.io.FileOutputStream;
37 import java.io.IOException;
38 import java.io.RandomAccessFile;
39 import java.nio.channels.WritableByteChannel;
40 import java.util.Random;
41 import java.util.concurrent.atomic.AtomicReference;
42
43 import static org.hamcrest.CoreMatchers.is;
44 import static org.hamcrest.MatcherAssert.assertThat;
45 import static org.junit.jupiter.api.Assertions.assertEquals;
46 import static org.junit.jupiter.api.Assertions.assertNotEquals;
47
48 public class SocketFileRegionTest extends AbstractSocketTest {
49
50 static final byte[] data = new byte[1048576 * 10];
51
52 static {
53 PlatformDependent.threadLocalRandom().nextBytes(data);
54 }
55
56 @Test
57 public void testFileRegion(TestInfo testInfo) throws Throwable {
58 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
59 @Override
60 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
61 testFileRegion(serverBootstrap, bootstrap);
62 }
63 });
64 }
65
66 @Test
67 public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
68 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
69 @Override
70 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
71 testCustomFileRegion(serverBootstrap, bootstrap);
72 }
73 });
74 }
75
76 @Test
77 public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
78 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
79 @Override
80 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
81 testFileRegionNotAutoRead(serverBootstrap, bootstrap);
82 }
83 });
84 }
85
86 @Test
87 public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
88 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
89 @Override
90 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
91 testFileRegionVoidPromise(serverBootstrap, bootstrap);
92 }
93 });
94 }
95
96 @Test
97 public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
98 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
99 @Override
100 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
101 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
102 }
103 });
104 }
105
106 @Test
107 public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
108 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
109 @Override
110 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
111 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
112 }
113 });
114 }
115
116 public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
117 testFileRegion0(sb, cb, false, true, true);
118 }
119
120 public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
121 testFileRegion0(sb, cb, false, true, false);
122 }
123
124 public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
125 testFileRegion0(sb, cb, true, true, true);
126 }
127
128 public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
129 testFileRegion0(sb, cb, false, false, true);
130 }
131
132 public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
133 testFileRegion0(sb, cb, true, false, true);
134 }
135
136 public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
137 File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
138 file.deleteOnExit();
139
140 final FileOutputStream out = new FileOutputStream(file);
141 out.write(data);
142 out.close();
143
144 sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
145 @Override
146 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
147
148 }
149 });
150 cb.handler(new ChannelInboundHandlerAdapter());
151
152 Channel sc = sb.bind().sync().channel();
153 Channel cc = cb.connect(sc.localAddress()).sync().channel();
154
155
156 FileRegion region = new DefaultFileRegion(
157 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
158
159 assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
160 cc.close().sync();
161 sc.close().sync();
162 }
163
164 private static void testFileRegion0(
165 ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
166 throws Throwable {
167 sb.childOption(ChannelOption.AUTO_READ, autoRead);
168 cb.option(ChannelOption.AUTO_READ, autoRead);
169
170 final int bufferSize = 1024;
171 final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
172 file.deleteOnExit();
173
174 final FileOutputStream out = new FileOutputStream(file);
175 final Random random = PlatformDependent.threadLocalRandom();
176
177
178 final int startOffset = random.nextInt(8192);
179 for (int i = 0; i < startOffset; i ++) {
180 out.write(random.nextInt());
181 }
182
183
184 out.write(data, bufferSize, data.length - bufferSize);
185
186
187 for (int i = random.nextInt(8192); i > 0; i --) {
188 out.write(random.nextInt());
189 }
190
191 out.close();
192
193 ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
194 @Override
195 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
196 }
197
198 @Override
199 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
200 if (!autoRead) {
201 ctx.read();
202 }
203 }
204
205 @Override
206 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
207 ctx.close();
208 }
209 };
210 TestHandler sh = new TestHandler(autoRead);
211
212 sb.childHandler(sh);
213 cb.handler(ch);
214
215 Channel sc = sb.bind().sync().channel();
216
217 Channel cc = cb.connect(sc.localAddress()).sync().channel();
218 FileRegion region = new DefaultFileRegion(
219 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
220 FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
221
222 if (!defaultFileRegion) {
223 region = new FileRegionWrapper(region);
224 emptyRegion = new FileRegionWrapper(emptyRegion);
225 }
226
227
228
229
230
231 if (voidPromise) {
232 assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
233 assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
234 assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
235 } else {
236 assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
237 assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
238 assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
239 }
240
241 while (sh.counter < data.length) {
242 if (sh.exception.get() != null) {
243 break;
244 }
245
246 try {
247 Thread.sleep(50);
248 } catch (InterruptedException e) {
249
250 }
251 }
252
253 sh.channel.close().sync();
254 cc.close().sync();
255 sc.close().sync();
256
257 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
258 throw sh.exception.get();
259 }
260
261 if (sh.exception.get() != null) {
262 throw sh.exception.get();
263 }
264
265
266 assertThat(sh.counter, is(data.length));
267 }
268
269 private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
270 private final boolean autoRead;
271 volatile Channel channel;
272 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
273 volatile int counter;
274
275 TestHandler(boolean autoRead) {
276 this.autoRead = autoRead;
277 }
278
279 @Override
280 public void channelActive(ChannelHandlerContext ctx)
281 throws Exception {
282 channel = ctx.channel();
283 if (!autoRead) {
284 ctx.read();
285 }
286 }
287
288 @Override
289 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
290 byte[] actual = new byte[in.readableBytes()];
291 in.readBytes(actual);
292
293 int lastIdx = counter;
294 for (int i = 0; i < actual.length; i ++) {
295 assertEquals(data[i + lastIdx], actual[i]);
296 }
297 counter += actual.length;
298 }
299
300 @Override
301 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
302 if (!autoRead) {
303 ctx.read();
304 }
305 }
306
307 @Override
308 public void exceptionCaught(ChannelHandlerContext ctx,
309 Throwable cause) throws Exception {
310 if (exception.compareAndSet(null, cause)) {
311 ctx.close();
312 }
313 }
314 }
315
316 private static final class FileRegionWrapper implements FileRegion {
317 private final FileRegion region;
318
319 FileRegionWrapper(FileRegion region) {
320 this.region = region;
321 }
322
323 @Override
324 public int refCnt() {
325 return region.refCnt();
326 }
327
328 @Override
329 public long position() {
330 return region.position();
331 }
332
333 @Override
334 @Deprecated
335 public long transfered() {
336 return region.transferred();
337 }
338
339 @Override
340 public boolean release() {
341 return region.release();
342 }
343
344 @Override
345 public long transferred() {
346 return region.transferred();
347 }
348
349 @Override
350 public long count() {
351 return region.count();
352 }
353
354 @Override
355 public boolean release(int decrement) {
356 return region.release(decrement);
357 }
358
359 @Override
360 public long transferTo(WritableByteChannel target, long position) throws IOException {
361 return region.transferTo(target, position);
362 }
363
364 @Override
365 public FileRegion retain() {
366 region.retain();
367 return this;
368 }
369
370 @Override
371 public FileRegion retain(int increment) {
372 region.retain(increment);
373 return this;
374 }
375
376 @Override
377 public FileRegion touch() {
378 region.touch();
379 return this;
380 }
381
382 @Override
383 public FileRegion touch(Object hint) {
384 region.touch(hint);
385 return this;
386 }
387 }
388 }