1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.util.Resource;
22 import io.netty5.channel.Channel;
23 import io.netty5.channel.ChannelHandler;
24 import io.netty5.channel.ChannelHandlerContext;
25 import io.netty5.channel.ChannelInitializer;
26 import io.netty5.channel.ChannelOption;
27 import io.netty5.channel.WriteBufferWaterMark;
28 import org.junit.jupiter.api.Test;
29 import org.junit.jupiter.api.TestInfo;
30 import org.junit.jupiter.api.Timeout;
31
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34
35 public class SocketConditionalWritabilityTest extends AbstractSocketTest {
36 @Test
37 @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
38 public void testConditionalWritability(TestInfo testInfo) throws Throwable {
39 run(testInfo, this::testConditionalWritability);
40 }
41
42 public void testConditionalWritability(ServerBootstrap sb, Bootstrap cb) throws Throwable {
43 Channel serverChannel = null;
44 Channel clientChannel = null;
45 try {
46 final int expectedBytes = 100 * 1024 * 1024;
47 final int maxWriteChunkSize = 16 * 1024;
48 final CountDownLatch latch = new CountDownLatch(1);
49 sb.childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 16 * 1024));
50 sb.childHandler(new ChannelInitializer<>() {
51 @Override
52 protected void initChannel(Channel ch) {
53 ch.pipeline().addLast(new ChannelHandler() {
54 private int bytesWritten;
55
56 @Override
57 public void channelRead(ChannelHandlerContext ctx, Object msg) {
58 Resource.dispose(msg);
59 writeRemainingBytes(ctx);
60 }
61
62 @Override
63 public void flush(ChannelHandlerContext ctx) {
64 if (ctx.channel().isWritable()) {
65 writeRemainingBytes(ctx);
66 } else {
67 ctx.flush();
68 }
69 }
70
71 @Override
72 public void channelWritabilityChanged(ChannelHandlerContext ctx) {
73 if (ctx.channel().isWritable()) {
74 writeRemainingBytes(ctx);
75 }
76 ctx.fireChannelWritabilityChanged();
77 }
78
79 private void writeRemainingBytes(ChannelHandlerContext ctx) {
80 while (ctx.channel().isWritable() && bytesWritten < expectedBytes) {
81 int chunkSize = Math.min(expectedBytes - bytesWritten, maxWriteChunkSize);
82 bytesWritten += chunkSize;
83 Buffer buffer = ctx.bufferAllocator().allocate(chunkSize);
84 buffer.skipWritableBytes(chunkSize);
85 ctx.write(buffer);
86 }
87 ctx.flush();
88 }
89 });
90 }
91 });
92
93 serverChannel = sb.bind().asStage().get();
94
95 cb.handler(new ChannelInitializer<>() {
96 @Override
97 protected void initChannel(Channel ch) {
98 ch.pipeline().addLast(new ChannelHandler() {
99 private int totalRead;
100
101 @Override
102 public void channelActive(ChannelHandlerContext ctx) {
103 ctx.writeAndFlush(ctx.bufferAllocator().allocate(1).writeByte((byte) 0));
104 }
105
106 @Override
107 public void channelRead(ChannelHandlerContext ctx, Object msg) {
108 if (msg instanceof Buffer) {
109 try (Buffer buffer = (Buffer) msg) {
110 totalRead += buffer.readableBytes();
111 if (totalRead == expectedBytes) {
112 latch.countDown();
113 }
114 }
115 } else {
116 Resource.dispose(msg);
117 }
118 }
119 });
120 }
121 });
122 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
123 latch.await();
124 } finally {
125 if (serverChannel != null) {
126 serverChannel.close();
127 }
128 if (clientChannel != null) {
129 clientChannel.close();
130 }
131 }
132 }
133 }