View Javadoc
1   /*
2    * Copyright 2018 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty5.testsuite.transport.socket;
17  
18  import io.netty5.bootstrap.Bootstrap;
19  import io.netty5.bootstrap.ServerBootstrap;
20  import io.netty5.buffer.api.Buffer;
21  import io.netty5.util.Resource;
22  import io.netty5.channel.Channel;
23  import io.netty5.channel.ChannelHandler;
24  import io.netty5.channel.ChannelHandlerContext;
25  import io.netty5.channel.ChannelInitializer;
26  import io.netty5.channel.ChannelOption;
27  import io.netty5.channel.WriteBufferWaterMark;
28  import org.junit.jupiter.api.Test;
29  import org.junit.jupiter.api.TestInfo;
30  import org.junit.jupiter.api.Timeout;
31  
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.TimeUnit;
34  
35  public class SocketConditionalWritabilityTest extends AbstractSocketTest {
36      @Test
37      @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
38      public void testConditionalWritability(TestInfo testInfo) throws Throwable {
39          run(testInfo, this::testConditionalWritability);
40      }
41  
42      public void testConditionalWritability(ServerBootstrap sb, Bootstrap cb) throws Throwable {
43          Channel serverChannel = null;
44          Channel clientChannel = null;
45          try {
46              final int expectedBytes = 100 * 1024 * 1024;
47              final int maxWriteChunkSize = 16 * 1024;
48              final CountDownLatch latch = new CountDownLatch(1);
49              sb.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 16 * 1024));
50              sb.childHandler(new ChannelInitializer<>() {
51                  @Override
52                  protected void initChannel(Channel ch) {
53                      ch.pipeline().addLast(new ChannelHandler() {
54                          private int bytesWritten;
55  
56                          @Override
57                          public void channelRead(ChannelHandlerContext ctx, Object msg) {
58                              Resource.dispose(msg);
59                              writeRemainingBytes(ctx);
60                          }
61  
62                          @Override
63                          public void flush(ChannelHandlerContext ctx) {
64                              if (ctx.channel().isWritable()) {
65                                  writeRemainingBytes(ctx);
66                              } else {
67                                  ctx.flush();
68                              }
69                          }
70  
71                          @Override
72                          public void channelWritabilityChanged(ChannelHandlerContext ctx) {
73                              if (ctx.channel().isWritable()) {
74                                  writeRemainingBytes(ctx);
75                              }
76                              ctx.fireChannelWritabilityChanged();
77                          }
78  
79                          private void writeRemainingBytes(ChannelHandlerContext ctx) {
80                              while (ctx.channel().isWritable() && bytesWritten < expectedBytes) {
81                                  int chunkSize = Math.min(expectedBytes - bytesWritten, maxWriteChunkSize);
82                                  bytesWritten += chunkSize;
83                                  Buffer buffer = ctx.bufferAllocator().allocate(chunkSize);
84                                  buffer.skipWritableBytes(chunkSize);
85                                  ctx.write(buffer);
86                              }
87                              ctx.flush();
88                          }
89                      });
90                  }
91              });
92  
93              serverChannel = sb.bind().asStage().get();
94  
95              cb.handler(new ChannelInitializer<>() {
96                  @Override
97                  protected void initChannel(Channel ch) {
98                      ch.pipeline().addLast(new ChannelHandler() {
99                          private int totalRead;
100 
101                         @Override
102                         public void channelActive(ChannelHandlerContext ctx) {
103                             ctx.writeAndFlush(ctx.bufferAllocator().allocate(1).writeByte((byte) 0));
104                         }
105 
106                         @Override
107                         public void channelRead(ChannelHandlerContext ctx, Object msg) {
108                             if (msg instanceof Buffer) {
109                                 try (Buffer buffer = (Buffer) msg) {
110                                     totalRead += buffer.readableBytes();
111                                     if (totalRead == expectedBytes) {
112                                         latch.countDown();
113                                     }
114                                 }
115                             } else {
116                                 Resource.dispose(msg);
117                             }
118                         }
119                     });
120                 }
121             });
122             clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
123             latch.await();
124         } finally {
125             if (serverChannel != null) {
126                 serverChannel.close();
127             }
128             if (clientChannel != null) {
129                 clientChannel.close();
130             }
131         }
132     }
133 }