1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.util.concurrent;
17
18 import io.netty.util.internal.ObjectUtil;
19
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.List;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicReference;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46 public final class AutoScalingEventExecutorChooserFactory implements EventExecutorChooserFactory {
47
48 private static final Runnable NO_OOP_TASK = () -> { };
49 private final int minChildren;
50 private final int maxChildren;
51 private final long utilizationCheckPeriodNanos;
52 private final double scaleDownThreshold;
53 private final double scaleUpThreshold;
54 private final int maxRampUpStep;
55 private final int maxRampDownStep;
56 private final int scalingPatienceCycles;
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public AutoScalingEventExecutorChooserFactory(int minThreads, int maxThreads, long utilizationWindow,
72 TimeUnit windowUnit, double scaleDownThreshold,
73 double scaleUpThreshold, int maxRampUpStep, int maxRampDownStep,
74 int scalingPatienceCycles) {
75 this.minChildren = ObjectUtil.checkPositiveOrZero(minThreads, "minThreads");
76 this.maxChildren = ObjectUtil.checkPositive(maxThreads, "maxThreads");
77 if (minThreads > maxThreads) {
78 throw new IllegalArgumentException(String.format(
79 "minThreads: %d must not be greater than maxThreads: %d", minThreads, maxThreads));
80 }
81 this.utilizationCheckPeriodNanos = ObjectUtil.checkNotNull(windowUnit, "windowUnit")
82 .toNanos(ObjectUtil.checkPositive(utilizationWindow,
83 "utilizationWindow"));
84 this.scaleDownThreshold = ObjectUtil.checkInRange(scaleDownThreshold, 0.0, 1.0, "scaleDownThreshold");
85 this.scaleUpThreshold = ObjectUtil.checkInRange(scaleUpThreshold, 0.0, 1.0, "scaleUpThreshold");
86 if (scaleDownThreshold >= scaleUpThreshold) {
87 throw new IllegalArgumentException(
88 "scaleDownThreshold must be less than scaleUpThreshold: " +
89 scaleDownThreshold + " >= " + scaleUpThreshold);
90 }
91 this.maxRampUpStep = ObjectUtil.checkPositive(maxRampUpStep, "maxRampUpStep");
92 this.maxRampDownStep = ObjectUtil.checkPositive(maxRampDownStep, "maxRampDownStep");
93 this.scalingPatienceCycles = ObjectUtil.checkPositiveOrZero(scalingPatienceCycles, "scalingPatienceCycles");
94 }
95
96 @Override
97 public EventExecutorChooser newChooser(EventExecutor[] executors) {
98 return new AutoScalingEventExecutorChooser(executors);
99 }
100
101
102
103
104
105 private static final class AutoScalingState {
106 final int activeChildrenCount;
107 final long nextWakeUpIndex;
108 final EventExecutor[] activeExecutors;
109 final EventExecutorChooser activeExecutorsChooser;
110
111 AutoScalingState(int activeChildrenCount, long nextWakeUpIndex, EventExecutor[] activeExecutors) {
112 this.activeChildrenCount = activeChildrenCount;
113 this.nextWakeUpIndex = nextWakeUpIndex;
114 this.activeExecutors = activeExecutors;
115 this.activeExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(activeExecutors);
116 }
117 }
118
119 private final class AutoScalingEventExecutorChooser implements EventExecutorChooser {
120 private final EventExecutor[] executors;
121 private final EventExecutorChooser allExecutorsChooser;
122 private final AtomicReference<AutoScalingState> state;
123
124 AutoScalingEventExecutorChooser(EventExecutor[] executors) {
125 this.executors = executors;
126 this.allExecutorsChooser = DefaultEventExecutorChooserFactory.INSTANCE.newChooser(executors);
127
128 AutoScalingState initialState = new AutoScalingState(maxChildren, 0L, executors);
129 this.state = new AtomicReference<>(initialState);
130
131 ScheduledFuture<?> utilizationMonitoringTask = GlobalEventExecutor.INSTANCE.scheduleAtFixedRate(
132 new UtilizationMonitor(), utilizationCheckPeriodNanos, utilizationCheckPeriodNanos,
133 TimeUnit.NANOSECONDS);
134
135 if (executors.length > 0) {
136 executors[0].terminationFuture().addListener(future -> utilizationMonitoringTask.cancel(false));
137 }
138 }
139
140
141
142
143
144 @Override
145 public EventExecutor next() {
146
147 AutoScalingState currentState = this.state.get();
148
149 if (currentState.activeExecutors.length == 0) {
150
151
152
153 tryScaleUpBy(1);
154 return allExecutorsChooser.next();
155 }
156 return currentState.activeExecutorsChooser.next();
157 }
158
159
160
161
162
163
164
165 private void tryScaleUpBy(int amount) {
166 if (amount <= 0) {
167 return;
168 }
169
170 for (;;) {
171 AutoScalingState oldState = state.get();
172 if (oldState.activeChildrenCount >= maxChildren) {
173 return;
174 }
175
176 int canAdd = Math.min(amount, maxChildren - oldState.activeChildrenCount);
177 List<EventExecutor> wokenUp = new ArrayList<>(canAdd);
178 final long startIndex = oldState.nextWakeUpIndex;
179
180 for (int i = 0; i < executors.length; i++) {
181 EventExecutor child = executors[(int) Math.abs((startIndex + i) % executors.length)];
182
183 if (wokenUp.size() >= canAdd) {
184 break;
185 }
186 if (child instanceof SingleThreadEventExecutor) {
187 SingleThreadEventExecutor stee = (SingleThreadEventExecutor) child;
188 if (stee.isSuspended()) {
189 stee.execute(NO_OOP_TASK);
190 wokenUp.add(stee);
191 }
192 }
193 }
194
195 if (wokenUp.isEmpty()) {
196 return;
197 }
198
199
200 List<EventExecutor> newActiveList = new ArrayList<>(oldState.activeExecutors.length + wokenUp.size());
201 Collections.addAll(newActiveList, oldState.activeExecutors);
202 newActiveList.addAll(wokenUp);
203
204 AutoScalingState newState = new AutoScalingState(
205 oldState.activeChildrenCount + wokenUp.size(),
206 startIndex + wokenUp.size(),
207 newActiveList.toArray(new EventExecutor[0]));
208
209 if (state.compareAndSet(oldState, newState)) {
210 return;
211 }
212
213 }
214 }
215
216 private final class UtilizationMonitor implements Runnable {
217 private final List<SingleThreadEventExecutor> consistentlyIdleChildren = new ArrayList<>(maxChildren);
218
219 @Override
220 public void run() {
221 if (executors.length == 0 || executors[0].isShuttingDown()) {
222
223
224 return;
225 }
226
227 int consistentlyBusyChildren = 0;
228 consistentlyIdleChildren.clear();
229
230 final AutoScalingState currentState = state.get();
231
232 for (EventExecutor child : executors) {
233 if (child.isSuspended() || !(child instanceof SingleThreadEventExecutor)) {
234 continue;
235 }
236
237 SingleThreadEventExecutor stee = (SingleThreadEventExecutor) child;
238
239 long activeTime = stee.getAndResetAccumulatedActiveTimeNanos();
240 final long totalTime = utilizationCheckPeriodNanos;
241
242 if (activeTime == 0) {
243 long lastActivity = stee.getLastActivityTimeNanos();
244 long idleTime = stee.ticker().nanoTime() - lastActivity;
245
246
247
248 if (idleTime < totalTime) {
249 activeTime = totalTime - idleTime;
250 }
251
252 }
253
254 double utilization = Math.min(1.0, (double) activeTime / totalTime);
255
256 if (utilization < scaleDownThreshold) {
257
258 int idleCycles = stee.getAndIncrementIdleCycles();
259 stee.resetBusyCycles();
260 if (idleCycles >= scalingPatienceCycles && stee.getNumOfRegisteredChannels() <= 0) {
261 consistentlyIdleChildren.add(stee);
262 }
263 } else if (utilization > scaleUpThreshold) {
264
265 int busyCycles = stee.getAndIncrementBusyCycles();
266 stee.resetIdleCycles();
267 if (busyCycles >= scalingPatienceCycles) {
268 consistentlyBusyChildren++;
269 }
270 } else {
271
272 stee.resetIdleCycles();
273 stee.resetBusyCycles();
274 }
275 }
276
277 boolean changed = false;
278 int currentActive = currentState.activeChildrenCount;
279
280
281 if (consistentlyBusyChildren > 0 && currentActive < maxChildren) {
282
283 int threadsToAdd = Math.min(consistentlyBusyChildren, maxRampUpStep);
284 threadsToAdd = Math.min(threadsToAdd, maxChildren - currentActive);
285 if (threadsToAdd > 0) {
286 tryScaleUpBy(threadsToAdd);
287
288 return;
289 }
290 }
291
292 if (!consistentlyIdleChildren.isEmpty() && currentActive > minChildren) {
293
294
295 int threadsToRemove = Math.min(consistentlyIdleChildren.size(), maxRampDownStep);
296 threadsToRemove = Math.min(threadsToRemove, currentActive - minChildren);
297
298 for (int i = 0; i < threadsToRemove; i++) {
299 SingleThreadEventExecutor childToSuspend = consistentlyIdleChildren.get(i);
300 if (childToSuspend.trySuspend()) {
301
302 childToSuspend.resetBusyCycles();
303 childToSuspend.resetIdleCycles();
304 changed = true;
305 }
306 }
307 }
308
309
310 if (changed || currentActive != currentState.activeExecutors.length) {
311 rebuildActiveExecutors();
312 }
313 }
314
315
316
317
318 private void rebuildActiveExecutors() {
319 for (;;) {
320 AutoScalingState oldState = state.get();
321 List<EventExecutor> active = new ArrayList<>(oldState.activeChildrenCount);
322 for (EventExecutor executor : executors) {
323 if (!executor.isSuspended()) {
324 active.add(executor);
325 }
326 }
327 EventExecutor[] newActiveExecutors = active.toArray(new EventExecutor[0]);
328
329
330
331
332 AutoScalingState newState = new AutoScalingState(
333 newActiveExecutors.length, oldState.nextWakeUpIndex, newActiveExecutors);
334
335 if (state.compareAndSet(oldState, newState)) {
336 break;
337 }
338 }
339 }
340 }
341 }
342 }