1 /*
2 * Copyright 2012 The Netty Project
3 *
4 * The Netty Project licenses this file to you under the Apache License,
5 * version 2.0 (the "License"); you may not use this file except in compliance
6 * with the License. You may obtain a copy of the License at:
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations
14 * under the License.
15 */
16 package org.jboss.netty.handler.execution;
17
18 import java.util.concurrent.Executor;
19 import java.util.concurrent.ExecutorService;
20
21 import org.jboss.netty.bootstrap.ServerBootstrap;
22 import org.jboss.netty.channel.Channel;
23 import org.jboss.netty.channel.ChannelDownstreamHandler;
24 import org.jboss.netty.channel.ChannelEvent;
25 import org.jboss.netty.channel.ChannelHandler;
26 import org.jboss.netty.channel.ChannelHandler.Sharable;
27 import org.jboss.netty.channel.ChannelHandlerContext;
28 import org.jboss.netty.channel.ChannelPipeline;
29 import org.jboss.netty.channel.ChannelPipelineFactory;
30 import org.jboss.netty.channel.ChannelState;
31 import org.jboss.netty.channel.ChannelStateEvent;
32 import org.jboss.netty.channel.ChannelUpstreamHandler;
33 import org.jboss.netty.channel.Channels;
34 import org.jboss.netty.util.ExternalResourceReleasable;
35
36 /**
37 * Forwards an upstream {@link ChannelEvent} to an {@link Executor}.
38 * <p>
39 * {@link ExecutionHandler} is often used when your {@link ChannelHandler}
40 * performs a blocking operation that takes long time or accesses a resource
41 * which is not CPU-bound business logic such as DB access. Running such
42 * operations in a pipeline without an {@link ExecutionHandler} will result in
43 * unwanted hiccup during I/O because an I/O thread cannot perform I/O until
44 * your handler returns the control to the I/O thread.
45 * <p>
46 * In most cases, an {@link ExecutionHandler} is coupled with an
47 * {@link OrderedMemoryAwareThreadPoolExecutor} because it guarantees the
48 * correct event execution order and prevents an {@link OutOfMemoryError}
49 * under load:
50 * <pre>
51 * public class DatabaseGatewayPipelineFactory implements {@link ChannelPipelineFactory} {
52 *
53 * <b>private final {@link ExecutionHandler} executionHandler;</b>
54 *
55 * public DatabaseGatewayPipelineFactory({@link ExecutionHandler} executionHandler) {
56 * this.executionHandler = executionHandler;
57 * }
58 *
59 * public {@link ChannelPipeline} getPipeline() {
60 * return {@link Channels}.pipeline(
61 * new DatabaseGatewayProtocolEncoder(),
62 * new DatabaseGatewayProtocolDecoder(),
63 * <b>executionHandler, // Must be shared</b>
64 * new DatabaseQueryingHandler());
65 * }
66 * }
67 * ...
68 *
69 * public static void main(String[] args) {
70 * {@link ServerBootstrap} bootstrap = ...;
71 * ...
72 * <b>{@link ExecutionHandler} executionHandler = new {@link ExecutionHandler}(
73 * new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576))
74 * bootstrap.setPipelineFactory(
75 * new DatabaseGatewayPipelineFactory(executionHandler));</b>
76 * ...
77 * bootstrap.bind(...);
78 * ...
79 *
80 * while (!isServerReadyToShutDown()) {
81 * // ... wait ...
82 * }
83 *
84 * bootstrap.releaseExternalResources();
85 * <b>executionHandler.releaseExternalResources();</b>
86 * }
87 * </pre>
88 *
89 * Please refer to {@link OrderedMemoryAwareThreadPoolExecutor} for the
90 * detailed information about how the event order is guaranteed.
91 *
92 * <h3>SEDA (Staged Event-Driven Architecture)</h3>
93 * You can implement an alternative thread model such as
94 * <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a>
95 * by adding more than one {@link ExecutionHandler} to the pipeline.
96 *
97 * <h3>Using other {@link Executor} implementation</h3>
98 *
99 * Although it's recommended to use {@link OrderedMemoryAwareThreadPoolExecutor},
100 * you can use other {@link Executor} implementations. However, you must note
101 * that other {@link Executor} implementation might break your application
102 * because they often do not maintain event execution order nor interact with
103 * I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}.
104 *
105 * @apiviz.landmark
106 * @apiviz.has java.util.concurrent.ThreadPoolExecutor
107 */
108 @Sharable
109 public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable {
110
111 private final Executor executor;
112 private final boolean handleDownstream;
113 private final boolean handleUpstream;
114
115 /**
116 * Creates a new instance with the specified {@link Executor}.
117 * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
118 */
119 public ExecutionHandler(Executor executor) {
120 this(executor, false, true);
121 }
122
123
124 /**
125 * Use {@link #ExecutionHandler(Executor, boolean, boolean)}
126 *
127 * {@link Deprecated}
128 */
129 @Deprecated
130 public ExecutionHandler(Executor executor, boolean handleDownstream) {
131 this(executor, handleDownstream, true);
132 }
133
134 /**
135 * Creates a new instance with the specified {@link Executor}.
136 * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure.
137 */
138 public ExecutionHandler(Executor executor, boolean handleDownstream, boolean handleUpstream) {
139 if (executor == null) {
140 throw new NullPointerException("executor");
141 }
142 if (!handleDownstream && !handleUpstream) {
143 throw new IllegalArgumentException("You must handle at least handle one event type");
144 }
145 this.executor = executor;
146 this.handleDownstream = handleDownstream;
147 this.handleUpstream = handleUpstream;
148 }
149
150 /**
151 * Returns the {@link Executor} which was specified with the constructor.
152 */
153 public Executor getExecutor() {
154 return executor;
155 }
156
157 /**
158 * Shuts down the {@link Executor} which was specified with the constructor
159 * and wait for its termination.
160 */
161 public void releaseExternalResources() {
162 Executor executor = getExecutor();
163 if (executor instanceof ExecutorService) {
164 ((ExecutorService) executor).shutdown();
165 }
166 if (executor instanceof ExternalResourceReleasable) {
167 ((ExternalResourceReleasable) executor).releaseExternalResources();
168 }
169 }
170
171 public void handleUpstream(
172 ChannelHandlerContext context, ChannelEvent e) throws Exception {
173 if (handleUpstream) {
174 executor.execute(new ChannelUpstreamEventRunnable(context, e, executor));
175 } else {
176 context.sendUpstream(e);
177 }
178 }
179
180 public void handleDownstream(
181 ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
182 // check if the read was suspend
183 if (!handleReadSuspend(ctx, e)) {
184 if (handleDownstream) {
185 executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor));
186 } else {
187 ctx.sendDownstream(e);
188 }
189 }
190 }
191
192 /**
193 * Handle suspended reads
194 */
195 protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) {
196 if (e instanceof ChannelStateEvent) {
197 ChannelStateEvent cse = (ChannelStateEvent) e;
198 if (cse.getState() == ChannelState.INTEREST_OPS &&
199 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) {
200
201 // setReadable(true) requested
202 boolean readSuspended = ctx.getAttachment() != null;
203 if (readSuspended) {
204 // Drop the request silently if MemoryAwareThreadPool has
205 // set the flag.
206 e.getFuture().setSuccess();
207 return true;
208 }
209 }
210 }
211
212 return false;
213 }
214 }