1 /* 2 * Copyright 2012 The Netty Project 3 * 4 * The Netty Project licenses this file to you under the Apache License, 5 * version 2.0 (the "License"); you may not use this file except in compliance 6 * with the License. You may obtain a copy of the License at: 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 13 * License for the specific language governing permissions and limitations 14 * under the License. 15 */ 16 package org.jboss.netty.handler.execution; 17 18 import java.util.concurrent.Executor; 19 import java.util.concurrent.ExecutorService; 20 21 import org.jboss.netty.bootstrap.ServerBootstrap; 22 import org.jboss.netty.channel.Channel; 23 import org.jboss.netty.channel.ChannelDownstreamHandler; 24 import org.jboss.netty.channel.ChannelEvent; 25 import org.jboss.netty.channel.ChannelHandler; 26 import org.jboss.netty.channel.ChannelHandler.Sharable; 27 import org.jboss.netty.channel.ChannelHandlerContext; 28 import org.jboss.netty.channel.ChannelPipeline; 29 import org.jboss.netty.channel.ChannelPipelineFactory; 30 import org.jboss.netty.channel.ChannelState; 31 import org.jboss.netty.channel.ChannelStateEvent; 32 import org.jboss.netty.channel.ChannelUpstreamHandler; 33 import org.jboss.netty.channel.Channels; 34 import org.jboss.netty.util.ExternalResourceReleasable; 35 36 /** 37 * Forwards an upstream {@link ChannelEvent} to an {@link Executor}. 38 * <p> 39 * {@link ExecutionHandler} is often used when your {@link ChannelHandler} 40 * performs a blocking operation that takes long time or accesses a resource 41 * which is not CPU-bound business logic such as DB access. Running such 42 * operations in a pipeline without an {@link ExecutionHandler} will result in 43 * unwanted hiccup during I/O because an I/O thread cannot perform I/O until 44 * your handler returns the control to the I/O thread. 45 * <p> 46 * In most cases, an {@link ExecutionHandler} is coupled with an 47 * {@link OrderedMemoryAwareThreadPoolExecutor} because it guarantees the 48 * correct event execution order and prevents an {@link OutOfMemoryError} 49 * under load: 50 * <pre> 51 * public class DatabaseGatewayPipelineFactory implements {@link ChannelPipelineFactory} { 52 * 53 * <b>private final {@link ExecutionHandler} executionHandler;</b> 54 * 55 * public DatabaseGatewayPipelineFactory({@link ExecutionHandler} executionHandler) { 56 * this.executionHandler = executionHandler; 57 * } 58 * 59 * public {@link ChannelPipeline} getPipeline() { 60 * return {@link Channels}.pipeline( 61 * new DatabaseGatewayProtocolEncoder(), 62 * new DatabaseGatewayProtocolDecoder(), 63 * <b>executionHandler, // Must be shared</b> 64 * new DatabaseQueryingHandler()); 65 * } 66 * } 67 * ... 68 * 69 * public static void main(String[] args) { 70 * {@link ServerBootstrap} bootstrap = ...; 71 * ... 72 * <b>{@link ExecutionHandler} executionHandler = new {@link ExecutionHandler}( 73 * new {@link OrderedMemoryAwareThreadPoolExecutor}(16, 1048576, 1048576)) 74 * bootstrap.setPipelineFactory( 75 * new DatabaseGatewayPipelineFactory(executionHandler));</b> 76 * ... 77 * bootstrap.bind(...); 78 * ... 79 * 80 * while (!isServerReadyToShutDown()) { 81 * // ... wait ... 82 * } 83 * 84 * bootstrap.releaseExternalResources(); 85 * <b>executionHandler.releaseExternalResources();</b> 86 * } 87 * </pre> 88 * 89 * Please refer to {@link OrderedMemoryAwareThreadPoolExecutor} for the 90 * detailed information about how the event order is guaranteed. 91 * 92 * <h3>SEDA (Staged Event-Driven Architecture)</h3> 93 * You can implement an alternative thread model such as 94 * <a href="http://en.wikipedia.org/wiki/Staged_event-driven_architecture">SEDA</a> 95 * by adding more than one {@link ExecutionHandler} to the pipeline. 96 * 97 * <h3>Using other {@link Executor} implementation</h3> 98 * 99 * Although it's recommended to use {@link OrderedMemoryAwareThreadPoolExecutor}, 100 * you can use other {@link Executor} implementations. However, you must note 101 * that other {@link Executor} implementation might break your application 102 * because they often do not maintain event execution order nor interact with 103 * I/O threads to control the incoming traffic and avoid {@link OutOfMemoryError}. 104 * 105 * @apiviz.landmark 106 * @apiviz.has java.util.concurrent.ThreadPoolExecutor 107 */ 108 @Sharable 109 public class ExecutionHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler, ExternalResourceReleasable { 110 111 private final Executor executor; 112 private final boolean handleDownstream; 113 private final boolean handleUpstream; 114 115 /** 116 * Creates a new instance with the specified {@link Executor}. 117 * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure. 118 */ 119 public ExecutionHandler(Executor executor) { 120 this(executor, false, true); 121 } 122 123 124 /** 125 * Use {@link #ExecutionHandler(Executor, boolean, boolean)} 126 * 127 * {@link Deprecated} 128 */ 129 @Deprecated 130 public ExecutionHandler(Executor executor, boolean handleDownstream) { 131 this(executor, handleDownstream, true); 132 } 133 134 /** 135 * Creates a new instance with the specified {@link Executor}. 136 * Specify an {@link OrderedMemoryAwareThreadPoolExecutor} if unsure. 137 */ 138 public ExecutionHandler(Executor executor, boolean handleDownstream, boolean handleUpstream) { 139 if (executor == null) { 140 throw new NullPointerException("executor"); 141 } 142 if (!handleDownstream && !handleUpstream) { 143 throw new IllegalArgumentException("You must handle at least handle one event type"); 144 } 145 this.executor = executor; 146 this.handleDownstream = handleDownstream; 147 this.handleUpstream = handleUpstream; 148 } 149 150 /** 151 * Returns the {@link Executor} which was specified with the constructor. 152 */ 153 public Executor getExecutor() { 154 return executor; 155 } 156 157 /** 158 * Shuts down the {@link Executor} which was specified with the constructor 159 * and wait for its termination. 160 */ 161 public void releaseExternalResources() { 162 Executor executor = getExecutor(); 163 if (executor instanceof ExecutorService) { 164 ((ExecutorService) executor).shutdown(); 165 } 166 if (executor instanceof ExternalResourceReleasable) { 167 ((ExternalResourceReleasable) executor).releaseExternalResources(); 168 } 169 } 170 171 public void handleUpstream( 172 ChannelHandlerContext context, ChannelEvent e) throws Exception { 173 if (handleUpstream) { 174 executor.execute(new ChannelUpstreamEventRunnable(context, e, executor)); 175 } else { 176 context.sendUpstream(e); 177 } 178 } 179 180 public void handleDownstream( 181 ChannelHandlerContext ctx, ChannelEvent e) throws Exception { 182 // check if the read was suspend 183 if (!handleReadSuspend(ctx, e)) { 184 if (handleDownstream) { 185 executor.execute(new ChannelDownstreamEventRunnable(ctx, e, executor)); 186 } else { 187 ctx.sendDownstream(e); 188 } 189 } 190 } 191 192 /** 193 * Handle suspended reads 194 */ 195 protected boolean handleReadSuspend(ChannelHandlerContext ctx, ChannelEvent e) { 196 if (e instanceof ChannelStateEvent) { 197 ChannelStateEvent cse = (ChannelStateEvent) e; 198 if (cse.getState() == ChannelState.INTEREST_OPS && 199 (((Integer) cse.getValue()).intValue() & Channel.OP_READ) != 0) { 200 201 // setReadable(true) requested 202 boolean readSuspended = ctx.getAttachment() != null; 203 if (readSuspended) { 204 // Drop the request silently if MemoryAwareThreadPool has 205 // set the flag. 206 e.getFuture().setSuccess(); 207 return true; 208 } 209 } 210 } 211 212 return false; 213 } 214 }