View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.bootstrap;
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelPromise;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.ReflectiveChannelFactory;
29  import io.netty.util.AttributeKey;
30  import io.netty.util.concurrent.EventExecutor;
31  import io.netty.util.concurrent.GlobalEventExecutor;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.SocketUtils;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.logging.InternalLogger;
37  
38  import java.net.InetAddress;
39  import java.net.InetSocketAddress;
40  import java.net.SocketAddress;
41  import java.util.Collection;
42  import java.util.Collections;
43  import java.util.HashMap;
44  import java.util.LinkedHashMap;
45  import java.util.Map;
46  import java.util.concurrent.ConcurrentHashMap;
47  
48  /**
49   * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
50   * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
51   *
52   * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
53   * transports such as datagram (UDP).</p>
54   */
55  public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
56  
57      private static final boolean CLOSE_ON_SET_OPTION_FAILURE = SystemPropertyUtil.getBoolean(
58              "io.netty.bootstrap.closeOnSetOptionFailure", true);
59      @SuppressWarnings("unchecked")
60      private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
61      @SuppressWarnings("unchecked")
62      private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
63  
64      volatile EventLoopGroup group;
65      @SuppressWarnings("deprecation")
66      private volatile ChannelFactory<? extends C> channelFactory;
67      private volatile SocketAddress localAddress;
68  
69      // The order in which ChannelOptions are applied is important they may depend on each other for validation
70      // purposes.
71      private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
72      private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
73      private volatile ChannelHandler handler;
74      private volatile ClassLoader extensionsClassLoader;
75  
76      AbstractBootstrap() {
77          // Disallow extending from a different package.
78      }
79  
80      AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
81          group = bootstrap.group;
82          channelFactory = bootstrap.channelFactory;
83          handler = bootstrap.handler;
84          localAddress = bootstrap.localAddress;
85          synchronized (bootstrap.options) {
86              options.putAll(bootstrap.options);
87          }
88          attrs.putAll(bootstrap.attrs);
89          extensionsClassLoader = bootstrap.extensionsClassLoader;
90      }
91  
92      /**
93       * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
94       * {@link Channel}
95       */
96      public B group(EventLoopGroup group) {
97          ObjectUtil.checkNotNull(group, "group");
98          if (this.group != null) {
99              throw new IllegalStateException("group set already");
100         }
101         this.group = group;
102         return self();
103     }
104 
105     @SuppressWarnings("unchecked")
106     private B self() {
107         return (B) this;
108     }
109 
110     /**
111      * The {@link Class} which is used to create {@link Channel} instances from.
112      * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
113      * {@link Channel} implementation has no no-args constructor.
114      */
115     public B channel(Class<? extends C> channelClass) {
116         return channelFactory(new ReflectiveChannelFactory<C>(
117                 ObjectUtil.checkNotNull(channelClass, "channelClass")
118         ));
119     }
120 
121     /**
122      * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
123      */
124     @Deprecated
125     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
126         ObjectUtil.checkNotNull(channelFactory, "channelFactory");
127         if (this.channelFactory != null) {
128             throw new IllegalStateException("channelFactory set already");
129         }
130 
131         this.channelFactory = channelFactory;
132         return self();
133     }
134 
135     /**
136      * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
137      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
138      * is not working for you because of some more complex needs. If your {@link Channel} implementation
139      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} to
140      * simplify your code.
141      */
142     @SuppressWarnings({ "unchecked", "deprecation" })
143     public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
144         return channelFactory((ChannelFactory<C>) channelFactory);
145     }
146 
147     /**
148      * The {@link SocketAddress} which is used to bind the local "end" to.
149      */
150     public B localAddress(SocketAddress localAddress) {
151         this.localAddress = localAddress;
152         return self();
153     }
154 
155     /**
156      * @see #localAddress(SocketAddress)
157      */
158     public B localAddress(int inetPort) {
159         return localAddress(new InetSocketAddress(inetPort));
160     }
161 
162     /**
163      * @see #localAddress(SocketAddress)
164      */
165     public B localAddress(String inetHost, int inetPort) {
166         return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
167     }
168 
169     /**
170      * @see #localAddress(SocketAddress)
171      */
172     public B localAddress(InetAddress inetHost, int inetPort) {
173         return localAddress(new InetSocketAddress(inetHost, inetPort));
174     }
175 
176     /**
177      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
178      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
179      */
180     public <T> B option(ChannelOption<T> option, T value) {
181         ObjectUtil.checkNotNull(option, "option");
182         synchronized (options) {
183             if (value == null) {
184                 options.remove(option);
185             } else {
186                 options.put(option, value);
187             }
188         }
189         return self();
190     }
191 
192     /**
193      * Allow to specify an initial attribute of the newly created {@link Channel}.  If the {@code value} is
194      * {@code null}, the attribute of the specified {@code key} is removed.
195      */
196     public <T> B attr(AttributeKey<T> key, T value) {
197         ObjectUtil.checkNotNull(key, "key");
198         if (value == null) {
199             attrs.remove(key);
200         } else {
201             attrs.put(key, value);
202         }
203         return self();
204     }
205 
206     /**
207      * Load {@link ChannelInitializerExtension}s using the given class loader.
208      * <p>
209      * By default, the extensions will be loaded by the same class loader that loaded this bootstrap class.
210      *
211      * @param classLoader The class loader to use for loading {@link ChannelInitializerExtension}s.
212      * @return This bootstrap.
213      */
214     public B extensionsClassLoader(ClassLoader classLoader) {
215         extensionsClassLoader = classLoader;
216         return self();
217     }
218 
219     /**
220      * Validate all the parameters. Sub-classes may override this, but should
221      * call the super method in that case.
222      */
223     public B validate() {
224         if (group == null) {
225             throw new IllegalStateException("group not set");
226         }
227         if (channelFactory == null) {
228             throw new IllegalStateException("channel or channelFactory not set");
229         }
230         return self();
231     }
232 
233     /**
234      * Returns a deep clone of this bootstrap which has the identical configuration.  This method is useful when making
235      * multiple {@link Channel}s with similar settings.  Please note that this method does not clone the
236      * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
237      */
238     @Override
239     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
240     public abstract B clone();
241 
242     /**
243      * Create a new {@link Channel} and register it with an {@link EventLoop}.
244      */
245     public ChannelFuture register() {
246         validate();
247         return initAndRegister();
248     }
249 
250     /**
251      * Create a new {@link Channel} and bind it.
252      */
253     public ChannelFuture bind() {
254         validate();
255         SocketAddress localAddress = this.localAddress;
256         if (localAddress == null) {
257             throw new IllegalStateException("localAddress not set");
258         }
259         return doBind(localAddress);
260     }
261 
262     /**
263      * Create a new {@link Channel} and bind it.
264      */
265     public ChannelFuture bind(int inetPort) {
266         return bind(new InetSocketAddress(inetPort));
267     }
268 
269     /**
270      * Create a new {@link Channel} and bind it.
271      */
272     public ChannelFuture bind(String inetHost, int inetPort) {
273         return bind(SocketUtils.socketAddress(inetHost, inetPort));
274     }
275 
276     /**
277      * Create a new {@link Channel} and bind it.
278      */
279     public ChannelFuture bind(InetAddress inetHost, int inetPort) {
280         return bind(new InetSocketAddress(inetHost, inetPort));
281     }
282 
283     /**
284      * Create a new {@link Channel} and bind it.
285      */
286     public ChannelFuture bind(SocketAddress localAddress) {
287         validate();
288         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
289     }
290 
291     private ChannelFuture doBind(final SocketAddress localAddress) {
292         final ChannelFuture regFuture = initAndRegister();
293         final Channel channel = regFuture.channel();
294         if (regFuture.cause() != null) {
295             return regFuture;
296         }
297 
298         if (regFuture.isDone()) {
299             // At this point we know that the registration was complete and successful.
300             ChannelPromise promise = channel.newPromise();
301             doBind0(regFuture, channel, localAddress, promise);
302             return promise;
303         } else {
304             // Registration future is almost always fulfilled already, but just in case it's not.
305             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
306             regFuture.addListener(future -> {
307                 Throwable cause = future.cause();
308                 if (cause != null) {
309                     // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
310                     // IllegalStateException once we try to access the EventLoop of the Channel.
311                     promise.setFailure(cause);
312                 } else {
313                     // Registration was successful, so set the correct executor to use.
314                     // See https://github.com/netty/netty/issues/2586
315                     promise.registered();
316 
317                     doBind0(regFuture, channel, localAddress, promise);
318                 }
319             });
320             return promise;
321         }
322     }
323 
324     final ChannelFuture initAndRegister() {
325         Channel channel = null;
326         try {
327             channel = channelFactory.newChannel();
328             init(channel);
329         } catch (Throwable t) {
330             if (channel != null) {
331                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
332                 channel.unsafe().closeForcibly();
333                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
334                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
335             }
336             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
337             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
338         }
339 
340         final ChannelFuture regFuture = config().group().register(channel);
341         if (regFuture.cause() != null) {
342             if (channel.isRegistered()) {
343                 channel.close();
344             } else {
345                 channel.unsafe().closeForcibly();
346             }
347         }
348 
349         // If we are here and the promise is not failed, it's one of the following cases:
350         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
351         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
352         // 2) If we attempted registration from the other thread, the registration request has been successfully
353         //    added to the event loop's task queue for later execution.
354         //    i.e. It's safe to attempt bind() or connect() now:
355         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
356         //         because register(), bind(), and connect() are all bound to the same thread.
357 
358         return regFuture;
359     }
360 
361     abstract void init(Channel channel) throws Throwable;
362 
363     Collection<ChannelInitializerExtension> getInitializerExtensions() {
364         ClassLoader loader = extensionsClassLoader;
365         if (loader == null) {
366             loader = getClass().getClassLoader();
367         }
368         return ChannelInitializerExtensions.getExtensions().extensions(loader);
369     }
370 
371     private static void doBind0(
372             final ChannelFuture regFuture, final Channel channel,
373             final SocketAddress localAddress, final ChannelPromise promise) {
374 
375         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
376         // the pipeline in its channelRegistered() implementation.
377         channel.eventLoop().execute(new Runnable() {
378             @Override
379             public void run() {
380                 if (regFuture.isSuccess()) {
381                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
382                 } else {
383                     promise.setFailure(regFuture.cause());
384                 }
385             }
386         });
387     }
388 
389     /**
390      * the {@link ChannelHandler} to use for serving the requests.
391      */
392     public B handler(ChannelHandler handler) {
393         this.handler = ObjectUtil.checkNotNull(handler, "handler");
394         return self();
395     }
396 
397     /**
398      * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet.
399      *
400      * @deprecated Use {@link #config()} instead.
401      */
402     @Deprecated
403     public final EventLoopGroup group() {
404         return group;
405     }
406 
407     /**
408      * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
409      * of the bootstrap.
410      */
411     public abstract AbstractBootstrapConfig<B, C> config();
412 
413     final Map.Entry<ChannelOption<?>, Object>[] newOptionsArray() {
414         return newOptionsArray(options);
415     }
416 
417     static Map.Entry<ChannelOption<?>, Object>[] newOptionsArray(Map<ChannelOption<?>, Object> options) {
418         synchronized (options) {
419             return new LinkedHashMap<ChannelOption<?>, Object>(options).entrySet().toArray(EMPTY_OPTION_ARRAY);
420         }
421     }
422 
423     final Map.Entry<AttributeKey<?>, Object>[] newAttributesArray() {
424         return newAttributesArray(attrs0());
425     }
426 
427     static Map.Entry<AttributeKey<?>, Object>[] newAttributesArray(Map<AttributeKey<?>, Object> attributes) {
428         return attributes.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
429     }
430 
431     final Map<ChannelOption<?>, Object> options0() {
432         return options;
433     }
434 
435     final Map<AttributeKey<?>, Object> attrs0() {
436         return attrs;
437     }
438 
439     final SocketAddress localAddress() {
440         return localAddress;
441     }
442 
443     @SuppressWarnings("deprecation")
444     final ChannelFactory<? extends C> channelFactory() {
445         return channelFactory;
446     }
447 
448     final ChannelHandler handler() {
449         return handler;
450     }
451 
452     final Map<ChannelOption<?>, Object> options() {
453         synchronized (options) {
454             return copiedMap(options);
455         }
456     }
457 
458     final Map<AttributeKey<?>, Object> attrs() {
459         return copiedMap(attrs);
460     }
461 
462     static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
463         if (map.isEmpty()) {
464             return Collections.emptyMap();
465         }
466         return Collections.unmodifiableMap(new HashMap<K, V>(map));
467     }
468 
469     static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
470         for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
471             @SuppressWarnings("unchecked")
472             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
473             channel.attr(key).set(e.getValue());
474         }
475     }
476 
477     static void setChannelOptions(
478             Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) throws Throwable {
479         for (Map.Entry<ChannelOption<?>, Object> e: options) {
480             setChannelOption(channel, e.getKey(), e.getValue(), logger);
481         }
482     }
483 
484     @SuppressWarnings("unchecked")
485     private static void setChannelOption(
486             Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) throws Throwable {
487         try {
488             if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
489                 logger.warn("Unknown channel option '{}' for channel '{}' of type '{}'",
490                         option, channel, channel.getClass());
491             }
492         } catch (Throwable t) {
493             logger.warn(
494                     "Failed to set channel option '{}' with value '{}' for channel '{}' of type '{}'",
495                     option, value, channel, channel.getClass(), t);
496             if (CLOSE_ON_SET_OPTION_FAILURE) {
497                 // Only rethrow if we want to close the channel in case of a failure.
498                 throw t;
499             }
500         }
501     }
502 
503     @Override
504     public String toString() {
505         StringBuilder buf = new StringBuilder()
506             .append(StringUtil.simpleClassName(this))
507             .append('(').append(config()).append(')');
508         return buf.toString();
509     }
510 
511     static final class PendingRegistrationPromise extends DefaultChannelPromise {
512 
513         // Is set to the correct EventExecutor once the registration was successful. Otherwise it will
514         // stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
515         private volatile boolean registered;
516 
517         PendingRegistrationPromise(Channel channel) {
518             super(channel);
519         }
520 
521         void registered() {
522             registered = true;
523         }
524 
525         @Override
526         protected EventExecutor executor() {
527             if (registered) {
528                 // If the registration was a success executor is set.
529                 //
530                 // See https://github.com/netty/netty/issues/2586
531                 return super.executor();
532             }
533             // The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
534             return GlobalEventExecutor.INSTANCE;
535         }
536     }
537 }