View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.bootstrap;
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelPromise;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.ReflectiveChannelFactory;
29  import io.netty.util.AttributeKey;
30  import io.netty.util.concurrent.EventExecutor;
31  import io.netty.util.concurrent.GlobalEventExecutor;
32  import io.netty.util.internal.ObjectUtil;
33  import io.netty.util.internal.SocketUtils;
34  import io.netty.util.internal.StringUtil;
35  import io.netty.util.internal.SystemPropertyUtil;
36  import io.netty.util.internal.logging.InternalLogger;
37  
38  import java.net.InetAddress;
39  import java.net.InetSocketAddress;
40  import java.net.SocketAddress;
41  import java.util.Collection;
42  import java.util.Collections;
43  import java.util.HashMap;
44  import java.util.LinkedHashMap;
45  import java.util.Map;
46  import java.util.concurrent.ConcurrentHashMap;
47  
48  /**
49   * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
50   * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
51   *
52   * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
53   * transports such as datagram (UDP).</p>
54   */
55  public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
56  
57      private static final boolean CLOSE_ON_SET_OPTION_FAILURE = SystemPropertyUtil.getBoolean(
58              "io.netty.bootstrap.closeOnSetOptionFailure", true);
59      @SuppressWarnings("unchecked")
60      private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
61      @SuppressWarnings("unchecked")
62      private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
63  
64      volatile EventLoopGroup group;
65      @SuppressWarnings("deprecation")
66      private volatile ChannelFactory<? extends C> channelFactory;
67      private volatile SocketAddress localAddress;
68  
69      // The order in which ChannelOptions are applied is important they may depend on each other for validation
70      // purposes.
71      private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
72      private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
73      private volatile ChannelHandler handler;
74      private volatile ClassLoader extensionsClassLoader;
75  
76      AbstractBootstrap() {
77          // Disallow extending from a different package.
78      }
79  
80      AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
81          group = bootstrap.group;
82          channelFactory = bootstrap.channelFactory;
83          handler = bootstrap.handler;
84          localAddress = bootstrap.localAddress;
85          synchronized (bootstrap.options) {
86              options.putAll(bootstrap.options);
87          }
88          attrs.putAll(bootstrap.attrs);
89          extensionsClassLoader = bootstrap.extensionsClassLoader;
90      }
91  
92      /**
93       * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
94       * {@link Channel}
95       */
96      public B group(EventLoopGroup group) {
97          ObjectUtil.checkNotNull(group, "group");
98          if (this.group != null) {
99              throw new IllegalStateException("group set already");
100         }
101         this.group = group;
102         return self();
103     }
104 
105     @SuppressWarnings("unchecked")
106     private B self() {
107         return (B) this;
108     }
109 
110     /**
111      * The {@link Class} which is used to create {@link Channel} instances from.
112      * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
113      * {@link Channel} implementation has no no-args constructor.
114      */
115     public B channel(Class<? extends C> channelClass) {
116         return channelFactory(new ReflectiveChannelFactory<C>(
117                 ObjectUtil.checkNotNull(channelClass, "channelClass")
118         ));
119     }
120 
121     /**
122      * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
123      */
124     @Deprecated
125     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
126         ObjectUtil.checkNotNull(channelFactory, "channelFactory");
127         if (this.channelFactory != null) {
128             throw new IllegalStateException("channelFactory set already");
129         }
130 
131         this.channelFactory = channelFactory;
132         return self();
133     }
134 
135     /**
136      * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
137      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
138      * is not working for you because of some more complex needs. If your {@link Channel} implementation
139      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} to
140      * simplify your code.
141      */
142     @SuppressWarnings({ "unchecked", "deprecation" })
143     public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
144         return channelFactory((ChannelFactory<C>) channelFactory);
145     }
146 
147     /**
148      * The {@link SocketAddress} which is used to bind the local "end" to.
149      */
150     public B localAddress(SocketAddress localAddress) {
151         this.localAddress = localAddress;
152         return self();
153     }
154 
155     /**
156      * @see #localAddress(SocketAddress)
157      */
158     public B localAddress(int inetPort) {
159         return localAddress(new InetSocketAddress(inetPort));
160     }
161 
162     /**
163      * @see #localAddress(SocketAddress)
164      */
165     public B localAddress(String inetHost, int inetPort) {
166         return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
167     }
168 
169     /**
170      * @see #localAddress(SocketAddress)
171      */
172     public B localAddress(InetAddress inetHost, int inetPort) {
173         return localAddress(new InetSocketAddress(inetHost, inetPort));
174     }
175 
176     /**
177      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
178      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
179      */
180     public <T> B option(ChannelOption<T> option, T value) {
181         ObjectUtil.checkNotNull(option, "option");
182         synchronized (options) {
183             if (value == null) {
184                 options.remove(option);
185             } else {
186                 options.put(option, value);
187             }
188         }
189         return self();
190     }
191 
192     /**
193      * Allow to specify an initial attribute of the newly created {@link Channel}.  If the {@code value} is
194      * {@code null}, the attribute of the specified {@code key} is removed.
195      */
196     public <T> B attr(AttributeKey<T> key, T value) {
197         ObjectUtil.checkNotNull(key, "key");
198         if (value == null) {
199             attrs.remove(key);
200         } else {
201             attrs.put(key, value);
202         }
203         return self();
204     }
205 
206     /**
207      * Load {@link ChannelInitializerExtension}s using the given class loader.
208      * <p>
209      * By default, the extensions will be loaded by the same class loader that loaded this bootstrap class.
210      *
211      * @param classLoader The class loader to use for loading {@link ChannelInitializerExtension}s.
212      * @return This bootstrap.
213      */
214     public B extensionsClassLoader(ClassLoader classLoader) {
215         extensionsClassLoader = classLoader;
216         return self();
217     }
218 
219     /**
220      * Validate all the parameters. Sub-classes may override this, but should
221      * call the super method in that case.
222      */
223     public B validate() {
224         if (group == null) {
225             throw new IllegalStateException("group not set");
226         }
227         if (channelFactory == null) {
228             throw new IllegalStateException("channel or channelFactory not set");
229         }
230         return self();
231     }
232 
233     /**
234      * Returns a deep clone of this bootstrap which has the identical configuration.  This method is useful when making
235      * multiple {@link Channel}s with similar settings.  Please note that this method does not clone the
236      * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
237      */
238     @Override
239     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
240     public abstract B clone();
241 
242     /**
243      * Create a new {@link Channel} and register it with an {@link EventLoop}.
244      */
245     public ChannelFuture register() {
246         validate();
247         return initAndRegister();
248     }
249 
250     /**
251      * Create a new {@link Channel} and bind it.
252      */
253     public ChannelFuture bind() {
254         validate();
255         SocketAddress localAddress = this.localAddress;
256         if (localAddress == null) {
257             throw new IllegalStateException("localAddress not set");
258         }
259         return doBind(localAddress);
260     }
261 
262     /**
263      * Create a new {@link Channel} and bind it.
264      */
265     public ChannelFuture bind(int inetPort) {
266         return bind(new InetSocketAddress(inetPort));
267     }
268 
269     /**
270      * Create a new {@link Channel} and bind it.
271      */
272     public ChannelFuture bind(String inetHost, int inetPort) {
273         return bind(SocketUtils.socketAddress(inetHost, inetPort));
274     }
275 
276     /**
277      * Create a new {@link Channel} and bind it.
278      */
279     public ChannelFuture bind(InetAddress inetHost, int inetPort) {
280         return bind(new InetSocketAddress(inetHost, inetPort));
281     }
282 
283     /**
284      * Create a new {@link Channel} and bind it.
285      */
286     public ChannelFuture bind(SocketAddress localAddress) {
287         validate();
288         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
289     }
290 
291     private ChannelFuture doBind(final SocketAddress localAddress) {
292         final ChannelFuture regFuture = initAndRegister();
293         final Channel channel = regFuture.channel();
294         if (regFuture.cause() != null) {
295             return regFuture;
296         }
297 
298         if (regFuture.isDone()) {
299             // At this point we know that the registration was complete and successful.
300             ChannelPromise promise = channel.newPromise();
301             doBind0(regFuture, channel, localAddress, promise);
302             return promise;
303         } else {
304             // Registration future is almost always fulfilled already, but just in case it's not.
305             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
306             regFuture.addListener(new ChannelFutureListener() {
307                 @Override
308                 public void operationComplete(ChannelFuture future) throws Exception {
309                     Throwable cause = future.cause();
310                     if (cause != null) {
311                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
312                         // IllegalStateException once we try to access the EventLoop of the Channel.
313                         promise.setFailure(cause);
314                     } else {
315                         // Registration was successful, so set the correct executor to use.
316                         // See https://github.com/netty/netty/issues/2586
317                         promise.registered();
318 
319                         doBind0(regFuture, channel, localAddress, promise);
320                     }
321                 }
322             });
323             return promise;
324         }
325     }
326 
327     final ChannelFuture initAndRegister() {
328         Channel channel = null;
329         try {
330             channel = channelFactory.newChannel();
331             init(channel);
332         } catch (Throwable t) {
333             if (channel != null) {
334                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
335                 channel.unsafe().closeForcibly();
336                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
337                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
338             }
339             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
340             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
341         }
342 
343         final ChannelFuture regFuture = config().group().register(channel);
344         if (regFuture.cause() != null) {
345             if (channel.isRegistered()) {
346                 channel.close();
347             } else {
348                 channel.unsafe().closeForcibly();
349             }
350         }
351 
352         // If we are here and the promise is not failed, it's one of the following cases:
353         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
354         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
355         // 2) If we attempted registration from the other thread, the registration request has been successfully
356         //    added to the event loop's task queue for later execution.
357         //    i.e. It's safe to attempt bind() or connect() now:
358         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
359         //         because register(), bind(), and connect() are all bound to the same thread.
360 
361         return regFuture;
362     }
363 
364     abstract void init(Channel channel) throws Throwable;
365 
366     Collection<ChannelInitializerExtension> getInitializerExtensions() {
367         ClassLoader loader = extensionsClassLoader;
368         if (loader == null) {
369             loader = getClass().getClassLoader();
370         }
371         return ChannelInitializerExtensions.getExtensions().extensions(loader);
372     }
373 
374     private static void doBind0(
375             final ChannelFuture regFuture, final Channel channel,
376             final SocketAddress localAddress, final ChannelPromise promise) {
377 
378         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
379         // the pipeline in its channelRegistered() implementation.
380         channel.eventLoop().execute(new Runnable() {
381             @Override
382             public void run() {
383                 if (regFuture.isSuccess()) {
384                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
385                 } else {
386                     promise.setFailure(regFuture.cause());
387                 }
388             }
389         });
390     }
391 
392     /**
393      * the {@link ChannelHandler} to use for serving the requests.
394      */
395     public B handler(ChannelHandler handler) {
396         this.handler = ObjectUtil.checkNotNull(handler, "handler");
397         return self();
398     }
399 
400     /**
401      * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet.
402      *
403      * @deprecated Use {@link #config()} instead.
404      */
405     @Deprecated
406     public final EventLoopGroup group() {
407         return group;
408     }
409 
410     /**
411      * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
412      * of the bootstrap.
413      */
414     public abstract AbstractBootstrapConfig<B, C> config();
415 
416     final Map.Entry<ChannelOption<?>, Object>[] newOptionsArray() {
417         return newOptionsArray(options);
418     }
419 
420     static Map.Entry<ChannelOption<?>, Object>[] newOptionsArray(Map<ChannelOption<?>, Object> options) {
421         synchronized (options) {
422             return new LinkedHashMap<ChannelOption<?>, Object>(options).entrySet().toArray(EMPTY_OPTION_ARRAY);
423         }
424     }
425 
426     final Map.Entry<AttributeKey<?>, Object>[] newAttributesArray() {
427         return newAttributesArray(attrs0());
428     }
429 
430     static Map.Entry<AttributeKey<?>, Object>[] newAttributesArray(Map<AttributeKey<?>, Object> attributes) {
431         return attributes.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
432     }
433 
434     final Map<ChannelOption<?>, Object> options0() {
435         return options;
436     }
437 
438     final Map<AttributeKey<?>, Object> attrs0() {
439         return attrs;
440     }
441 
442     final SocketAddress localAddress() {
443         return localAddress;
444     }
445 
446     @SuppressWarnings("deprecation")
447     final ChannelFactory<? extends C> channelFactory() {
448         return channelFactory;
449     }
450 
451     final ChannelHandler handler() {
452         return handler;
453     }
454 
455     final Map<ChannelOption<?>, Object> options() {
456         synchronized (options) {
457             return copiedMap(options);
458         }
459     }
460 
461     final Map<AttributeKey<?>, Object> attrs() {
462         return copiedMap(attrs);
463     }
464 
465     static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
466         if (map.isEmpty()) {
467             return Collections.emptyMap();
468         }
469         return Collections.unmodifiableMap(new HashMap<K, V>(map));
470     }
471 
472     static void setAttributes(Channel channel, Map.Entry<AttributeKey<?>, Object>[] attrs) {
473         for (Map.Entry<AttributeKey<?>, Object> e: attrs) {
474             @SuppressWarnings("unchecked")
475             AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
476             channel.attr(key).set(e.getValue());
477         }
478     }
479 
480     static void setChannelOptions(
481             Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) throws Throwable {
482         for (Map.Entry<ChannelOption<?>, Object> e: options) {
483             setChannelOption(channel, e.getKey(), e.getValue(), logger);
484         }
485     }
486 
487     @SuppressWarnings("unchecked")
488     private static void setChannelOption(
489             Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) throws Throwable {
490         try {
491             if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
492                 logger.warn("Unknown channel option '{}' for channel '{}' of type '{}'",
493                         option, channel, channel.getClass());
494             }
495         } catch (Throwable t) {
496             logger.warn(
497                     "Failed to set channel option '{}' with value '{}' for channel '{}' of type '{}'",
498                     option, value, channel, channel.getClass(), t);
499             if (CLOSE_ON_SET_OPTION_FAILURE) {
500                 // Only rethrow if we want to close the channel in case of a failure.
501                 throw t;
502             }
503         }
504     }
505 
506     @Override
507     public String toString() {
508         StringBuilder buf = new StringBuilder()
509             .append(StringUtil.simpleClassName(this))
510             .append('(').append(config()).append(')');
511         return buf.toString();
512     }
513 
514     static final class PendingRegistrationPromise extends DefaultChannelPromise {
515 
516         // Is set to the correct EventExecutor once the registration was successful. Otherwise it will
517         // stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
518         private volatile boolean registered;
519 
520         PendingRegistrationPromise(Channel channel) {
521             super(channel);
522         }
523 
524         void registered() {
525             registered = true;
526         }
527 
528         @Override
529         protected EventExecutor executor() {
530             if (registered) {
531                 // If the registration was a success executor is set.
532                 //
533                 // See https://github.com/netty/netty/issues/2586
534                 return super.executor();
535             }
536             // The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
537             return GlobalEventExecutor.INSTANCE;
538         }
539     }
540 }