1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.testsuite.transport;
17
18 import io.netty.bootstrap.ServerBootstrap;
19 import io.netty.channel.Channel;
20 import io.netty.channel.ChannelFuture;
21 import io.netty.channel.ChannelInboundHandlerAdapter;
22 import io.netty.channel.EventLoop;
23 import io.netty.channel.EventLoopGroup;
24 import io.netty.channel.ServerChannel;
25 import io.netty.channel.SingleThreadEventLoop;
26 import io.netty.channel.local.LocalAddress;
27 import io.netty.channel.local.LocalServerChannel;
28 import io.netty.util.concurrent.AutoScalingEventExecutorChooserFactory;
29 import io.netty.util.concurrent.EventExecutor;
30 import io.netty.util.concurrent.EventExecutorChooserFactory;
31 import io.netty.util.concurrent.Future;
32 import io.netty.util.concurrent.Promise;
33 import io.netty.util.concurrent.ScheduledFuture;
34 import io.netty.util.internal.PlatformDependent;
35 import org.junit.jupiter.api.Test;
36 import org.junit.jupiter.api.Timeout;
37 import org.junit.jupiter.api.function.Executable;
38
39 import java.util.ArrayList;
40 import java.util.HashSet;
41 import java.util.Iterator;
42 import java.util.List;
43 import java.util.NoSuchElementException;
44 import java.util.Set;
45 import java.util.concurrent.Callable;
46 import java.util.concurrent.CountDownLatch;
47 import java.util.concurrent.RejectedExecutionException;
48 import java.util.concurrent.TimeUnit;
49
50 import static java.util.concurrent.TimeUnit.MILLISECONDS;
51 import static org.junit.jupiter.api.Assertions.assertEquals;
52 import static org.junit.jupiter.api.Assertions.assertFalse;
53 import static org.junit.jupiter.api.Assertions.assertNotNull;
54 import static org.junit.jupiter.api.Assertions.assertThrows;
55 import static org.junit.jupiter.api.Assertions.assertTrue;
56 import static org.junit.jupiter.api.Assertions.fail;
57 import static org.junit.jupiter.api.Assumptions.assumeTrue;
58
59 public abstract class AbstractSingleThreadEventLoopTest {
60 protected static final int SCALING_MIN_THREADS = 1;
61 protected static final int SCALING_MAX_THREADS = 2;
62 protected static final long SCALING_WINDOW_SECONDS = 100;
63 protected static final TimeUnit SCALING_WINDOW_UNIT = MILLISECONDS;
64 protected static final double SCALEDOWN_THRESHOLD = 0.2;
65 protected static final double SCALEUP_THRESHOLD = 0.9;
66 protected static final int SCALING_PATIENCE_CYCLES = 1;
67
68 protected static final EventExecutorChooserFactory AUTO_SCALING_CHOOSER_FACTORY =
69 new AutoScalingEventExecutorChooserFactory(SCALING_MIN_THREADS, SCALING_MAX_THREADS, SCALING_WINDOW_SECONDS,
70 SCALING_WINDOW_UNIT, SCALEDOWN_THRESHOLD, SCALEUP_THRESHOLD,
71 SCALING_MAX_THREADS, SCALING_MAX_THREADS, SCALING_PATIENCE_CYCLES
72 );
73
74 @Test
75 @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
76 public void testChannelsRegistered() throws Exception {
77 EventLoopGroup group = newEventLoopGroup();
78 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
79
80 try {
81 final Channel ch1 = newChannel();
82 final Channel ch2 = newChannel();
83
84 int rc = registeredChannels(loop);
85 boolean channelCountSupported = rc != -1;
86
87 if (channelCountSupported) {
88 assertEquals(0, registeredChannels(loop));
89 }
90
91 assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
92 assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
93 if (channelCountSupported) {
94 checkNumRegisteredChannels(loop, 2);
95 }
96
97 assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
98 if (channelCountSupported) {
99 checkNumRegisteredChannels(loop, 1);
100 }
101 } finally {
102 group.shutdownGracefully();
103 }
104 }
105
106 private static void checkNumRegisteredChannels(SingleThreadEventLoop loop, int numChannels) throws Exception {
107
108 while (registeredChannels(loop) != numChannels) {
109 Thread.sleep(50);
110 }
111 }
112
113
114 private static int registeredChannels(final SingleThreadEventLoop loop) throws Exception {
115 return loop.submit(new Callable<Integer>() {
116 @Override
117 public Integer call() {
118 return loop.registeredChannels();
119 }
120 }).get(1, TimeUnit.SECONDS);
121 }
122
123 @Test
124 @SuppressWarnings("deprecation")
125 public void shutdownBeforeStart() throws Exception {
126 EventLoopGroup group = newEventLoopGroup();
127 assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
128 group.shutdown();
129 assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
130 }
131
132 @Test
133 public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
134 EventLoopGroup group = newEventLoopGroup();
135 assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
136 }
137
138
139 @Test
140 @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
141 public void testShutdownGracefullyNoQuietPeriod() throws Exception {
142 EventLoopGroup loop = newEventLoopGroup();
143 ServerBootstrap b = new ServerBootstrap();
144 b.group(loop)
145 .channel(serverChannelClass())
146 .childHandler(new ChannelInboundHandlerAdapter());
147
148
149 ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
150 ? b.bind(new LocalAddress("local")) : b.bind(0);
151 cf.sync().channel();
152
153 Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
154 assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
155 assertTrue(f.syncUninterruptibly().isSuccess());
156 assertTrue(loop.isShutdown());
157 assertTrue(loop.isTerminated());
158 }
159
160 @Test
161 public void shutdownGracefullyBeforeStart() throws Exception {
162 EventLoopGroup group = newEventLoopGroup();
163 assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
164 }
165
166 @Test
167 public void gracefulShutdownAfterStart() throws Exception {
168 EventLoop loop = newEventLoopGroup().next();
169 final CountDownLatch latch = new CountDownLatch(1);
170 loop.execute(new Runnable() {
171 @Override
172 public void run() {
173 latch.countDown();
174 }
175 });
176
177
178 latch.await();
179
180
181 loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);
182
183
184 assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));
185
186 assertRejection(loop);
187 }
188
189 @Test
190 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
191 public void testChannelsIteratorEmpty() throws Exception {
192 assumeTrue(supportsChannelIteration());
193 EventLoopGroup group = newEventLoopGroup();
194 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
195 try {
196 runBlockingOn(loop, new Runnable() {
197 @Override
198 public void run() {
199 final Iterator<Channel> iterator = loop.registeredChannelsIterator();
200
201 assertFalse(iterator.hasNext());
202 assertThrows(NoSuchElementException.class, new Executable() {
203 @Override
204 public void execute() {
205 iterator.next();
206 }
207 });
208 }
209 });
210 } finally {
211 group.shutdownGracefully();
212 }
213 }
214
215 @Test
216 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
217 public void testChannelsIterator() throws Exception {
218 assumeTrue(supportsChannelIteration());
219 EventLoopGroup group = newEventLoopGroup();
220 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
221 try {
222 final Channel ch1 = newChannel();
223 final Channel ch2 = newChannel();
224 loop.register(ch1).syncUninterruptibly();
225 loop.register(ch2).syncUninterruptibly();
226 assertEquals(2, registeredChannels(loop));
227
228 runBlockingOn(loop, new Runnable() {
229 @Override
230 public void run() {
231 final Iterator<Channel> iterator = loop.registeredChannelsIterator();
232
233 assertTrue(iterator.hasNext());
234 Channel actualCh1 = iterator.next();
235 assertNotNull(actualCh1);
236
237 assertTrue(iterator.hasNext());
238 Channel actualCh2 = iterator.next();
239 assertNotNull(actualCh2);
240
241 Set<Channel> expected = new HashSet<Channel>(4);
242 expected.add(ch1);
243 expected.add(ch2);
244 expected.remove(actualCh1);
245 expected.remove(actualCh2);
246 assertTrue(expected.isEmpty());
247
248 assertFalse(iterator.hasNext());
249 assertThrows(NoSuchElementException.class, new Executable() {
250 @Override
251 public void execute() {
252 iterator.next();
253 }
254 });
255 }
256 });
257 } finally {
258 group.shutdownGracefully();
259 }
260 }
261
262 @Test
263 @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
264 public void testChannelsIteratorRemoveThrows() throws Exception {
265 assumeTrue(supportsChannelIteration());
266 EventLoopGroup group = newEventLoopGroup();
267 final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
268
269 try {
270 final Channel ch = newChannel();
271 loop.register(ch).syncUninterruptibly();
272 assertEquals(1, registeredChannels(loop));
273
274 runBlockingOn(loop, new Runnable() {
275 @Override
276 public void run() {
277 assertThrows(UnsupportedOperationException.class, new Executable() {
278 @Override
279 public void execute() {
280 loop.registeredChannelsIterator().remove();
281 }
282 });
283 }
284 });
285 } finally {
286 group.shutdownGracefully();
287 }
288 }
289
290 @Test
291 void schedulingAndCancellingTasks() throws Exception {
292 Runnable runnable = new Runnable() {
293 @Override
294 public void run() {
295 }
296 };
297 List<ScheduledFuture<?>> tasks = new ArrayList<ScheduledFuture<?>>();
298 EventLoopGroup group = newEventLoopGroup();
299 try {
300 EventLoop eventLoop = group.next();
301 for (int i = 0; i < 5000; i++) {
302 tasks.add(eventLoop.scheduleAtFixedRate(runnable, 1, 1, MILLISECONDS));
303 if (tasks.size() > 500) {
304 tasks.get(PlatformDependent.threadLocalRandom().nextInt(tasks.size())).cancel(false);
305 }
306 }
307 for (ScheduledFuture<?> task : tasks) {
308 task.cancel(false);
309 }
310 for (ScheduledFuture<?> task : tasks) {
311 task.await();
312 }
313 for (ScheduledFuture<?> task : tasks) {
314 if (!task.isCancelled()) {
315 task.sync();
316 }
317 }
318 } finally {
319 group.shutdownGracefully();
320 }
321 }
322
323 @Test
324 @Timeout(30)
325 public void testAutoScalingEventLoopGroupCanScaleDownAndBeUsed() throws Exception {
326 EventLoopGroup group = newAutoScalingEventLoopGroup();
327 if (group == null) {
328 return;
329 }
330 try {
331 startAllExecutors(group);
332 assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
333 "Group should start with max threads active.");
334
335 long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
336 while (countActiveExecutors(group) > SCALING_MIN_THREADS && System.nanoTime() < deadline) {
337 Thread.sleep(100);
338 }
339
340 assertEquals(SCALING_MIN_THREADS, countActiveExecutors(group),
341 "Group did not scale down to min threads in time.");
342 } finally {
343 group.shutdownGracefully().syncUninterruptibly();
344 }
345 }
346
347 @Test
348 @Timeout(30)
349 public void testSubmittingTaskWakesUpSuspendedExecutor() throws Exception {
350 EventLoopGroup group = newAutoScalingEventLoopGroup();
351 if (group == null) {
352 return;
353 }
354 try {
355 startAllExecutors(group);
356 assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
357 "Group should start with max threads.");
358
359 long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
360 while (countActiveExecutors(group) > SCALING_MIN_THREADS && System.nanoTime() < deadline) {
361 Thread.sleep(100);
362 }
363 assertEquals(SCALING_MIN_THREADS, countActiveExecutors(group),
364 "Group did not scale down to min threads in time.");
365
366 EventLoop suspendedLoop = null;
367 for (EventExecutor exec : group) {
368 if (exec.isSuspended()) {
369 suspendedLoop = (EventLoop) exec;
370 break;
371 }
372 }
373 assertNotNull(suspendedLoop, "Could not find a suspended executor to test.");
374
375
376 Future<?> future = suspendedLoop.submit(() -> { });
377 future.syncUninterruptibly();
378
379 assertFalse(suspendedLoop.isSuspended(), "Executor should wake up after task submission.");
380 assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
381 "Active executor count should increase after wake-up.");
382 } finally {
383 group.shutdownGracefully().syncUninterruptibly();
384 }
385 }
386
387 private static int countActiveExecutors(EventLoopGroup group) {
388 int activeCount = 0;
389 for (EventExecutor executor : group) {
390 if (!executor.isSuspended()) {
391 activeCount++;
392 }
393 }
394 return activeCount;
395 }
396
397 private static void startAllExecutors(EventLoopGroup group) throws InterruptedException {
398 CountDownLatch startLatch = new CountDownLatch(SCALING_MAX_THREADS);
399 for (EventExecutor executor : group) {
400 executor.execute(startLatch::countDown);
401 }
402 startLatch.await();
403 }
404
405 private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
406 final Promise<Void> promise = eventLoop.newPromise();
407 eventLoop.execute(new Runnable() {
408 @Override
409 public void run() {
410 try {
411 action.run();
412 promise.setSuccess(null);
413 } catch (Throwable t) {
414 promise.tryFailure(t);
415 }
416 }
417 });
418 try {
419 promise.await();
420 } catch (InterruptedException e) {
421 throw new RuntimeException(e);
422 }
423 Throwable cause = promise.cause();
424 if (cause != null) {
425 if (cause instanceof RuntimeException) {
426 throw (RuntimeException) cause;
427 }
428 throw new RuntimeException(cause);
429 }
430 }
431
432 private static final Runnable NOOP = new Runnable() {
433 @Override
434 public void run() { }
435 };
436
437 private static void assertRejection(EventExecutor loop) {
438 try {
439 loop.execute(NOOP);
440 fail("A task must be rejected after shutdown() is called.");
441 } catch (RejectedExecutionException e) {
442
443 }
444 }
445
446 protected boolean supportsChannelIteration() {
447 return false;
448 }
449 protected abstract EventLoopGroup newEventLoopGroup();
450 protected abstract EventLoopGroup newAutoScalingEventLoopGroup();
451 protected abstract Channel newChannel();
452 protected abstract Class<? extends ServerChannel> serverChannelClass();
453 }