View Javadoc
1   /*
2    * Copyright 2013 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  
17  package io.netty.channel;
18  
19  import io.netty.util.Recycler;
20  import io.netty.util.ReferenceCountUtil;
21  import io.netty.util.concurrent.EventExecutor;
22  import io.netty.util.internal.OneTimeTask;
23  import io.netty.util.internal.RecyclableMpscLinkedQueueNode;
24  
25  import java.net.SocketAddress;
26  
27  import static io.netty.channel.ChannelHandlerInvokerUtil.*;
28  import static io.netty.channel.DefaultChannelPipeline.*;
29  
30  public class DefaultChannelHandlerInvoker implements ChannelHandlerInvoker {
31  
32      private final EventExecutor executor;
33  
34      public DefaultChannelHandlerInvoker(EventExecutor executor) {
35          if (executor == null) {
36              throw new NullPointerException("executor");
37          }
38  
39          this.executor = executor;
40      }
41  
42      @Override
43      public EventExecutor executor() {
44          return executor;
45      }
46  
47      @Override
48      public void invokeChannelRegistered(final ChannelHandlerContext ctx) {
49          if (executor.inEventLoop()) {
50              invokeChannelRegisteredNow(ctx);
51          } else {
52              executor.execute(new OneTimeTask() {
53                  @Override
54                  public void run() {
55                      invokeChannelRegisteredNow(ctx);
56                  }
57              });
58          }
59      }
60  
61      @Override
62      public void invokeChannelUnregistered(final ChannelHandlerContext ctx) {
63          if (executor.inEventLoop()) {
64              invokeChannelUnregisteredNow(ctx);
65          } else {
66              executor.execute(new OneTimeTask() {
67                  @Override
68                  public void run() {
69                      invokeChannelUnregisteredNow(ctx);
70                  }
71              });
72          }
73      }
74  
75      @Override
76      public void invokeChannelActive(final ChannelHandlerContext ctx) {
77          if (executor.inEventLoop()) {
78              invokeChannelActiveNow(ctx);
79          } else {
80              executor.execute(new OneTimeTask() {
81                  @Override
82                  public void run() {
83                      invokeChannelActiveNow(ctx);
84                  }
85              });
86          }
87      }
88  
89      @Override
90      public void invokeChannelInactive(final ChannelHandlerContext ctx) {
91          if (executor.inEventLoop()) {
92              invokeChannelInactiveNow(ctx);
93          } else {
94              executor.execute(new OneTimeTask() {
95                  @Override
96                  public void run() {
97                      invokeChannelInactiveNow(ctx);
98                  }
99              });
100         }
101     }
102 
103     @Override
104     public void invokeExceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
105         if (cause == null) {
106             throw new NullPointerException("cause");
107         }
108 
109         if (executor.inEventLoop()) {
110             invokeExceptionCaughtNow(ctx, cause);
111         } else {
112             try {
113                 executor.execute(new OneTimeTask() {
114                     @Override
115                     public void run() {
116                         invokeExceptionCaughtNow(ctx, cause);
117                     }
118                 });
119             } catch (Throwable t) {
120                 if (logger.isWarnEnabled()) {
121                     logger.warn("Failed to submit an exceptionCaught() event.", t);
122                     logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
123                 }
124             }
125         }
126     }
127 
128     @Override
129     public void invokeUserEventTriggered(final ChannelHandlerContext ctx, final Object event) {
130         if (event == null) {
131             throw new NullPointerException("event");
132         }
133 
134         if (executor.inEventLoop()) {
135             invokeUserEventTriggeredNow(ctx, event);
136         } else {
137             safeExecuteInbound(new OneTimeTask() {
138                 @Override
139                 public void run() {
140                     invokeUserEventTriggeredNow(ctx, event);
141                 }
142             }, event);
143         }
144     }
145 
146     @Override
147     public void invokeChannelRead(final ChannelHandlerContext ctx, final Object msg) {
148         if (msg == null) {
149             throw new NullPointerException("msg");
150         }
151 
152         if (executor.inEventLoop()) {
153             invokeChannelReadNow(ctx, msg);
154         } else {
155             safeExecuteInbound(new OneTimeTask() {
156                 @Override
157                 public void run() {
158                     invokeChannelReadNow(ctx, msg);
159                 }
160             }, msg);
161         }
162     }
163 
164     @Override
165     public void invokeChannelReadComplete(final ChannelHandlerContext ctx) {
166         if (executor.inEventLoop()) {
167             invokeChannelReadCompleteNow(ctx);
168         } else {
169             AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
170             Runnable task = dctx.invokeChannelReadCompleteTask;
171             if (task == null) {
172                 dctx.invokeChannelReadCompleteTask = task = new Runnable() {
173                     @Override
174                     public void run() {
175                         invokeChannelReadCompleteNow(ctx);
176                     }
177                 };
178             }
179             executor.execute(task);
180         }
181     }
182 
183     @Override
184     public void invokeChannelWritabilityChanged(final ChannelHandlerContext ctx) {
185         if (executor.inEventLoop()) {
186             invokeChannelWritabilityChangedNow(ctx);
187         } else {
188             AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
189             Runnable task = dctx.invokeChannelWritableStateChangedTask;
190             if (task == null) {
191                 dctx.invokeChannelWritableStateChangedTask = task = new Runnable() {
192                     @Override
193                     public void run() {
194                         invokeChannelWritabilityChangedNow(ctx);
195                     }
196                 };
197             }
198             executor.execute(task);
199         }
200     }
201 
202     @Override
203     public void invokeBind(
204             final ChannelHandlerContext ctx, final SocketAddress localAddress, final ChannelPromise promise) {
205         if (localAddress == null) {
206             throw new NullPointerException("localAddress");
207         }
208         if (!validatePromise(ctx, promise, false)) {
209             // promise cancelled
210             return;
211         }
212 
213         if (executor.inEventLoop()) {
214             invokeBindNow(ctx, localAddress, promise);
215         } else {
216             safeExecuteOutbound(new OneTimeTask() {
217                 @Override
218                 public void run() {
219                     invokeBindNow(ctx, localAddress, promise);
220                 }
221             }, promise);
222         }
223     }
224 
225     @Override
226     public void invokeConnect(
227             final ChannelHandlerContext ctx,
228             final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
229         if (remoteAddress == null) {
230             throw new NullPointerException("remoteAddress");
231         }
232         if (!validatePromise(ctx, promise, false)) {
233             // promise cancelled
234             return;
235         }
236 
237         if (executor.inEventLoop()) {
238             invokeConnectNow(ctx, remoteAddress, localAddress, promise);
239         } else {
240             safeExecuteOutbound(new OneTimeTask() {
241                 @Override
242                 public void run() {
243                     invokeConnectNow(ctx, remoteAddress, localAddress, promise);
244                 }
245             }, promise);
246         }
247     }
248 
249     @Override
250     public void invokeDisconnect(final ChannelHandlerContext ctx, final ChannelPromise promise) {
251         if (!validatePromise(ctx, promise, false)) {
252             // promise cancelled
253             return;
254         }
255 
256         if (executor.inEventLoop()) {
257             invokeDisconnectNow(ctx, promise);
258         } else {
259             safeExecuteOutbound(new OneTimeTask() {
260                 @Override
261                 public void run() {
262                     invokeDisconnectNow(ctx, promise);
263                 }
264             }, promise);
265         }
266     }
267 
268     @Override
269     public void invokeClose(final ChannelHandlerContext ctx, final ChannelPromise promise) {
270         if (!validatePromise(ctx, promise, false)) {
271             // promise cancelled
272             return;
273         }
274 
275         if (executor.inEventLoop()) {
276             invokeCloseNow(ctx, promise);
277         } else {
278             safeExecuteOutbound(new OneTimeTask() {
279                 @Override
280                 public void run() {
281                     invokeCloseNow(ctx, promise);
282                 }
283             }, promise);
284         }
285     }
286 
287     @Override
288     public void invokeDeregister(final ChannelHandlerContext ctx, final ChannelPromise promise) {
289         if (!validatePromise(ctx, promise, false)) {
290             // promise cancelled
291             return;
292         }
293 
294         if (executor.inEventLoop()) {
295             invokeDeregisterNow(ctx, promise);
296         } else {
297             safeExecuteOutbound(new OneTimeTask() {
298                 @Override
299                 public void run() {
300                     invokeDeregisterNow(ctx, promise);
301                 }
302             }, promise);
303         }
304     }
305 
306     @Override
307     public void invokeRead(final ChannelHandlerContext ctx) {
308         if (executor.inEventLoop()) {
309             invokeReadNow(ctx);
310         } else {
311             AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
312             Runnable task = dctx.invokeReadTask;
313             if (task == null) {
314                 dctx.invokeReadTask = task = new Runnable() {
315                     @Override
316                     public void run() {
317                         invokeReadNow(ctx);
318                     }
319                 };
320             }
321             executor.execute(task);
322         }
323     }
324 
325     @Override
326     public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
327         if (msg == null) {
328             throw new NullPointerException("msg");
329         }
330         if (!validatePromise(ctx, promise, true)) {
331             // promise cancelled
332             ReferenceCountUtil.release(msg);
333             return;
334         }
335 
336         if (executor.inEventLoop()) {
337             invokeWriteNow(ctx, msg, promise);
338         } else {
339             AbstractChannel channel = (AbstractChannel) ctx.channel();
340             int size = channel.estimatorHandle().size(msg);
341             if (size > 0) {
342                 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
343                 // Check for null as it may be set to null if the channel is closed already
344                 if (buffer != null) {
345                     buffer.incrementPendingOutboundBytes(size);
346                 }
347             }
348             safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
349         }
350     }
351 
352     @Override
353     public void invokeFlush(final ChannelHandlerContext ctx) {
354         if (executor.inEventLoop()) {
355             invokeFlushNow(ctx);
356         } else {
357             AbstractChannelHandlerContext dctx = (AbstractChannelHandlerContext) ctx;
358             Runnable task = dctx.invokeFlushTask;
359             if (task == null) {
360                 dctx.invokeFlushTask = task = new Runnable() {
361                     @Override
362                     public void run() {
363                         invokeFlushNow(ctx);
364                     }
365                 };
366             }
367             executor.execute(task);
368         }
369     }
370 
371     private void safeExecuteInbound(Runnable task, Object msg) {
372         boolean success = false;
373         try {
374             executor.execute(task);
375             success = true;
376         } finally {
377             if (!success) {
378                 ReferenceCountUtil.release(msg);
379             }
380         }
381     }
382 
383     private void safeExecuteOutbound(Runnable task, ChannelPromise promise) {
384         try {
385             executor.execute(task);
386         } catch (Throwable cause) {
387             promise.setFailure(cause);
388         }
389     }
390     private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
391         try {
392             executor.execute(task);
393         } catch (Throwable cause) {
394             try {
395                 promise.setFailure(cause);
396             } finally {
397                 ReferenceCountUtil.release(msg);
398             }
399         }
400     }
401 
402     static final class WriteTask extends RecyclableMpscLinkedQueueNode<SingleThreadEventLoop.NonWakeupRunnable>
403             implements SingleThreadEventLoop.NonWakeupRunnable {
404         private ChannelHandlerContext ctx;
405         private Object msg;
406         private ChannelPromise promise;
407         private int size;
408 
409         private static final Recycler<WriteTask> RECYCLER = new Recycler<WriteTask>() {
410             @Override
411             protected WriteTask newObject(Handle<WriteTask> handle) {
412                 return new WriteTask(handle);
413             }
414         };
415 
416         private static WriteTask newInstance(
417                 ChannelHandlerContext ctx, Object msg, int size, ChannelPromise promise) {
418             WriteTask task = RECYCLER.get();
419             task.ctx = ctx;
420             task.msg = msg;
421             task.promise = promise;
422             task.size = size;
423             return task;
424         }
425 
426         private WriteTask(Recycler.Handle<WriteTask> handle) {
427             super(handle);
428         }
429 
430         @Override
431         public void run() {
432             try {
433                 if (size > 0) {
434                     ChannelOutboundBuffer buffer = ctx.channel().unsafe().outboundBuffer();
435                     // Check for null as it may be set to null if the channel is closed already
436                     if (buffer != null) {
437                         buffer.decrementPendingOutboundBytes(size);
438                     }
439                 }
440                 invokeWriteNow(ctx, msg, promise);
441             } finally {
442                 // Set to null so the GC can collect them directly
443                 ctx = null;
444                 msg = null;
445                 promise = null;
446             }
447         }
448 
449         @Override
450         public SingleThreadEventLoop.NonWakeupRunnable value() {
451             return this;
452         }
453     }
454 }