1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.Channel;
23 import io.netty.channel.ChannelHandlerContext;
24 import io.netty.channel.ChannelInboundHandler;
25 import io.netty.channel.ChannelInboundHandlerAdapter;
26 import io.netty.channel.ChannelOption;
27 import io.netty.channel.DefaultFileRegion;
28 import io.netty.channel.FileRegion;
29 import io.netty.channel.SimpleChannelInboundHandler;
30 import io.netty.util.internal.PlatformDependent;
31 import org.hamcrest.CoreMatchers;
32 import org.junit.jupiter.api.Test;
33 import org.junit.jupiter.api.TestInfo;
34
35 import java.io.File;
36 import java.io.FileOutputStream;
37 import java.io.IOException;
38 import java.io.RandomAccessFile;
39 import java.nio.channels.WritableByteChannel;
40 import java.util.Random;
41 import java.util.concurrent.atomic.AtomicReference;
42
43 import static org.hamcrest.CoreMatchers.is;
44 import static org.hamcrest.MatcherAssert.assertThat;
45 import static org.junit.jupiter.api.Assertions.assertEquals;
46 import static org.junit.jupiter.api.Assertions.assertNotEquals;
47 import static org.junit.jupiter.api.Assumptions.assumeTrue;
48
49 public class SocketFileRegionTest extends AbstractSocketTest {
50
51 static final byte[] data = new byte[1048576 * 10];
52
53 static {
54 PlatformDependent.threadLocalRandom().nextBytes(data);
55 }
56
57 @Test
58 public void testFileRegion(TestInfo testInfo) throws Throwable {
59 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
60 @Override
61 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
62 testFileRegion(serverBootstrap, bootstrap);
63 }
64 });
65 }
66
67 protected boolean supportsCustomFileRegion() {
68 return true;
69 }
70
71 @Test
72 public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
73 assumeTrue(supportsCustomFileRegion());
74 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
75 @Override
76 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
77 testCustomFileRegion(serverBootstrap, bootstrap);
78 }
79 });
80 }
81
82 @Test
83 public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
84 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
85 @Override
86 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
87 testFileRegionNotAutoRead(serverBootstrap, bootstrap);
88 }
89 });
90 }
91
92 @Test
93 public void testFileRegionVoidPromise(TestInfo testInfo) throws Throwable {
94 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
95 @Override
96 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
97 testFileRegionVoidPromise(serverBootstrap, bootstrap);
98 }
99 });
100 }
101
102 @Test
103 public void testFileRegionVoidPromiseNotAutoRead(TestInfo testInfo) throws Throwable {
104 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
105 @Override
106 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
107 testFileRegionVoidPromiseNotAutoRead(serverBootstrap, bootstrap);
108 }
109 });
110 }
111
112 @Test
113 public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
114 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
115 @Override
116 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
117 testFileRegionCountLargerThenFile(serverBootstrap, bootstrap);
118 }
119 });
120 }
121
122 public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
123 testFileRegion0(sb, cb, false, true, true);
124 }
125
126 public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
127 testFileRegion0(sb, cb, false, true, false);
128 }
129
130 public void testFileRegionVoidPromise(ServerBootstrap sb, Bootstrap cb) throws Throwable {
131 testFileRegion0(sb, cb, true, true, true);
132 }
133
134 public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
135 testFileRegion0(sb, cb, false, false, true);
136 }
137
138 public void testFileRegionVoidPromiseNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
139 testFileRegion0(sb, cb, true, false, true);
140 }
141
142 public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
143 File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
144 file.deleteOnExit();
145
146 final FileOutputStream out = new FileOutputStream(file);
147 out.write(data);
148 out.close();
149
150 sb.childHandler(new SimpleChannelInboundHandler<ByteBuf>() {
151 @Override
152 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
153
154 }
155 });
156 cb.handler(new ChannelInboundHandlerAdapter());
157
158 Channel sc = sb.bind().sync().channel();
159 Channel cc = cb.connect(sc.localAddress()).sync().channel();
160
161
162 FileRegion region = new DefaultFileRegion(
163 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
164
165 assertThat(cc.writeAndFlush(region).await().cause(), CoreMatchers.<Throwable>instanceOf(IOException.class));
166 cc.close().sync();
167 sc.close().sync();
168 }
169
170 private static void testFileRegion0(
171 ServerBootstrap sb, Bootstrap cb, boolean voidPromise, final boolean autoRead, boolean defaultFileRegion)
172 throws Throwable {
173 sb.childOption(ChannelOption.AUTO_READ, autoRead);
174 cb.option(ChannelOption.AUTO_READ, autoRead);
175
176 final int bufferSize = 1024;
177 final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
178 file.deleteOnExit();
179
180 final FileOutputStream out = new FileOutputStream(file);
181 final Random random = PlatformDependent.threadLocalRandom();
182
183
184 final int startOffset = random.nextInt(8192);
185 for (int i = 0; i < startOffset; i ++) {
186 out.write(random.nextInt());
187 }
188
189
190 out.write(data, bufferSize, data.length - bufferSize);
191
192
193 for (int i = random.nextInt(8192); i > 0; i --) {
194 out.write(random.nextInt());
195 }
196
197 out.close();
198
199 ChannelInboundHandler ch = new SimpleChannelInboundHandler<Object>() {
200 @Override
201 public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
202 }
203
204 @Override
205 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
206 if (!autoRead) {
207 ctx.read();
208 }
209 }
210
211 @Override
212 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
213 ctx.close();
214 }
215 };
216 TestHandler sh = new TestHandler(autoRead);
217
218 sb.childHandler(sh);
219 cb.handler(ch);
220
221 Channel sc = sb.bind().sync().channel();
222
223 Channel cc = cb.connect(sc.localAddress()).sync().channel();
224 FileRegion region = new DefaultFileRegion(
225 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
226 FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
227
228 if (!defaultFileRegion) {
229 region = new FileRegionWrapper(region);
230 emptyRegion = new FileRegionWrapper(emptyRegion);
231 }
232
233
234
235
236
237 if (voidPromise) {
238 assertEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize), cc.voidPromise()));
239 assertEquals(cc.voidPromise(), cc.write(emptyRegion, cc.voidPromise()));
240 assertEquals(cc.voidPromise(), cc.writeAndFlush(region, cc.voidPromise()));
241 } else {
242 assertNotEquals(cc.voidPromise(), cc.write(Unpooled.wrappedBuffer(data, 0, bufferSize)));
243 assertNotEquals(cc.voidPromise(), cc.write(emptyRegion));
244 assertNotEquals(cc.voidPromise(), cc.writeAndFlush(region));
245 }
246
247 while (sh.counter < data.length) {
248 if (sh.exception.get() != null) {
249 break;
250 }
251
252 Thread.sleep(50);
253 }
254
255 sh.channel.close().sync();
256 cc.close().sync();
257 sc.close().sync();
258
259 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
260 throw sh.exception.get();
261 }
262
263 if (sh.exception.get() != null) {
264 throw sh.exception.get();
265 }
266
267
268 assertThat(sh.counter, is(data.length));
269 }
270
271 private static class TestHandler extends SimpleChannelInboundHandler<ByteBuf> {
272 private final boolean autoRead;
273 volatile Channel channel;
274 final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
275 volatile int counter;
276
277 TestHandler(boolean autoRead) {
278 this.autoRead = autoRead;
279 }
280
281 @Override
282 public void channelActive(ChannelHandlerContext ctx)
283 throws Exception {
284 channel = ctx.channel();
285 if (!autoRead) {
286 ctx.read();
287 }
288 }
289
290 @Override
291 public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
292 byte[] actual = new byte[in.readableBytes()];
293 in.readBytes(actual);
294
295 int lastIdx = counter;
296 for (int i = 0; i < actual.length; i ++) {
297 assertEquals(data[i + lastIdx], actual[i]);
298 }
299 counter += actual.length;
300 }
301
302 @Override
303 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
304 if (!autoRead) {
305 ctx.read();
306 }
307 }
308
309 @Override
310 public void exceptionCaught(ChannelHandlerContext ctx,
311 Throwable cause) throws Exception {
312 if (exception.compareAndSet(null, cause)) {
313 ctx.close();
314 }
315 }
316 }
317
318 private static final class FileRegionWrapper implements FileRegion {
319 private final FileRegion region;
320
321 FileRegionWrapper(FileRegion region) {
322 this.region = region;
323 }
324
325 @Override
326 public int refCnt() {
327 return region.refCnt();
328 }
329
330 @Override
331 public long position() {
332 return region.position();
333 }
334
335 @Override
336 @Deprecated
337 public long transfered() {
338 return region.transferred();
339 }
340
341 @Override
342 public boolean release() {
343 return region.release();
344 }
345
346 @Override
347 public long transferred() {
348 return region.transferred();
349 }
350
351 @Override
352 public long count() {
353 return region.count();
354 }
355
356 @Override
357 public boolean release(int decrement) {
358 return region.release(decrement);
359 }
360
361 @Override
362 public long transferTo(WritableByteChannel target, long position) throws IOException {
363 return region.transferTo(target, position);
364 }
365
366 @Override
367 public FileRegion retain() {
368 region.retain();
369 return this;
370 }
371
372 @Override
373 public FileRegion retain(int increment) {
374 region.retain(increment);
375 return this;
376 }
377
378 @Override
379 public FileRegion touch() {
380 region.touch();
381 return this;
382 }
383
384 @Override
385 public FileRegion touch(Object hint) {
386 region.touch(hint);
387 return this;
388 }
389 }
390 }