View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.ssl;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBuffers;
20  import org.jboss.netty.channel.Channel;
21  import org.jboss.netty.channel.ChannelDownstreamHandler;
22  import org.jboss.netty.channel.ChannelEvent;
23  import org.jboss.netty.channel.ChannelFuture;
24  import org.jboss.netty.channel.ChannelFutureListener;
25  import org.jboss.netty.channel.ChannelHandlerContext;
26  import org.jboss.netty.channel.ChannelPipeline;
27  import org.jboss.netty.channel.ChannelStateEvent;
28  import org.jboss.netty.channel.Channels;
29  import org.jboss.netty.channel.DefaultChannelFuture;
30  import org.jboss.netty.channel.DownstreamMessageEvent;
31  import org.jboss.netty.channel.ExceptionEvent;
32  import org.jboss.netty.channel.MessageEvent;
33  import org.jboss.netty.handler.codec.frame.FrameDecoder;
34  import org.jboss.netty.logging.InternalLogger;
35  import org.jboss.netty.logging.InternalLoggerFactory;
36  import org.jboss.netty.util.internal.DetectionUtil;
37  import org.jboss.netty.util.internal.NonReentrantLock;
38  
39  import javax.net.ssl.SSLEngine;
40  import javax.net.ssl.SSLEngineResult;
41  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
42  import javax.net.ssl.SSLEngineResult.Status;
43  import javax.net.ssl.SSLException;
44  import java.io.IOException;
45  import java.nio.ByteBuffer;
46  import java.nio.channels.ClosedChannelException;
47  import java.nio.channels.DatagramChannel;
48  import java.nio.channels.SocketChannel;
49  import java.util.LinkedList;
50  import java.util.Queue;
51  import java.util.concurrent.ConcurrentLinkedQueue;
52  import java.util.concurrent.Executor;
53  import java.util.concurrent.atomic.AtomicBoolean;
54  import java.util.regex.Pattern;
55  
56  import static org.jboss.netty.channel.Channels.*;
57  
58  /**
59   * Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
60   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
61   * to the <strong>"SecureChat"</strong> example in the distribution or the web
62   * site for the detailed usage.
63   *
64   * <h3>Beginning the handshake</h3>
65   * <p>
66   * You must make sure not to write a message while the
67   * {@linkplain #handshake() handshake} is in progress unless you are
68   * renegotiating.  You will be notified by the {@link ChannelFuture} which is
69   * returned by the {@link #handshake()} method when the handshake
70   * process succeeds or fails.
71   *
72   * <h3>Handshake</h3>
73   * <p>
74   * If {@link #isIssueHandshake()} is {@code false}
75   * (default) you will need to take care of calling {@link #handshake()} by your own. In most
76   * situations were {@link SslHandler} is used in 'client mode' you want to issue a handshake once
77   * the connection was established. if {@link #setIssueHandshake(boolean)} is set to <code>true</code>
78   * you don't need to worry about this as the {@link SslHandler} will take care of it.
79   * <p>
80   *
81   * <h3>Renegotiation</h3>
82   * <p>
83   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code true}
84   * (default) and the initial handshake has been done successfully, you can call
85   * {@link #handshake()} to trigger the renegotiation.
86   * <p>
87   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code false},
88   * an attempt to trigger renegotiation will result in the connection closure.
89   * <p>
90   * Please note that TLS renegotiation had a security issue before.  If your
91   * runtime environment did not fix it, please make sure to disable TLS
92   * renegotiation by calling {@link #setEnableRenegotiation(boolean)} with
93   * {@code false}.  For more information, please refer to the following documents:
94   * <ul>
95   *   <li><a href="http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2009-3555">CVE-2009-3555</a></li>
96   *   <li><a href="http://www.ietf.org/rfc/rfc5746.txt">RFC5746</a></li>
97   *   <li><a href="http://www.oracle.com/technetwork/java/javase/documentation/tlsreadme2-176330.html">Phased
98   *       Approach to Fixing the TLS Renegotiation Issue</a></li>
99   * </ul>
100  *
101  * <h3>Closing the session</h3>
102  * <p>
103  * To close the SSL session, the {@link #close()} method should be
104  * called to send the {@code close_notify} message to the remote peer.  One
105  * exception is when you close the {@link Channel} - {@link SslHandler}
106  * intercepts the close request and send the {@code close_notify} message
107  * before the channel closure automatically.  Once the SSL session is closed,
108  * it is not reusable, and consequently you should create a new
109  * {@link SslHandler} with a new {@link SSLEngine} as explained in the
110  * following section.
111  *
112  * <h3>Restarting the session</h3>
113  * <p>
114  * To restart the SSL session, you must remove the existing closed
115  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
116  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
117  * and start the handshake process as described in the first section.
118  *
119  * <h3>Implementing StartTLS</h3>
120  * <p>
121  * <a href="http://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
122  * communication pattern that secures the wire in the middle of the plaintext
123  * connection.  Please note that it is different from SSL &middot; TLS, that
124  * secures the wire from the beginning of the connection.  Typically, StartTLS
125  * is composed of three steps:
126  * <ol>
127  * <li>Client sends a StartTLS request to server.</li>
128  * <li>Server sends a StartTLS response to client.</li>
129  * <li>Client begins SSL handshake.</li>
130  * </ol>
131  * If you implement a server, you need to:
132  * <ol>
133  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
134  *     to {@code true},</li>
135  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
136  * <li>write a StartTLS response.</li>
137  * </ol>
138  * Please note that you must insert {@link SslHandler} <em>before</em> sending
139  * the StartTLS response.  Otherwise the client can send begin SSL handshake
140  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
141  * data corruption.
142  * <p>
143  * The client-side implementation is much simpler.
144  * <ol>
145  * <li>Write a StartTLS request,</li>
146  * <li>wait for the StartTLS response,</li>
147  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
148  *     to {@code false},</li>
149  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
150  * <li>Initiate SSL handshake by calling {@link SslHandler#handshake()}.</li>
151  * </ol>
152  *
153  * @apiviz.landmark
154  * @apiviz.uses org.jboss.netty.handler.ssl.SslBufferPool
155  */
156 public class SslHandler extends FrameDecoder
157                         implements ChannelDownstreamHandler {
158 
159     private static final InternalLogger logger =
160         InternalLoggerFactory.getInstance(SslHandler.class);
161 
162     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
163 
164     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
165             "^.*(Socket|DatagramChannel|SctpChannel).*$");
166     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
167             "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
168             Pattern.CASE_INSENSITIVE);
169 
170     private static SslBufferPool defaultBufferPool;
171 
172     /**
173      * Returns the default {@link SslBufferPool} used when no pool is
174      * specified in the constructor.
175      */
176     public static synchronized SslBufferPool getDefaultBufferPool() {
177         if (defaultBufferPool == null) {
178             defaultBufferPool = new SslBufferPool();
179         }
180         return defaultBufferPool;
181     }
182 
183     private volatile ChannelHandlerContext ctx;
184     private final SSLEngine engine;
185     private final SslBufferPool bufferPool;
186     private final Executor delegatedTaskExecutor;
187     private final boolean startTls;
188 
189     private volatile boolean enableRenegotiation = true;
190 
191     final Object handshakeLock = new Object();
192     private boolean handshaking;
193     private volatile boolean handshaken;
194     private volatile ChannelFuture handshakeFuture;
195 
196     private final AtomicBoolean sentFirstMessage = new AtomicBoolean();
197     private final AtomicBoolean sentCloseNotify = new AtomicBoolean();
198     int ignoreClosedChannelException;
199     final Object ignoreClosedChannelExceptionLock = new Object();
200     private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
201     private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
202     private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
203 
204     private volatile boolean issueHandshake;
205 
206     private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
207 
208     private boolean closeOnSSLException;
209 
210     private int packetLength = Integer.MIN_VALUE;
211 
212     /**
213      * Creates a new instance.
214      *
215      * @param engine  the {@link SSLEngine} this handler will use
216      */
217     public SslHandler(SSLEngine engine) {
218         this(engine, getDefaultBufferPool(), ImmediateExecutor.INSTANCE);
219     }
220 
221     /**
222      * Creates a new instance.
223      *
224      * @param engine      the {@link SSLEngine} this handler will use
225      * @param bufferPool  the {@link SslBufferPool} where this handler will
226      *                    acquire the buffers required by the {@link SSLEngine}
227      */
228     public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
229         this(engine, bufferPool, ImmediateExecutor.INSTANCE);
230     }
231 
232     /**
233      * Creates a new instance.
234      *
235      * @param engine    the {@link SSLEngine} this handler will use
236      * @param startTls  {@code true} if the first write request shouldn't be
237      *                  encrypted by the {@link SSLEngine}
238      */
239     public SslHandler(SSLEngine engine, boolean startTls) {
240         this(engine, getDefaultBufferPool(), startTls);
241     }
242 
243     /**
244      * Creates a new instance.
245      *
246      * @param engine      the {@link SSLEngine} this handler will use
247      * @param bufferPool  the {@link SslBufferPool} where this handler will
248      *                    acquire the buffers required by the {@link SSLEngine}
249      * @param startTls    {@code true} if the first write request shouldn't be
250      *                    encrypted by the {@link SSLEngine}
251      */
252     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
253         this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE);
254     }
255 
256     /**
257      * Creates a new instance.
258      *
259      * @param engine
260      *        the {@link SSLEngine} this handler will use
261      * @param delegatedTaskExecutor
262      *        the {@link Executor} which will execute the delegated task
263      *        that {@link SSLEngine#getDelegatedTask()} will return
264      */
265     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
266         this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
267     }
268 
269     /**
270      * Creates a new instance.
271      *
272      * @param engine
273      *        the {@link SSLEngine} this handler will use
274      * @param bufferPool
275      *        the {@link SslBufferPool} where this handler will acquire
276      *        the buffers required by the {@link SSLEngine}
277      * @param delegatedTaskExecutor
278      *        the {@link Executor} which will execute the delegated task
279      *        that {@link SSLEngine#getDelegatedTask()} will return
280      */
281     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
282         this(engine, bufferPool, false, delegatedTaskExecutor);
283     }
284 
285     /**
286      * Creates a new instance.
287      *
288      * @param engine
289      *        the {@link SSLEngine} this handler will use
290      * @param startTls
291      *        {@code true} if the first write request shouldn't be encrypted
292      *        by the {@link SSLEngine}
293      * @param delegatedTaskExecutor
294      *        the {@link Executor} which will execute the delegated task
295      *        that {@link SSLEngine#getDelegatedTask()} will return
296      */
297     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
298         this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
299     }
300 
301     /**
302      * Creates a new instance.
303      *
304      * @param engine
305      *        the {@link SSLEngine} this handler will use
306      * @param bufferPool
307      *        the {@link SslBufferPool} where this handler will acquire
308      *        the buffers required by the {@link SSLEngine}
309      * @param startTls
310      *        {@code true} if the first write request shouldn't be encrypted
311      *        by the {@link SSLEngine}
312      * @param delegatedTaskExecutor
313      *        the {@link Executor} which will execute the delegated task
314      *        that {@link SSLEngine#getDelegatedTask()} will return
315      */
316     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
317         if (engine == null) {
318             throw new NullPointerException("engine");
319         }
320         if (bufferPool == null) {
321             throw new NullPointerException("bufferPool");
322         }
323         if (delegatedTaskExecutor == null) {
324             throw new NullPointerException("delegatedTaskExecutor");
325         }
326         this.engine = engine;
327         this.bufferPool = bufferPool;
328         this.delegatedTaskExecutor = delegatedTaskExecutor;
329         this.startTls = startTls;
330     }
331 
332     /**
333      * Returns the {@link SSLEngine} which is used by this handler.
334      */
335     public SSLEngine getEngine() {
336         return engine;
337     }
338 
339     /**
340      * Starts an SSL / TLS handshake for the specified channel.
341      *
342      * @return a {@link ChannelFuture} which is notified when the handshake
343      *         succeeds or fails.
344      */
345     public ChannelFuture handshake() {
346         if (handshaken && !isEnableRenegotiation()) {
347             throw new IllegalStateException("renegotiation disabled");
348         }
349 
350         ChannelHandlerContext ctx = this.ctx;
351         Channel channel = ctx.getChannel();
352         ChannelFuture handshakeFuture;
353         Exception exception = null;
354 
355         synchronized (handshakeLock) {
356             if (handshaking) {
357                 return this.handshakeFuture;
358             } else {
359                 handshaking = true;
360                 try {
361                     engine.beginHandshake();
362                     runDelegatedTasks();
363                     handshakeFuture = this.handshakeFuture = future(channel);
364                 } catch (Exception e) {
365                     handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
366                     exception = e;
367                 }
368             }
369         }
370 
371         if (exception == null) { // Began handshake successfully.
372             try {
373                 wrapNonAppData(ctx, channel);
374             } catch (SSLException e) {
375                 handshakeFuture.setFailure(e);
376 
377                 fireExceptionCaught(ctx, e);
378                 if (closeOnSSLException) {
379                     Channels.close(ctx, future(channel));
380                 }
381             }
382         } else { // Failed to initiate handshake.
383             fireExceptionCaught(ctx, exception);
384             if (closeOnSSLException) {
385                 Channels.close(ctx, future(channel));
386             }
387         }
388 
389         return handshakeFuture;
390     }
391 
392     /**
393      * @deprecated Use {@link #handshake()} instead.
394      */
395     @Deprecated
396     public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
397         return handshake();
398     }
399 
400     /**
401      * Sends an SSL {@code close_notify} message to the specified channel and
402      * destroys the underlying {@link SSLEngine}.
403      */
404     public ChannelFuture close() {
405         ChannelHandlerContext ctx = this.ctx;
406         Channel channel = ctx.getChannel();
407         try {
408             engine.closeOutbound();
409             return wrapNonAppData(ctx, channel);
410         } catch (SSLException e) {
411             fireExceptionCaught(ctx, e);
412             if (closeOnSSLException) {
413                 Channels.close(ctx, future(channel));
414             }
415             return failedFuture(channel, e);
416         }
417     }
418 
419     /**
420      * @deprecated Use {@link #close()} instead.
421      */
422     @Deprecated
423     public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
424         return close();
425     }
426 
427     /**
428      * Returns {@code true} if and only if TLS renegotiation is enabled.
429      */
430     public boolean isEnableRenegotiation() {
431         return enableRenegotiation;
432     }
433 
434     /**
435      * Enables or disables TLS renegotiation.
436      */
437     public void setEnableRenegotiation(boolean enableRenegotiation) {
438         this.enableRenegotiation = enableRenegotiation;
439     }
440 
441     /**
442      * Enables or disables the automatic handshake once the {@link Channel} is
443      * connected. The value will only have affect if its set before the
444      * {@link Channel} is connected.
445      */
446     public void setIssueHandshake(boolean issueHandshake) {
447         this.issueHandshake = issueHandshake;
448     }
449 
450     /**
451      * Returns <code>true</code> if the automatic handshake is enabled
452      */
453     public boolean isIssueHandshake() {
454         return issueHandshake;
455     }
456 
457     /**
458      * Return the {@link ChannelFuture} that will get notified if the inbound of the {@link SSLEngine} will get closed.
459      *
460      * This method will return the same {@link ChannelFuture} all the time.
461      *
462      * For more informations see the apidocs of {@link SSLEngine}
463      *
464      */
465     public ChannelFuture getSSLEngineInboundCloseFuture() {
466         return sslEngineCloseFuture;
467 
468     }
469 
470     /**
471      * If set to <code>true</code>, the {@link Channel} will automatically get closed
472      * one a {@link SSLException} was caught. This is most times what you want, as after this
473      * its almost impossible to recover.
474      *
475      * Anyway the default is <code>false</code> to not break compatibility with older releases. This
476      * will be changed to <code>true</code> in the next major release.
477      *
478      */
479     public void setCloseOnSSLException(boolean closeOnSslException) {
480         if (ctx != null) {
481             throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
482         }
483         closeOnSSLException = closeOnSslException;
484     }
485 
486     public boolean getCloseOnSSLException() {
487         return closeOnSSLException;
488     }
489 
490     public void handleDownstream(
491             final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
492         if (evt instanceof ChannelStateEvent) {
493             ChannelStateEvent e = (ChannelStateEvent) evt;
494             switch (e.getState()) {
495             case OPEN:
496             case CONNECTED:
497             case BOUND:
498                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
499                     closeOutboundAndChannel(context, e);
500                     return;
501                 }
502             }
503         }
504         if (!(evt instanceof MessageEvent)) {
505             context.sendDownstream(evt);
506             return;
507         }
508 
509         MessageEvent e = (MessageEvent) evt;
510         if (!(e.getMessage() instanceof ChannelBuffer)) {
511             context.sendDownstream(evt);
512             return;
513         }
514 
515         // Do not encrypt the first write request if this handler is
516         // created with startTLS flag turned on.
517         if (startTls && sentFirstMessage.compareAndSet(false, true)) {
518             context.sendDownstream(evt);
519             return;
520         }
521 
522         // Otherwise, all messages are encrypted.
523         ChannelBuffer msg = (ChannelBuffer) e.getMessage();
524         PendingWrite pendingWrite;
525 
526         if (msg.readable()) {
527             pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
528         } else {
529             pendingWrite = new PendingWrite(evt.getFuture(), null);
530         }
531         synchronized (pendingUnencryptedWrites) {
532             boolean offered = pendingUnencryptedWrites.offer(pendingWrite);
533             assert offered;
534         }
535 
536         wrap(context, evt.getChannel());
537     }
538 
539     @Override
540     public void channelDisconnected(ChannelHandlerContext ctx,
541             ChannelStateEvent e) throws Exception {
542 
543         // Make sure the handshake future is notified when a connection has
544         // been closed during handshake.
545         synchronized (handshakeLock) {
546             if (handshaking) {
547                 handshakeFuture.setFailure(new ClosedChannelException());
548             }
549         }
550 
551         try {
552             super.channelDisconnected(ctx, e);
553         } finally {
554             unwrap(ctx, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
555             engine.closeOutbound();
556             if (!sentCloseNotify.get() && handshaken) {
557                 try {
558                     engine.closeInbound();
559                 } catch (SSLException ex) {
560                     if (logger.isDebugEnabled()) {
561                         logger.debug("Failed to clean up SSLEngine.", ex);
562                     }
563                 }
564             }
565         }
566     }
567 
568     @Override
569     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
570             throws Exception {
571 
572         Throwable cause = e.getCause();
573         if (cause instanceof IOException) {
574             if (cause instanceof ClosedChannelException) {
575                 synchronized (ignoreClosedChannelExceptionLock) {
576                     if (ignoreClosedChannelException > 0) {
577                         ignoreClosedChannelException --;
578                         if (logger.isDebugEnabled()) {
579                             logger.debug(
580                                     "Swallowing an exception raised while " +
581                                     "writing non-app data", cause);
582                         }
583 
584                         return;
585                     }
586                 }
587             } else {
588                 if (ignoreException(cause)) {
589                     return;
590                 }
591             }
592         }
593 
594         ctx.sendUpstream(e);
595     }
596 
597     /**
598      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
599      *
600      * When an ssl connection is closed a close_notify message is sent.
601      * After that the peer also sends close_notify however, it's not mandatory to receive
602      * the close_notify. The party who sent the initial close_notify can close the connection immediately
603      * then the peer will get connection reset error.
604      *
605      */
606     private boolean ignoreException(Throwable t) {
607         if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
608             String message = String.valueOf(t.getMessage()).toLowerCase();
609 
610             // first try to match connection reset / broke peer based on the regex. This is the fastest way
611             // but may fail on different jdk impls or OS's
612             if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
613                 return true;
614             }
615 
616             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
617             StackTraceElement[] elements = t.getStackTrace();
618             for (StackTraceElement element: elements) {
619                 String classname = element.getClassName();
620                 String methodname = element.getMethodName();
621 
622                 // skip all classes that belong to the io.netty package
623                 if (classname.startsWith("org.jboss.netty.")) {
624                     continue;
625                 }
626 
627                 // check if the method name is read if not skip it
628                 if (!methodname.equals("read")) {
629                     continue;
630                 }
631 
632                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
633                 // also others
634                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
635                     return true;
636                 }
637 
638                 try {
639                     // No match by now.. Try to load the class via classloader and inspect it.
640                     // This is mainly done as other JDK implementations may differ in name of
641                     // the impl.
642                     Class<?> clazz = getClass().getClassLoader().loadClass(classname);
643 
644                     if (SocketChannel.class.isAssignableFrom(clazz)
645                             || DatagramChannel.class.isAssignableFrom(clazz)) {
646                         return true;
647                     }
648 
649                     // also match against SctpChannel via String matching as it may not present.
650                     if (DetectionUtil.javaVersion() >= 7
651                             && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
652                         return true;
653                     }
654                 } catch (ClassNotFoundException e) {
655                     // This should not happen just ignore
656                 }
657 
658             }
659         }
660 
661         return false;
662     }
663 
664     /**
665      * Returns <code>true</code> if the given {@link ChannelBuffer} is encrypted. Be aware that this method
666      * will not increase the readerIndex of the given {@link ChannelBuffer}.
667      *
668      * @param   buffer
669      *                  The {@link ChannelBuffer} to read from. Be aware that it must have at least 5 bytes to read,
670      *                  otherwise it will throw an {@link IllegalArgumentException}.
671      * @return  encrypted
672      *                  <code>true</code> if the {@link ChannelBuffer} is encrypted, <code>false</code> otherwise.
673      * @throws IllegalArgumentException
674      *                  Is thrown if the given {@link ChannelBuffer} has not at least 5 bytes to read.
675      */
676     public static boolean isEncrypted(ChannelBuffer buffer) {
677         return getEncryptedPacketLength(buffer) != -1;
678     }
679 
680     /**
681      * Return how much bytes can be read out of the encrypted data. Be aware that this method will not increase
682      * the readerIndex of the given {@link ChannelBuffer}.
683      *
684      * @param   buffer
685      *                  The {@link ChannelBuffer} to read from. Be aware that it must have at least 5 bytes to read,
686      *                  otherwise it will throw an {@link IllegalArgumentException}.
687      * @return  length
688      *                  The length of the encrypted packet that is included in the buffer. This will
689      *                  return <code>-1</code> if the given {@link ChannelBuffer} is not encrypted at all.
690      * @throws IllegalArgumentException
691      *                  Is thrown if the given {@link ChannelBuffer} has not at least 5 bytes to read.
692      */
693     private static int getEncryptedPacketLength(ChannelBuffer buffer) {
694         if (buffer.readableBytes() < 5) {
695             throw new IllegalArgumentException("buffer must have at least 5 readable bytes");
696         }
697 
698         int packetLength = 0;
699 
700         // SSLv3 or TLS - Check ContentType
701         boolean tls;
702         switch (buffer.getUnsignedByte(buffer.readerIndex())) {
703         case 20:  // change_cipher_spec
704         case 21:  // alert
705         case 22:  // handshake
706         case 23:  // application_data
707             tls = true;
708             break;
709         default:
710             // SSLv2 or bad data
711             tls = false;
712         }
713 
714         if (tls) {
715             // SSLv3 or TLS - Check ProtocolVersion
716             int majorVersion = buffer.getUnsignedByte(buffer.readerIndex() + 1);
717             if (majorVersion == 3) {
718                 // SSLv3 or TLS
719                 packetLength = (getShort(buffer, buffer.readerIndex() + 3) & 0xFFFF) + 5;
720                 if (packetLength <= 5) {
721                     // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
722                     tls = false;
723                 }
724             } else {
725                 // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
726                 tls = false;
727             }
728         }
729 
730         if (!tls) {
731             // SSLv2 or bad data - Check the version
732             boolean sslv2 = true;
733             int headerLength = (buffer.getUnsignedByte(
734                     buffer.readerIndex()) & 0x80) != 0 ? 2 : 3;
735             int majorVersion = buffer.getUnsignedByte(
736                     buffer.readerIndex() + headerLength + 1);
737             if (majorVersion == 2 || majorVersion == 3) {
738                 // SSLv2
739                 if (headerLength == 2) {
740                     packetLength = (getShort(buffer, buffer.readerIndex()) & 0x7FFF) + 2;
741                 } else {
742                     packetLength = (getShort(buffer, buffer.readerIndex()) & 0x3FFF) + 3;
743                 }
744                 if (packetLength <= headerLength) {
745                     sslv2 = false;
746                 }
747             } else {
748                 sslv2 = false;
749             }
750 
751             if (!sslv2) {
752                 return -1;
753             }
754         }
755         return packetLength;
756     }
757 
758     @Override
759     protected Object decode(
760             final ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
761 
762         // Check if the packet length was parsed yet, if so we can skip the parsing
763         if (packetLength == Integer.MIN_VALUE) {
764             if (buffer.readableBytes() < 5) {
765                 return null;
766             }
767             int packetLength = getEncryptedPacketLength(buffer);
768 
769             if (packetLength == -1) {
770                 // Bad data - discard the buffer and raise an exception.
771                 NotSslRecordException e = new NotSslRecordException(
772                         "not an SSL/TLS record: " + ChannelBuffers.hexDump(buffer));
773                 buffer.skipBytes(buffer.readableBytes());
774 
775                 if (closeOnSSLException) {
776                     // first trigger the exception and then close the channel
777                     fireExceptionCaught(ctx, e);
778                     Channels.close(ctx, future(channel));
779 
780                     // just return null as we closed the channel before, that
781                     // will take care of cleanup etc
782                     return null;
783                 } else {
784                     throw e;
785                 }
786             }
787 
788             assert packetLength > 0;
789             this.packetLength = packetLength;
790         }
791 
792         if (buffer.readableBytes() < packetLength) {
793             return null;
794         }
795 
796         // We advance the buffer's readerIndex before calling unwrap() because
797         // unwrap() can trigger FrameDecoder call decode(), this method, recursively.
798         // The recursive call results in decoding the same packet twice if
799         // the readerIndex is advanced *after* decode().
800         //
801         // Here's an example:
802         // 1) An SSL packet is received from the wire.
803         // 2) SslHandler.decode() deciphers the packet and calls the user code.
804         // 3) The user closes the channel in the same thread.
805         // 4) The same thread triggers a channelDisconnected() event.
806         // 5) FrameDecoder.cleanup() is called, and it calls SslHandler.decode().
807         // 6) SslHandler.decode() will feed the same packet with what was
808         //    deciphered at the step 2 again if the readerIndex was not advanced
809         //    before calling the user code.
810         final int packetOffset = buffer.readerIndex();
811         buffer.skipBytes(packetLength);
812         try {
813             return unwrap(ctx, channel, buffer, packetOffset, packetLength);
814         } finally {
815             // reset the packet length so it will be parsed again on the next call
816             packetLength = Integer.MIN_VALUE;
817         }
818 
819     }
820 
821     /**
822      * Reads a big-endian short integer from the buffer.  Please note that we do not use
823      * {@link ChannelBuffer#getShort(int)} because it might be a little-endian buffer.
824      */
825     private static short getShort(ChannelBuffer buf, int offset) {
826         return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
827     }
828 
829     private void wrap(ChannelHandlerContext context, Channel channel)
830             throws SSLException {
831 
832         ChannelBuffer msg;
833         ByteBuffer outNetBuf = bufferPool.acquireBuffer();
834         boolean success = true;
835         boolean offered = false;
836         boolean needsUnwrap = false;
837         PendingWrite pendingWrite = null;
838 
839         try {
840             loop:
841             for (;;) {
842                 // Acquire a lock to make sure unencrypted data is polled
843                 // in order and their encrypted counterpart is offered in
844                 // order.
845                 synchronized (pendingUnencryptedWrites) {
846                     pendingWrite = pendingUnencryptedWrites.peek();
847                     if (pendingWrite == null) {
848                         break;
849                     }
850 
851                     ByteBuffer outAppBuf = pendingWrite.outAppBuf;
852                     if (outAppBuf == null) {
853                         // A write request with an empty buffer
854                         pendingUnencryptedWrites.remove();
855                         offerEncryptedWriteRequest(
856                                 new DownstreamMessageEvent(
857                                         channel, pendingWrite.future,
858                                         ChannelBuffers.EMPTY_BUFFER,
859                                         channel.getRemoteAddress()));
860                         offered = true;
861                     } else {
862                         SSLEngineResult result = null;
863                         try {
864                             synchronized (handshakeLock) {
865                                 result = engine.wrap(outAppBuf, outNetBuf);
866                             }
867                         } finally {
868                             if (!outAppBuf.hasRemaining()) {
869                                 pendingUnencryptedWrites.remove();
870                             }
871                         }
872 
873                         if (result.bytesProduced() > 0) {
874                             outNetBuf.flip();
875                             int remaining = outNetBuf.remaining();
876                             msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
877 
878                             // Transfer the bytes to the new ChannelBuffer using some safe method that will also
879                             // work with "non" heap buffers
880                             //
881                             // See https://github.com/netty/netty/issues/329
882                             msg.writeBytes(outNetBuf);
883                             outNetBuf.clear();
884 
885                             ChannelFuture future;
886                             if (pendingWrite.outAppBuf.hasRemaining()) {
887                                 // pendingWrite's future shouldn't be notified if
888                                 // only partial data is written.
889                                 future = succeededFuture(channel);
890                             } else {
891                                 future = pendingWrite.future;
892                             }
893 
894                             MessageEvent encryptedWrite = new DownstreamMessageEvent(
895                                     channel, future, msg, channel.getRemoteAddress());
896                             offerEncryptedWriteRequest(encryptedWrite);
897                             offered = true;
898                         } else if (result.getStatus() == Status.CLOSED) {
899                             // SSLEngine has been closed already.
900                             // Any further write attempts should be denied.
901                             success = false;
902                             break;
903                         } else {
904                             final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
905                             handleRenegotiation(handshakeStatus);
906                             switch (handshakeStatus) {
907                             case NEED_WRAP:
908                                 if (outAppBuf.hasRemaining()) {
909                                     break;
910                                 } else {
911                                     break loop;
912                                 }
913                             case NEED_UNWRAP:
914                                 needsUnwrap = true;
915                                 break loop;
916                             case NEED_TASK:
917                                 runDelegatedTasks();
918                                 break;
919                             case FINISHED:
920                             case NOT_HANDSHAKING:
921                                 if (handshakeStatus == HandshakeStatus.FINISHED) {
922                                     setHandshakeSuccess(channel);
923                                 }
924                                 if (result.getStatus() == Status.CLOSED) {
925                                     success = false;
926                                 }
927                                 break loop;
928                             default:
929                                 throw new IllegalStateException(
930                                         "Unknown handshake status: " +
931                                         handshakeStatus);
932                             }
933                         }
934                     }
935                 }
936             }
937         } catch (SSLException e) {
938             success = false;
939             setHandshakeFailure(channel, e);
940             throw e;
941         } finally {
942             bufferPool.releaseBuffer(outNetBuf);
943 
944             if (offered) {
945                 flushPendingEncryptedWrites(context);
946             }
947 
948             if (!success) {
949                 IllegalStateException cause =
950                     new IllegalStateException("SSLEngine already closed");
951 
952                 // Check if we had a pendingWrite in process, if so we need to also notify as otherwise
953                 // the ChannelFuture will never get notified
954                 if (pendingWrite != null) {
955                     pendingWrite.future.setFailure(cause);
956                 }
957 
958                 // Mark all remaining pending writes as failure if anything
959                 // wrong happened before the write requests are wrapped.
960                 // Please note that we do not call setFailure while a lock is
961                 // acquired, to avoid a potential dead lock.
962                 for (;;) {
963                     synchronized (pendingUnencryptedWrites) {
964                         pendingWrite = pendingUnencryptedWrites.poll();
965                         if (pendingWrite == null) {
966                             break;
967                         }
968                     }
969 
970                     pendingWrite.future.setFailure(cause);
971                 }
972             }
973         }
974 
975         if (needsUnwrap) {
976             unwrap(context, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
977         }
978     }
979 
980     private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
981         final boolean locked = pendingEncryptedWritesLock.tryLock();
982         try {
983             pendingEncryptedWrites.offer(encryptedWrite);
984         } finally {
985             if (locked) {
986                 pendingEncryptedWritesLock.unlock();
987             }
988         }
989     }
990 
991     private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
992         // Avoid possible dead lock and data integrity issue
993         // which is caused by cross communication between more than one channel
994         // in the same VM.
995         if (!pendingEncryptedWritesLock.tryLock()) {
996             return;
997         }
998 
999         try {
1000             MessageEvent e;
1001             while ((e = pendingEncryptedWrites.poll()) != null) {
1002                 ctx.sendDownstream(e);
1003             }
1004         } finally {
1005             pendingEncryptedWritesLock.unlock();
1006         }
1007     }
1008 
1009     private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1010         ChannelFuture future = null;
1011         ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1012 
1013         SSLEngineResult result;
1014         try {
1015             for (;;) {
1016                 synchronized (handshakeLock) {
1017                     result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1018                 }
1019 
1020                 if (result.bytesProduced() > 0) {
1021                     outNetBuf.flip();
1022                     ChannelBuffer msg =
1023                             ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1024 
1025 
1026                     // Transfer the bytes to the new ChannelBuffer using some safe method that will also
1027                     // work with "non" heap buffers
1028                     //
1029                     // See https://github.com/netty/netty/issues/329
1030                     msg.writeBytes(outNetBuf);
1031                     outNetBuf.clear();
1032 
1033                     future = future(channel);
1034                     future.addListener(new ChannelFutureListener() {
1035                         public void operationComplete(ChannelFuture future)
1036                                 throws Exception {
1037                             if (future.getCause() instanceof ClosedChannelException) {
1038                                 synchronized (ignoreClosedChannelExceptionLock) {
1039                                     ignoreClosedChannelException ++;
1040                                 }
1041                             }
1042                         }
1043                     });
1044 
1045                     write(ctx, future, msg);
1046                 }
1047 
1048                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1049                 handleRenegotiation(handshakeStatus);
1050                 switch (handshakeStatus) {
1051                 case FINISHED:
1052                     setHandshakeSuccess(channel);
1053                     runDelegatedTasks();
1054                     break;
1055                 case NEED_TASK:
1056                     runDelegatedTasks();
1057                     break;
1058                 case NEED_UNWRAP:
1059                     if (!Thread.holdsLock(handshakeLock)) {
1060                         // unwrap shouldn't be called when this method was
1061                         // called by unwrap - unwrap will keep running after
1062                         // this method returns.
1063                         unwrap(ctx, channel, ChannelBuffers.EMPTY_BUFFER, 0, 0);
1064                     }
1065                     break;
1066                 case NOT_HANDSHAKING:
1067                 case NEED_WRAP:
1068                     break;
1069                 default:
1070                     throw new IllegalStateException(
1071                             "Unexpected handshake status: " + handshakeStatus);
1072                 }
1073 
1074                 if (result.bytesProduced() == 0) {
1075                     break;
1076                 }
1077             }
1078         } catch (SSLException e) {
1079             setHandshakeFailure(channel, e);
1080             throw e;
1081         } finally {
1082             bufferPool.releaseBuffer(outNetBuf);
1083         }
1084 
1085         if (future == null) {
1086             future = succeededFuture(channel);
1087         }
1088 
1089         return future;
1090     }
1091 
1092     private ChannelBuffer unwrap(
1093             ChannelHandlerContext ctx, Channel channel,
1094             ChannelBuffer buffer, int offset, int length) throws SSLException {
1095         ByteBuffer inNetBuf = buffer.toByteBuffer(offset, length);
1096         ByteBuffer outAppBuf = bufferPool.acquireBuffer();
1097 
1098         try {
1099             boolean needsWrap = false;
1100             loop:
1101             for (;;) {
1102                 SSLEngineResult result;
1103                 boolean needsHandshake = false;
1104                 synchronized (handshakeLock) {
1105                     if (!handshaken && !handshaking &&
1106                         !engine.getUseClientMode() &&
1107                         !engine.isInboundDone() && !engine.isOutboundDone()) {
1108                         needsHandshake = true;
1109 
1110                     }
1111                 }
1112                 if (needsHandshake) {
1113                     handshake();
1114                 }
1115 
1116                 synchronized (handshakeLock) {
1117                     result = engine.unwrap(inNetBuf, outAppBuf);
1118                 }
1119 
1120                 // notify about the CLOSED state of the SSLEngine. See #137
1121                 if (result.getStatus() == Status.CLOSED) {
1122                     sslEngineCloseFuture.setClosed();
1123                 }
1124 
1125 
1126                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1127                 handleRenegotiation(handshakeStatus);
1128                 switch (handshakeStatus) {
1129                 case NEED_UNWRAP:
1130                     if (inNetBuf.hasRemaining() && !engine.isInboundDone()) {
1131                         break;
1132                     } else {
1133                         break loop;
1134                     }
1135                 case NEED_WRAP:
1136                     wrapNonAppData(ctx, channel);
1137                     break;
1138                 case NEED_TASK:
1139                     runDelegatedTasks();
1140                     break;
1141                 case FINISHED:
1142                     setHandshakeSuccess(channel);
1143                     needsWrap = true;
1144                     break loop;
1145                 case NOT_HANDSHAKING:
1146                     needsWrap = true;
1147                     break loop;
1148                 default:
1149                     throw new IllegalStateException(
1150                             "Unknown handshake status: " + handshakeStatus);
1151                 }
1152 
1153             }
1154 
1155             if (needsWrap) {
1156                 // wrap() acquires pendingUnencryptedWrites first and then
1157                 // handshakeLock.  If handshakeLock is already hold by the
1158                 // current thread, calling wrap() will lead to a dead lock
1159                 // i.e. pendingUnencryptedWrites -> handshakeLock vs.
1160                 //      handshakeLock -> pendingUnencryptedLock -> handshakeLock
1161                 //
1162                 // There is also a same issue between pendingEncryptedWrites
1163                 // and pendingUnencryptedWrites.
1164                 if (!Thread.holdsLock(handshakeLock) &&
1165                     !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1166                     wrap(ctx, channel);
1167                 }
1168             }
1169 
1170             outAppBuf.flip();
1171 
1172             if (outAppBuf.hasRemaining()) {
1173                 ChannelBuffer frame = ctx.getChannel().getConfig().getBufferFactory().getBuffer(outAppBuf.remaining());
1174                 // Transfer the bytes to the new ChannelBuffer using some safe method that will also
1175                 // work with "non" heap buffers
1176                 //
1177                 // See https://github.com/netty/netty/issues/329
1178                 frame.writeBytes(outAppBuf);
1179 
1180                 return frame;
1181             } else {
1182                 return null;
1183             }
1184         } catch (SSLException e) {
1185             setHandshakeFailure(channel, e);
1186             throw e;
1187         } finally {
1188             bufferPool.releaseBuffer(outAppBuf);
1189         }
1190     }
1191 
1192     private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1193         if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1194             handshakeStatus == HandshakeStatus.FINISHED) {
1195             // Not handshaking
1196             return;
1197         }
1198 
1199         if (!handshaken) {
1200             // Not renegotiation
1201             return;
1202         }
1203 
1204         final boolean renegotiate;
1205         synchronized (handshakeLock) {
1206             if (handshaking) {
1207                 // Renegotiation in progress or failed already.
1208                 // i.e. Renegotiation check has been done already below.
1209                 return;
1210             }
1211 
1212             if (engine.isInboundDone() || engine.isOutboundDone()) {
1213                 // Not handshaking but closing.
1214                 return;
1215             }
1216 
1217             if (isEnableRenegotiation()) {
1218                 // Continue renegotiation.
1219                 renegotiate = true;
1220             } else {
1221                 // Do not renegotiate.
1222                 renegotiate = false;
1223                 // Prevent reentrance of this method.
1224                 handshaking = true;
1225             }
1226         }
1227 
1228         if (renegotiate) {
1229             // Renegotiate.
1230             handshake();
1231         } else {
1232             // Raise an exception.
1233             fireExceptionCaught(
1234                     ctx, new SSLException(
1235                             "renegotiation attempted by peer; " +
1236                             "closing the connection"));
1237 
1238             // Close the connection to stop renegotiation.
1239             Channels.close(ctx, succeededFuture(ctx.getChannel()));
1240         }
1241     }
1242 
1243     private void runDelegatedTasks() {
1244         for (;;) {
1245             final Runnable task;
1246             synchronized (handshakeLock) {
1247                 task = engine.getDelegatedTask();
1248             }
1249 
1250             if (task == null) {
1251                 break;
1252             }
1253 
1254             delegatedTaskExecutor.execute(new Runnable() {
1255                 public void run() {
1256                     synchronized (handshakeLock) {
1257                         task.run();
1258                     }
1259                 }
1260             });
1261         }
1262     }
1263 
1264     private void setHandshakeSuccess(Channel channel) {
1265         synchronized (handshakeLock) {
1266             handshaking = false;
1267             handshaken = true;
1268 
1269             if (handshakeFuture == null) {
1270                 handshakeFuture = future(channel);
1271             }
1272         }
1273 
1274         handshakeFuture.setSuccess();
1275     }
1276 
1277     private void setHandshakeFailure(Channel channel, SSLException cause) {
1278         synchronized (handshakeLock) {
1279             if (!handshaking) {
1280                 return;
1281             }
1282             handshaking = false;
1283             handshaken = false;
1284 
1285             if (handshakeFuture == null) {
1286                 handshakeFuture = future(channel);
1287             }
1288 
1289             // Release all resources such as internal buffers that SSLEngine
1290             // is managing.
1291 
1292             engine.closeOutbound();
1293 
1294             try {
1295                 engine.closeInbound();
1296             } catch (SSLException e) {
1297                 if (logger.isDebugEnabled()) {
1298                     logger.debug(
1299                             "SSLEngine.closeInbound() raised an exception after " +
1300                             "a handshake failure.", e);
1301                 }
1302 
1303             }
1304         }
1305 
1306         handshakeFuture.setFailure(cause);
1307         if (closeOnSSLException) {
1308             Channels.close(ctx, future(channel));
1309         }
1310     }
1311 
1312     private void closeOutboundAndChannel(
1313             final ChannelHandlerContext context, final ChannelStateEvent e) {
1314         if (!e.getChannel().isConnected()) {
1315             context.sendDownstream(e);
1316             return;
1317         }
1318 
1319         boolean passthrough = true;
1320         try {
1321             try {
1322                 unwrap(context, e.getChannel(), ChannelBuffers.EMPTY_BUFFER, 0, 0);
1323             } catch (SSLException ex) {
1324                 if (logger.isDebugEnabled()) {
1325                     logger.debug("Failed to unwrap before sending a close_notify message", ex);
1326                 }
1327             }
1328 
1329             if (!engine.isOutboundDone()) {
1330                 if (sentCloseNotify.compareAndSet(false, true)) {
1331                     engine.closeOutbound();
1332                     try {
1333                         ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1334                         closeNotifyFuture.addListener(
1335                                 new ClosingChannelFutureListener(context, e));
1336                         passthrough = false;
1337                     } catch (SSLException ex) {
1338                         if (logger.isDebugEnabled()) {
1339                             logger.debug("Failed to encode a close_notify message", ex);
1340                         }
1341                     }
1342                 }
1343             }
1344         } finally {
1345             if (passthrough) {
1346                 context.sendDownstream(e);
1347             }
1348         }
1349     }
1350 
1351     private static final class PendingWrite {
1352         final ChannelFuture future;
1353         final ByteBuffer outAppBuf;
1354 
1355         PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1356             this.future = future;
1357             this.outAppBuf = outAppBuf;
1358         }
1359     }
1360 
1361     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1362 
1363         private final ChannelHandlerContext context;
1364         private final ChannelStateEvent e;
1365 
1366         ClosingChannelFutureListener(
1367                 ChannelHandlerContext context, ChannelStateEvent e) {
1368             this.context = context;
1369             this.e = e;
1370         }
1371 
1372         public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1373             if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1374                 Channels.close(context, e.getFuture());
1375             } else {
1376                 e.getFuture().setSuccess();
1377             }
1378         }
1379     }
1380 
1381     @Override
1382     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1383         super.beforeAdd(ctx);
1384         this.ctx = ctx;
1385     }
1386 
1387     /**
1388      * Fail all pending writes which we were not able to flush out
1389      */
1390     @Override
1391     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1392 
1393         // there is no need for synchronization here as we do not receive downstream events anymore
1394         Throwable cause = null;
1395         for (;;) {
1396             PendingWrite pw = pendingUnencryptedWrites.poll();
1397             if (pw == null) {
1398                 break;
1399             }
1400             if (cause == null) {
1401                 cause = new IOException("Unable to write data");
1402             }
1403             pw.future.setFailure(cause);
1404 
1405         }
1406 
1407         for (;;) {
1408             MessageEvent ev = pendingEncryptedWrites.poll();
1409             if (ev == null) {
1410                 break;
1411             }
1412             if (cause == null) {
1413                 cause = new IOException("Unable to write data");
1414             }
1415             ev.getFuture().setFailure(cause);
1416 
1417         }
1418 
1419         if (cause != null) {
1420             fireExceptionCaughtLater(ctx, cause);
1421         }
1422     }
1423 
1424 
1425     /**
1426      * Calls {@link #handshake()} once the {@link Channel} is connected
1427      */
1428     @Override
1429     public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1430         if (issueHandshake) {
1431             // issue and handshake and add a listener to it which will fire an exception event if
1432             // an exception was thrown while doing the handshake
1433             handshake().addListener(new ChannelFutureListener() {
1434 
1435                 public void operationComplete(ChannelFuture future) throws Exception {
1436                     if (!future.isSuccess()) {
1437                         fireExceptionCaught(future.getChannel(), future.getCause());
1438                     } else {
1439                         // Send the event upstream after the handshake was completed without an error.
1440                         //
1441                         // See https://github.com/netty/netty/issues/358
1442                         ctx.sendUpstream(e);
1443                     }
1444 
1445                 }
1446             });
1447         } else {
1448             super.channelConnected(ctx, e);
1449         }
1450     }
1451 
1452     /**
1453      * Loop over all the pending writes and fail them.
1454      *
1455      * See <a href="https://github.com/netty/netty/issues/305">#305</a> for more details.
1456      */
1457     @Override
1458     public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1459         Throwable cause = null;
1460         synchronized (pendingUnencryptedWrites) {
1461             for (;;) {
1462                 PendingWrite pw = pendingUnencryptedWrites.poll();
1463                 if (pw == null) {
1464                     break;
1465                 }
1466                 if (cause == null) {
1467                     cause = new ClosedChannelException();
1468                 }
1469                 pw.future.setFailure(cause);
1470 
1471             }
1472 
1473 
1474             for (;;) {
1475                 MessageEvent ev = pendingEncryptedWrites.poll();
1476                 if (ev == null) {
1477                     break;
1478                 }
1479                 if (cause == null) {
1480                     cause = new ClosedChannelException();
1481                 }
1482                 ev.getFuture().setFailure(cause);
1483 
1484             }
1485         }
1486 
1487         if (cause != null) {
1488             fireExceptionCaught(ctx, cause);
1489         }
1490 
1491         super.channelClosed(ctx, e);
1492     }
1493 
1494     private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1495         public SSLEngineInboundCloseFuture() {
1496             super(null, true);
1497         }
1498 
1499         void setClosed() {
1500             super.setSuccess();
1501         }
1502 
1503         @Override
1504         public Channel getChannel() {
1505             if (ctx == null) {
1506                 // Maybe we should better throw an IllegalStateException() ?
1507                 return null;
1508             } else {
1509                 return ctx.getChannel();
1510             }
1511         }
1512 
1513         @Override
1514         public boolean setSuccess() {
1515             return false;
1516         }
1517 
1518         @Override
1519         public boolean setFailure(Throwable cause) {
1520             return false;
1521         }
1522     }
1523 }