1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.buffer.api.BufferAllocator;
22 import io.netty5.buffer.api.CompositeBuffer;
23 import io.netty5.buffer.api.DefaultBufferAllocators;
24 import io.netty5.channel.Channel;
25 import io.netty5.channel.ChannelFutureListeners;
26 import io.netty5.channel.ChannelHandler;
27 import io.netty5.channel.ChannelHandlerContext;
28 import io.netty5.channel.ChannelInitializer;
29 import io.netty5.channel.ChannelOption;
30 import org.junit.jupiter.api.Test;
31 import org.junit.jupiter.api.TestInfo;
32 import org.junit.jupiter.api.Timeout;
33
34 import java.io.IOException;
35 import java.util.Random;
36 import java.util.concurrent.CountDownLatch;
37 import java.util.concurrent.TimeUnit;
38 import java.util.concurrent.atomic.AtomicReference;
39
40 import static java.util.Arrays.asList;
41 import static org.junit.jupiter.api.Assertions.assertEquals;
42
43 public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
44 private static final int EXPECTED_BYTES = 20;
45
46 @Test
47 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
48 public void testSingleCompositeBufferWrite(TestInfo testInfo) throws Throwable {
49 run(testInfo, this::testSingleCompositeBufferWrite);
50 }
51
52 public void testSingleCompositeBufferWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
53 Channel serverChannel = null;
54 Channel clientChannel = null;
55 try {
56 final CountDownLatch latch = new CountDownLatch(1);
57 final AtomicReference<Object> clientReceived = new AtomicReference<>();
58 sb.childHandler(new ChannelInitializer<>() {
59 @Override
60 protected void initChannel(Channel ch) throws Exception {
61 ch.pipeline().addLast(new ChannelHandler() {
62 @Override
63 public void channelActive(ChannelHandlerContext ctx) throws Exception {
64 ctx.writeAndFlush(newCompositeBuffer(ctx.bufferAllocator()))
65 .addListener(ctx, ChannelFutureListeners.CLOSE);
66 }
67 });
68 }
69 });
70 cb.handler(new ChannelInitializer<>() {
71 @Override
72 protected void initChannel(Channel ch) throws Exception {
73 ch.pipeline().addLast(new ChannelHandler() {
74 private Buffer aggregator;
75
76 @Override
77 public void handlerAdded(ChannelHandlerContext ctx) {
78 aggregator = ctx.bufferAllocator().allocate(EXPECTED_BYTES);
79 }
80
81 @Override
82 public void channelRead(ChannelHandlerContext ctx, Object msg) {
83 if (msg instanceof Buffer) {
84 try (Buffer buf = (Buffer) msg) {
85 aggregator.writeBytes(buf);
86 }
87 }
88 }
89
90 @Override
91 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
92
93 if (!(cause instanceof IOException)) {
94 closeAggregator();
95 clientReceived.set(cause);
96 latch.countDown();
97 }
98 }
99
100 @Override
101 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
102 if (clientReceived.compareAndSet(null, aggregator)) {
103 try {
104 assertEquals(EXPECTED_BYTES, aggregator.readableBytes());
105 } catch (Throwable cause) {
106 closeAggregator();
107 clientReceived.set(cause);
108 } finally {
109 latch.countDown();
110 }
111 }
112 }
113
114 private void closeAggregator() {
115 if (aggregator != null) {
116 aggregator.close();
117 aggregator = null;
118 }
119 }
120 });
121 }
122 });
123
124 serverChannel = sb.bind().asStage().get();
125 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
126
127 try (Buffer expected = newCompositeBuffer(clientChannel.bufferAllocator())) {
128 latch.await();
129 Object received = clientReceived.get();
130 if (received instanceof Buffer) {
131 try (Buffer actual = (Buffer) received) {
132 assertEquals(expected, actual);
133 }
134 } else {
135 throw (Throwable) received;
136 }
137 }
138 } finally {
139 if (clientChannel != null) {
140 clientChannel.close().asStage().sync();
141 }
142 if (serverChannel != null) {
143 serverChannel.close().asStage().sync();
144 }
145 }
146 }
147
148 @Test
149 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
150 public void testCompositeBufferPartialWriteDoesNotCorruptData(TestInfo testInfo) throws Throwable {
151 run(testInfo, this::testCompositeBufferPartialWriteDoesNotCorruptData);
152 }
153
154 public void testCompositeBufferPartialWriteDoesNotCorruptData(ServerBootstrap sb, Bootstrap cb) throws Throwable {
155
156
157
158
159 Channel serverChannel = null;
160 Channel clientChannel = null;
161 BufferAllocator alloc = DefaultBufferAllocators.preferredAllocator();
162 final int soSndBuf = 1024;
163 try (Buffer expectedContent = alloc.allocate(soSndBuf * 2)) {
164 Random r = new Random();
165 expectedContent.writeBytes(newRandomBytes(expectedContent.writableBytes(), r));
166 final CountDownLatch latch = new CountDownLatch(1);
167 final AtomicReference<Object> clientReceived = new AtomicReference<>();
168 sb.childOption(ChannelOption.SO_SNDBUF, soSndBuf)
169 .childHandler(new ChannelInitializer<>() {
170 @Override
171 protected void initChannel(Channel ch) throws Exception {
172 ch.pipeline().addLast(new ChannelHandler() {
173 @Override
174 public void channelActive(ChannelHandlerContext ctx) throws Exception {
175 compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(
176 ctx.channel(), soSndBuf);
177 Buffer contents = expectedContent.copy();
178
179 ctx.write(contents.readSplit(soSndBuf - 100));
180
181
182 CompositeBuffer compositeBuffer = ctx.bufferAllocator().compose(asList(
183 contents.readSplit(50).send(),
184 contents.readSplit(200).send()));
185 ctx.write(compositeBuffer);
186
187
188
189
190 ctx.write(contents.readSplit(50));
191
192
193 ctx.writeAndFlush(contents).addListener(ctx, ChannelFutureListeners.CLOSE);
194 }
195
196 @Override
197 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
198
199
200 if (!(cause instanceof IOException)) {
201 clientReceived.set(cause);
202 latch.countDown();
203 }
204 }
205 });
206 }
207 });
208 cb.handler(new ChannelInitializer<>() {
209 @Override
210 protected void initChannel(Channel ch) throws Exception {
211 ch.pipeline().addLast(new ChannelHandler() {
212 private Buffer aggregator;
213
214 @Override
215 public void handlerAdded(ChannelHandlerContext ctx) {
216 aggregator = ctx.bufferAllocator().allocate(expectedContent.readableBytes());
217 }
218
219 @Override
220 public void channelRead(ChannelHandlerContext ctx, Object msg) {
221 if (msg instanceof Buffer) {
222 try (Buffer buf = (Buffer) msg) {
223 aggregator.writeBytes(buf);
224 }
225 }
226 }
227
228 @Override
229 public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
230
231
232 if (!(cause instanceof IOException)) {
233 closeAggregator();
234 clientReceived.set(cause);
235 latch.countDown();
236 }
237 }
238
239 @Override
240 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
241 if (clientReceived.compareAndSet(null, aggregator)) {
242 try {
243 assertEquals(expectedContent.readableBytes(), aggregator.readableBytes());
244 } catch (Throwable cause) {
245 closeAggregator();
246 clientReceived.set(cause);
247 } finally {
248 latch.countDown();
249 }
250 }
251 }
252
253 private void closeAggregator() {
254 if (aggregator != null) {
255 aggregator.close();
256 aggregator = null;
257 }
258 }
259 });
260 }
261 });
262
263 serverChannel = sb.bind().asStage().get();
264 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
265
266 latch.await();
267 Object received = clientReceived.get();
268 if (received instanceof Buffer) {
269 try (Buffer actual = (Buffer) received) {
270 assertEquals(expectedContent, actual);
271 }
272 } else {
273 throw (Throwable) received;
274 }
275 } finally {
276 if (clientChannel != null) {
277 clientChannel.close().asStage().sync();
278 }
279 if (serverChannel != null) {
280 serverChannel.close().asStage().sync();
281 }
282 }
283 }
284
285 protected void compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(Channel channel,
286 int soSndBuf) {
287 }
288
289 private static Buffer newCompositeBuffer(BufferAllocator alloc) {
290 CompositeBuffer compositeBuffer = alloc.compose(asList(
291 alloc.allocate(4).writeInt(100).send(),
292 alloc.allocate(8).writeLong(123).send(),
293 alloc.allocate(8).writeLong(456).send()));
294 assertEquals(EXPECTED_BYTES, compositeBuffer.readableBytes());
295 return compositeBuffer;
296 }
297
298 private static byte[] newRandomBytes(int size, Random r) {
299 byte[] bytes = new byte[size];
300 r.nextBytes(bytes);
301 return bytes;
302 }
303 }