View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  
20  import java.util.ArrayList;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicReference;
25  
26  /**
27   * A factory that creates auto-scaling {@link EventExecutorChooser} instances.
28   * This chooser implements a dynamic, utilization-based auto-scaling strategy.
29   * <p>
30   * It enables the {@link io.netty.channel.EventLoopGroup} to automatically scale the number of active
31   * {@link io.netty.channel.EventLoop} threads between a minimum and maximum threshold.
32   * The scaling decision is based on the average utilization of the active threads, measured over a
33   * configurable time window.
34   * <p>
35   * An {@code EventLoop} can be suspended if its utilization is consistently below the
36   * {@code scaleDownThreshold}. Conversely, if the group's average utilization is consistently
37   * above the {@code scaleUpThreshold}, a suspended thread will be automatically resumed to handle
38   * the increased load.
39   * <p>
40   * To control the aggressiveness of scaling actions, the {@code maxRampUpStep} and {@code maxRampDownStep}
41   * parameters limit the maximum number of threads that can be activated or suspended in a single scaling cycle.
42   * Furthermore, to ensure decisions are based on sustained trends rather than transient spikes, the
43   * {@code scalingPatienceCycles} defines how many consecutive monitoring windows a condition must be met
44   * before a scaling action is triggered.
45   */
46  public final class AutoScalingEventExecutorChooserFactory implements EventExecutorChooserFactory {
47  
48      private static final Runnable NO_OOP_TASK = () -> { };
49      private final int minChildren;
50      private final int maxChildren;
51      private final long utilizationCheckPeriodNanos;
52      private final double scaleDownThreshold;
53      private final double scaleUpThreshold;
54      private final int maxRampUpStep;
55      private final int maxRampDownStep;
56      private final int scalingPatienceCycles;
57  
58      /**
59       * Creates a new factory for a scaling-enabled {@link EventExecutorChooser}.
60       *
61       * @param minThreads               the minimum number of threads to keep active.
62       * @param maxThreads               the maximum number of threads to scale up to.
63       * @param utilizationWindow        the period at which to check group utilization.
64       * @param windowUnit               the unit for {@code utilizationWindow}.
65       * @param scaleDownThreshold       the average utilization below which a thread may be suspended.
66       * @param scaleUpThreshold         the average utilization above which a thread may be resumed.
67       * @param maxRampUpStep            the maximum number of threads to add in one cycle.
68       * @param maxRampDownStep          the maximum number of threads to remove in one cycle.
69       * @param scalingPatienceCycles    the number of consecutive cycles a condition must be met before scaling.
70       */
71      public AutoScalingEventExecutorChooserFactory(int minThreads, int maxThreads, long utilizationWindow,
72                                                    TimeUnit windowUnit, double scaleDownThreshold,
73                                                    double scaleUpThreshold, int maxRampUpStep, int maxRampDownStep,
74                                                    int scalingPatienceCycles) {
75          this.minChildren = ObjectUtil.checkPositiveOrZero(minThreads, "minThreads");
76          this.maxChildren = ObjectUtil.checkPositive(maxThreads, "maxThreads");
77          if (minThreads > maxThreads) {
78              throw new IllegalArgumentException(String.format(
79                      "minThreads: %d must not be greater than maxThreads: %d", minThreads, maxThreads));
80          }
81          this.utilizationCheckPeriodNanos = ObjectUtil.checkNotNull(windowUnit, "windowUnit")
82                                                       .toNanos(ObjectUtil.checkPositive(utilizationWindow,
83                                                                                         "utilizationWindow"));
84          this.scaleDownThreshold = ObjectUtil.checkInRange(scaleDownThreshold, 0.0, 1.0, "scaleDownThreshold");
85          this.scaleUpThreshold = ObjectUtil.checkInRange(scaleUpThreshold, 0.0, 1.0, "scaleUpThreshold");
86          if (scaleDownThreshold >= scaleUpThreshold) {
87              throw new IllegalArgumentException(
88                      "scaleDownThreshold must be less than scaleUpThreshold: " +
89                      scaleDownThreshold + " >= " + scaleUpThreshold);
90          }
91          this.maxRampUpStep = ObjectUtil.checkPositive(maxRampUpStep, "maxRampUpStep");
92          this.maxRampDownStep = ObjectUtil.checkPositive(maxRampDownStep, "maxRampDownStep");
93          this.scalingPatienceCycles = ObjectUtil.checkPositiveOrZero(scalingPatienceCycles, "scalingPatienceCycles");
94      }
95  
96      @Override
97      public EventExecutorChooser newChooser(EventExecutor[] executors) {
98          return new AutoScalingEventExecutorChooser(executors);
99      }
100 
101     /**
102      * An immutable snapshot of the chooser's state. All state transitions
103      * are managed by atomically swapping this object.
104      */
105     private static final class AutoScalingState {
106         final int activeChildrenCount;
107         final long nextWakeUpIndex;
108         final EventExecutor[] activeExecutors;
109         final EventExecutorChooser activeExecutorsChooser;
110 
111         AutoScalingState(int activeChildrenCount, long nextWakeUpIndex, EventExecutor[] activeExecutors) {
112             this.activeChildrenCount = activeChildrenCount;
113             this.nextWakeUpIndex = nextWakeUpIndex;
114             this.activeExecutors = activeExecutors;
115             this.activeExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(activeExecutors);
116         }
117     }
118 
119     private final class AutoScalingEventExecutorChooser implements EventExecutorChooser {
120         private final EventExecutor[] executors;
121         private final EventExecutorChooser allExecutorsChooser;
122         private final AtomicReference<AutoScalingState> state;
123 
124         AutoScalingEventExecutorChooser(EventExecutor[] executors) {
125             this.executors = executors;
126             this.allExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(executors);
127 
128             AutoScalingState initialState = new AutoScalingState(maxChildren, 0L, executors);
129             this.state = new AtomicReference<>(initialState);
130 
131             ScheduledFuture<?> utilizationMonitoringTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(
132                     new UtilizationMonitor(), utilizationCheckPeriodNanos, utilizationCheckPeriodNanos,
133                     TimeUnit.NANOSECONDS);
134 
135             if (executors.length > 0) {
136                 executors[0].terminationFuture().addListener(future -> utilizationMonitoringTask.cancel(false));
137             }
138         }
139 
140         /**
141          * This method is only responsible for picking from the active executors list.
142          * The monitor handles all scaling decisions.
143          */
144         @Override
145         public EventExecutor next() {
146             // Get a snapshot of the current state.
147             AutoScalingState currentState = this.state.get();
148 
149             if (currentState.activeExecutors.length == 0) {
150                 // This is only reachable if minChildren is 0 and the monitor has just suspended the last active thread.
151                 // To prevent an error and ensure the group can recover, we wake one up and use the
152                 // chooser that contains all executors as a safe temporary choice.
153                 tryScaleUpBy(1);
154                 return allExecutorsChooser.next();
155             }
156             return currentState.activeExecutorsChooser.next();
157         }
158 
159         /**
160          * Tries to increase the active thread count by waking up suspended executors.
161          * This method is thread-safe and updates the state atomically.
162          *
163          * @param amount    The desired number of threads to add to the active count.
164          */
165         private void tryScaleUpBy(int amount) {
166             if (amount <= 0) {
167                 return;
168             }
169 
170             for (;;) {
171                 AutoScalingState oldState = state.get();
172                 if (oldState.activeChildrenCount >= maxChildren) {
173                     return;
174                 }
175 
176                 int canAdd = Math.min(amount, maxChildren - oldState.activeChildrenCount);
177                 List<EventExecutor> wokenUp = new ArrayList<>(canAdd);
178                 final long startIndex = oldState.nextWakeUpIndex;
179 
180                 for (int i = 0; i < executors.length; i++) {
181                     EventExecutor child = executors[(int) Math.abs((startIndex + i) % executors.length)];
182 
183                     if (wokenUp.size() >= canAdd) {
184                         break; // We have woken up all the threads we reserved.
185                     }
186                     if (child instanceof SingleThreadEventExecutor) {
187                         SingleThreadEventExecutor stee = (SingleThreadEventExecutor) child;
188                         if (stee.isSuspended()) {
189                             stee.execute(NO_OOP_TASK);
190                             wokenUp.add(stee);
191                         }
192                     }
193                 }
194 
195                 if (wokenUp.isEmpty()) {
196                     return;
197                 }
198 
199                 // Create the new state.
200                 List<EventExecutor> newActiveList = new ArrayList<>(oldState.activeExecutors.length + wokenUp.size());
201                 Collections.addAll(newActiveList, oldState.activeExecutors);
202                 newActiveList.addAll(wokenUp);
203 
204                 AutoScalingState newState = new AutoScalingState(
205                         oldState.activeChildrenCount + wokenUp.size(),
206                         startIndex + wokenUp.size(),
207                         newActiveList.toArray(new EventExecutor[0]));
208 
209                 if (state.compareAndSet(oldState, newState)) {
210                     return;
211                 }
212                 // CAS failed, another thread changed the state. Loop again to retry.
213             }
214         }
215 
216         private final class UtilizationMonitor implements Runnable {
217             private final List<SingleThreadEventExecutor> consistentlyIdleChildren = new ArrayList<>(maxChildren);
218 
219             @Override
220             public void run() {
221                 if (executors.length == 0 || executors[0].isShuttingDown()) {
222                     // The group is shutting down, so no scaling decisions should be made.
223                     // The lifecycle listener on the terminationFuture will handle the final cancellation.
224                     return;
225                 }
226 
227                 int consistentlyBusyChildren = 0;
228                 consistentlyIdleChildren.clear();
229 
230                 final AutoScalingState currentState = state.get();
231 
232                 for (EventExecutor child : executors) {
233                     if (child.isSuspended() || !(child instanceof SingleThreadEventExecutor)) {
234                         continue;
235                     }
236 
237                     SingleThreadEventExecutor stee = (SingleThreadEventExecutor) child;
238 
239                     long activeTime = stee.getAndResetAccumulatedActiveTimeNanos();
240                     final long totalTime = utilizationCheckPeriodNanos;
241 
242                     if (activeTime == 0) {
243                         long lastActivity = stee.getLastActivityTimeNanos();
244                         long idleTime = stee.ticker().nanoTime() - lastActivity;
245 
246                         // If the event loop has been idle for less time than our utilization window,
247                         // it means it was active for the remainder of that window.
248                         if (idleTime < totalTime) {
249                             activeTime = totalTime - idleTime;
250                         }
251                         // If idleTime >= totalTime, it was idle for the whole window, so activeTime remains 0.
252                     }
253 
254                     double utilization = Math.min(1.0, (double) activeTime / totalTime);
255 
256                     if (utilization < scaleDownThreshold) {
257                         // Utilization is low, increment idle counter and reset busy counter.
258                         int idleCycles = stee.getAndIncrementIdleCycles();
259                         stee.resetBusyCycles();
260                         if (idleCycles >= scalingPatienceCycles && stee.getNumOfRegisteredChannels() <= 0) {
261                             consistentlyIdleChildren.add(stee);
262                         }
263                     } else if (utilization > scaleUpThreshold) {
264                         // Utilization is high, increment busy counter and reset idle counter.
265                         int busyCycles = stee.getAndIncrementBusyCycles();
266                         stee.resetIdleCycles();
267                         if (busyCycles >= scalingPatienceCycles) {
268                             consistentlyBusyChildren++;
269                         }
270                     } else {
271                         // Utilization is in the normal range, reset counters.
272                         stee.resetIdleCycles();
273                         stee.resetBusyCycles();
274                     }
275                 }
276 
277                 boolean changed = false; // Flag to track if we need to rebuild the active executors list.
278                 int currentActive = currentState.activeChildrenCount;
279 
280                 // Make scaling decisions based on stable states.
281                 if (consistentlyBusyChildren > 0 && currentActive < maxChildren) {
282                     // Scale Up, we have children that have been busy for multiple cycles.
283                     int threadsToAdd = Math.min(consistentlyBusyChildren, maxRampUpStep);
284                     threadsToAdd = Math.min(threadsToAdd, maxChildren - currentActive);
285                     if (threadsToAdd > 0) {
286                         tryScaleUpBy(threadsToAdd);
287                         // State change is handled by tryScaleUpBy, no need for rebuild here.
288                         return; // Exit to avoid conflicting scale down logic in the same cycle.
289                     }
290                 }
291 
292                 if (!consistentlyIdleChildren.isEmpty() && currentActive > minChildren) {
293                     // Scale down, we have children that have been idle for multiple cycles.
294 
295                     int threadsToRemove = Math.min(consistentlyIdleChildren.size(), maxRampDownStep);
296                     threadsToRemove = Math.min(threadsToRemove, currentActive - minChildren);
297 
298                     for (int i = 0; i < threadsToRemove; i++) {
299                         SingleThreadEventExecutor childToSuspend = consistentlyIdleChildren.get(i);
300                         if (childToSuspend.trySuspend()) {
301                             // Reset cycles upon suspension so it doesn't get immediately re-suspended on wake-up.
302                             childToSuspend.resetBusyCycles();
303                             childToSuspend.resetIdleCycles();
304                             changed = true;
305                         }
306                     }
307                 }
308 
309                 // If a scale-down occurred, or if the actual state differs from our view, rebuild.
310                 if (changed || currentActive != currentState.activeExecutors.length) {
311                     rebuildActiveExecutors();
312                 }
313             }
314 
315             /**
316              * Atomically updates the state by creating a new snapshot with the current set of active executors.
317              */
318             private void rebuildActiveExecutors() {
319                 for (;;) {
320                     AutoScalingState oldState = state.get();
321                     List<EventExecutor> active = new ArrayList<>(oldState.activeChildrenCount);
322                     for (EventExecutor executor : executors) {
323                         if (!executor.isSuspended()) {
324                             active.add(executor);
325                         }
326                     }
327                     EventExecutor[] newActiveExecutors = active.toArray(new EventExecutor[0]);
328 
329                     // If the number of active executors in our scan differs from the count in the state,
330                     // another thread likely changed it. We use the count from our fresh scan.
331                     // The nextWakeUpIndex is preserved from the old state as this rebuild is not a scale-up action.
332                     AutoScalingState newState = new AutoScalingState(
333                             newActiveExecutors.length, oldState.nextWakeUpIndex, newActiveExecutors);
334 
335                     if (state.compareAndSet(oldState, newState)) {
336                         break;
337                     }
338                 }
339             }
340         }
341     }
342 }