View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.bootstrap;
18  
19  import io.netty.channel.Channel;
20  import io.netty.channel.ChannelFuture;
21  import io.netty.channel.ChannelFutureListener;
22  import io.netty.channel.ChannelHandler;
23  import io.netty.channel.ChannelOption;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.DefaultChannelPromise;
26  import io.netty.channel.EventLoop;
27  import io.netty.channel.EventLoopGroup;
28  import io.netty.channel.ReflectiveChannelFactory;
29  import io.netty.util.internal.SocketUtils;
30  import io.netty.util.AttributeKey;
31  import io.netty.util.concurrent.EventExecutor;
32  import io.netty.util.concurrent.GlobalEventExecutor;
33  import io.netty.util.internal.StringUtil;
34  import io.netty.util.internal.logging.InternalLogger;
35  
36  import java.net.InetAddress;
37  import java.net.InetSocketAddress;
38  import java.net.SocketAddress;
39  import java.util.Collections;
40  import java.util.LinkedHashMap;
41  import java.util.Map;
42  
43  /**
44   * {@link AbstractBootstrap} is a helper class that makes it easy to bootstrap a {@link Channel}. It support
45   * method-chaining to provide an easy way to configure the {@link AbstractBootstrap}.
46   *
47   * <p>When not used in a {@link ServerBootstrap} context, the {@link #bind()} methods are useful for connectionless
48   * transports such as datagram (UDP).</p>
49   */
50  public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable {
51  
52      volatile EventLoopGroup group;
53      @SuppressWarnings("deprecation")
54      private volatile ChannelFactory<? extends C> channelFactory;
55      private volatile SocketAddress localAddress;
56      private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
57      private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
58      private volatile ChannelHandler handler;
59  
60      AbstractBootstrap() {
61          // Disallow extending from a different package.
62      }
63  
64      AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
65          group = bootstrap.group;
66          channelFactory = bootstrap.channelFactory;
67          handler = bootstrap.handler;
68          localAddress = bootstrap.localAddress;
69          synchronized (bootstrap.options) {
70              options.putAll(bootstrap.options);
71          }
72          synchronized (bootstrap.attrs) {
73              attrs.putAll(bootstrap.attrs);
74          }
75      }
76  
77      /**
78       * The {@link EventLoopGroup} which is used to handle all the events for the to-be-created
79       * {@link Channel}
80       */
81      public B group(EventLoopGroup group) {
82          if (group == null) {
83              throw new NullPointerException("group");
84          }
85          if (this.group != null) {
86              throw new IllegalStateException("group set already");
87          }
88          this.group = group;
89          return self();
90      }
91  
92      @SuppressWarnings("unchecked")
93      private B self() {
94          return (B) this;
95      }
96  
97      /**
98       * The {@link Class} which is used to create {@link Channel} instances from.
99       * You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
100      * {@link Channel} implementation has no no-args constructor.
101      */
102     public B channel(Class<? extends C> channelClass) {
103         if (channelClass == null) {
104             throw new NullPointerException("channelClass");
105         }
106         return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
107     }
108 
109     /**
110      * @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
111      */
112     @Deprecated
113     public B channelFactory(ChannelFactory<? extends C> channelFactory) {
114         if (channelFactory == null) {
115             throw new NullPointerException("channelFactory");
116         }
117         if (this.channelFactory != null) {
118             throw new IllegalStateException("channelFactory set already");
119         }
120 
121         this.channelFactory = channelFactory;
122         return self();
123     }
124 
125     /**
126      * {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
127      * when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
128      * is not working for you because of some more complex needs. If your {@link Channel} implementation
129      * has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
130      * simplify your code.
131      */
132     @SuppressWarnings({ "unchecked", "deprecation" })
133     public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
134         return channelFactory((ChannelFactory<C>) channelFactory);
135     }
136 
137     /**
138      * The {@link SocketAddress} which is used to bind the local "end" to.
139      */
140     public B localAddress(SocketAddress localAddress) {
141         this.localAddress = localAddress;
142         return self();
143     }
144 
145     /**
146      * @see #localAddress(SocketAddress)
147      */
148     public B localAddress(int inetPort) {
149         return localAddress(new InetSocketAddress(inetPort));
150     }
151 
152     /**
153      * @see #localAddress(SocketAddress)
154      */
155     public B localAddress(String inetHost, int inetPort) {
156         return localAddress(SocketUtils.socketAddress(inetHost, inetPort));
157     }
158 
159     /**
160      * @see #localAddress(SocketAddress)
161      */
162     public B localAddress(InetAddress inetHost, int inetPort) {
163         return localAddress(new InetSocketAddress(inetHost, inetPort));
164     }
165 
166     /**
167      * Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they got
168      * created. Use a value of {@code null} to remove a previous set {@link ChannelOption}.
169      */
170     public <T> B option(ChannelOption<T> option, T value) {
171         if (option == null) {
172             throw new NullPointerException("option");
173         }
174         if (value == null) {
175             synchronized (options) {
176                 options.remove(option);
177             }
178         } else {
179             synchronized (options) {
180                 options.put(option, value);
181             }
182         }
183         return self();
184     }
185 
186     /**
187      * Allow to specify an initial attribute of the newly created {@link Channel}.  If the {@code value} is
188      * {@code null}, the attribute of the specified {@code key} is removed.
189      */
190     public <T> B attr(AttributeKey<T> key, T value) {
191         if (key == null) {
192             throw new NullPointerException("key");
193         }
194         if (value == null) {
195             synchronized (attrs) {
196                 attrs.remove(key);
197             }
198         } else {
199             synchronized (attrs) {
200                 attrs.put(key, value);
201             }
202         }
203         return self();
204     }
205 
206     /**
207      * Validate all the parameters. Sub-classes may override this, but should
208      * call the super method in that case.
209      */
210     public B validate() {
211         if (group == null) {
212             throw new IllegalStateException("group not set");
213         }
214         if (channelFactory == null) {
215             throw new IllegalStateException("channel or channelFactory not set");
216         }
217         return self();
218     }
219 
220     /**
221      * Returns a deep clone of this bootstrap which has the identical configuration.  This method is useful when making
222      * multiple {@link Channel}s with similar settings.  Please note that this method does not clone the
223      * {@link EventLoopGroup} deeply but shallowly, making the group a shared resource.
224      */
225     @Override
226     @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
227     public abstract B clone();
228 
229     /**
230      * Create a new {@link Channel} and register it with an {@link EventLoop}.
231      */
232     public ChannelFuture register() {
233         validate();
234         return initAndRegister();
235     }
236 
237     /**
238      * Create a new {@link Channel} and bind it.
239      */
240     public ChannelFuture bind() {
241         validate();
242         SocketAddress localAddress = this.localAddress;
243         if (localAddress == null) {
244             throw new IllegalStateException("localAddress not set");
245         }
246         return doBind(localAddress);
247     }
248 
249     /**
250      * Create a new {@link Channel} and bind it.
251      */
252     public ChannelFuture bind(int inetPort) {
253         return bind(new InetSocketAddress(inetPort));
254     }
255 
256     /**
257      * Create a new {@link Channel} and bind it.
258      */
259     public ChannelFuture bind(String inetHost, int inetPort) {
260         return bind(SocketUtils.socketAddress(inetHost, inetPort));
261     }
262 
263     /**
264      * Create a new {@link Channel} and bind it.
265      */
266     public ChannelFuture bind(InetAddress inetHost, int inetPort) {
267         return bind(new InetSocketAddress(inetHost, inetPort));
268     }
269 
270     /**
271      * Create a new {@link Channel} and bind it.
272      */
273     public ChannelFuture bind(SocketAddress localAddress) {
274         validate();
275         if (localAddress == null) {
276             throw new NullPointerException("localAddress");
277         }
278         return doBind(localAddress);
279     }
280 
281     private ChannelFuture doBind(final SocketAddress localAddress) {
282         final ChannelFuture regFuture = initAndRegister();
283         final Channel channel = regFuture.channel();
284         if (regFuture.cause() != null) {
285             return regFuture;
286         }
287 
288         if (regFuture.isDone()) {
289             // At this point we know that the registration was complete and successful.
290             ChannelPromise promise = channel.newPromise();
291             doBind0(regFuture, channel, localAddress, promise);
292             return promise;
293         } else {
294             // Registration future is almost always fulfilled already, but just in case it's not.
295             final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
296             regFuture.addListener(new ChannelFutureListener() {
297                 @Override
298                 public void operationComplete(ChannelFuture future) throws Exception {
299                     Throwable cause = future.cause();
300                     if (cause != null) {
301                         // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
302                         // IllegalStateException once we try to access the EventLoop of the Channel.
303                         promise.setFailure(cause);
304                     } else {
305                         // Registration was successful, so set the correct executor to use.
306                         // See https://github.com/netty/netty/issues/2586
307                         promise.registered();
308 
309                         doBind0(regFuture, channel, localAddress, promise);
310                     }
311                 }
312             });
313             return promise;
314         }
315     }
316 
317     final ChannelFuture initAndRegister() {
318         Channel channel = null;
319         try {
320             channel = channelFactory.newChannel();
321             init(channel);
322         } catch (Throwable t) {
323             if (channel != null) {
324                 // channel can be null if newChannel crashed (eg SocketException("too many open files"))
325                 channel.unsafe().closeForcibly();
326                 // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
327                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
328             }
329             // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
330             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
331         }
332 
333         ChannelFuture regFuture = config().group().register(channel);
334         if (regFuture.cause() != null) {
335             if (channel.isRegistered()) {
336                 channel.close();
337             } else {
338                 channel.unsafe().closeForcibly();
339             }
340         }
341 
342         // If we are here and the promise is not failed, it's one of the following cases:
343         // 1) If we attempted registration from the event loop, the registration has been completed at this point.
344         //    i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
345         // 2) If we attempted registration from the other thread, the registration request has been successfully
346         //    added to the event loop's task queue for later execution.
347         //    i.e. It's safe to attempt bind() or connect() now:
348         //         because bind() or connect() will be executed *after* the scheduled registration task is executed
349         //         because register(), bind(), and connect() are all bound to the same thread.
350 
351         return regFuture;
352     }
353 
354     abstract void init(Channel channel) throws Exception;
355 
356     private static void doBind0(
357             final ChannelFuture regFuture, final Channel channel,
358             final SocketAddress localAddress, final ChannelPromise promise) {
359 
360         // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
361         // the pipeline in its channelRegistered() implementation.
362         channel.eventLoop().execute(new Runnable() {
363             @Override
364             public void run() {
365                 if (regFuture.isSuccess()) {
366                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
367                 } else {
368                     promise.setFailure(regFuture.cause());
369                 }
370             }
371         });
372     }
373 
374     /**
375      * the {@link ChannelHandler} to use for serving the requests.
376      */
377     public B handler(ChannelHandler handler) {
378         if (handler == null) {
379             throw new NullPointerException("handler");
380         }
381         this.handler = handler;
382         return self();
383     }
384 
385     /**
386      * Returns the configured {@link EventLoopGroup} or {@code null} if non is configured yet.
387      *
388      * @deprecated Use {@link #config()} instead.
389      */
390     @Deprecated
391     public final EventLoopGroup group() {
392         return group;
393     }
394 
395     /**
396      * Returns the {@link AbstractBootstrapConfig} object that can be used to obtain the current config
397      * of the bootstrap.
398      */
399     public abstract AbstractBootstrapConfig<B, C> config();
400 
401     static <K, V> Map<K, V> copiedMap(Map<K, V> map) {
402         final Map<K, V> copied;
403         synchronized (map) {
404             if (map.isEmpty()) {
405                 return Collections.emptyMap();
406             }
407             copied = new LinkedHashMap<K, V>(map);
408         }
409         return Collections.unmodifiableMap(copied);
410     }
411 
412     final Map<ChannelOption<?>, Object> options0() {
413         return options;
414     }
415 
416     final Map<AttributeKey<?>, Object> attrs0() {
417         return attrs;
418     }
419 
420     final SocketAddress localAddress() {
421         return localAddress;
422     }
423 
424     @SuppressWarnings("deprecation")
425     final ChannelFactory<? extends C> channelFactory() {
426         return channelFactory;
427     }
428 
429     final ChannelHandler handler() {
430         return handler;
431     }
432 
433     final Map<ChannelOption<?>, Object> options() {
434         return copiedMap(options);
435     }
436 
437     final Map<AttributeKey<?>, Object> attrs() {
438         return copiedMap(attrs);
439     }
440 
441     static void setChannelOptions(
442             Channel channel, Map<ChannelOption<?>, Object> options, InternalLogger logger) {
443         for (Map.Entry<ChannelOption<?>, Object> e: options.entrySet()) {
444             setChannelOption(channel, e.getKey(), e.getValue(), logger);
445         }
446     }
447 
448     static void setChannelOptions(
449             Channel channel, Map.Entry<ChannelOption<?>, Object>[] options, InternalLogger logger) {
450         for (Map.Entry<ChannelOption<?>, Object> e: options) {
451             setChannelOption(channel, e.getKey(), e.getValue(), logger);
452         }
453     }
454 
455     @SuppressWarnings("unchecked")
456     private static void setChannelOption(
457             Channel channel, ChannelOption<?> option, Object value, InternalLogger logger) {
458         try {
459             if (!channel.config().setOption((ChannelOption<Object>) option, value)) {
460                 logger.warn("Unknown channel option '{}' for channel '{}'", option, channel);
461             }
462         } catch (Throwable t) {
463             logger.warn(
464                     "Failed to set channel option '{}' with value '{}' for channel '{}'", option, value, channel, t);
465         }
466     }
467 
468     @Override
469     public String toString() {
470         StringBuilder buf = new StringBuilder()
471             .append(StringUtil.simpleClassName(this))
472             .append('(').append(config()).append(')');
473         return buf.toString();
474     }
475 
476     static final class PendingRegistrationPromise extends DefaultChannelPromise {
477 
478         // Is set to the correct EventExecutor once the registration was successful. Otherwise it will
479         // stay null and so the GlobalEventExecutor.INSTANCE will be used for notifications.
480         private volatile boolean registered;
481 
482         PendingRegistrationPromise(Channel channel) {
483             super(channel);
484         }
485 
486         void registered() {
487             registered = true;
488         }
489 
490         @Override
491         protected EventExecutor executor() {
492             if (registered) {
493                 // If the registration was a success executor is set.
494                 //
495                 // See https://github.com/netty/netty/issues/2586
496                 return super.executor();
497             }
498             // The registration failed so we can only use the GlobalEventExecutor as last resort to notify.
499             return GlobalEventExecutor.INSTANCE;
500         }
501     }
502 }