1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.buffer.api.MemoryManager;
22 import io.netty5.channel.Channel;
23 import io.netty5.channel.ChannelHandler;
24 import io.netty5.channel.ChannelHandlerContext;
25 import io.netty5.channel.ChannelOption;
26 import io.netty5.channel.DefaultFileRegion;
27 import io.netty5.channel.FileRegion;
28 import io.netty5.channel.SimpleChannelInboundHandler;
29 import io.netty5.util.internal.PlatformDependent;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.TestInfo;
32
33 import java.io.File;
34 import java.io.FileOutputStream;
35 import java.io.IOException;
36 import java.io.RandomAccessFile;
37 import java.nio.channels.WritableByteChannel;
38 import java.util.Random;
39 import java.util.concurrent.ThreadLocalRandom;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import static org.assertj.core.api.Assertions.assertThat;
43 import static org.junit.jupiter.api.Assertions.assertEquals;
44
45 public class SocketFileRegionTest extends AbstractSocketTest {
46
47 static final byte[] data = new byte[1048576 * 10];
48
49 static {
50 ThreadLocalRandom.current().nextBytes(data);
51 }
52
53 @Test
54 public void testFileRegion(TestInfo testInfo) throws Throwable {
55 run(testInfo, this::testFileRegion);
56 }
57
58 @Test
59 public void testCustomFileRegion(TestInfo testInfo) throws Throwable {
60 run(testInfo, this::testCustomFileRegion);
61 }
62
63 @Test
64 public void testFileRegionNotAutoRead(TestInfo testInfo) throws Throwable {
65 run(testInfo, this::testFileRegionNotAutoRead);
66 }
67
68 @Test
69 public void testFileRegionCountLargerThenFile(TestInfo testInfo) throws Throwable {
70 run(testInfo, this::testFileRegionCountLargerThenFile);
71 }
72
73 public void testFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
74 testFileRegion0(sb, cb, true, true);
75 }
76
77 public void testCustomFileRegion(ServerBootstrap sb, Bootstrap cb) throws Throwable {
78 testFileRegion0(sb, cb, true, false);
79 }
80
81 public void testFileRegionNotAutoRead(ServerBootstrap sb, Bootstrap cb) throws Throwable {
82 testFileRegion0(sb, cb, false, true);
83 }
84
85 public void testFileRegionCountLargerThenFile(ServerBootstrap sb, Bootstrap cb) throws Throwable {
86 File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
87 file.deleteOnExit();
88
89 final FileOutputStream out = new FileOutputStream(file);
90 out.write(data);
91 out.close();
92
93 sb.childHandler(new SimpleChannelInboundHandler<Buffer>() {
94 @Override
95 protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
96
97 }
98 });
99 cb.handler(new ChannelHandler() { });
100
101 Channel sc = sb.bind().asStage().get();
102 Channel cc = cb.connect(sc.localAddress()).asStage().get();
103
104
105 FileRegion region = new DefaultFileRegion(
106 new RandomAccessFile(file, "r").getChannel(), 0, data.length + 1024);
107
108 Throwable result = cc.writeAndFlush(region).asStage().getCause();
109 assertThat(result).isInstanceOf(IOException.class);
110 cc.close().asStage().sync();
111 sc.close().asStage().sync();
112 }
113
114 private static void testFileRegion0(
115 ServerBootstrap sb, Bootstrap cb, final boolean autoRead, boolean defaultFileRegion)
116 throws Throwable {
117 sb.childOption(ChannelOption.AUTO_READ, autoRead);
118 cb.option(ChannelOption.AUTO_READ, autoRead);
119
120 final int bufferSize = 1024;
121 final File file = PlatformDependent.createTempFile("netty-", ".tmp", null);
122 file.deleteOnExit();
123
124 final FileOutputStream out = new FileOutputStream(file);
125 final Random random = ThreadLocalRandom.current();
126
127
128 final int startOffset = random.nextInt(8192);
129 for (int i = 0; i < startOffset; i ++) {
130 out.write(random.nextInt());
131 }
132
133
134 out.write(data, bufferSize, data.length - bufferSize);
135
136
137 for (int i = random.nextInt(8192); i > 0; i --) {
138 out.write(random.nextInt());
139 }
140
141 out.close();
142
143 ChannelHandler ch = new SimpleChannelInboundHandler<>() {
144 @Override
145 public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
146 }
147
148 @Override
149 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
150 if (!autoRead) {
151 ctx.read();
152 }
153 }
154
155 @Override
156 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
157 ctx.close();
158 }
159 };
160 TestHandler sh = new TestHandler(autoRead);
161
162 sb.childHandler(sh);
163 cb.handler(ch);
164
165 Channel sc = sb.bind().asStage().get();
166
167 Channel cc = cb.connect(sc.localAddress()).asStage().get();
168 FileRegion region = new DefaultFileRegion(
169 new RandomAccessFile(file, "r").getChannel(), startOffset, data.length - bufferSize);
170 FileRegion emptyRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), 0, 0);
171
172 if (!defaultFileRegion) {
173 region = new FileRegionWrapper(region);
174 emptyRegion = new FileRegionWrapper(emptyRegion);
175 }
176
177
178
179
180
181 try (Buffer buffer = MemoryManager.unsafeWrap(data)) {
182 cc.write(buffer.readSplit(bufferSize));
183 }
184 cc.write(emptyRegion);
185 cc.writeAndFlush(region);
186
187 while (sh.counter < data.length) {
188 if (sh.exception.get() != null) {
189 break;
190 }
191
192 try {
193 Thread.sleep(50);
194 } catch (InterruptedException e) {
195
196 }
197 }
198
199 sh.channel.close().asStage().sync();
200 cc.close().asStage().sync();
201 sc.close().asStage().sync();
202
203 if (sh.exception.get() != null && !(sh.exception.get() instanceof IOException)) {
204 throw sh.exception.get();
205 }
206
207 if (sh.exception.get() != null) {
208 throw sh.exception.get();
209 }
210
211
212 assertThat(sh.counter).isEqualTo(data.length);
213 }
214
215 private static class TestHandler extends SimpleChannelInboundHandler<Buffer> {
216 private final boolean autoRead;
217 volatile Channel channel;
218 final AtomicReference<Throwable> exception = new AtomicReference<>();
219 volatile int counter;
220
221 TestHandler(boolean autoRead) {
222 this.autoRead = autoRead;
223 }
224
225 @Override
226 public void channelActive(ChannelHandlerContext ctx)
227 throws Exception {
228 channel = ctx.channel();
229 if (!autoRead) {
230 ctx.read();
231 }
232 }
233
234 @Override
235 public void messageReceived(ChannelHandlerContext ctx, Buffer in) throws Exception {
236 byte[] actual = new byte[in.readableBytes()];
237 in.readBytes(actual, 0, actual.length);
238
239 int lastIdx = counter;
240 for (int i = 0; i < actual.length; i ++) {
241 assertEquals(data[i + lastIdx], actual[i]);
242 }
243 counter += actual.length;
244 }
245
246 @Override
247 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
248 if (!autoRead) {
249 ctx.read();
250 }
251 }
252
253 @Override
254 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
255 if (exception.compareAndSet(null, cause)) {
256 ctx.close();
257 }
258 }
259 }
260
261 private static final class FileRegionWrapper implements FileRegion {
262 private final FileRegion region;
263
264 FileRegionWrapper(FileRegion region) {
265 this.region = region;
266 }
267
268 @Override
269 public int refCnt() {
270 return region.refCnt();
271 }
272
273 @Override
274 public long position() {
275 return region.position();
276 }
277
278 @Override
279 @Deprecated
280 public long transfered() {
281 return region.transferred();
282 }
283
284 @Override
285 public boolean release() {
286 return region.release();
287 }
288
289 @Override
290 public long transferred() {
291 return region.transferred();
292 }
293
294 @Override
295 public long count() {
296 return region.count();
297 }
298
299 @Override
300 public boolean release(int decrement) {
301 return region.release(decrement);
302 }
303
304 @Override
305 public long transferTo(WritableByteChannel target, long position) throws IOException {
306 return region.transferTo(target, position);
307 }
308
309 @Override
310 public FileRegion retain() {
311 region.retain();
312 return this;
313 }
314
315 @Override
316 public FileRegion retain(int increment) {
317 region.retain(increment);
318 return this;
319 }
320
321 @Override
322 public FileRegion touch() {
323 region.touch();
324 return this;
325 }
326
327 @Override
328 public FileRegion touch(Object hint) {
329 region.touch(hint);
330 return this;
331 }
332 }
333 }