View Javadoc
1   /*
2    * Copyright 2014 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.util.concurrent;
18  
19  import io.netty.util.internal.InternalThreadLocalMap;
20  import io.netty.util.internal.StringUtil;
21  import io.netty.util.internal.chmv8.ForkJoinPool;
22  import io.netty.util.internal.chmv8.ForkJoinPool.ForkJoinWorkerThreadFactory;
23  import io.netty.util.internal.chmv8.ForkJoinWorkerThread;
24  import io.netty.util.internal.logging.InternalLogger;
25  import io.netty.util.internal.logging.InternalLoggerFactory;
26  
27  import java.lang.Thread.UncaughtExceptionHandler;
28  import java.util.Locale;
29  import java.util.concurrent.Executor;
30  import java.util.concurrent.ExecutorService;
31  import java.util.concurrent.atomic.AtomicInteger;
32  
33  /**
34   * An implementation of an {@link ExecutorServiceFactory} that creates a new {@link ForkJoinPool} on each
35   * call to {@link #newExecutorService(int)}.
36   * <p>
37   * This {@link ExecutorServiceFactory} powers Netty's nio and epoll eventloops by default. Netty moved from managing its
38   * own threads and pinning a thread to each eventloop to an {@link Executor}-based approach. That way advanced
39   * users of Netty can plug in their own threadpools and gain more control of scheduling the eventloops.
40   * <p>
41   * The main reason behind choosing a {@link ForkJoinPool} as the default {@link Executor} is that it uses
42   * thread-local task queues, providing a high level of thread affinity to Netty's eventloops.
43   * <p>
44   * The whole discussion can be found on GitHub
45   * <a href="https://github.com/netty/netty/issues/2250">https://github.com/netty/netty/issues/2250</a>.
46   */
47  public final class DefaultExecutorServiceFactory implements ExecutorServiceFactory {
48  
49      private static final InternalLogger logger =
50              InternalLoggerFactory.getInstance(DefaultExecutorServiceFactory.class);
51  
52      private static final AtomicInteger executorId = new AtomicInteger();
53      private final String namePrefix;
54  
55      /**
56       * @param clazzNamePrefix   the name of the class will be used to prefix the name of each
57       *                          {@link ForkJoinWorkerThread} with.
58       */
59      public DefaultExecutorServiceFactory(Class<?> clazzNamePrefix) {
60          this(toName(clazzNamePrefix));
61      }
62  
63      /**
64       * @param namePrefix    the string to prefix the name of each {@link ForkJoinWorkerThread} with.
65       */
66      public DefaultExecutorServiceFactory(String namePrefix) {
67          this.namePrefix = namePrefix;
68      }
69  
70      @Override
71      public ExecutorService newExecutorService(int parallelism) {
72          ForkJoinWorkerThreadFactory threadFactory =
73                  new DefaultForkJoinWorkerThreadFactory(namePrefix + '-' + executorId.getAndIncrement());
74  
75          return new ForkJoinPool(parallelism, threadFactory, DefaultUncaughtExceptionHandler.INSTANCE, true);
76      }
77  
78      private static String toName(Class<?> clazz) {
79          if (clazz == null) {
80              throw new NullPointerException("clazz");
81          }
82  
83          String clazzName = StringUtil.simpleClassName(clazz);
84          switch (clazzName.length()) {
85              case 0:
86                  return "unknown";
87              case 1:
88                  return clazzName.toLowerCase(Locale.US);
89              default:
90                  if (Character.isUpperCase(clazzName.charAt(0)) && Character.isLowerCase(clazzName.charAt(1))) {
91                      return Character.toLowerCase(clazzName.charAt(0)) + clazzName.substring(1);
92                  } else {
93                      return clazzName;
94                  }
95          }
96      }
97  
98      private static final class DefaultUncaughtExceptionHandler implements UncaughtExceptionHandler {
99  
100         private static final DefaultUncaughtExceptionHandler INSTANCE = new DefaultUncaughtExceptionHandler();
101 
102         @Override
103         public void uncaughtException(Thread t, Throwable e) {
104             if (logger.isErrorEnabled()) {
105                 logger.error("Uncaught exception in thread: {}", t.getName(), e);
106             }
107         }
108     }
109 
110     private static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
111 
112         private final AtomicInteger idx = new AtomicInteger();
113         private final String namePrefix;
114 
115         DefaultForkJoinWorkerThreadFactory(String namePrefix) {
116             this.namePrefix = namePrefix;
117         }
118 
119         @Override
120         public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
121             // Note: The ForkJoinPool will create these threads as daemon threads.
122             ForkJoinWorkerThread thread = new DefaultForkJoinWorkerThread(pool);
123             thread.setName(namePrefix + '-' + idx.getAndIncrement());
124             thread.setPriority(Thread.MAX_PRIORITY);
125             return thread;
126         }
127     }
128 
129     private static final class DefaultForkJoinWorkerThread
130             extends ForkJoinWorkerThread implements FastThreadLocalAccess {
131 
132         private InternalThreadLocalMap threadLocalMap;
133 
134         DefaultForkJoinWorkerThread(ForkJoinPool pool) {
135             super(pool);
136         }
137 
138         @Override
139         public InternalThreadLocalMap threadLocalMap() {
140             return threadLocalMap;
141         }
142 
143         @Override
144         public void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
145             this.threadLocalMap = threadLocalMap;
146         }
147     }
148 }