View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.execution;
17  
18  import org.jboss.netty.channel.Channel;
19  import org.jboss.netty.channel.ChannelEvent;
20  import org.jboss.netty.channel.ChannelFuture;
21  import org.jboss.netty.channel.ChannelFutureListener;
22  import org.jboss.netty.util.ObjectSizeEstimator;
23  
24  import java.util.concurrent.Executor;
25  import java.util.concurrent.Executors;
26  import java.util.concurrent.RejectedExecutionException;
27  import java.util.concurrent.ThreadFactory;
28  import java.util.concurrent.TimeUnit;
29  
30  /**
31   * {@link Executor} which should be used for downstream {@link ChannelEvent}'s. This implementation
32   * will take care of preserve the order of the events in a {@link Channel}.  If you don't need to
33   * preserve the order just use one of the {@link Executor} implementations provided by the static
34   * methods of {@link Executors}.
35   * <br>
36   * <br>
37   * For more informations about how the order is preserved see {@link OrderedMemoryAwareThreadPoolExecutor}
38   */
39  public final class OrderedDownstreamThreadPoolExecutor extends OrderedMemoryAwareThreadPoolExecutor {
40  
41      /**
42       * Creates a new instance.
43       *
44       * @param corePoolSize          the maximum number of active threads
45       */
46      public OrderedDownstreamThreadPoolExecutor(int corePoolSize) {
47          super(corePoolSize, 0L, 0L);
48      }
49  
50      /**
51       * Creates a new instance.
52       *
53       * @param corePoolSize          the maximum number of active threads
54       * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
55       * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
56       */
57      public OrderedDownstreamThreadPoolExecutor(
58              int corePoolSize, long keepAliveTime, TimeUnit unit) {
59          super(corePoolSize, 0L, 0L, keepAliveTime, unit);
60      }
61  
62      /**
63       * Creates a new instance.
64       *
65       * @param corePoolSize          the maximum number of active threads
66       * @param keepAliveTime         the amount of time for an inactive thread to shut itself down
67       * @param unit                  the {@link TimeUnit} of {@code keepAliveTime}
68       * @param threadFactory         the {@link ThreadFactory} of this pool
69       */
70      public OrderedDownstreamThreadPoolExecutor(
71              int corePoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
72          super(corePoolSize, 0L, 0L,
73                  keepAliveTime, unit, threadFactory);
74      }
75  
76      /**
77       * Return {@code null}
78       */
79      @Override
80      public ObjectSizeEstimator getObjectSizeEstimator() {
81          return null;
82      }
83  
84      /**
85       * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
86       * size in this implementation
87       */
88      @Override
89      public void setObjectSizeEstimator(ObjectSizeEstimator objectSizeEstimator) {
90          throw new UnsupportedOperationException("Not supported by this implementation");
91      }
92  
93      /**
94       * Returns {@code 0L}
95       */
96      @Override
97      public long getMaxChannelMemorySize() {
98          return 0L;
99      }
100 
101     /**
102      * Throws {@link UnsupportedOperationException} as there is not support for limit the memory
103      * size in this implementation
104      */
105     @Override
106     public void setMaxChannelMemorySize(long maxChannelMemorySize) {
107         throw new UnsupportedOperationException("Not supported by this implementation");
108     }
109 
110     /**
111      * Returns {@code 0L}
112      */
113     @Override
114     public long getMaxTotalMemorySize() {
115         return 0L;
116     }
117 
118     /**
119      * Return {@code false} as we not need to cound the memory in this implementation
120      */
121     @Override
122     protected boolean shouldCount(Runnable task) {
123         return false;
124     }
125 
126     @Override
127     public void execute(Runnable command) {
128 
129         // check if the Runnable was of an unsupported type
130         if (command instanceof ChannelUpstreamEventRunnable) {
131             throw new RejectedExecutionException("command must be enclosed with an downstream event.");
132         }
133         doExecute(command);
134     }
135 
136     @Override
137     protected Executor getChildExecutor(ChannelEvent e) {
138         final Object key = getChildExecutorKey(e);
139         Executor executor = childExecutors.get(key);
140         if (executor == null) {
141             executor = new ChildExecutor();
142             Executor oldExecutor = childExecutors.putIfAbsent(key, executor);
143             if (oldExecutor != null) {
144                 executor = oldExecutor;
145             } else {
146                 // register a listener so that the ChildExecutor will get removed once the channel was closed
147                 e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
148 
149                     public void operationComplete(ChannelFuture future) throws Exception {
150                         removeChildExecutor(key);
151                     }
152                 });
153             }
154         }
155 
156         return executor;
157     }
158 }