View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelInboundHandlerAdapter;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.EventLoopGroup;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.SingleThreadEventLoop;
26  import io.netty.channel.local.LocalAddress;
27  import io.netty.channel.local.LocalServerChannel;
28  import io.netty.util.concurrent.AutoScalingEventExecutorChooserFactory;
29  import io.netty.util.concurrent.EventExecutor;
30  import io.netty.util.concurrent.EventExecutorChooserFactory;
31  import io.netty.util.concurrent.Future;
32  import io.netty.util.concurrent.Promise;
33  import io.netty.util.concurrent.ScheduledFuture;
34  import io.netty.util.internal.PlatformDependent;
35  import org.junit.jupiter.api.Test;
36  import org.junit.jupiter.api.Timeout;
37  import org.junit.jupiter.api.function.Executable;
38  
39  import java.util.ArrayList;
40  import java.util.HashSet;
41  import java.util.Iterator;
42  import java.util.List;
43  import java.util.NoSuchElementException;
44  import java.util.Set;
45  import java.util.concurrent.Callable;
46  import java.util.concurrent.CountDownLatch;
47  import java.util.concurrent.RejectedExecutionException;
48  import java.util.concurrent.TimeUnit;
49  
50  import static java.util.concurrent.TimeUnit.MILLISECONDS;
51  import static org.junit.jupiter.api.Assertions.assertEquals;
52  import static org.junit.jupiter.api.Assertions.assertFalse;
53  import static org.junit.jupiter.api.Assertions.assertNotNull;
54  import static org.junit.jupiter.api.Assertions.assertThrows;
55  import static org.junit.jupiter.api.Assertions.assertTrue;
56  import static org.junit.jupiter.api.Assertions.fail;
57  import static org.junit.jupiter.api.Assumptions.assumeTrue;
58  
59  public abstract class AbstractSingleThreadEventLoopTest {
60      protected static final int SCALING_MIN_THREADS = 1;
61      protected static final int SCALING_MAX_THREADS = 2;
62      protected static final long SCALING_WINDOW_SECONDS = 100;
63      protected static final TimeUnit SCALING_WINDOW_UNIT = MILLISECONDS;
64      protected static final double SCALEDOWN_THRESHOLD = 0.2;
65      protected static final double SCALEUP_THRESHOLD = 0.9;
66      protected static final int SCALING_PATIENCE_CYCLES = 1;
67  
68      protected static final EventExecutorChooserFactory AUTO_SCALING_CHOOSER_FACTORY =
69              new AutoScalingEventExecutorChooserFactory(SCALING_MIN_THREADS, SCALING_MAX_THREADS, SCALING_WINDOW_SECONDS,
70                                                         SCALING_WINDOW_UNIT, SCALEDOWN_THRESHOLD, SCALEUP_THRESHOLD,
71                                                         SCALING_MAX_THREADS, SCALING_MAX_THREADS, SCALING_PATIENCE_CYCLES
72      );
73  
74      @Test
75      @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
76      public void testChannelsRegistered() throws Exception {
77          EventLoopGroup group = newEventLoopGroup();
78          final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
79  
80          try {
81              final Channel ch1 = newChannel();
82              final Channel ch2 = newChannel();
83  
84              int rc = registeredChannels(loop);
85              boolean channelCountSupported = rc != -1;
86  
87              if (channelCountSupported) {
88                  assertEquals(0, registeredChannels(loop));
89              }
90  
91              assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
92              assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
93              if (channelCountSupported) {
94                  checkNumRegisteredChannels(loop, 2);
95              }
96  
97              assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
98              if (channelCountSupported) {
99                  checkNumRegisteredChannels(loop, 1);
100             }
101         } finally {
102             group.shutdownGracefully();
103         }
104     }
105 
106     private static void checkNumRegisteredChannels(SingleThreadEventLoop loop, int numChannels) throws Exception {
107         // We need to loop as some EventLoop implementations may need some time to update the counter correctly.
108         while (registeredChannels(loop) != numChannels) {
109             Thread.sleep(50);
110         }
111     }
112 
113     // Only reliable if run from event loop
114     private static int registeredChannels(final SingleThreadEventLoop loop) throws Exception {
115         return loop.submit(new Callable<Integer>() {
116             @Override
117             public Integer call() {
118                 return loop.registeredChannels();
119             }
120         }).get(1, TimeUnit.SECONDS);
121     }
122 
123     @Test
124     @SuppressWarnings("deprecation")
125     public void shutdownBeforeStart() throws Exception {
126         EventLoopGroup group = newEventLoopGroup();
127         assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
128         group.shutdown();
129         assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
130     }
131 
132     @Test
133     public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
134         EventLoopGroup group = newEventLoopGroup();
135         assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
136     }
137 
138     // Copied from AbstractEventLoopTest
139     @Test
140     @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
141     public void testShutdownGracefullyNoQuietPeriod() throws Exception {
142         EventLoopGroup loop = newEventLoopGroup();
143         ServerBootstrap b = new ServerBootstrap();
144         b.group(loop)
145         .channel(serverChannelClass())
146         .childHandler(new ChannelInboundHandlerAdapter());
147 
148         // Not close the Channel to ensure the EventLoop is still shutdown in time.
149         ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
150                 ? b.bind(new LocalAddress("local")) : b.bind(0);
151         cf.sync().channel();
152 
153         Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
154         assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
155         assertTrue(f.syncUninterruptibly().isSuccess());
156         assertTrue(loop.isShutdown());
157         assertTrue(loop.isTerminated());
158     }
159 
160     @Test
161     public void shutdownGracefullyBeforeStart() throws Exception {
162         EventLoopGroup group = newEventLoopGroup();
163         assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
164     }
165 
166     @Test
167     public void gracefulShutdownAfterStart() throws Exception {
168         EventLoop loop = newEventLoopGroup().next();
169         final CountDownLatch latch = new CountDownLatch(1);
170         loop.execute(new Runnable() {
171             @Override
172             public void run() {
173                 latch.countDown();
174             }
175         });
176 
177         // Wait for the event loop thread to start.
178         latch.await();
179 
180         // Request the event loop thread to stop.
181         loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);
182 
183         // Wait until the event loop is terminated.
184         assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));
185 
186         assertRejection(loop);
187     }
188 
189     @Test
190     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
191     public void testChannelsIteratorEmpty() throws Exception {
192         assumeTrue(supportsChannelIteration());
193         EventLoopGroup group = newEventLoopGroup();
194         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
195         try {
196             runBlockingOn(loop, new Runnable() {
197                 @Override
198                 public void run() {
199                     final Iterator<Channel> iterator = loop.registeredChannelsIterator();
200 
201                     assertFalse(iterator.hasNext());
202                     assertThrows(NoSuchElementException.class, new Executable() {
203                         @Override
204                         public void execute() {
205                             iterator.next();
206                         }
207                     });
208                 }
209             });
210         } finally {
211             group.shutdownGracefully();
212         }
213     }
214 
215     @Test
216     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
217     public void testChannelsIterator() throws Exception {
218         assumeTrue(supportsChannelIteration());
219         EventLoopGroup group = newEventLoopGroup();
220         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
221         try {
222             final Channel ch1 = newChannel();
223             final Channel ch2 = newChannel();
224             loop.register(ch1).syncUninterruptibly();
225             loop.register(ch2).syncUninterruptibly();
226             assertEquals(2, registeredChannels(loop));
227 
228             runBlockingOn(loop, new Runnable() {
229                 @Override
230                 public void run() {
231                     final Iterator<Channel> iterator = loop.registeredChannelsIterator();
232 
233                     assertTrue(iterator.hasNext());
234                     Channel actualCh1 = iterator.next();
235                     assertNotNull(actualCh1);
236 
237                     assertTrue(iterator.hasNext());
238                     Channel actualCh2 = iterator.next();
239                     assertNotNull(actualCh2);
240 
241                     Set<Channel> expected = new HashSet<Channel>(4);
242                     expected.add(ch1);
243                     expected.add(ch2);
244                     expected.remove(actualCh1);
245                     expected.remove(actualCh2);
246                     assertTrue(expected.isEmpty());
247 
248                     assertFalse(iterator.hasNext());
249                     assertThrows(NoSuchElementException.class, new Executable() {
250                         @Override
251                         public void execute() {
252                             iterator.next();
253                         }
254                     });
255                 }
256             });
257         } finally {
258             group.shutdownGracefully();
259         }
260     }
261 
262     @Test
263     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
264     public void testChannelsIteratorRemoveThrows() throws Exception {
265         assumeTrue(supportsChannelIteration());
266         EventLoopGroup group = newEventLoopGroup();
267         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
268 
269         try {
270             final Channel ch = newChannel();
271             loop.register(ch).syncUninterruptibly();
272             assertEquals(1, registeredChannels(loop));
273 
274             runBlockingOn(loop, new Runnable() {
275                 @Override
276                 public void run() {
277                     assertThrows(UnsupportedOperationException.class, new Executable() {
278                         @Override
279                         public void execute() {
280                             loop.registeredChannelsIterator().remove();
281                         }
282                     });
283                 }
284             });
285         } finally {
286             group.shutdownGracefully();
287         }
288     }
289 
290     @Test
291     void schedulingAndCancellingTasks() throws Exception {
292         Runnable runnable = new Runnable() {
293             @Override
294             public void run() {
295             }
296         };
297         List<ScheduledFuture<?>> tasks = new ArrayList<ScheduledFuture<?>>();
298         EventLoopGroup group = newEventLoopGroup();
299         try {
300             EventLoop eventLoop = group.next();
301             for (int i = 0; i < 5000; i++) {
302                 tasks.add(eventLoop.scheduleAtFixedRate(runnable, 1, 1, MILLISECONDS));
303                 if (tasks.size() > 500) {
304                     tasks.get(PlatformDependent.threadLocalRandom().nextInt(tasks.size())).cancel(false);
305                 }
306             }
307             for (ScheduledFuture<?> task : tasks) {
308                 task.cancel(false);
309             }
310             for (ScheduledFuture<?> task : tasks) {
311                 task.await();
312             }
313             for (ScheduledFuture<?> task : tasks) {
314                 if (!task.isCancelled()) {
315                     task.sync();
316                 }
317             }
318         } finally {
319             group.shutdownGracefully();
320         }
321     }
322 
323     @Test
324     @Timeout(30)
325     public void testAutoScalingEventLoopGroupCanScaleDownAndBeUsed() throws Exception {
326         EventLoopGroup group = newAutoScalingEventLoopGroup();
327         if (group == null) {
328             return;
329         }
330         try {
331             startAllExecutors(group);
332             assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
333                          "Group should start with max threads active.");
334 
335             long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
336             while (countActiveExecutors(group) > SCALING_MIN_THREADS && System.nanoTime() < deadline) {
337                 Thread.sleep(100);
338             }
339 
340             assertEquals(SCALING_MIN_THREADS, countActiveExecutors(group),
341                          "Group did not scale down to min threads in time.");
342         } finally {
343             group.shutdownGracefully().syncUninterruptibly();
344         }
345     }
346 
347     @Test
348     @Timeout(30)
349     public void testSubmittingTaskWakesUpSuspendedExecutor() throws Exception {
350         EventLoopGroup group = newAutoScalingEventLoopGroup();
351         if (group == null) {
352             return;
353         }
354         try {
355             startAllExecutors(group);
356             assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
357                          "Group should start with max threads.");
358 
359             long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
360             while (countActiveExecutors(group) > SCALING_MIN_THREADS && System.nanoTime() < deadline) {
361                 Thread.sleep(100);
362             }
363             assertEquals(SCALING_MIN_THREADS, countActiveExecutors(group),
364                          "Group did not scale down to min threads in time.");
365 
366             EventLoop suspendedLoop = null;
367             for (EventExecutor exec : group) {
368                 if (exec.isSuspended()) {
369                     suspendedLoop = (EventLoop) exec;
370                     break;
371                 }
372             }
373             assertNotNull(suspendedLoop, "Could not find a suspended executor to test.");
374 
375             // Submit a task directly to the suspended loop, this should trigger the wake-up mechanism.
376             Future<?> future = suspendedLoop.submit(() -> { });
377             future.syncUninterruptibly();
378 
379             assertFalse(suspendedLoop.isSuspended(), "Executor should wake up after task submission.");
380             assertEquals(SCALING_MAX_THREADS, countActiveExecutors(group),
381                          "Active executor count should increase after wake-up.");
382         } finally {
383             group.shutdownGracefully().syncUninterruptibly();
384         }
385     }
386 
387     private static int countActiveExecutors(EventLoopGroup group) {
388         int activeCount = 0;
389         for (EventExecutor executor : group) {
390             if (!executor.isSuspended()) {
391                 activeCount++;
392             }
393         }
394         return activeCount;
395     }
396 
397     private static void startAllExecutors(EventLoopGroup group) throws InterruptedException {
398         CountDownLatch startLatch = new CountDownLatch(SCALING_MAX_THREADS);
399         for (EventExecutor executor : group) {
400             executor.execute(startLatch::countDown);
401         }
402         startLatch.await();
403     }
404 
405     private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
406         final Promise<Void> promise = eventLoop.newPromise();
407             eventLoop.execute(new Runnable() {
408                 @Override
409                 public void run() {
410                     try {
411                         action.run();
412                         promise.setSuccess(null);
413                     } catch (Throwable t) {
414                         promise.tryFailure(t);
415                     }
416                 }
417             });
418         try {
419             promise.await();
420         } catch (InterruptedException e) {
421             throw new RuntimeException(e);
422         }
423         Throwable cause = promise.cause();
424         if (cause != null) {
425             if (cause instanceof RuntimeException) {
426                 throw (RuntimeException) cause;
427             }
428             throw new RuntimeException(cause);
429         }
430     }
431 
432     private static final Runnable NOOP = new Runnable() {
433         @Override
434         public void run() { }
435     };
436 
437     private static void assertRejection(EventExecutor loop) {
438         try {
439             loop.execute(NOOP);
440             fail("A task must be rejected after shutdown() is called.");
441         } catch (RejectedExecutionException e) {
442             // Expected
443         }
444     }
445 
446     protected boolean supportsChannelIteration() {
447         return false;
448     }
449     protected abstract EventLoopGroup newEventLoopGroup();
450     protected abstract EventLoopGroup newAutoScalingEventLoopGroup();
451     protected abstract Channel newChannel();
452     protected abstract Class<? extends ServerChannel> serverChannelClass();
453 }