1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package io.netty5.bootstrap;
18
19 import io.netty5.channel.Channel;
20 import io.netty5.channel.ChannelFutureListeners;
21 import io.netty5.channel.ChannelHandler;
22 import io.netty5.channel.ChannelOption;
23 import io.netty5.channel.EventLoop;
24 import io.netty5.channel.EventLoopGroup;
25 import io.netty5.util.AttributeKey;
26 import io.netty5.util.concurrent.Future;
27 import io.netty5.util.concurrent.Promise;
28 import io.netty5.util.internal.SocketUtils;
29 import io.netty5.util.internal.StringUtil;
30 import io.netty5.util.internal.logging.InternalLogger;
31
32 import java.net.InetAddress;
33 import java.net.InetSocketAddress;
34 import java.net.SocketAddress;
35 import java.util.Collections;
36 import java.util.LinkedHashMap;
37 import java.util.Map;
38 import java.util.concurrent.ConcurrentHashMap;
39
40 import static java.util.Objects.requireNonNull;
41
42
43
44
45
46
47
48
49 public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C extends Channel, F>
50 implements Cloneable {
51 @SuppressWarnings("unchecked")
52 private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
53 @SuppressWarnings("unchecked")
54 private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
55
56 volatile EventLoopGroup group;
57 private volatile SocketAddress localAddress;
58
59
60
61 private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
62 private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();
63
64 private volatile ChannelHandler handler;
65
66 AbstractBootstrap() {
67
68 }
69
70 AbstractBootstrap(AbstractBootstrap<B, C, F> bootstrap) {
71 group = bootstrap.group;
72 handler = bootstrap.handler;
73 localAddress = bootstrap.localAddress;
74 synchronized (bootstrap.options) {
75 options.putAll(bootstrap.options);
76 }
77 attrs.putAll(bootstrap.attrs);
78 }
79
80
81
82
83
84 public B group(EventLoopGroup group) {
85 requireNonNull(group, "group");
86 if (this.group != null) {
87 throw new IllegalStateException("group set already");
88 }
89 this.group = group;
90 return self();
91 }
92
93 @SuppressWarnings("unchecked")
94 private B self() {
95 return (B) this;
96 }
97
98
99
100
101 public B localAddress(SocketAddress localAddress) {
102 this.localAddress = localAddress;
103 return self();
104 }
105
106
107
108
109 public B localAddress(int inetPort) {
110 return localAddress(new InetSocketAddress(inetPort));
111 }
112
113
114
115
116 public B localAddress(String inetHost, int inetPort) {
117 return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
118 }
119
120
121
122
123 public B localAddress(InetAddress inetHost, int inetPort) {
124 return localAddress(new InetSocketAddress(inetHost, inetPort));
125 }
126
127
128
129
130
131 public <T> B option(ChannelOption<T> option, T value) {
132 requireNonNull(option, "option");
133 synchronized (options) {
134 if (value == null) {
135 options.remove(option);
136 } else {
137 options.put(option, value);
138 }
139 }
140 return self();
141 }
142
143
144
145
146
147 public <T> B attr(AttributeKey<T> key, T value) {
148 requireNonNull(key, "key");
149 if (value == null) {
150 attrs.remove(key);
151 } else {
152 attrs.put(key, value);
153 }
154 return self();
155 }
156
157
158
159
160
161 public B validate() {
162 if (group == null) {
163 throw new IllegalStateException("group not set");
164 }
165 return self();
166 }
167
168
169
170
171
172
173 @Override
174 @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
175 public abstract B clone();
176
177
178
179
180 public Future<Channel> register() {
181 validate();
182 return initAndRegister(group.next());
183 }
184
185
186
187
188
189
190
191
192
193 public Channel createUnregistered() throws Exception {
194 validate();
195 return initWithoutRegister();
196 }
197
198
199
200
201 public Future<Channel> bind() {
202 validate();
203 SocketAddress localAddress = this.localAddress;
204 requireNonNull(localAddress, "localAddress");
205 return doBind(localAddress);
206 }
207
208
209
210
211 public Future<Channel> bind(int inetPort) {
212 return bind(new InetSocketAddress(inetPort));
213 }
214
215
216
217
218 public Future<Channel> bind(String inetHost, int inetPort) {
219 return bind(SocketUtils.socketAddress(inetHost, inetPort));
220 }
221
222
223
224
225 public Future<Channel> bind(InetAddress inetHost, int inetPort) {
226 return bind(new InetSocketAddress(inetHost, inetPort));
227 }
228
229
230
231
232 public Future<Channel> bind(SocketAddress localAddress) {
233 validate();
234 requireNonNull(localAddress, "localAddress");
235 return doBind(localAddress);
236 }
237
238 private Future<Channel> doBind(final SocketAddress localAddress) {
239 EventLoop loop = group.next();
240 final Future<Channel> regFuture = initAndRegister(loop);
241 if (regFuture.isFailed()) {
242 return regFuture;
243 }
244
245 Promise<Channel> bindPromise = loop.newPromise();
246 if (regFuture.isDone()) {
247
248 Channel channel = regFuture.getNow();
249 Promise<Void> promise = channel.newPromise();
250 promise.asFuture().map(v -> channel).cascadeTo(bindPromise);
251 doBind0(regFuture, channel, localAddress, promise);
252 } else {
253
254 regFuture.addListener(future -> {
255 Throwable cause = future.cause();
256 if (cause != null) {
257
258
259 bindPromise.setFailure(cause);
260 } else {
261 Channel channel = future.getNow();
262 Promise<Void> promise = channel.newPromise();
263 promise.asFuture().map(v -> channel).cascadeTo(bindPromise);
264 doBind0(regFuture, channel, localAddress, promise);
265 }
266 });
267 }
268 return bindPromise.asFuture();
269 }
270
271 final Future<Channel> initAndRegister(EventLoop loop) {
272 final Channel channel;
273 try {
274 channel = newChannel(loop);
275 } catch (Throwable t) {
276 return loop.newFailedFuture(t);
277 }
278
279 Promise<Channel> promise = loop.newPromise();
280 loop.execute(() -> init(channel).addListener(future -> {
281 if (future.isSuccess()) {
282
283
284 channel.register().addListener(f -> promise.setSuccess(channel));
285 } else {
286 channel.close();
287 promise.setFailure(future.cause());
288 }
289 }));
290
291 return promise.asFuture();
292 }
293
294 final Channel initWithoutRegister() throws Exception {
295 EventLoop loop = group.next();
296 Channel channel = newChannel(loop);
297
298 init(channel).addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE);
299 return channel;
300 }
301
302 abstract C newChannel(EventLoop loop) throws Exception;
303
304 abstract Future<Channel> init(Channel channel);
305
306 private static void doBind0(
307 final Future<Channel> regFuture, final Channel channel,
308 final SocketAddress localAddress, final Promise<Void> promise) {
309
310
311 channel.executor().execute(() -> {
312 if (regFuture.isSuccess()) {
313 channel.bind(localAddress).cascadeTo(promise)
314 .addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE);
315 } else {
316 promise.setFailure(regFuture.cause());
317 }
318 });
319 }
320
321
322
323
324 public B handler(ChannelHandler handler) {
325 requireNonNull(handler, "handler");
326 this.handler = handler;
327 return self();
328 }
329
330
331
332
333
334
335 @Deprecated
336 public final EventLoopGroup group() {
337 return group;
338 }
339
340
341
342
343
344 public abstract AbstractBootstrapConfig<B, C, F> config();
345
346 final Map.Entry<ChannelOption<?>, Object>[] newOptionsArray() {
347 return newOptionsArray(options);
348 }
349
350 static Map.Entry<ChannelOption<?>, Object>[] newOptionsArray(Map<ChannelOption<?>, Object> options) {
351 synchronized (options) {
352 return new LinkedHashMap<>(options).entrySet().toArray(EMPTY_OPTION_ARRAY);
353 }
354 }
355
356 final Map.Entry<AttributeKey<?>, Object>[] newAttributesArray() {
357 return newAttributesArray(attrs0());
358 }
359
360 static Map.Entry<AttributeKey<?>, Object>[] newAttributesArray(Map<AttributeKey<?>, Object> attributes) {
361 return attributes.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
362 }
363
364 final Map<ChannelOption<?>, Object> options0() {
365 return options;
366 }
367
368 final Map<AttributeKey<?>, Object> attrs0() {
369 return attrs;
370 }
371
372 final SocketAddress localAddress() {
373 return localAddress;
374 }
375
376 final ChannelHandler handler() {
377 return handler;
378 }
379
380 final Map<ChannelOption<?>, Object> options() {
381 synchronized (options) {
382 return copiedMap(options);
383 }
384 }
385
386 final Map<AttributeKey<?>, Object> attrs() {
387 return copiedMap(attrs);
388 }
389
390 static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
391 if (map.isEmpty()) {
392 return Collections.emptyMap();
393 }
394 return Map.copyOf(map);
395 }
396
397 static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
398 for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
399 @SuppressWarnings("unchecked")
400 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
401 channel.attr(key).set(e.getValue());
402 }
403 }
404
405 static void setChannelOptions(
406 Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
407 for (Map.Entry<ChannelOption<?>, Object> e: options) {
408 setChannelOption(channel, e.getKey(), e.getValue(), logger);
409 }
410 }
411
412 @SuppressWarnings("unchecked")
413 private static void setChannelOption(
414 Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
415 try {
416 if (!channel.isOptionSupported(option)) {
417 logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
418 } else {
419 channel.setOption((ChannelOption<Object>) option, value);
420 }
421 } catch (Throwable t) {
422 logger.warn(
423 "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
424 }
425 }
426
427 @Override
428 public String toString() {
429 StringBuilder buf = new StringBuilder()
430 .append(StringUtil.simpleClassName(this))
431 .append('(').append(config()).append(')');
432 return buf.toString();
433 }
434 }