View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty5.bootstrap;
18  
19  import io.netty5.channel.Channel;
20  import io.netty5.channel.ChannelFutureListeners;
21  import io.netty5.channel.ChannelHandler;
22  import io.netty5.channel.ChannelOption;
23  import io.netty5.channel.EventLoop;
24  import io.netty5.channel.EventLoopGroup;
25  import io.netty5.util.AttributeKey;
26  import io.netty5.util.concurrent.Future;
27  import io.netty5.util.concurrent.Promise;
28  import io.netty5.util.internal.SocketUtils;
29  import io.netty5.util.internal.StringUtil;
30  import io.netty5.util.internal.logging.InternalLogger;
31  
32  import java.net.InetAddress;
33  import java.net.InetSocketAddress;
34  import java.net.SocketAddress;
35  import java.util.Collections;
36  import java.util.LinkedHashMap;
37  import java.util.Map;
38  import java.util.concurrent.ConcurrentHashMap;
39  
40  import static java.util.Objects.requireNonNull;
41  
42  /**
43   * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
44   * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
45   *
46   * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
47   * transports such as datagram (UDP).</p>
48   */
49  public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C, F>, C extends Channel, F>
50          implements Cloneable {
51      @SuppressWarnings("unchecked")
52      private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
53      @SuppressWarnings("unchecked")
54      private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
55  
56      volatile EventLoopGroup group;
57      private volatile SocketAddress localAddress;
58  
59      // The order in which ChannelOptions are applied is important they may depend on each other for validation
60      // purposes.
61      private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
62      private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<>();
63  
64      private volatile ChannelHandler handler;
65  
66      AbstractBootstrap() {
67          // Disallow extending from a different package.
68      }
69  
70      AbstractBootstrap(AbstractBootstrap<B, C, F> bootstrap) {
71          group = bootstrap.group;
72          handler = bootstrap.handler;
73          localAddress = bootstrap.localAddress;
74          synchronized (bootstrap.options) {
75              options.putAll(bootstrap.options);
76          }
77          attrs.putAll(bootstrap.attrs);
78      }
79  
80      /**
81       * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
82       * {@link Channel}
83       */
84      public B group(EventLoopGroup group) {
85          requireNonNull(group, "group");
86          if (this.group != null) {
87              throw new IllegalStateException("group set already");
88          }
89          this.group = group;
90          return self();
91      }
92  
93      @SuppressWarnings("unchecked")
94      private B self() {
95          return (B) this;
96      }
97  
98      /**
99       * The {@link SocketAddress} which is used to bind the local "end" to.
100      */
101     public B localAddress(SocketAddress localAddress) {
102         this.localAddress = localAddress;
103         return self();
104     }
105 
106     /**
107      * @see #localAddress(SocketAddress)
108      */
109     public B localAddress(int inetPort) {
110         return localAddress(new InetSocketAddress(inetPort));
111     }
112 
113     /**
114      * @see #localAddress(SocketAddress)
115      */
116     public B localAddress(String inetHost, int inetPort) {
117         return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
118     }
119 
120     /**
121      * @see #localAddress(SocketAddress)
122      */
123     public B localAddress(InetAddress inetHost, int inetPort) {
124         return localAddress(new InetSocketAddress(inetHost, inetPort));
125     }
126 
127     /**
128      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
129      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
130      */
131     public <T> B option(ChannelOption<T> option, T value) {
132         requireNonNull(option, "option");
133         synchronized (options) {
134             if (value == null) {
135                 options.remove(option);
136             } else {
137                 options.put(option, value);
138             }
139         }
140         return self();
141     }
142 
143     /**
144      * Allow to specify an initial attribute of the newly created {@link Channel}.  If the {@code value} is
145      * {@code null}, the attribute of the specified {@code key} is removed.
146      */
147     public <T> B attr(AttributeKey<T> key, T value) {
148         requireNonNull(key, "key");
149         if (value == null) {
150             attrs.remove(key);
151         } else {
152             attrs.put(key, value);
153         }
154         return self();
155     }
156 
157     /**
158      * Validate all the parameters. Sub-classes may override this, but should
159      * call the super method in that case.
160      */
161     public B validate() {
162         if (group == null) {
163             throw new IllegalStateException("group not set");
164         }
165         return self();
166     }
167 
168     /**
169      * Returns a deep clone of this bootstrap which has the identical configuration.  This method is useful when making
170      * multiple {@link Channel}s with similar settings.  Please note that this method does not clone the
171      * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
172      */
173     @Override
174     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
175     public abstract B clone();
176 
177     /**
178      * Create a new {@link Channel} and register it with an {@link EventLoop}.
179      */
180     public Future<Channel> register() {
181         validate();
182         return initAndRegister(group.next());
183     }
184 
185     /**
186      * Create a new unregistered channel.
187      * <p>
188      * The channel must then be {@linkplain Channel#register() registered} separately.
189      *
190      * @return A new unregistered channel.
191      * @throws Exception If the channel cannot be created.
192      */
193     public Channel createUnregistered() throws Exception {
194         validate();
195         return initWithoutRegister();
196     }
197 
198     /**
199      * Create a new {@link Channel} and bind it.
200      */
201     public Future<Channel> bind() {
202         validate();
203         SocketAddress localAddress = this.localAddress;
204         requireNonNull(localAddress, "localAddress");
205         return doBind(localAddress);
206     }
207 
208     /**
209      * Create a new {@link Channel} and bind it.
210      */
211     public Future<Channel> bind(int inetPort) {
212         return bind(new InetSocketAddress(inetPort));
213     }
214 
215     /**
216      * Create a new {@link Channel} and bind it.
217      */
218     public Future<Channel> bind(String inetHost, int inetPort) {
219         return bind(SocketUtils.socketAddress(inetHost, inetPort));
220     }
221 
222     /**
223      * Create a new {@link Channel} and bind it.
224      */
225     public Future<Channel> bind(InetAddress inetHost, int inetPort) {
226         return bind(new InetSocketAddress(inetHost, inetPort));
227     }
228 
229     /**
230      * Create a new {@link Channel} and bind it.
231      */
232     public Future<Channel> bind(SocketAddress localAddress) {
233         validate();
234         requireNonNull(localAddress, "localAddress");
235         return doBind(localAddress);
236     }
237 
238     private Future<Channel> doBind(final SocketAddress localAddress) {
239         EventLoop loop = group.next();
240         final Future<Channel> regFuture = initAndRegister(loop);
241         if (regFuture.isFailed()) {
242             return regFuture;
243         }
244 
245         Promise<Channel> bindPromise = loop.newPromise();
246         if (regFuture.isDone()) {
247             // At this point we know that the registration was complete and successful.
248             Channel channel = regFuture.getNow();
249             Promise<Void> promise = channel.newPromise();
250             promise.asFuture().map(v -> channel).cascadeTo(bindPromise);
251             doBind0(regFuture, channel, localAddress, promise);
252         } else {
253             // Registration future is almost always fulfilled already, but just in case it's not.
254             regFuture.addListener(future -> {
255                 Throwable cause = future.cause();
256                 if (cause != null) {
257                     // Registration on the EventLoop failed so fail the Promise directly to not cause an
258                     // IllegalStateException once we try to access the EventLoop of the Channel.
259                     bindPromise.setFailure(cause);
260                 } else {
261                     Channel channel = future.getNow();
262                     Promise<Void> promise = channel.newPromise();
263                     promise.asFuture().map(v -> channel).cascadeTo(bindPromise);
264                     doBind0(regFuture, channel, localAddress, promise);
265                 }
266             });
267         }
268         return bindPromise.asFuture();
269     }
270 
271     final Future<Channel> initAndRegister(EventLoop loop) {
272         final Channel channel;
273         try {
274             channel = newChannel(loop);
275         } catch (Throwable t) {
276             return loop.newFailedFuture(t);
277         }
278 
279         Promise<Channel> promise = loop.newPromise();
280         loop.execute(() -> init(channel).addListener(future -> {
281             if (future.isSuccess()) {
282                 // TODO eventually I think we'd like to be able to either pass the generic promise down,
283                 //  or return the future from register().
284                 channel.register().addListener(f -> promise.setSuccess(channel));
285             } else {
286                 channel.close();
287                 promise.setFailure(future.cause());
288             }
289         }));
290 
291         return promise.asFuture();
292     }
293 
294     final Channel initWithoutRegister() throws Exception {
295         EventLoop loop = group.next();
296         Channel channel = newChannel(loop);
297 
298         init(channel).addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE);
299         return channel;
300     }
301 
302     abstract C newChannel(EventLoop loop) throws Exception;
303 
304     abstract Future<Channel> init(Channel channel);
305 
306     private static void doBind0(
307             final Future<Channel> regFuture, final Channel channel,
308             final SocketAddress localAddress, final Promise<Void> promise) {
309         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
310         // the pipeline in its channelRegistered() implementation.
311         channel.executor().execute(() -> {
312             if (regFuture.isSuccess()) {
313                channel.bind(localAddress).cascadeTo(promise)
314                         .addListener(channel, ChannelFutureListeners.CLOSE_ON_FAILURE);
315             } else {
316                 promise.setFailure(regFuture.cause());
317             }
318         });
319     }
320 
321     /**
322      * the {@link ChannelHandler} to use for serving the requests.
323      */
324     public B handler(ChannelHandler handler) {
325         requireNonNull(handler, "handler");
326         this.handler = handler;
327         return self();
328     }
329 
330     /**
331      * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet.
332      *
333      * @deprecated Use {@link #config()} instead.
334      */
335     @Deprecated
336     public final EventLoopGroup group() {
337         return group;
338     }
339 
340     /**
341      * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
342      * of the bootstrap.
343      */
344     public abstract AbstractBootstrapConfig<B, C, F> config();
345 
346     final Map.Entry<ChannelOption<?>, Object>[] newOptionsArray() {
347         return newOptionsArray(options);
348     }
349 
350     static Map.Entry<ChannelOption<?>, Object>[] newOptionsArray(Map<ChannelOption<?>, Object> options) {
351         synchronized (options) {
352             return new LinkedHashMap<>(options).entrySet().toArray(EMPTY_OPTION_ARRAY);
353         }
354     }
355 
356     final Map.Entry<AttributeKey<?>, Object>[] newAttributesArray() {
357         return newAttributesArray(attrs0());
358     }
359 
360     static Map.Entry<AttributeKey<?>, Object>[] newAttributesArray(Map<AttributeKey<?>, Object> attributes) {
361         return attributes.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
362     }
363 
364     final Map<ChannelOption<?>, Object> options0() {
365         return options;
366     }
367 
368     final Map<AttributeKey<?>, Object> attrs0() {
369         return attrs;
370     }
371 
372     final SocketAddress localAddress() {
373         return localAddress;
374     }
375 
376     final ChannelHandler handler() {
377         return handler;
378     }
379 
380     final Map<ChannelOption<?>, Object> options() {
381         synchronized (options) {
382             return copiedMap(options);
383         }
384     }
385 
386     final Map<AttributeKey<?>, Object> attrs() {
387         return copiedMap(attrs);
388     }
389 
390     static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
391         if (map.isEmpty()) {
392             return Collections.emptyMap();
393         }
394         return Map.copyOf(map);
395     }
396 
397     static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
398         for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
399             @SuppressWarnings("unchecked")
400             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
401             channel.attr(key).set(e.getValue());
402         }
403     }
404 
405     static void setChannelOptions(
406             Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
407         for (Map.Entry<ChannelOption<?>, Object> e: options) {
408             setChannelOption(channel, e.getKey(), e.getValue(), logger);
409         }
410     }
411 
412     @SuppressWarnings("unchecked")
413     private static void setChannelOption(
414             Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
415         try {
416             if (!channel.isOptionSupported(option)) {
417                 logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
418             } else {
419                 channel.setOption((ChannelOption<Object>) option, value);
420             }
421         } catch (Throwable t) {
422             logger.warn(
423                     "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
424         }
425     }
426 
427     @Override
428     public String toString() {
429         StringBuilder buf = new StringBuilder()
430             .append(StringUtil.simpleClassName(this))
431             .append('(').append(config()).append(')');
432         return buf.toString();
433     }
434 }