View Javadoc
1   /*
2    * Copyright 2025 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.util.concurrent;
17  
18  import io.netty.util.internal.ObjectUtil;
19  
20  import java.util.ArrayList;
21  import java.util.Collections;
22  import java.util.List;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicLong;
25  import java.util.concurrent.atomic.AtomicReference;
26  
27  /**
28   * A factory that creates auto-scaling {@link EventExecutorChooser} instances.
29   * This chooser implements a dynamic, utilization-based auto-scaling strategy.
30   * <p>
31   * It enables the {@link io.netty.channel.EventLoopGroup} to automatically scale the number of active
32   * {@link io.netty.channel.EventLoop} threads between a minimum and maximum threshold.
33   * The scaling decision is based on the average utilization of the active threads, measured over a
34   * configurable time window.
35   * <p>
36   * An {@code EventLoop} can be suspended if its utilization is consistently below the
37   * {@code scaleDownThreshold}. Conversely, if the group's average utilization is consistently
38   * above the {@code scaleUpThreshold}, a suspended thread will be automatically resumed to handle
39   * the increased load.
40   * <p>
41   * To control the aggressiveness of scaling actions, the {@code maxRampUpStep} and {@code maxRampDownStep}
42   * parameters limit the maximum number of threads that can be activated or suspended in a single scaling cycle.
43   * Furthermore, to ensure decisions are based on sustained trends rather than transient spikes, the
44   * {@code scalingPatienceCycles} defines how many consecutive monitoring windows a condition must be met
45   * before a scaling action is triggered.
46   */
47  public final class AutoScalingEventExecutorChooserFactory implements EventExecutorChooserFactory {
48  
49      /**
50       * A container for the utilization metric of a single EventExecutor.
51       * This object is intended to be created once and have its {@code utilization}
52       * field updated periodically.
53       */
54      public static final class AutoScalingUtilizationMetric {
55          private final EventExecutor executor;
56          private final AtomicLong utilizationBits = new AtomicLong();
57  
58          AutoScalingUtilizationMetric(EventExecutor executor) {
59              this.executor = executor;
60          }
61  
62          /**
63           * Returns the most recently calculated utilization for the associated executor.
64           * @return a value from 0.0 to 1.0.
65           */
66          public double utilization() {
67              return Double.longBitsToDouble(utilizationBits.get());
68          }
69  
70          /**
71           * Returns the {@link EventExecutor} this metric belongs too.
72           * @return the executor.
73           */
74          public EventExecutor executor() {
75              return executor;
76          }
77  
78          void setUtilization(double utilization) {
79              long bits = Double.doubleToRawLongBits(utilization);
80              utilizationBits.lazySet(bits);
81          }
82      }
83  
84      private static final Runnable NO_OOP_TASK = () -> { };
85      private final int minChildren;
86      private final int maxChildren;
87      private final long utilizationCheckPeriodNanos;
88      private final double scaleDownThreshold;
89      private final double scaleUpThreshold;
90      private final int maxRampUpStep;
91      private final int maxRampDownStep;
92      private final int scalingPatienceCycles;
93  
94      /**
95       * Creates a new factory for a scaling-enabled {@link EventExecutorChooser}.
96       *
97       * @param minThreads               the minimum number of threads to keep active.
98       * @param maxThreads               the maximum number of threads to scale up to.
99       * @param utilizationWindow        the period at which to check group utilization.
100      * @param windowUnit               the unit for {@code utilizationWindow}.
101      * @param scaleDownThreshold       the average utilization below which a thread may be suspended.
102      * @param scaleUpThreshold         the average utilization above which a thread may be resumed.
103      * @param maxRampUpStep            the maximum number of threads to add in one cycle.
104      * @param maxRampDownStep          the maximum number of threads to remove in one cycle.
105      * @param scalingPatienceCycles    the number of consecutive cycles a condition must be met before scaling.
106      */
107     public AutoScalingEventExecutorChooserFactory(int minThreads, int maxThreads, long utilizationWindow,
108                                                   TimeUnit windowUnit, double scaleDownThreshold,
109                                                   double scaleUpThreshold, int maxRampUpStep, int maxRampDownStep,
110                                                   int scalingPatienceCycles) {
111         minChildren = ObjectUtil.checkPositiveOrZero(minThreads, "minThreads");
112         maxChildren = ObjectUtil.checkPositive(maxThreads, "maxThreads");
113         if (minThreads > maxThreads) {
114             throw new IllegalArgumentException(String.format(
115                     "minThreads: %d must not be greater than maxThreads: %d", minThreads, maxThreads));
116         }
117         utilizationCheckPeriodNanos = ObjectUtil.checkNotNull(windowUnit, "windowUnit")
118                                                      .toNanos(ObjectUtil.checkPositive(utilizationWindow,
119                                                                                        "utilizationWindow"));
120         this.scaleDownThreshold = ObjectUtil.checkInRange(scaleDownThreshold, 0.0, 1.0, "scaleDownThreshold");
121         this.scaleUpThreshold = ObjectUtil.checkInRange(scaleUpThreshold, 0.0, 1.0, "scaleUpThreshold");
122         if (scaleDownThreshold >= scaleUpThreshold) {
123             throw new IllegalArgumentException(
124                     "scaleDownThreshold must be less than scaleUpThreshold: " +
125                     scaleDownThreshold + " >= " + scaleUpThreshold);
126         }
127         this.maxRampUpStep = ObjectUtil.checkPositive(maxRampUpStep, "maxRampUpStep");
128         this.maxRampDownStep = ObjectUtil.checkPositive(maxRampDownStep, "maxRampDownStep");
129         this.scalingPatienceCycles = ObjectUtil.checkPositiveOrZero(scalingPatienceCycles, "scalingPatienceCycles");
130     }
131 
132     @Override
133     public EventExecutorChooser newChooser(EventExecutor[] executors) {
134         return new AutoScalingEventExecutorChooser(executors);
135     }
136 
137     /**
138      * An immutable snapshot of the chooser's state. All state transitions
139      * are managed by atomically swapping this object.
140      */
141     private static final class AutoScalingState {
142         final int activeChildrenCount;
143         final long nextWakeUpIndex;
144         final EventExecutor[] activeExecutors;
145         final EventExecutorChooser activeExecutorsChooser;
146 
147         AutoScalingState(int activeChildrenCount, long nextWakeUpIndex, EventExecutor[] activeExecutors) {
148             this.activeChildrenCount = activeChildrenCount;
149             this.nextWakeUpIndex = nextWakeUpIndex;
150             this.activeExecutors = activeExecutors;
151             activeExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(activeExecutors);
152         }
153     }
154 
155     private final class AutoScalingEventExecutorChooser implements ObservableEventExecutorChooser {
156         private final EventExecutor[] executors;
157         private final EventExecutorChooser allExecutorsChooser;
158         private final AtomicReference<AutoScalingState> state;
159         private final List<AutoScalingUtilizationMetric> utilizationMetrics;
160 
161         AutoScalingEventExecutorChooser(EventExecutor[] executors) {
162             this.executors = executors;
163             List<AutoScalingUtilizationMetric> metrics = new ArrayList<>(executors.length);
164             for (EventExecutor executor : executors) {
165                 metrics.add(new AutoScalingUtilizationMetric(executor));
166             }
167             utilizationMetrics = Collections.unmodifiableList(metrics);
168             allExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(executors);
169 
170             AutoScalingState initialState = new AutoScalingState(maxChildren, 0L, executors);
171             state = new AtomicReference<>(initialState);
172 
173             ScheduledFuture<?> utilizationMonitoringTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(
174                     new UtilizationMonitor(), utilizationCheckPeriodNanos, utilizationCheckPeriodNanos,
175                     TimeUnit.NANOSECONDS);
176 
177             if (executors.length > 0) {
178                 executors[0].terminationFuture().addListener(future -> utilizationMonitoringTask.cancel(false));
179             }
180         }
181 
182         /**
183          * This method is only responsible for picking from the active executors list.
184          * The monitor handles all scaling decisions.
185          */
186         @Override
187         public EventExecutor next() {
188             // Get a snapshot of the current state.
189             AutoScalingState currentState = this.state.get();
190 
191             if (currentState.activeExecutors.length == 0) {
192                 // This is only reachable if minChildren is 0 and the monitor has just suspended the last active thread.
193                 // To prevent an error and ensure the group can recover, we wake one up and use the
194                 // chooser that contains all executors as a safe temporary choice.
195                 tryScaleUpBy(1);
196                 return allExecutorsChooser.next();
197             }
198             return currentState.activeExecutorsChooser.next();
199         }
200 
201         /**
202          * Tries to increase the active thread count by waking up suspended executors.
203          * This method is thread-safe and updates the state atomically.
204          *
205          * @param amount    The desired number of threads to add to the active count.
206          */
207         private void tryScaleUpBy(int amount) {
208             if (amount <= 0) {
209                 return;
210             }
211 
212             for (;;) {
213                 AutoScalingState oldState = state.get();
214                 if (oldState.activeChildrenCount >= maxChildren) {
215                     return;
216                 }
217 
218                 int canAdd = Math.min(amount, maxChildren - oldState.activeChildrenCount);
219                 List<EventExecutor> wokenUp = new ArrayList<>(canAdd);
220                 final long startIndex = oldState.nextWakeUpIndex;
221 
222                 for (int i = 0; i < executors.length; i++) {
223                     EventExecutor child = executors[(int) Math.abs((startIndex + i) % executors.length)];
224 
225                     if (wokenUp.size() >= canAdd) {
226                         break; // We have woken up all the threads we reserved.
227                     }
228                     if (child instanceof SingleThreadEventExecutor) {
229                         SingleThreadEventExecutor stee = (SingleThreadEventExecutor) child;
230                         if (stee.isSuspended()) {
231                             stee.execute(NO_OOP_TASK);
232                             wokenUp.add(stee);
233                         }
234                     }
235                 }
236 
237                 if (wokenUp.isEmpty()) {
238                     return;
239                 }
240 
241                 // Create the new state.
242                 List<EventExecutor> newActiveList = new ArrayList<>(oldState.activeExecutors.length + wokenUp.size());
243                 Collections.addAll(newActiveList, oldState.activeExecutors);
244                 newActiveList.addAll(wokenUp);
245 
246                 AutoScalingState newState = new AutoScalingState(
247                         oldState.activeChildrenCount + wokenUp.size(),
248                         startIndex + wokenUp.size(),
249                         newActiveList.toArray(new EventExecutor[0]));
250 
251                 if (state.compareAndSet(oldState, newState)) {
252                     return;
253                 }
254                 // CAS failed, another thread changed the state. Loop again to retry.
255             }
256         }
257 
258         @Override
259         public int activeExecutorCount() {
260             return state.get().activeChildrenCount;
261         }
262 
263         @Override
264         public List<AutoScalingUtilizationMetric> executorUtilizations() {
265             return utilizationMetrics;
266         }
267 
268         private final class UtilizationMonitor implements Runnable {
269             private final List<SingleThreadEventExecutor> consistentlyIdleChildren = new ArrayList<>(maxChildren);
270             private long lastCheckTimeNanos;
271 
272             @Override
273             public void run() {
274                 if (executors.length == 0 || executors[0].isShuttingDown()) {
275                     // The group is shutting down, so no scaling decisions should be made.
276                     // The lifecycle listener on the terminationFuture will handle the final cancellation.
277                     return;
278                 }
279 
280                 // Calculate the actual elapsed time since the last run.
281                 final long now = executors[0].ticker().nanoTime();
282                 long totalTime;
283 
284                 if (lastCheckTimeNanos == 0) {
285                     // On the first run, use the configured period as a baseline to avoid skipping the cycle.
286                     totalTime = utilizationCheckPeriodNanos;
287                 } else {
288                     // On subsequent runs, calculate the actual elapsed time.
289                     totalTime = now - lastCheckTimeNanos;
290                 }
291 
292                 // Always update the timestamp for the next cycle.
293                 lastCheckTimeNanos = now;
294 
295                 if (totalTime <= 0) {
296                     // Skip this cycle if the clock has issues or the interval is invalid.
297                     return;
298                 }
299 
300                 int consistentlyBusyChildren = 0;
301                 consistentlyIdleChildren.clear();
302 
303                 final AutoScalingState currentState = state.get();
304 
305                 for (int i = 0; i < executors.length; i++) {
306                     EventExecutor child = executors[i];
307                     if (!(child instanceof SingleThreadEventExecutor)) {
308                         continue;
309                     }
310 
311                     SingleThreadEventExecutor eventExecutor = (SingleThreadEventExecutor) child;
312 
313                     double utilization = 0.0;
314                     if (!eventExecutor.isSuspended()) {
315                         long activeTime = eventExecutor.getAndResetAccumulatedActiveTimeNanos();
316 
317                         if (activeTime == 0) {
318                             long lastActivity = eventExecutor.getLastActivityTimeNanos();
319                             long idleTime = now - lastActivity;
320 
321                             // If the event loop has been idle for less time than our utilization window,
322                             // it means it was active for the remainder of that window.
323                             if (idleTime < totalTime) {
324                                 activeTime = totalTime - idleTime;
325                             }
326                             // If idleTime >= totalTime, it was idle for the whole window, so activeTime remains 0.
327                         }
328 
329                         utilization = Math.min(1.0, (double) activeTime / totalTime);
330 
331                         if (utilization < scaleDownThreshold) {
332                             // Utilization is low, increment idle counter and reset busy counter.
333                             int idleCycles = eventExecutor.getAndIncrementIdleCycles();
334                             eventExecutor.resetBusyCycles();
335                             if (idleCycles >= scalingPatienceCycles &&
336                                 eventExecutor.getNumOfRegisteredChannels() <= 0) {
337                                 consistentlyIdleChildren.add(eventExecutor);
338                             }
339                         } else if (utilization > scaleUpThreshold) {
340                             // Utilization is high, increment busy counter and reset idle counter.
341                             int busyCycles = eventExecutor.getAndIncrementBusyCycles();
342                             eventExecutor.resetIdleCycles();
343                             if (busyCycles >= scalingPatienceCycles) {
344                                 consistentlyBusyChildren++;
345                             }
346                         } else {
347                             // Utilization is in the normal range, reset counters.
348                             eventExecutor.resetIdleCycles();
349                             eventExecutor.resetBusyCycles();
350                         }
351                     }
352 
353                     utilizationMetrics.get(i).setUtilization(utilization);
354                 }
355 
356                 int currentActive = currentState.activeChildrenCount;
357 
358                 // Make scaling decisions based on stable states.
359                 if (consistentlyBusyChildren > 0 && currentActive < maxChildren) {
360                     // Scale Up, we have children that have been busy for multiple cycles.
361                     int threadsToAdd = Math.min(consistentlyBusyChildren, maxRampUpStep);
362                     threadsToAdd = Math.min(threadsToAdd, maxChildren - currentActive);
363                     if (threadsToAdd > 0) {
364                         tryScaleUpBy(threadsToAdd);
365                         // State change is handled by tryScaleUpBy, no need for rebuild here.
366                         return; // Exit to avoid conflicting scale down logic in the same cycle.
367                     }
368                 }
369 
370                 boolean changed = false; // Flag to track if we need to rebuild the active executors list.
371                 if (!consistentlyIdleChildren.isEmpty() && currentActive > minChildren) {
372                     // Scale down, we have children that have been idle for multiple cycles.
373 
374                     int threadsToRemove = Math.min(consistentlyIdleChildren.size(), maxRampDownStep);
375                     threadsToRemove = Math.min(threadsToRemove, currentActive - minChildren);
376 
377                     for (int i = 0; i < threadsToRemove; i++) {
378                         SingleThreadEventExecutor childToSuspend = consistentlyIdleChildren.get(i);
379                         if (childToSuspend.trySuspend()) {
380                             // Reset cycles upon suspension so it doesn't get immediately re-suspended on wake-up.
381                             childToSuspend.resetBusyCycles();
382                             childToSuspend.resetIdleCycles();
383                             changed = true;
384                         }
385                     }
386                 }
387 
388                 // If a scale-down occurred, or if the actual state differs from our view, rebuild.
389                 if (changed || currentActive != currentState.activeExecutors.length) {
390                     rebuildActiveExecutors();
391                 }
392             }
393 
394             /**
395              * Atomically updates the state by creating a new snapshot with the current set of active executors.
396              */
397             private void rebuildActiveExecutors() {
398                 for (;;) {
399                     AutoScalingState oldState = state.get();
400                     List<EventExecutor> active = new ArrayList<>(oldState.activeChildrenCount);
401                     for (EventExecutor executor : executors) {
402                         if (!executor.isSuspended()) {
403                             active.add(executor);
404                         }
405                     }
406                     EventExecutor[] newActiveExecutors = active.toArray(new EventExecutor[0]);
407 
408                     // If the number of active executors in our scan differs from the count in the state,
409                     // another thread likely changed it. We use the count from our fresh scan.
410                     // The nextWakeUpIndex is preserved from the old state as this rebuild is not a scale-up action.
411                     AutoScalingState newState = new AutoScalingState(
412                             newActiveExecutors.length, oldState.nextWakeUpIndex, newActiveExecutors);
413 
414                     if (state.compareAndSet(oldState, newState)) {
415                         break;
416                     }
417                 }
418             }
419         }
420     }
421 }