View Javadoc
1   /*
2    * Copyright 2019 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.testsuite.transport;
17  
18  import io.netty.bootstrap.ServerBootstrap;
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelInboundHandlerAdapter;
22  import io.netty.channel.EventLoop;
23  import io.netty.channel.EventLoopGroup;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.SingleThreadEventLoop;
26  import io.netty.channel.local.LocalAddress;
27  import io.netty.channel.local.LocalServerChannel;
28  import io.netty.util.concurrent.EventExecutor;
29  import io.netty.util.concurrent.Future;
30  import io.netty.util.concurrent.Promise;
31  import io.netty.util.concurrent.ScheduledFuture;
32  import io.netty.util.internal.PlatformDependent;
33  import org.junit.jupiter.api.Test;
34  import org.junit.jupiter.api.Timeout;
35  import org.junit.jupiter.api.function.Executable;
36  
37  import java.util.ArrayList;
38  import java.util.HashSet;
39  import java.util.Iterator;
40  import java.util.List;
41  import java.util.NoSuchElementException;
42  import java.util.Set;
43  import java.util.concurrent.Callable;
44  import java.util.concurrent.CountDownLatch;
45  import java.util.concurrent.RejectedExecutionException;
46  import java.util.concurrent.TimeUnit;
47  
48  import static java.util.concurrent.TimeUnit.MILLISECONDS;
49  import static org.junit.jupiter.api.Assertions.assertEquals;
50  import static org.junit.jupiter.api.Assertions.assertFalse;
51  import static org.junit.jupiter.api.Assertions.assertNotNull;
52  import static org.junit.jupiter.api.Assertions.assertThrows;
53  import static org.junit.jupiter.api.Assertions.assertTrue;
54  import static org.junit.jupiter.api.Assertions.fail;
55  import static org.junit.jupiter.api.Assumptions.assumeTrue;
56  
57  public abstract class AbstractSingleThreadEventLoopTest {
58      @Test
59      @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
60      public void testChannelsRegistered() throws Exception {
61          EventLoopGroup group = newEventLoopGroup();
62          final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
63  
64          try {
65              final Channel ch1 = newChannel();
66              final Channel ch2 = newChannel();
67  
68              int rc = registeredChannels(loop);
69              boolean channelCountSupported = rc != -1;
70  
71              if (channelCountSupported) {
72                  assertEquals(0, registeredChannels(loop));
73              }
74  
75              assertTrue(loop.register(ch1).syncUninterruptibly().isSuccess());
76              assertTrue(loop.register(ch2).syncUninterruptibly().isSuccess());
77              if (channelCountSupported) {
78                  checkNumRegisteredChannels(loop, 2);
79              }
80  
81              assertTrue(ch1.deregister().syncUninterruptibly().isSuccess());
82              if (channelCountSupported) {
83                  checkNumRegisteredChannels(loop, 1);
84              }
85          } finally {
86              group.shutdownGracefully();
87          }
88      }
89  
90      private static void checkNumRegisteredChannels(SingleThreadEventLoop loop, int numChannels) throws Exception {
91          // We need to loop as some EventLoop implementations may need some time to update the counter correctly.
92          while (registeredChannels(loop) != numChannels) {
93              Thread.sleep(50);
94          }
95      }
96  
97      // Only reliable if run from event loop
98      private static int registeredChannels(final SingleThreadEventLoop loop) throws Exception {
99          return loop.submit(new Callable<Integer>() {
100             @Override
101             public Integer call() {
102                 return loop.registeredChannels();
103             }
104         }).get(1, TimeUnit.SECONDS);
105     }
106 
107     @Test
108     @SuppressWarnings("deprecation")
109     public void shutdownBeforeStart() throws Exception {
110         EventLoopGroup group = newEventLoopGroup();
111         assertFalse(group.awaitTermination(2, TimeUnit.MILLISECONDS));
112         group.shutdown();
113         assertTrue(group.awaitTermination(200, TimeUnit.MILLISECONDS));
114     }
115 
116     @Test
117     public void shutdownGracefullyZeroQuietBeforeStart() throws Exception {
118         EventLoopGroup group = newEventLoopGroup();
119         assertTrue(group.shutdownGracefully(0L, 2L, TimeUnit.SECONDS).await(200L));
120     }
121 
122     // Copied from AbstractEventLoopTest
123     @Test
124     @Timeout(value = 5000, unit = TimeUnit.MILLISECONDS)
125     public void testShutdownGracefullyNoQuietPeriod() throws Exception {
126         EventLoopGroup loop = newEventLoopGroup();
127         ServerBootstrap b = new ServerBootstrap();
128         b.group(loop)
129         .channel(serverChannelClass())
130         .childHandler(new ChannelInboundHandlerAdapter());
131 
132         // Not close the Channel to ensure the EventLoop is still shutdown in time.
133         ChannelFuture cf = serverChannelClass() == LocalServerChannel.class
134                 ? b.bind(new LocalAddress("local")) : b.bind(0);
135         cf.sync().channel();
136 
137         Future<?> f = loop.shutdownGracefully(0, 1, TimeUnit.MINUTES);
138         assertTrue(loop.awaitTermination(600, TimeUnit.MILLISECONDS));
139         assertTrue(f.syncUninterruptibly().isSuccess());
140         assertTrue(loop.isShutdown());
141         assertTrue(loop.isTerminated());
142     }
143 
144     @Test
145     public void shutdownGracefullyBeforeStart() throws Exception {
146         EventLoopGroup group = newEventLoopGroup();
147         assertTrue(group.shutdownGracefully(200L, 1000L, TimeUnit.MILLISECONDS).await(500L));
148     }
149 
150     @Test
151     public void gracefulShutdownAfterStart() throws Exception {
152         EventLoop loop = newEventLoopGroup().next();
153         final CountDownLatch latch = new CountDownLatch(1);
154         loop.execute(new Runnable() {
155             @Override
156             public void run() {
157                 latch.countDown();
158             }
159         });
160 
161         // Wait for the event loop thread to start.
162         latch.await();
163 
164         // Request the event loop thread to stop.
165         loop.shutdownGracefully(200L, 3000L, TimeUnit.MILLISECONDS);
166 
167         // Wait until the event loop is terminated.
168         assertTrue(loop.awaitTermination(500L, TimeUnit.MILLISECONDS));
169 
170         assertRejection(loop);
171     }
172 
173     @Test
174     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
175     public void testChannelsIteratorEmpty() throws Exception {
176         assumeTrue(supportsChannelIteration());
177         EventLoopGroup group = newEventLoopGroup();
178         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
179         try {
180             runBlockingOn(loop, new Runnable() {
181                 @Override
182                 public void run() {
183                     final Iterator<Channel> iterator = loop.registeredChannelsIterator();
184 
185                     assertFalse(iterator.hasNext());
186                     assertThrows(NoSuchElementException.class, new Executable() {
187                         @Override
188                         public void execute() {
189                             iterator.next();
190                         }
191                     });
192                 }
193             });
194         } finally {
195             group.shutdownGracefully();
196         }
197     }
198 
199     @Test
200     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
201     public void testChannelsIterator() throws Exception {
202         assumeTrue(supportsChannelIteration());
203         EventLoopGroup group = newEventLoopGroup();
204         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
205         try {
206             final Channel ch1 = newChannel();
207             final Channel ch2 = newChannel();
208             loop.register(ch1).syncUninterruptibly();
209             loop.register(ch2).syncUninterruptibly();
210             assertEquals(2, registeredChannels(loop));
211 
212             runBlockingOn(loop, new Runnable() {
213                 @Override
214                 public void run() {
215                     final Iterator<Channel> iterator = loop.registeredChannelsIterator();
216 
217                     assertTrue(iterator.hasNext());
218                     Channel actualCh1 = iterator.next();
219                     assertNotNull(actualCh1);
220 
221                     assertTrue(iterator.hasNext());
222                     Channel actualCh2 = iterator.next();
223                     assertNotNull(actualCh2);
224 
225                     Set<Channel> expected = new HashSet<Channel>(4);
226                     expected.add(ch1);
227                     expected.add(ch2);
228                     expected.remove(actualCh1);
229                     expected.remove(actualCh2);
230                     assertTrue(expected.isEmpty());
231 
232                     assertFalse(iterator.hasNext());
233                     assertThrows(NoSuchElementException.class, new Executable() {
234                         @Override
235                         public void execute() {
236                             iterator.next();
237                         }
238                     });
239                 }
240             });
241         } finally {
242             group.shutdownGracefully();
243         }
244     }
245 
246     @Test
247     @Timeout(value = 3000, unit = TimeUnit.MILLISECONDS)
248     public void testChannelsIteratorRemoveThrows() throws Exception {
249         assumeTrue(supportsChannelIteration());
250         EventLoopGroup group = newEventLoopGroup();
251         final SingleThreadEventLoop loop = (SingleThreadEventLoop) group.next();
252 
253         try {
254             final Channel ch = newChannel();
255             loop.register(ch).syncUninterruptibly();
256             assertEquals(1, registeredChannels(loop));
257 
258             runBlockingOn(loop, new Runnable() {
259                 @Override
260                 public void run() {
261                     assertThrows(UnsupportedOperationException.class, new Executable() {
262                         @Override
263                         public void execute() {
264                             loop.registeredChannelsIterator().remove();
265                         }
266                     });
267                 }
268             });
269         } finally {
270             group.shutdownGracefully();
271         }
272     }
273 
274     @Test
275     void schedulingAndCancellingTasks() throws Exception {
276         Runnable runnable = new Runnable() {
277             @Override
278             public void run() {
279             }
280         };
281         List<ScheduledFuture<?>> tasks = new ArrayList<ScheduledFuture<?>>();
282         EventLoopGroup group = newEventLoopGroup();
283         try {
284             EventLoop eventLoop = group.next();
285             for (int i = 0; i < 5000; i++) {
286                 tasks.add(eventLoop.scheduleAtFixedRate(runnable, 1, 1, MILLISECONDS));
287                 if (tasks.size() > 500) {
288                     tasks.get(PlatformDependent.threadLocalRandom().nextInt(tasks.size())).cancel(false);
289                 }
290             }
291             for (ScheduledFuture<?> task : tasks) {
292                 task.cancel(false);
293             }
294             for (ScheduledFuture<?> task : tasks) {
295                 task.await();
296             }
297             for (ScheduledFuture<?> task : tasks) {
298                 if (!task.isCancelled()) {
299                     task.sync();
300                 }
301             }
302         } finally {
303             group.shutdownGracefully();
304         }
305     }
306 
307     private static void runBlockingOn(EventLoop eventLoop, final Runnable action) {
308         final Promise<Void> promise = eventLoop.newPromise();
309             eventLoop.execute(new Runnable() {
310                 @Override
311                 public void run() {
312                     try {
313                         action.run();
314                         promise.setSuccess(null);
315                     } catch (Throwable t) {
316                         promise.tryFailure(t);
317                     }
318                 }
319             });
320         try {
321             promise.await();
322         } catch (InterruptedException e) {
323             throw new RuntimeException(e);
324         }
325         Throwable cause = promise.cause();
326         if (cause != null) {
327             if (cause instanceof RuntimeException) {
328                 throw (RuntimeException) cause;
329             }
330             throw new RuntimeException(cause);
331         }
332     }
333 
334     private static final Runnable NOOP = new Runnable() {
335         @Override
336         public void run() { }
337     };
338 
339     private static void assertRejection(EventExecutor loop) {
340         try {
341             loop.execute(NOOP);
342             fail("A task must be rejected after shutdown() is called.");
343         } catch (RejectedExecutionException e) {
344             // Expected
345         }
346     }
347 
348     protected boolean supportsChannelIteration() {
349         return false;
350     }
351     protected abstract EventLoopGroup newEventLoopGroup();
352     protected abstract Channel newChannel();
353     protected abstract Class<? extends ServerChannel> serverChannelClass();
354 }