1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport.socket;
17
18 import io.netty.bootstrap.Bootstrap;
19 import io.netty.bootstrap.ServerBootstrap;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.channel.Channel;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInboundHandlerAdapter;
24 import io.netty.channel.ChannelInitializer;
25 import io.netty.channel.SimpleChannelInboundHandler;
26 import org.junit.jupiter.api.Test;
27 import org.junit.jupiter.api.TestInfo;
28 import org.junit.jupiter.api.Timeout;
29
30 import java.util.concurrent.CountDownLatch;
31 import java.util.concurrent.TimeUnit;
32 import java.util.concurrent.atomic.AtomicReference;
33
34 import static io.netty.channel.ChannelOption.AUTO_READ;
35 import static org.junit.jupiter.api.Assertions.assertEquals;
36 import static org.junit.jupiter.api.Assertions.assertNotNull;
37
38 public class SocketDataReadInitialStateTest extends AbstractSocketTest {
39 @Test
40 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
41 public void testAutoReadOffNoDataReadUntilReadCalled(TestInfo testInfo) throws Throwable {
42 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
43 @Override
44 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
45 testAutoReadOffNoDataReadUntilReadCalled(serverBootstrap, bootstrap);
46 }
47 });
48 }
49
50 public void testAutoReadOffNoDataReadUntilReadCalled(ServerBootstrap sb, Bootstrap cb) throws Throwable {
51 Channel serverChannel = null;
52 Channel clientChannel = null;
53 final int sleepMs = 100;
54 try {
55 sb.option(AUTO_READ, false);
56 sb.childOption(AUTO_READ, false);
57 cb.option(AUTO_READ, false);
58 final CountDownLatch serverReadyLatch = new CountDownLatch(1);
59 final CountDownLatch acceptorReadLatch = new CountDownLatch(1);
60 final CountDownLatch serverReadLatch = new CountDownLatch(1);
61 final CountDownLatch clientReadLatch = new CountDownLatch(1);
62 final AtomicReference<Channel> serverConnectedChannelRef = new AtomicReference<Channel>();
63
64 sb.handler(new ChannelInitializer<Channel>() {
65 @Override
66 protected void initChannel(Channel ch) {
67 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
68 @Override
69 public void channelRead(ChannelHandlerContext ctx, Object msg) {
70 acceptorReadLatch.countDown();
71 ctx.fireChannelRead(msg);
72 }
73 });
74 }
75 });
76
77 sb.childHandler(new ChannelInitializer<Channel>() {
78 @Override
79 protected void initChannel(Channel ch) {
80 serverConnectedChannelRef.set(ch);
81 ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
82 @Override
83 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
84 ctx.writeAndFlush(msg.retainedDuplicate());
85 serverReadLatch.countDown();
86 }
87 });
88 serverReadyLatch.countDown();
89 }
90 });
91
92 cb.handler(new ChannelInitializer<Channel>() {
93 @Override
94 protected void initChannel(Channel ch) {
95 ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
96 @Override
97 protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
98 clientReadLatch.countDown();
99 }
100 });
101 }
102 });
103
104 serverChannel = sb.bind().sync().channel();
105 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
106 clientChannel.writeAndFlush(clientChannel.alloc().buffer().writeZero(1)).syncUninterruptibly();
107
108
109 Thread.sleep(sleepMs);
110 assertEquals(1, acceptorReadLatch.getCount());
111 serverChannel.read();
112 serverReadyLatch.await();
113
114 Channel serverConnectedChannel = serverConnectedChannelRef.get();
115 assertNotNull(serverConnectedChannel);
116
117
118
119 Thread.sleep(sleepMs);
120 assertEquals(1, serverReadLatch.getCount());
121 serverConnectedChannel.read();
122 serverReadLatch.await();
123
124
125 Thread.sleep(sleepMs);
126 assertEquals(1, clientReadLatch.getCount());
127 clientChannel.read();
128 clientReadLatch.await();
129 } finally {
130 if (serverChannel != null) {
131 serverChannel.close().sync();
132 }
133 if (clientChannel != null) {
134 clientChannel.close().sync();
135 }
136 }
137 }
138
139 @Test
140 @Timeout(value = 10000, unit = TimeUnit.MILLISECONDS)
141 public void testAutoReadOnDataReadImmediately(TestInfo testInfo) throws Throwable {
142 run(testInfo, new Runner<ServerBootstrap, Bootstrap>() {
143 @Override
144 public void run(ServerBootstrap serverBootstrap, Bootstrap bootstrap) throws Throwable {
145 testAutoReadOnDataReadImmediately(serverBootstrap, bootstrap);
146 }
147 });
148 }
149
150 public void testAutoReadOnDataReadImmediately(ServerBootstrap sb, Bootstrap cb) throws Throwable {
151 Channel serverChannel = null;
152 Channel clientChannel = null;
153 try {
154 sb.option(AUTO_READ, true);
155 sb.childOption(AUTO_READ, true);
156 cb.option(AUTO_READ, true);
157 final CountDownLatch serverReadLatch = new CountDownLatch(1);
158 final CountDownLatch clientReadLatch = new CountDownLatch(1);
159
160 sb.childHandler(new ChannelInitializer<Channel>() {
161 @Override
162 protected void initChannel(Channel ch) {
163 ch.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
164 @Override
165 protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) {
166 ctx.writeAndFlush(msg.retainedDuplicate());
167 serverReadLatch.countDown();
168 }
169 });
170 }
171 });
172
173 cb.handler(new ChannelInitializer<Channel>() {
174 @Override
175 protected void initChannel(Channel ch) {
176 ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
177 @Override
178 protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
179 clientReadLatch.countDown();
180 }
181 });
182 }
183 });
184
185 serverChannel = sb.bind().sync().channel();
186 clientChannel = cb.connect(serverChannel.localAddress()).sync().channel();
187 clientChannel.writeAndFlush(clientChannel.alloc().buffer().writeZero(1)).syncUninterruptibly();
188 serverReadLatch.await();
189 clientReadLatch.await();
190 } finally {
191 if (serverChannel != null) {
192 serverChannel.close().sync();
193 }
194 if (clientChannel != null) {
195 clientChannel.close().sync();
196 }
197 }
198 }
199 }