1 /*
2 * Copyright 2025 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * https://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicLong;
25 import java.util.concurrent.atomic.AtomicReference;
26
27 /**
28 * A factory that creates auto-scaling {@link EventExecutorChooser} instances.
29 * This chooser implements a dynamic, utilization-based auto-scaling strategy.
30 * <p>
31 * It enables the {@link io.netty.channel.EventLoopGroup} to automatically scale the number of active
32 * {@link io.netty.channel.EventLoop} threads between a minimum and maximum threshold.
33 * The scaling decision is based on the average utilization of the active threads, measured over a
34 * configurable time window.
35 * <p>
36 * An {@code EventLoop} can be suspended if its utilization is consistently below the
37 * {@code scaleDownThreshold}. Conversely, if the group's average utilization is consistently
38 * above the {@code scaleUpThreshold}, a suspended thread will be automatically resumed to handle
39 * the increased load.
40 * <p>
41 * To control the aggressiveness of scaling actions, the {@code maxRampUpStep} and {@code maxRampDownStep}
42 * parameters limit the maximum number of threads that can be activated or suspended in a single scaling cycle.
43 * Furthermore, to ensure decisions are based on sustained trends rather than transient spikes, the
44 * {@code scalingPatienceCycles} defines how many consecutive monitoring windows a condition must be met
45 * before a scaling action is triggered.
46 */
47 public final class AutoScalingEventExecutorChooserFactory implements EventExecutorChooserFactory {
48
49 /**
50 * A container for the utilization metric of a single EventExecutor.
51 * This object is intended to be created once and have its {@code utilization}
52 * field updated periodically.
53 */
54 public static final class AutoScalingUtilizationMetric {
55 private final EventExecutor executor;
56 private final AtomicLong utilizationBits = new AtomicLong();
57
58 AutoScalingUtilizationMetric(EventExecutor executor) {
59 this.executor = executor;
60 }
61
62 /**
63 * Returns the most recently calculated utilization for the associated executor.
64 * @return a value from 0.0 to 1.0.
65 */
66 public double utilization() {
67 return Double.longBitsToDouble(utilizationBits.get());
68 }
69
70 /**
71 * Returns the {@link EventExecutor} this metric belongs too.
72 * @return the executor.
73 */
74 public EventExecutor executor() {
75 return executor;
76 }
77
78 void setUtilization(double utilization) {
79 long bits = Double.doubleToRawLongBits(utilization);
80 utilizationBits.lazySet(bits);
81 }
82 }
83
84 private static final Runnable NO_OOP_TASK = () -> { };
85 private final int minChildren;
86 private final int maxChildren;
87 private final long utilizationCheckPeriodNanos;
88 private final double scaleDownThreshold;
89 private final double scaleUpThreshold;
90 private final int maxRampUpStep;
91 private final int maxRampDownStep;
92 private final int scalingPatienceCycles;
93
94 /**
95 * Creates a new factory for a scaling-enabled {@link EventExecutorChooser}.
96 *
97 * @param minThreads the minimum number of threads to keep active.
98 * @param maxThreads the maximum number of threads to scale up to.
99 * @param utilizationWindow the period at which to check group utilization.
100 * @param windowUnit the unit for {@code utilizationWindow}.
101 * @param scaleDownThreshold the average utilization below which a thread may be suspended.
102 * @param scaleUpThreshold the average utilization above which a thread may be resumed.
103 * @param maxRampUpStep the maximum number of threads to add in one cycle.
104 * @param maxRampDownStep the maximum number of threads to remove in one cycle.
105 * @param scalingPatienceCycles the number of consecutive cycles a condition must be met before scaling.
106 */
107 public AutoScalingEventExecutorChooserFactory(int minThreads, int maxThreads, long utilizationWindow,
108 TimeUnit windowUnit, double scaleDownThreshold,
109 double scaleUpThreshold, int maxRampUpStep, int maxRampDownStep,
110 int scalingPatienceCycles) {
111 minChildren = ObjectUtil.checkPositiveOrZero(minThreads, "minThreads");
112 maxChildren = ObjectUtil.checkPositive(maxThreads, "maxThreads");
113 if (minThreads > maxThreads) {
114 throw new IllegalArgumentException(String.format(
115 "minThreads: %d must not be greater than maxThreads: %d", minThreads, maxThreads));
116 }
117 utilizationCheckPeriodNanos = ObjectUtil.checkNotNull(windowUnit, "windowUnit")
118 .toNanos(ObjectUtil.checkPositive(utilizationWindow,
119 "utilizationWindow"));
120 this.scaleDownThreshold = ObjectUtil.checkInRange(scaleDownThreshold, 0.0, 1.0, "scaleDownThreshold");
121 this.scaleUpThreshold = ObjectUtil.checkInRange(scaleUpThreshold, 0.0, 1.0, "scaleUpThreshold");
122 if (scaleDownThreshold >= scaleUpThreshold) {
123 throw new IllegalArgumentException(
124 "scaleDownThreshold must be less than scaleUpThreshold: " +
125 scaleDownThreshold + " >= " + scaleUpThreshold);
126 }
127 this.maxRampUpStep = ObjectUtil.checkPositive(maxRampUpStep, "maxRampUpStep");
128 this.maxRampDownStep = ObjectUtil.checkPositive(maxRampDownStep, "maxRampDownStep");
129 this.scalingPatienceCycles = ObjectUtil.checkPositiveOrZero(scalingPatienceCycles, "scalingPatienceCycles");
130 }
131
132 @Override
133 public EventExecutorChooser newChooser(EventExecutor[] executors) {
134 return new AutoScalingEventExecutorChooser(executors);
135 }
136
137 /**
138 * An immutable snapshot of the chooser's state. All state transitions
139 * are managed by atomically swapping this object.
140 */
141 private static final class AutoScalingState {
142 final int activeChildrenCount;
143 final long nextWakeUpIndex;
144 final EventExecutor[] activeExecutors;
145 final EventExecutorChooser activeExecutorsChooser;
146
147 AutoScalingState(int activeChildrenCount, long nextWakeUpIndex, EventExecutor[] activeExecutors) {
148 this.activeChildrenCount = activeChildrenCount;
149 this.nextWakeUpIndex = nextWakeUpIndex;
150 this.activeExecutors = activeExecutors;
151 activeExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(activeExecutors);
152 }
153 }
154
155 private final class AutoScalingEventExecutorChooser implements ObservableEventExecutorChooser {
156 private final EventExecutor[] executors;
157 private final EventExecutorChooser allExecutorsChooser;
158 private final AtomicReference<AutoScalingState> state;
159 private final List<AutoScalingUtilizationMetric> utilizationMetrics;
160
161 AutoScalingEventExecutorChooser(EventExecutor[] executors) {
162 this.executors = executors;
163 List<AutoScalingUtilizationMetric> metrics = new ArrayList<>(executors.length);
164 for (EventExecutor executor : executors) {
165 metrics.add(new AutoScalingUtilizationMetric(executor));
166 }
167 utilizationMetrics = Collections.unmodifiableList(metrics);
168 allExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(executors);
169
170 AutoScalingState initialState = new AutoScalingState(maxChildren, 0L, executors);
171 state = new AtomicReference<>(initialState);
172
173 ScheduledFuture<?> utilizationMonitoringTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(
174 new UtilizationMonitor(), utilizationCheckPeriodNanos, utilizationCheckPeriodNanos,
175 TimeUnit.NANOSECONDS);
176
177 if (executors.length > 0) {
178 executors[0].terminationFuture().addListener(future -> utilizationMonitoringTask.cancel(false));
179 }
180 }
181
182 /**
183 * This method is only responsible for picking from the active executors list.
184 * The monitor handles all scaling decisions.
185 */
186 @Override
187 public EventExecutor next() {
188 // Get a snapshot of the current state.
189 AutoScalingState currentState = this.state.get();
190
191 if (currentState.activeExecutors.length == 0) {
192 // This is only reachable if minChildren is 0 and the monitor has just suspended the last active thread.
193 // To prevent an error and ensure the group can recover, we wake one up and use the
194 // chooser that contains all executors as a safe temporary choice.
195 tryScaleUpBy(1);
196 return allExecutorsChooser.next();
197 }
198 return currentState.activeExecutorsChooser.next();
199 }
200
201 /**
202 * Tries to increase the active thread count by waking up suspended executors.
203 * This method is thread-safe and updates the state atomically.
204 *
205 * @param amount The desired number of threads to add to the active count.
206 */
207 private void tryScaleUpBy(int amount) {
208 if (amount <= 0) {
209 return;
210 }
211
212 for (;;) {
213 AutoScalingState oldState = state.get();
214 if (oldState.activeChildrenCount >= maxChildren) {
215 return;
216 }
217
218 int canAdd = Math.min(amount, maxChildren - oldState.activeChildrenCount);
219 List<EventExecutor> wokenUp = new ArrayList<>(canAdd);
220 final long startIndex = oldState.nextWakeUpIndex;
221
222 for (int i = 0; i < executors.length; i++) {
223 EventExecutor child = executors[(int) Math.abs((startIndex + i) % executors.length)];
224
225 if (wokenUp.size() >= canAdd) {
226 break; // We have woken up all the threads we reserved.
227 }
228 if (child instanceof SingleThreadEventExecutor) {
229 SingleThreadEventExecutor stee = (SingleThreadEventExecutor) child;
230 if (stee.isSuspended()) {
231 stee.execute(NO_OOP_TASK);
232 wokenUp.add(stee);
233 }
234 }
235 }
236
237 if (wokenUp.isEmpty()) {
238 return;
239 }
240
241 // Create the new state.
242 List<EventExecutor> newActiveList = new ArrayList<>(oldState.activeExecutors.length + wokenUp.size());
243 Collections.addAll(newActiveList, oldState.activeExecutors);
244 newActiveList.addAll(wokenUp);
245
246 AutoScalingState newState = new AutoScalingState(
247 oldState.activeChildrenCount + wokenUp.size(),
248 startIndex + wokenUp.size(),
249 newActiveList.toArray(new EventExecutor[0]));
250
251 if (state.compareAndSet(oldState, newState)) {
252 return;
253 }
254 // CAS failed, another thread changed the state. Loop again to retry.
255 }
256 }
257
258 @Override
259 public int activeExecutorCount() {
260 return state.get().activeChildrenCount;
261 }
262
263 @Override
264 public List<AutoScalingUtilizationMetric> executorUtilizations() {
265 return utilizationMetrics;
266 }
267
268 private final class UtilizationMonitor implements Runnable {
269 private final List<SingleThreadEventExecutor> consistentlyIdleChildren = new ArrayList<>(maxChildren);
270 private long lastCheckTimeNanos;
271
272 @Override
273 public void run() {
274 if (executors.length == 0 || executors[0].isShuttingDown()) {
275 // The group is shutting down, so no scaling decisions should be made.
276 // The lifecycle listener on the terminationFuture will handle the final cancellation.
277 return;
278 }
279
280 // Calculate the actual elapsed time since the last run.
281 final long now = executors[0].ticker().nanoTime();
282 long totalTime;
283
284 if (lastCheckTimeNanos == 0) {
285 // On the first run, use the configured period as a baseline to avoid skipping the cycle.
286 totalTime = utilizationCheckPeriodNanos;
287 } else {
288 // On subsequent runs, calculate the actual elapsed time.
289 totalTime = now - lastCheckTimeNanos;
290 }
291
292 // Always update the timestamp for the next cycle.
293 lastCheckTimeNanos = now;
294
295 if (totalTime <= 0) {
296 // Skip this cycle if the clock has issues or the interval is invalid.
297 return;
298 }
299
300 int consistentlyBusyChildren = 0;
301 consistentlyIdleChildren.clear();
302
303 final AutoScalingState currentState = state.get();
304
305 for (int i = 0; i < executors.length; i++) {
306 EventExecutor child = executors[i];
307 if (!(child instanceof SingleThreadEventExecutor)) {
308 continue;
309 }
310
311 SingleThreadEventExecutor eventExecutor = (SingleThreadEventExecutor) child;
312
313 double utilization = 0.0;
314 if (!eventExecutor.isSuspended()) {
315 long activeTime = eventExecutor.getAndResetAccumulatedActiveTimeNanos();
316
317 if (activeTime == 0) {
318 long lastActivity = eventExecutor.getLastActivityTimeNanos();
319 long idleTime = now - lastActivity;
320
321 // If the event loop has been idle for less time than our utilization window,
322 // it means it was active for the remainder of that window.
323 if (idleTime < totalTime) {
324 activeTime = totalTime - idleTime;
325 }
326 // If idleTime >= totalTime, it was idle for the whole window, so activeTime remains 0.
327 }
328
329 utilization = Math.min(1.0, (double) activeTime / totalTime);
330
331 if (utilization < scaleDownThreshold) {
332 // Utilization is low, increment idle counter and reset busy counter.
333 int idleCycles = eventExecutor.getAndIncrementIdleCycles();
334 eventExecutor.resetBusyCycles();
335 if (idleCycles >= scalingPatienceCycles &&
336 eventExecutor.getNumOfRegisteredChannels() <= 0) {
337 consistentlyIdleChildren.add(eventExecutor);
338 }
339 } else if (utilization > scaleUpThreshold) {
340 // Utilization is high, increment busy counter and reset idle counter.
341 int busyCycles = eventExecutor.getAndIncrementBusyCycles();
342 eventExecutor.resetIdleCycles();
343 if (busyCycles >= scalingPatienceCycles) {
344 consistentlyBusyChildren++;
345 }
346 } else {
347 // Utilization is in the normal range, reset counters.
348 eventExecutor.resetIdleCycles();
349 eventExecutor.resetBusyCycles();
350 }
351 }
352
353 utilizationMetrics.get(i).setUtilization(utilization);
354 }
355
356 int currentActive = currentState.activeChildrenCount;
357
358 // Make scaling decisions based on stable states.
359 if (consistentlyBusyChildren > 0 && currentActive < maxChildren) {
360 // Scale Up, we have children that have been busy for multiple cycles.
361 int threadsToAdd = Math.min(consistentlyBusyChildren, maxRampUpStep);
362 threadsToAdd = Math.min(threadsToAdd, maxChildren - currentActive);
363 if (threadsToAdd > 0) {
364 tryScaleUpBy(threadsToAdd);
365 // State change is handled by tryScaleUpBy, no need for rebuild here.
366 return; // Exit to avoid conflicting scale down logic in the same cycle.
367 }
368 }
369
370 boolean changed = false; // Flag to track if we need to rebuild the active executors list.
371 if (!consistentlyIdleChildren.isEmpty() && currentActive > minChildren) {
372 // Scale down, we have children that have been idle for multiple cycles.
373
374 int threadsToRemove = Math.min(consistentlyIdleChildren.size(), maxRampDownStep);
375 threadsToRemove = Math.min(threadsToRemove, currentActive - minChildren);
376
377 for (int i = 0; i < threadsToRemove; i++) {
378 SingleThreadEventExecutor childToSuspend = consistentlyIdleChildren.get(i);
379 if (childToSuspend.trySuspend()) {
380 // Reset cycles upon suspension so it doesn't get immediately re-suspended on wake-up.
381 childToSuspend.resetBusyCycles();
382 childToSuspend.resetIdleCycles();
383 changed = true;
384 }
385 }
386 }
387
388 // If a scale-down occurred, or if the actual state differs from our view, rebuild.
389 if (changed || currentActive != currentState.activeExecutors.length) {
390 rebuildActiveExecutors();
391 }
392 }
393
394 /**
395 * Atomically updates the state by creating a new snapshot with the current set of active executors.
396 */
397 private void rebuildActiveExecutors() {
398 for (;;) {
399 AutoScalingState oldState = state.get();
400 List<EventExecutor> active = new ArrayList<>(oldState.activeChildrenCount);
401 for (EventExecutor executor : executors) {
402 if (!executor.isSuspended()) {
403 active.add(executor);
404 }
405 }
406 EventExecutor[] newActiveExecutors = active.toArray(new EventExecutor[0]);
407
408 // If the number of active executors in our scan differs from the count in the state,
409 // another thread likely changed it. We use the count from our fresh scan.
410 // The nextWakeUpIndex is preserved from the old state as this rebuild is not a scale-up action.
411 AutoScalingState newState = new AutoScalingState(
412 newActiveExecutors.length, oldState.nextWakeUpIndex, newActiveExecutors);
413
414 if (state.compareAndSet(oldState, newState)) {
415 break;
416 }
417 }
418 }
419 }
420 }
421 }