1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.ServerChannel;
25 import io.netty.channel.SingleThreadEventLoop;
26 import io.netty.channel.local.LocalAddress;
27 import io.netty.channel.local.LocalServerChannel;
28 import io.netty.util.concurrent.EventExecutor;
29 import io.netty.util.concurrent.Future;
30 import io.netty.util.concurrent.Promise;
31 import io.netty.util.concurrent.ScheduledFuture;
32 import io.netty.util.internal.PlatformDependent;
33 import org.junit.jupiter.api.Test;
34 import org.junit.jupiter.api.Timeout;
35 import org.junit.jupiter.api.function.Executable;
36
37 import java.util.ArrayList;
38 import java.util.HashSet;
39 import java.util.Iterator;
40 import java.util.List;
41 import java.util.NoSuchElementException;
42 import java.util.Set;
43 import java.util.concurrent.Callable;
44 import java.util.concurrent.CountDownLatch;
45 import java.util.concurrent.RejectedExecutionException;
46 import java.util.concurrent.TimeUnit;
47
48 import static java.util.concurrent.TimeUnit.MILLISECONDS;
49 import static org.junit.jupiter.api.Assertions.assertEquals;
50 import static org.junit.jupiter.api.Assertions.assertFalse;
51 import static org.junit.jupiter.api.Assertions.assertNotNull;
52 import static org.junit.jupiter.api.Assertions.assertThrows;
53 import static org.junit.jupiter.api.Assertions.assertTrue;
54 import static org.junit.jupiter.api.Assertions.fail;
55 import static org.junit.jupiter.api.Assumptions.assumeTrue;
56
57 public abstract class AbstractSingleThreadEventLoopTest {
58 @Test
59 @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
60 public void testChannelsRegistered() throws Exception {
61 EventLoopGroup group = newEventLoopGroup();
62 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
63
64 try {
65 final Channel ch1 = newChannel();
66 final Channel ch2 = newChannel();
67
68 int rc = registeredChannels(loop);
69 boolean channelCountSupported = rc != -1;
70
71 if (channelCountSupported) {
72 assertEquals(0, registeredChannels(loop));
73 }
74
75 assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
76 assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
77 if (channelCountSupported) {
78 checkNumRegisteredChannels(loop, 2);
79 }
80
81 assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
82 if (channelCountSupported) {
83 checkNumRegisteredChannels(loop, 1);
84 }
85 } finally {
86 group.shutdownGracefully();
87 }
88 }
89
90 private static void checkNumRegisteredChannels(SingleThreadEventLoop loop, int numChannels) throws Exception {
91
92 while (registeredChannels(loop) != numChannels) {
93 Thread.sleep(50);
94 }
95 }
96
97
98 private static int registeredChannels(final SingleThreadEventLoop loop) throws Exception {
99 return loop.submit(new Callable<Integer>() {
100 @Override
101 public Integer call() {
102 return loop.registeredChannels();
103 }
104 }).get(1, TimeUnit.SECONDS);
105 }
106
107 @Test
108 @SuppressWarnings("deprecation")
109 public void shutdownBeforeStart() throws Exception {
110 EventLoopGroup group = newEventLoopGroup();
111 assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
112 group.shutdown();
113 assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
114 }
115
116 @Test
117 public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
118 EventLoopGroup group = newEventLoopGroup();
119 assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
120 }
121
122
123 @Test
124 @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
125 public void testShutdownGracefullyNoQuietPeriod() throws Exception {
126 EventLoopGroup loop = newEventLoopGroup();
127 ServerBootstrap b = new ServerBootstrap();
128 b.group(loop)
129 .channel(serverChannelClass())
130 .childHandler(new ChannelInboundHandlerAdapter());
131
132
133 ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
134 ? b.bind(new LocalAddress("local")) : b.bind(0);
135 cf.sync().channel();
136
137 Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
138 assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
139 assertTrue(f.syncUninterruptibly().isSuccess());
140 assertTrue(loop.isShutdown());
141 assertTrue(loop.isTerminated());
142 }
143
144 @Test
145 public void shutdownGracefullyBeforeStart() throws Exception {
146 EventLoopGroup group = newEventLoopGroup();
147 assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
148 }
149
150 @Test
151 public void gracefulShutdownAfterStart() throws Exception {
152 EventLoop loop = newEventLoopGroup().next();
153 final CountDownLatch latch = new CountDownLatch(1);
154 loop.execute(new Runnable() {
155 @Override
156 public void run() {
157 latch.countDown();
158 }
159 });
160
161
162 latch.await();
163
164
165 loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);
166
167
168 assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));
169
170 assertRejection(loop);
171 }
172
173 @Test
174 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
175 public void testChannelsIteratorEmpty() throws Exception {
176 assumeTrue(supportsChannelIteration());
177 EventLoopGroup group = newEventLoopGroup();
178 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
179 try {
180 runBlockingOn(loop, new Runnable() {
181 @Override
182 public void run() {
183 final Iterator<Channel> iterator = loop.registeredChannelsIterator();
184
185 assertFalse(iterator.hasNext());
186 assertThrows(NoSuchElementException.class, new Executable() {
187 @Override
188 public void execute() {
189 iterator.next();
190 }
191 });
192 }
193 });
194 } finally {
195 group.shutdownGracefully();
196 }
197 }
198
199 @Test
200 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
201 public void testChannelsIterator() throws Exception {
202 assumeTrue(supportsChannelIteration());
203 EventLoopGroup group = newEventLoopGroup();
204 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
205 try {
206 final Channel ch1 = newChannel();
207 final Channel ch2 = newChannel();
208 loop.register(ch1).syncUninterruptibly();
209 loop.register(ch2).syncUninterruptibly();
210 assertEquals(2, registeredChannels(loop));
211
212 runBlockingOn(loop, new Runnable() {
213 @Override
214 public void run() {
215 final Iterator<Channel> iterator = loop.registeredChannelsIterator();
216
217 assertTrue(iterator.hasNext());
218 Channel actualCh1 = iterator.next();
219 assertNotNull(actualCh1);
220
221 assertTrue(iterator.hasNext());
222 Channel actualCh2 = iterator.next();
223 assertNotNull(actualCh2);
224
225 Set<Channel> expected = new HashSet<Channel>(4);
226 expected.add(ch1);
227 expected.add(ch2);
228 expected.remove(actualCh1);
229 expected.remove(actualCh2);
230 assertTrue(expected.isEmpty());
231
232 assertFalse(iterator.hasNext());
233 assertThrows(NoSuchElementException.class, new Executable() {
234 @Override
235 public void execute() {
236 iterator.next();
237 }
238 });
239 }
240 });
241 } finally {
242 group.shutdownGracefully();
243 }
244 }
245
246 @Test
247 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
248 public void testChannelsIteratorRemoveThrows() throws Exception {
249 assumeTrue(supportsChannelIteration());
250 EventLoopGroup group = newEventLoopGroup();
251 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
252
253 try {
254 final Channel ch = newChannel();
255 loop.register(ch).syncUninterruptibly();
256 assertEquals(1, registeredChannels(loop));
257
258 runBlockingOn(loop, new Runnable() {
259 @Override
260 public void run() {
261 assertThrows(UnsupportedOperationException.class, new Executable() {
262 @Override
263 public void execute() {
264 loop.registeredChannelsIterator().remove();
265 }
266 });
267 }
268 });
269 } finally {
270 group.shutdownGracefully();
271 }
272 }
273
274 @Test
275 void schedulingAndCancellingTasks() throws Exception {
276 Runnable runnable = new Runnable() {
277 @Override
278 public void run() {
279 }
280 };
281 List<ScheduledFuture<?>> tasks = new ArrayList<ScheduledFuture<?>>();
282 EventLoopGroup group = newEventLoopGroup();
283 try {
284 EventLoop eventLoop = group.next();
285 for (int i = 0; i < 5000; i++) {
286 tasks.add(eventLoop.scheduleAtFixedRate(runnable, 1, 1, MILLISECONDS));
287 if (tasks.size() > 500) {
288 tasks.get(PlatformDependent.threadLocalRandom().nextInt(tasks.size())).cancel(false);
289 }
290 }
291 for (ScheduledFuture<?> task : tasks) {
292 task.cancel(false);
293 }
294 for (ScheduledFuture<?> task : tasks) {
295 task.await();
296 }
297 for (ScheduledFuture<?> task : tasks) {
298 if (!task.isCancelled()) {
299 task.sync();
300 }
301 }
302 } finally {
303 group.shutdownGracefully();
304 }
305 }
306
307 private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
308 final Promise<Void> promise = eventLoop.newPromise();
309 eventLoop.execute(new Runnable() {
310 @Override
311 public void run() {
312 try {
313 action.run();
314 promise.setSuccess(null);
315 } catch (Throwable t) {
316 promise.tryFailure(t);
317 }
318 }
319 });
320 try {
321 promise.await();
322 } catch (InterruptedException e) {
323 throw new RuntimeException(e);
324 }
325 Throwable cause = promise.cause();
326 if (cause != null) {
327 if (cause instanceof RuntimeException) {
328 throw (RuntimeException) cause;
329 }
330 throw new RuntimeException(cause);
331 }
332 }
333
334 private static final Runnable NOOP = new Runnable() {
335 @Override
336 public void run() { }
337 };
338
339 private static void assertRejection(EventExecutor loop) {
340 try {
341 loop.execute(NOOP);
342 fail("A task must be rejected after shutdown() is called.");
343 } catch (RejectedExecutionException e) {
344
345 }
346 }
347
348 protected boolean supportsChannelIteration() {
349 return false;
350 }
351 protected abstract EventLoopGroup newEventLoopGroup();
352 protected abstract Channel newChannel();
353 protected abstract Class<? extends ServerChannel> serverChannelClass();
354 }