1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty5.testsuite.transport.socket;
17
18 import io.netty5.bootstrap.Bootstrap;
19 import io.netty5.bootstrap.ServerBootstrap;
20 import io.netty5.buffer.api.Buffer;
21 import io.netty5.channel.Channel;
22 import io.netty5.channel.ChannelHandler;
23 import io.netty5.channel.ChannelHandlerContext;
24 import io.netty5.channel.ChannelInitializer;
25 import io.netty5.channel.SimpleChannelInboundHandler;
26 import org.junit.jupiter.api.Test;
27 import org.junit.jupiter.api.TestInfo;
28 import org.junit.jupiter.api.Timeout;
29
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import static io.netty5.channel.ChannelOption.AUTO_READ;
35 import static org.junit.jupiter.api.Assertions.assertEquals;
36 import static org.junit.jupiter.api.Assertions.assertNotNull;
37
38 public class SocketDataReadInitialStateTest extends AbstractSocketTest {
39 @Test
40 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
41 public void testAutoReadOffNoDataReadUntilReadCalled(TestInfo testInfo) throws Throwable {
42 run(testInfo, this::testAutoReadOffNoDataReadUntilReadCalled);
43 }
44
45 public void testAutoReadOffNoDataReadUntilReadCalled(ServerBootstrap sb, Bootstrap cb) throws Throwable {
46 Channel serverChannel = null;
47 Channel clientChannel = null;
48 final int sleepMs = 100;
49 try {
50 sb.option(AUTO_READ, false);
51 sb.childOption(AUTO_READ, false);
52 cb.option(AUTO_READ, false);
53 final CountDownLatch serverReadyLatch = new CountDownLatch(1);
54 final CountDownLatch acceptorReadLatch = new CountDownLatch(1);
55 final CountDownLatch serverReadLatch = new CountDownLatch(1);
56 final CountDownLatch clientReadLatch = new CountDownLatch(1);
57 final AtomicReference<Channel> serverConnectedChannelRef = new AtomicReference<>();
58
59 sb.handler(new ChannelInitializer<>() {
60 @Override
61 protected void initChannel(Channel ch) {
62 ch.pipeline().addLast(new ChannelHandler() {
63 @Override
64 public void channelRead(ChannelHandlerContext ctx, Object msg) {
65 acceptorReadLatch.countDown();
66 ctx.fireChannelRead(msg);
67 }
68 });
69 }
70 });
71
72 sb.childHandler(new ChannelInitializer<>() {
73 @Override
74 protected void initChannel(Channel ch) {
75 serverConnectedChannelRef.set(ch);
76 ch.pipeline().addLast(new SimpleChannelInboundHandler<Buffer>() {
77 @Override
78 protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
79 ctx.writeAndFlush(msg.split());
80 serverReadLatch.countDown();
81 }
82 });
83 serverReadyLatch.countDown();
84 }
85 });
86
87 cb.handler(new ChannelInitializer<>() {
88 @Override
89 protected void initChannel(Channel ch) {
90 ch.pipeline().addLast(new SimpleChannelInboundHandler<>() {
91 @Override
92 protected void messageReceived(ChannelHandlerContext ctx, Object msg) {
93 clientReadLatch.countDown();
94 }
95 });
96 }
97 });
98
99 serverChannel = sb.bind().asStage().get();
100 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
101 clientChannel.writeAndFlush(clientChannel.bufferAllocator().copyOf(new byte[] {0})).asStage().sync();
102
103
104 Thread.sleep(sleepMs);
105 assertEquals(1, acceptorReadLatch.getCount());
106 serverChannel.read();
107 serverReadyLatch.await();
108
109 Channel serverConnectedChannel = serverConnectedChannelRef.get();
110 assertNotNull(serverConnectedChannel);
111
112
113
114 Thread.sleep(sleepMs);
115 assertEquals(1, serverReadLatch.getCount());
116 serverConnectedChannel.read();
117 serverReadLatch.await();
118
119
120 Thread.sleep(sleepMs);
121 assertEquals(1, clientReadLatch.getCount());
122 clientChannel.read();
123 clientReadLatch.await();
124 } finally {
125 if (serverChannel != null) {
126 serverChannel.close().asStage().sync();
127 }
128 if (clientChannel != null) {
129 clientChannel.close().asStage().sync();
130 }
131 }
132 }
133
134 @Test
135 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
136 public void testAutoReadOnDataReadImmediately(TestInfo testInfo) throws Throwable {
137 run(testInfo, this::testAutoReadOnDataReadImmediately);
138 }
139
140 public void testAutoReadOnDataReadImmediately(ServerBootstrap sb, Bootstrap cb) throws Throwable {
141 Channel serverChannel = null;
142 Channel clientChannel = null;
143 try {
144 sb.option(AUTO_READ, true);
145 sb.childOption(AUTO_READ, true);
146 cb.option(AUTO_READ, true);
147 final CountDownLatch serverReadLatch = new CountDownLatch(1);
148 final CountDownLatch clientReadLatch = new CountDownLatch(1);
149
150 sb.childHandler(new ChannelInitializer<>() {
151 @Override
152 protected void initChannel(Channel ch) {
153 ch.pipeline().addLast(new SimpleChannelInboundHandler<Buffer>() {
154 @Override
155 protected void messageReceived(ChannelHandlerContext ctx, Buffer msg) {
156 ctx.writeAndFlush(msg.split());
157 serverReadLatch.countDown();
158 }
159 });
160 }
161 });
162
163 cb.handler(new ChannelInitializer<>() {
164 @Override
165 protected void initChannel(Channel ch) {
166 ch.pipeline().addLast(new SimpleChannelInboundHandler<>() {
167 @Override
168 protected void messageReceived(ChannelHandlerContext ctx, Object msg) {
169 clientReadLatch.countDown();
170 }
171 });
172 }
173 });
174
175 serverChannel = sb.bind().asStage().get();
176 clientChannel = cb.connect(serverChannel.localAddress()).asStage().get();
177 clientChannel.writeAndFlush(clientChannel.bufferAllocator().copyOf(new byte[] {0})).asStage().sync();
178 serverReadLatch.await();
179 clientReadLatch.await();
180 } finally {
181 if (serverChannel != null) {
182 serverChannel.close().asStage().sync();
183 }
184 if (clientChannel != null) {
185 clientChannel.close().asStage().sync();
186 }
187 }
188 }
189 }