View Javadoc
1   /*
2    * Copyright 2017 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.buffer.api.BufferAllocator;
22  import io.netty5.buffer.api.CompositeBuffer;
23  import io.netty5.buffer.api.DefaultBufferAllocators;
24  import io.netty5.channel.Channel;
25  import io.netty5.channel.ChannelFutureListeners;
26  import io.netty5.channel.ChannelHandler;
27  import io.netty5.channel.ChannelHandlerContext;
28  import io.netty5.channel.ChannelInitializer;
29  import io.netty5.channel.ChannelOption;
30  import org.junit.jupiter.api.Test;
31  import org.junit.jupiter.api.TestInfo;
32  import org.junit.jupiter.api.Timeout;
33  
34  import java.io.IOException;
35  import java.util.Random;
36  import java.util.concurrent.CountDownLatch;
37  import java.util.concurrent.TimeUnit;
38  import java.util.concurrent.atomic.AtomicReference;
39  
40  import static java.util.Arrays.asList;
41  import static org.junit.jupiter.api.Assertions.assertEquals;
42  
43  public class CompositeBufferGatheringWriteTest extends AbstractSocketTest {
44      private static final int EXPECTED_BYTES = 20;
45  
46      @Test
47      @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
48      public void testSingleCompositeBufferWrite(TestInfo testInfo) throws Throwable {
49          run(testInfo, this::testSingleCompositeBufferWrite);
50      }
51  
52      public void testSingleCompositeBufferWrite(ServerBootstrap sb, Bootstrap cb) throws Throwable {
53          Channel serverChannel = null;
54          Channel clientChannel = null;
55          try {
56              final CountDownLatch latch = new CountDownLatch(1);
57              final AtomicReference<Object> clientReceived = new AtomicReference<>();
58              sb.childHandler(new ChannelInitializer<>() {
59                  @Override
60                  protected void initChannel(Channel ch) throws Exception {
61                      ch.pipeline().addLast(new ChannelHandler() {
62                          @Override
63                          public void channelActive(ChannelHandlerContext ctx) throws Exception {
64                              ctx.writeAndFlush(newCompositeBuffer(ctx.bufferAllocator()))
65                                      .addListener(ctx, ChannelFutureListeners.CLOSE);
66                          }
67                      });
68                  }
69              });
70              cb.handler(new ChannelInitializer<>() {
71                  @Override
72                  protected void initChannel(Channel ch) throws Exception {
73                      ch.pipeline().addLast(new ChannelHandler() {
74                          private Buffer aggregator;
75  
76                          @Override
77                          public void handlerAdded(ChannelHandlerContext ctx) {
78                              aggregator = ctx.bufferAllocator().allocate(EXPECTED_BYTES);
79                          }
80  
81                          @Override
82                          public void channelRead(ChannelHandlerContext ctx, Object msg) {
83                              if (msg instanceof Buffer) {
84                                  try (Buffer buf = (Buffer) msg) {
85                                      aggregator.writeBytes(buf);
86                                  }
87                              }
88                          }
89  
90                          @Override
91                          public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
92                              // IOException is fine as it will also close the channel and may just be a connection reset.
93                              if (!(cause instanceof IOException)) {
94                                  closeAggregator();
95                                  clientReceived.set(cause);
96                                  latch.countDown();
97                              }
98                          }
99  
100                         @Override
101                         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
102                             if (clientReceived.compareAndSet(null, aggregator)) {
103                                 try {
104                                     assertEquals(EXPECTED_BYTES, aggregator.readableBytes());
105                                 } catch (Throwable cause) {
106                                     closeAggregator();
107                                     clientReceived.set(cause);
108                                 } finally {
109                                     latch.countDown();
110                                 }
111                             }
112                         }
113 
114                         private void closeAggregator() {
115                             if (aggregator != null) {
116                                 aggregator.close();
117                                 aggregator = null;
118                             }
119                         }
120                     });
121                 }
122             });
123 
124             serverChannel = sb.bind().asStage().get();
125             clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
126 
127             try (Buffer expected = newCompositeBuffer(clientChannel.bufferAllocator())) {
128                 latch.await();
129                 Object received = clientReceived.get();
130                 if (received instanceof Buffer) {
131                     try (Buffer actual = (Buffer) received) {
132                         assertEquals(expected, actual);
133                     }
134                 } else {
135                     throw (Throwable) received;
136                 }
137             }
138         } finally {
139             if (clientChannel != null) {
140                 clientChannel.close().asStage().sync();
141             }
142             if (serverChannel != null) {
143                 serverChannel.close().asStage().sync();
144             }
145         }
146     }
147 
148     @Test
149     @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
150     public void testCompositeBufferPartialWriteDoesNotCorruptData(TestInfo testInfo) throws Throwable {
151         run(testInfo, this::testCompositeBufferPartialWriteDoesNotCorruptData);
152     }
153 
154     public void testCompositeBufferPartialWriteDoesNotCorruptData(ServerBootstrap sb, Bootstrap cb) throws Throwable {
155         // The scenario is the following:
156         // Limit SO_SNDBUF so that a single buffer can be written, and part of a CompositeByteBuf at the same time.
157         // We then write the single buffer, the CompositeBuffer, and another single buffer and verify the data is not
158         // corrupted when we read it on the other side.
159         Channel serverChannel = null;
160         Channel clientChannel = null;
161         BufferAllocator alloc = DefaultBufferAllocators.preferredAllocator();
162         final int soSndBuf = 1024;
163         try (Buffer expectedContent = alloc.allocate(soSndBuf * 2)) {
164             Random r = new Random();
165             expectedContent.writeBytes(newRandomBytes(expectedContent.writableBytes(), r));
166             final CountDownLatch latch = new CountDownLatch(1);
167             final AtomicReference<Object> clientReceived = new AtomicReference<>();
168             sb.childOption(ChannelOption.SO_SNDBUF, soSndBuf)
169               .childHandler(new ChannelInitializer<>() {
170                   @Override
171                   protected void initChannel(Channel ch) throws Exception {
172                       ch.pipeline().addLast(new ChannelHandler() {
173                           @Override
174                           public void channelActive(ChannelHandlerContext ctx) throws Exception {
175                               compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(
176                                       ctx.channel(), soSndBuf);
177                               Buffer contents = expectedContent.copy();
178                               // First single write
179                               ctx.write(contents.readSplit(soSndBuf - 100));
180 
181                               // Build and write CompositeBuffer
182                               CompositeBuffer compositeBuffer = ctx.bufferAllocator().compose(asList(
183                                       contents.readSplit(50).send(),
184                                       contents.readSplit(200).send()));
185                               ctx.write(compositeBuffer);
186 
187                               // Write a single buffer that is smaller than the second component of the
188                               // CompositeBuffer above but small enough to fit in the remaining space allowed by the
189                               // soSndBuf amount.
190                               ctx.write(contents.readSplit(50));
191 
192                               // Write the remainder of the content
193                               ctx.writeAndFlush(contents).addListener(ctx, ChannelFutureListeners.CLOSE);
194                           }
195 
196                           @Override
197                           public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
198                               // IOException is fine as it will also close the channel
199                               // and may just be a connection reset.
200                               if (!(cause instanceof IOException)) {
201                                   clientReceived.set(cause);
202                                   latch.countDown();
203                               }
204                           }
205                       });
206                   }
207               });
208             cb.handler(new ChannelInitializer<>() {
209                 @Override
210                 protected void initChannel(Channel ch) throws Exception {
211                     ch.pipeline().addLast(new ChannelHandler() {
212                         private Buffer aggregator;
213 
214                         @Override
215                         public void handlerAdded(ChannelHandlerContext ctx) {
216                             aggregator = ctx.bufferAllocator().allocate(expectedContent.readableBytes());
217                         }
218 
219                         @Override
220                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
221                             if (msg instanceof Buffer) {
222                                 try (Buffer buf = (Buffer) msg) {
223                                     aggregator.writeBytes(buf);
224                                 }
225                             }
226                         }
227 
228                         @Override
229                         public void channelExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
230                             // IOException is fine as it will also close the channel
231                             // and may just be a connection reset.
232                             if (!(cause instanceof IOException)) {
233                                 closeAggregator();
234                                 clientReceived.set(cause);
235                                 latch.countDown();
236                             }
237                         }
238 
239                         @Override
240                         public void channelInactive(ChannelHandlerContext ctx) throws Exception {
241                             if (clientReceived.compareAndSet(null, aggregator)) {
242                                 try {
243                                     assertEquals(expectedContent.readableBytes(), aggregator.readableBytes());
244                                 } catch (Throwable cause) {
245                                     closeAggregator();
246                                     clientReceived.set(cause);
247                                 } finally {
248                                     latch.countDown();
249                                 }
250                             }
251                         }
252 
253                         private void closeAggregator() {
254                             if (aggregator != null) {
255                                 aggregator.close();
256                                 aggregator = null;
257                             }
258                         }
259                     });
260                 }
261             });
262 
263             serverChannel = sb.bind().asStage().get();
264             clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
265 
266             latch.await();
267             Object received = clientReceived.get();
268             if (received instanceof Buffer) {
269                 try (Buffer actual = (Buffer) received) {
270                     assertEquals(expectedContent, actual);
271                 }
272             } else {
273                 throw (Throwable) received;
274             }
275         } finally {
276             if (clientChannel != null) {
277                 clientChannel.close().asStage().sync();
278             }
279             if (serverChannel != null) {
280                 serverChannel.close().asStage().sync();
281             }
282         }
283     }
284 
285     protected void compositeBufferPartialWriteDoesNotCorruptDataInitServerConfig(Channel channel,
286                                                                                  int soSndBuf) {
287     }
288 
289     private static Buffer newCompositeBuffer(BufferAllocator alloc) {
290         CompositeBuffer compositeBuffer = alloc.compose(asList(
291                 alloc.allocate(4).writeInt(100).send(),
292                 alloc.allocate(8).writeLong(123).send(),
293                 alloc.allocate(8).writeLong(456).send()));
294         assertEquals(EXPECTED_BYTES, compositeBuffer.readableBytes());
295         return compositeBuffer;
296     }
297 
298     private static byte[] newRandomBytes(int size, Random r) {
299         byte[] bytes = new byte[size];
300         r.nextBytes(bytes);
301         return bytes;
302     }
303 }