View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package org.jboss.netty.handler.ssl;
17  
18  import org.jboss.netty.buffer.ChannelBuffer;
19  import org.jboss.netty.buffer.ChannelBufferFactory;
20  import org.jboss.netty.buffer.ChannelBuffers;
21  import org.jboss.netty.channel.Channel;
22  import org.jboss.netty.channel.ChannelDownstreamHandler;
23  import org.jboss.netty.channel.ChannelEvent;
24  import org.jboss.netty.channel.ChannelFuture;
25  import org.jboss.netty.channel.ChannelFutureListener;
26  import org.jboss.netty.channel.ChannelHandlerContext;
27  import org.jboss.netty.channel.ChannelPipeline;
28  import org.jboss.netty.channel.ChannelStateEvent;
29  import org.jboss.netty.channel.Channels;
30  import org.jboss.netty.channel.DefaultChannelFuture;
31  import org.jboss.netty.channel.DownstreamMessageEvent;
32  import org.jboss.netty.channel.ExceptionEvent;
33  import org.jboss.netty.channel.MessageEvent;
34  import org.jboss.netty.handler.codec.frame.FrameDecoder;
35  import org.jboss.netty.logging.InternalLogger;
36  import org.jboss.netty.logging.InternalLoggerFactory;
37  import org.jboss.netty.util.Timeout;
38  import org.jboss.netty.util.Timer;
39  import org.jboss.netty.util.TimerTask;
40  import org.jboss.netty.util.internal.DetectionUtil;
41  import org.jboss.netty.util.internal.NonReentrantLock;
42  
43  import javax.net.ssl.SSLEngine;
44  import javax.net.ssl.SSLEngineResult;
45  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
46  import javax.net.ssl.SSLEngineResult.Status;
47  import javax.net.ssl.SSLException;
48  import java.io.IOException;
49  import java.nio.ByteBuffer;
50  import java.nio.channels.ClosedChannelException;
51  import java.nio.channels.DatagramChannel;
52  import java.nio.channels.SocketChannel;
53  import java.util.ArrayList;
54  import java.util.LinkedList;
55  import java.util.List;
56  import java.util.Queue;
57  import java.util.concurrent.ConcurrentLinkedQueue;
58  import java.util.concurrent.CountDownLatch;
59  import java.util.concurrent.Executor;
60  import java.util.concurrent.TimeUnit;
61  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
62  import java.util.regex.Pattern;
63  
64  import static org.jboss.netty.channel.Channels.*;
65  
66  /**
67   * Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
68   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
69   * to the <strong>"SecureChat"</strong> example in the distribution or the web
70   * site for the detailed usage.
71   *
72   * <h3>Beginning the handshake</h3>
73   * <p>
74   * You must make sure not to write a message while the
75   * {@linkplain #handshake() handshake} is in progress unless you are
76   * renegotiating.  You will be notified by the {@link ChannelFuture} which is
77   * returned by the {@link #handshake()} method when the handshake
78   * process succeeds or fails.
79   *
80   * <h3>Handshake</h3>
81   * <p>
82   * If {@link #isIssueHandshake()} is {@code false}
83   * (default) you will need to take care of calling {@link #handshake()} by your own. In most
84   * situations were {@link SslHandler} is used in 'client mode' you want to issue a handshake once
85   * the connection was established. if {@link #setIssueHandshake(boolean)} is set to {@code true}
86   * you don't need to worry about this as the {@link SslHandler} will take care of it.
87   * <p>
88   *
89   * <h3>Renegotiation</h3>
90   * <p>
91   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code true}
92   * (default) and the initial handshake has been done successfully, you can call
93   * {@link #handshake()} to trigger the renegotiation.
94   * <p>
95   * If {@link #isEnableRenegotiation() enableRenegotiation} is {@code false},
96   * an attempt to trigger renegotiation will result in the connection closure.
97   * <p>
98   * Please note that TLS renegotiation had a security issue before.  If your
99   * runtime environment did not fix it, please make sure to disable TLS
100  * renegotiation by calling {@link #setEnableRenegotiation(boolean)} with
101  * {@code false}.  For more information, please refer to the following documents:
102  * <ul>
103  *   <li><a href="http://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2009-3555">CVE-2009-3555</a></li>
104  *   <li><a href="http://www.ietf.org/rfc/rfc5746.txt">RFC5746</a></li>
105  *   <li><a href="http://www.oracle.com/technetwork/java/javase/documentation/tlsreadme2-176330.html">Phased
106  *       Approach to Fixing the TLS Renegotiation Issue</a></li>
107  * </ul>
108  *
109  * <h3>Closing the session</h3>
110  * <p>
111  * To close the SSL session, the {@link #close()} method should be
112  * called to send the {@code close_notify} message to the remote peer.  One
113  * exception is when you close the {@link Channel} - {@link SslHandler}
114  * intercepts the close request and send the {@code close_notify} message
115  * before the channel closure automatically.  Once the SSL session is closed,
116  * it is not reusable, and consequently you should create a new
117  * {@link SslHandler} with a new {@link SSLEngine} as explained in the
118  * following section.
119  *
120  * <h3>Restarting the session</h3>
121  * <p>
122  * To restart the SSL session, you must remove the existing closed
123  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
124  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
125  * and start the handshake process as described in the first section.
126  *
127  * <h3>Implementing StartTLS</h3>
128  * <p>
129  * <a href="http://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
130  * communication pattern that secures the wire in the middle of the plaintext
131  * connection.  Please note that it is different from SSL &middot; TLS, that
132  * secures the wire from the beginning of the connection.  Typically, StartTLS
133  * is composed of three steps:
134  * <ol>
135  * <li>Client sends a StartTLS request to server.</li>
136  * <li>Server sends a StartTLS response to client.</li>
137  * <li>Client begins SSL handshake.</li>
138  * </ol>
139  * If you implement a server, you need to:
140  * <ol>
141  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
142  *     to {@code true},</li>
143  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
144  * <li>write a StartTLS response.</li>
145  * </ol>
146  * Please note that you must insert {@link SslHandler} <em>before</em> sending
147  * the StartTLS response.  Otherwise the client can send begin SSL handshake
148  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
149  * data corruption.
150  * <p>
151  * The client-side implementation is much simpler.
152  * <ol>
153  * <li>Write a StartTLS request,</li>
154  * <li>wait for the StartTLS response,</li>
155  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
156  *     to {@code false},</li>
157  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
158  * <li>Initiate SSL handshake by calling {@link SslHandler#handshake()}.</li>
159  * </ol>
160  *
161  * <h3>Known issues</h3>
162  * <p>
163  * Because of a known issue with the current implementation of the SslEngine that comes
164  * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
165  * <p>
166  * So if you are affected you can workaround this problem by adjust the cache settings
167  * like shown below:
168  *
169  * <pre>
170  *     SslContext context = ...;
171  *     context.getServerSessionContext().setSessionCacheSize(someSaneSize);
172  *     context.getServerSessionContext().setSessionTime(someSameTimeout);
173  * </pre>
174  * <p>
175  * What values to use here depends on the nature of your application and should be set
176  * based on monitoring and debugging of it.
177  * For more details see
178  * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
179  * @apiviz.landmark
180  * @apiviz.uses org.jboss.netty.handler.ssl.SslBufferPool
181  */
182 public class SslHandler extends FrameDecoder
183                         implements ChannelDownstreamHandler {
184 
185     private static final InternalLogger logger = InternalLoggerFactory.getInstance(SslHandler.class);
186 
187     private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
188 
189     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
190             "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
191     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
192             "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
193 
194     private static SslBufferPool defaultBufferPool;
195 
196     /**
197      * Returns the default {@link SslBufferPool} used when no pool is
198      * specified in the constructor.
199      */
200     public static synchronized SslBufferPool getDefaultBufferPool() {
201         if (defaultBufferPool == null) {
202             defaultBufferPool = new SslBufferPool();
203         }
204         return defaultBufferPool;
205     }
206 
207     private volatile ChannelHandlerContext ctx;
208     private final SSLEngine engine;
209     private final SslBufferPool bufferPool;
210     private final Executor delegatedTaskExecutor;
211     private final boolean startTls;
212 
213     private volatile boolean enableRenegotiation = true;
214 
215     final Object handshakeLock = new Object();
216     private boolean handshaking;
217     private volatile boolean handshaken;
218     private volatile ChannelFuture handshakeFuture;
219 
220     @SuppressWarnings("UnusedDeclaration")
221     private volatile int sentFirstMessage;
222     @SuppressWarnings("UnusedDeclaration")
223     private volatile int sentCloseNotify;
224     @SuppressWarnings("UnusedDeclaration")
225     private volatile int closedOutboundAndChannel;
226 
227     private static final AtomicIntegerFieldUpdater<SslHandler> SENT_FIRST_MESSAGE_UPDATER =
228             AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentFirstMessage");
229     private static final AtomicIntegerFieldUpdater<SslHandler> SENT_CLOSE_NOTIFY_UPDATER =
230             AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "sentCloseNotify");
231     private static final AtomicIntegerFieldUpdater<SslHandler> CLOSED_OUTBOUND_AND_CHANNEL_UPDATER =
232             AtomicIntegerFieldUpdater.newUpdater(SslHandler.class, "closedOutboundAndChannel");
233 
234     int ignoreClosedChannelException;
235     final Object ignoreClosedChannelExceptionLock = new Object();
236     private final Queue<PendingWrite> pendingUnencryptedWrites = new LinkedList<PendingWrite>();
237     private final NonReentrantLock pendingUnencryptedWritesLock = new NonReentrantLock();
238     private final Queue<MessageEvent> pendingEncryptedWrites = new ConcurrentLinkedQueue<MessageEvent>();
239     private final NonReentrantLock pendingEncryptedWritesLock = new NonReentrantLock();
240 
241     private volatile boolean issueHandshake;
242     private volatile boolean writeBeforeHandshakeDone;
243     private final SSLEngineInboundCloseFuture sslEngineCloseFuture = new SSLEngineInboundCloseFuture();
244 
245     private boolean closeOnSslException;
246 
247     private int packetLength;
248 
249     private final Timer timer;
250     private final long handshakeTimeoutInMillis;
251     private Timeout handshakeTimeout;
252 
253     /**
254      * Creates a new instance.
255      *
256      * @param engine  the {@link SSLEngine} this handler will use
257      */
258     public SslHandler(SSLEngine engine) {
259         this(engine, getDefaultBufferPool(), false, null, 0);
260     }
261 
262     /**
263      * Creates a new instance.
264      *
265      * @param engine      the {@link SSLEngine} this handler will use
266      * @param bufferPool  the {@link SslBufferPool} where this handler will
267      *                    acquire the buffers required by the {@link SSLEngine}
268      */
269     public SslHandler(SSLEngine engine, SslBufferPool bufferPool) {
270         this(engine, bufferPool, false, null, 0);
271     }
272 
273     /**
274      * Creates a new instance.
275      *
276      * @param engine    the {@link SSLEngine} this handler will use
277      * @param startTls  {@code true} if the first write request shouldn't be
278      *                  encrypted by the {@link SSLEngine}
279      */
280     public SslHandler(SSLEngine engine, boolean startTls) {
281         this(engine, getDefaultBufferPool(), startTls);
282     }
283 
284     /**
285      * Creates a new instance.
286      *
287      * @param engine      the {@link SSLEngine} this handler will use
288      * @param bufferPool  the {@link SslBufferPool} where this handler will
289      *                    acquire the buffers required by the {@link SSLEngine}
290      * @param startTls    {@code true} if the first write request shouldn't be
291      *                    encrypted by the {@link SSLEngine}
292      */
293     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls) {
294         this(engine, bufferPool, startTls, null, 0);
295     }
296 
297     /**
298      * Creates a new instance.
299      *
300      * @param engine
301      *        the {@link SSLEngine} this handler will use
302      * @param bufferPool
303      *        the {@link SslBufferPool} where this handler will acquire
304      *        the buffers required by the {@link SSLEngine}
305      * @param startTls
306      *        {@code true} if the first write request shouldn't be encrypted
307      *        by the {@link SSLEngine}
308      * @param timer
309      *        the {@link Timer} which will be used to process the timeout of the {@link #handshake()}.
310      *        Be aware that the given {@link Timer} will not get stopped automaticly, so it is up to you to cleanup
311      *        once you not need it anymore
312      * @param handshakeTimeoutInMillis
313      *        the time in milliseconds after whic the {@link #handshake()}  will be failed, and so the future notified
314      */
315     @SuppressWarnings("deprecation")
316     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls,
317                       Timer timer, long handshakeTimeoutInMillis) {
318         this(engine, bufferPool, startTls, ImmediateExecutor.INSTANCE, timer, handshakeTimeoutInMillis);
319     }
320 
321     /**
322      * @deprecated Use {@link #SslHandler(SSLEngine)} instead.
323      */
324     @Deprecated
325     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
326         this(engine, getDefaultBufferPool(), delegatedTaskExecutor);
327     }
328 
329     /**
330      * @deprecated Use {@link #SslHandler(SSLEngine, boolean)} instead.
331      */
332     @Deprecated
333     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, Executor delegatedTaskExecutor) {
334         this(engine, bufferPool, false, delegatedTaskExecutor);
335     }
336 
337     /**
338      * @deprecated  Use {@link #SslHandler(SSLEngine, boolean)} instead.
339      */
340     @Deprecated
341     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
342         this(engine, getDefaultBufferPool(), startTls, delegatedTaskExecutor);
343     }
344 
345     /**
346      * @deprecated Use {@link #SslHandler(SSLEngine, SslBufferPool, boolean)} instead.
347      */
348     @Deprecated
349     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor) {
350         this(engine, bufferPool, startTls, delegatedTaskExecutor, null, 0);
351     }
352 
353     /**
354      * @deprecated Use {@link #SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Timer timer,
355      *             long handshakeTimeoutInMillis)} instead.
356      */
357     @Deprecated
358     public SslHandler(SSLEngine engine, SslBufferPool bufferPool, boolean startTls, Executor delegatedTaskExecutor,
359                       Timer timer, long handshakeTimeoutInMillis) {
360         if (engine == null) {
361             throw new NullPointerException("engine");
362         }
363         if (bufferPool == null) {
364             throw new NullPointerException("bufferPool");
365         }
366         if (delegatedTaskExecutor == null) {
367             throw new NullPointerException("delegatedTaskExecutor");
368         }
369         if (timer == null && handshakeTimeoutInMillis > 0) {
370             throw new IllegalArgumentException("No Timer was given but a handshakeTimeoutInMillis, need both or none");
371         }
372 
373         this.engine = engine;
374         this.bufferPool = bufferPool;
375         this.delegatedTaskExecutor = delegatedTaskExecutor;
376         this.startTls = startTls;
377         this.timer = timer;
378         this.handshakeTimeoutInMillis = handshakeTimeoutInMillis;
379     }
380 
381     /**
382      * Returns the {@link SSLEngine} which is used by this handler.
383      */
384     public SSLEngine getEngine() {
385         return engine;
386     }
387 
388     /**
389      * Starts an SSL / TLS handshake for the specified channel.
390      *
391      * @return a {@link ChannelFuture} which is notified when the handshake
392      *         succeeds or fails.
393      */
394     public ChannelFuture handshake() {
395         synchronized (handshakeLock) {
396             if (handshaken && !isEnableRenegotiation()) {
397                 throw new IllegalStateException("renegotiation disabled");
398             }
399 
400             final ChannelHandlerContext ctx = this.ctx;
401             final Channel channel = ctx.getChannel();
402             ChannelFuture handshakeFuture;
403             Exception exception = null;
404 
405             if (handshaking) {
406                 return this.handshakeFuture;
407             }
408 
409             handshaking = true;
410             try {
411                 engine.beginHandshake();
412                 runDelegatedTasks();
413                 handshakeFuture = this.handshakeFuture = future(channel);
414                 if (handshakeTimeoutInMillis > 0) {
415                     handshakeTimeout = timer.newTimeout(new TimerTask() {
416                             public void run(Timeout timeout) throws Exception {
417                             ChannelFuture future = SslHandler.this.handshakeFuture;
418                             if (future != null && future.isDone()) {
419                                 return;
420                             }
421 
422                             setHandshakeFailure(channel, new SSLException("Handshake did not complete within " +
423                                             handshakeTimeoutInMillis + "ms"));
424                         }
425                         }, handshakeTimeoutInMillis, TimeUnit.MILLISECONDS);
426                 }
427             } catch (Exception e) {
428                 handshakeFuture = this.handshakeFuture = failedFuture(channel, e);
429                 exception = e;
430             }
431 
432             if (exception == null) { // Began handshake successfully.
433                 try {
434                     final ChannelFuture hsFuture = handshakeFuture;
435                     wrapNonAppData(ctx, channel).addListener(new ChannelFutureListener() {
436                         public void operationComplete(ChannelFuture future) throws Exception {
437                             if (!future.isSuccess()) {
438                                 Throwable cause = future.getCause();
439                                 hsFuture.setFailure(cause);
440 
441                                 fireExceptionCaught(ctx, cause);
442                                 if (closeOnSslException) {
443                                     Channels.close(ctx, future(channel));
444                                 }
445                             }
446                         }
447                     });
448                 } catch (SSLException e) {
449                     handshakeFuture.setFailure(e);
450 
451                     fireExceptionCaught(ctx, e);
452                     if (closeOnSslException) {
453                         Channels.close(ctx, future(channel));
454                     }
455                 }
456             } else { // Failed to initiate handshake.
457                 fireExceptionCaught(ctx, exception);
458                 if (closeOnSslException) {
459                     Channels.close(ctx, future(channel));
460                 }
461             }
462             return handshakeFuture;
463         }
464     }
465 
466     /**
467      * @deprecated Use {@link #handshake()} instead.
468      */
469     @Deprecated
470     public ChannelFuture handshake(@SuppressWarnings("unused") Channel channel) {
471         return handshake();
472     }
473 
474     /**
475      * Sends an SSL {@code close_notify} message to the specified channel and
476      * destroys the underlying {@link SSLEngine}.
477      */
478     public ChannelFuture close() {
479         ChannelHandlerContext ctx = this.ctx;
480         Channel channel = ctx.getChannel();
481         try {
482             engine.closeOutbound();
483             return wrapNonAppData(ctx, channel);
484         } catch (SSLException e) {
485             fireExceptionCaught(ctx, e);
486             if (closeOnSslException) {
487                 Channels.close(ctx, future(channel));
488             }
489             return failedFuture(channel, e);
490         }
491     }
492 
493     /**
494      * @deprecated Use {@link #close()} instead.
495      */
496     @Deprecated
497     public ChannelFuture close(@SuppressWarnings("unused") Channel channel) {
498         return close();
499     }
500 
501     /**
502      * Returns {@code true} if and only if TLS renegotiation is enabled.
503      */
504     public boolean isEnableRenegotiation() {
505         return enableRenegotiation;
506     }
507 
508     /**
509      * Enables or disables TLS renegotiation.
510      */
511     public void setEnableRenegotiation(boolean enableRenegotiation) {
512         this.enableRenegotiation = enableRenegotiation;
513     }
514 
515     /**
516      * Enables or disables the automatic handshake once the {@link Channel} is
517      * connected. The value will only have affect if its set before the
518      * {@link Channel} is connected.
519      */
520     public void setIssueHandshake(boolean issueHandshake) {
521         this.issueHandshake = issueHandshake;
522     }
523 
524     /**
525      * Returns {@code true} if the automatic handshake is enabled
526      */
527     public boolean isIssueHandshake() {
528         return issueHandshake;
529     }
530 
531     /**
532      * Return the {@link ChannelFuture} that will get notified if the inbound of the {@link SSLEngine} will get closed.
533      *
534      * This method will return the same {@link ChannelFuture} all the time.
535      *
536      * For more informations see the apidocs of {@link SSLEngine}
537      *
538      */
539     public ChannelFuture getSSLEngineInboundCloseFuture() {
540         return sslEngineCloseFuture;
541     }
542 
543     /**
544      * Return the timeout (in ms) after which the {@link ChannelFuture} of {@link #handshake()} will be failed, while
545      * a handshake is in progress
546      */
547     public long getHandshakeTimeout() {
548         return handshakeTimeoutInMillis;
549     }
550 
551     /**
552      * If set to {@code true}, the {@link Channel} will automatically get closed
553      * one a {@link SSLException} was caught. This is most times what you want, as after this
554      * its almost impossible to recover.
555      *
556      * Anyway the default is {@code false} to not break compatibility with older releases. This
557      * will be changed to {@code true} in the next major release.
558      *
559      */
560     public void setCloseOnSSLException(boolean closeOnSslException) {
561         if (ctx != null) {
562             throw new IllegalStateException("Can only get changed before attached to ChannelPipeline");
563         }
564         this.closeOnSslException = closeOnSslException;
565     }
566 
567     public boolean getCloseOnSSLException() {
568         return closeOnSslException;
569     }
570 
571     public void handleDownstream(
572             final ChannelHandlerContext context, final ChannelEvent evt) throws Exception {
573         if (evt instanceof ChannelStateEvent) {
574             ChannelStateEvent e = (ChannelStateEvent) evt;
575             switch (e.getState()) {
576             case OPEN:
577             case CONNECTED:
578             case BOUND:
579                 if (Boolean.FALSE.equals(e.getValue()) || e.getValue() == null) {
580                     closeOutboundAndChannel(context, e);
581                     return;
582                 }
583             }
584         }
585         if (!(evt instanceof MessageEvent)) {
586             context.sendDownstream(evt);
587             return;
588         }
589 
590         MessageEvent e = (MessageEvent) evt;
591         if (!(e.getMessage() instanceof ChannelBuffer)) {
592             context.sendDownstream(evt);
593             return;
594         }
595 
596         // Do not encrypt the first write request if this handler is
597         // created with startTLS flag turned on.
598         if (startTls && SENT_FIRST_MESSAGE_UPDATER.compareAndSet(this, 0, 1)) {
599             context.sendDownstream(evt);
600             return;
601         }
602 
603         // Otherwise, all messages are encrypted.
604         ChannelBuffer msg = (ChannelBuffer) e.getMessage();
605         PendingWrite pendingWrite;
606 
607         if (msg.readable()) {
608             pendingWrite = new PendingWrite(evt.getFuture(), msg.toByteBuffer(msg.readerIndex(), msg.readableBytes()));
609         } else {
610             pendingWrite = new PendingWrite(evt.getFuture(), null);
611         }
612 
613         pendingUnencryptedWritesLock.lock();
614         try {
615             pendingUnencryptedWrites.add(pendingWrite);
616         } finally {
617             pendingUnencryptedWritesLock.unlock();
618         }
619 
620         if (handshakeFuture == null || !handshakeFuture.isDone()) {
621             writeBeforeHandshakeDone = true;
622         }
623         wrap(context, evt.getChannel());
624     }
625 
626     private void cancelHandshakeTimeout() {
627         if (handshakeTimeout != null) {
628             // cancel the task as we will fail the handshake future now
629             handshakeTimeout.cancel();
630         }
631     }
632 
633     @Override
634     public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
635 
636         // Make sure the handshake future is notified when a connection has
637         // been closed during handshake.
638         synchronized (handshakeLock) {
639             if (handshaking) {
640                 cancelHandshakeTimeout();
641                 handshakeFuture.setFailure(new ClosedChannelException());
642             }
643         }
644 
645         try {
646             super.channelDisconnected(ctx, e);
647         } finally {
648             unwrapNonAppData(ctx, e.getChannel(), false);
649             closeEngine();
650         }
651     }
652 
653     private void closeEngine() {
654         engine.closeOutbound();
655         if (sentCloseNotify == 0 && handshaken) {
656             try {
657                 engine.closeInbound();
658             } catch (SSLException ex) {
659                 if (logger.isDebugEnabled()) {
660                     logger.debug("Failed to clean up SSLEngine.", ex);
661                 }
662             }
663         }
664     }
665 
666     @Override
667     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
668             throws Exception {
669 
670         Throwable cause = e.getCause();
671         if (cause instanceof IOException) {
672             if (cause instanceof ClosedChannelException) {
673                 synchronized (ignoreClosedChannelExceptionLock) {
674                     if (ignoreClosedChannelException > 0) {
675                         ignoreClosedChannelException --;
676                         if (logger.isDebugEnabled()) {
677                             logger.debug(
678                                     "Swallowing an exception raised while " +
679                                     "writing non-app data", cause);
680                         }
681 
682                         return;
683                     }
684                 }
685             } else {
686                 if (ignoreException(cause)) {
687                     return;
688                 }
689             }
690         }
691 
692         ctx.sendUpstream(e);
693     }
694 
695     /**
696      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
697      *
698      * When an ssl connection is closed a close_notify message is sent.
699      * After that the peer also sends close_notify however, it's not mandatory to receive
700      * the close_notify. The party who sent the initial close_notify can close the connection immediately
701      * then the peer will get connection reset error.
702      *
703      */
704     private boolean ignoreException(Throwable t) {
705         if (!(t instanceof SSLException) && t instanceof IOException && engine.isOutboundDone()) {
706             String message = String.valueOf(t.getMessage()).toLowerCase();
707 
708             // first try to match connection reset / broke peer based on the regex. This is the fastest way
709             // but may fail on different jdk impls or OS's
710             if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
711                 return true;
712             }
713 
714             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
715             StackTraceElement[] elements = t.getStackTrace();
716             for (StackTraceElement element: elements) {
717                 String classname = element.getClassName();
718                 String methodname = element.getMethodName();
719 
720                 // skip all classes that belong to the io.netty package
721                 if (classname.startsWith("org.jboss.netty.")) {
722                     continue;
723                 }
724 
725                 // check if the method name is read if not skip it
726                 if (!"read".equals(methodname)) {
727                     continue;
728                 }
729 
730                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
731                 // also others
732                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
733                     return true;
734                 }
735 
736                 try {
737                     // No match by now.. Try to load the class via classloader and inspect it.
738                     // This is mainly done as other JDK implementations may differ in name of
739                     // the impl.
740                     Class<?> clazz = getClass().getClassLoader().loadClass(classname);
741 
742                     if (SocketChannel.class.isAssignableFrom(clazz)
743                             || DatagramChannel.class.isAssignableFrom(clazz)) {
744                         return true;
745                     }
746 
747                     // also match against SctpChannel via String matching as it may not present.
748                     if (DetectionUtil.javaVersion() >= 7
749                             && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
750                         return true;
751                     }
752                 } catch (ClassNotFoundException e) {
753                     // This should not happen just ignore
754                 }
755             }
756         }
757 
758         return false;
759     }
760 
761     /**
762      * Returns {@code true} if the given {@link ChannelBuffer} is encrypted. Be aware that this method
763      * will not increase the readerIndex of the given {@link ChannelBuffer}.
764      *
765      * @param   buffer
766      *                  The {@link ChannelBuffer} to read from. Be aware that it must have at least 5 bytes to read,
767      *                  otherwise it will throw an {@link IllegalArgumentException}.
768      * @return encrypted
769      *                  {@code true} if the {@link ChannelBuffer} is encrypted, {@code false} otherwise.
770      * @throws IllegalArgumentException
771      *                  Is thrown if the given {@link ChannelBuffer} has not at least 5 bytes to read.
772      */
773     public static boolean isEncrypted(ChannelBuffer buffer) {
774         return getEncryptedPacketLength(buffer, buffer.readerIndex()) != -1;
775     }
776 
777     /**
778      * Return how much bytes can be read out of the encrypted data. Be aware that this method will not increase
779      * the readerIndex of the given {@link ChannelBuffer}.
780      *
781      * @param   buffer
782      *                  The {@link ChannelBuffer} to read from. Be aware that it must have at least 5 bytes to read,
783      *                  otherwise it will throw an {@link IllegalArgumentException}.
784      * @return length
785      *                  The length of the encrypted packet that is included in the buffer. This will
786      *                  return {@code -1} if the given {@link ChannelBuffer} is not encrypted at all.
787      * @throws IllegalArgumentException
788      *                  Is thrown if the given {@link ChannelBuffer} has not at least 5 bytes to read.
789      */
790     private static int getEncryptedPacketLength(ChannelBuffer buffer, int offset) {
791         int packetLength = 0;
792 
793         // SSLv3 or TLS - Check ContentType
794         boolean tls;
795         switch (buffer.getUnsignedByte(offset)) {
796             case 20:  // change_cipher_spec
797             case 21:  // alert
798             case 22:  // handshake
799             case 23:  // application_data
800                 tls = true;
801                 break;
802             default:
803                 // SSLv2 or bad data
804                 tls = false;
805         }
806 
807         if (tls) {
808             // SSLv3 or TLS - Check ProtocolVersion
809             int majorVersion = buffer.getUnsignedByte(offset + 1);
810             if (majorVersion == 3) {
811                 // SSLv3 or TLS
812                 packetLength = (getShort(buffer, offset + 3) & 0xFFFF) + 5;
813                 if (packetLength <= 5) {
814                     // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
815                     tls = false;
816                 }
817             } else {
818                 // Neither SSLv3 or TLSv1 (i.e. SSLv2 or bad data)
819                 tls = false;
820             }
821         }
822 
823         if (!tls) {
824             // SSLv2 or bad data - Check the version
825             boolean sslv2 = true;
826             int headerLength = (buffer.getUnsignedByte(offset) & 0x80) != 0 ? 2 : 3;
827             int majorVersion = buffer.getUnsignedByte(offset + headerLength + 1);
828             if (majorVersion == 2 || majorVersion == 3) {
829                 // SSLv2
830                 if (headerLength == 2) {
831                     packetLength = (getShort(buffer, offset) & 0x7FFF) + 2;
832                 } else {
833                     packetLength = (getShort(buffer, offset) & 0x3FFF) + 3;
834                 }
835                 if (packetLength <= headerLength) {
836                     sslv2 = false;
837                 }
838             } else {
839                 sslv2 = false;
840             }
841 
842             if (!sslv2) {
843                 return -1;
844             }
845         }
846         return packetLength;
847     }
848 
849     @Override
850     protected Object decode(
851             final ChannelHandlerContext ctx, Channel channel, ChannelBuffer in) throws Exception {
852 
853         final int startOffset = in.readerIndex();
854         final int endOffset = in.writerIndex();
855         int offset = startOffset;
856         int totalLength = 0;
857 
858         // If we calculated the length of the current SSL record before, use that information.
859         if (packetLength > 0) {
860             if (endOffset - startOffset < packetLength) {
861                 return null;
862             } else {
863                 offset += packetLength;
864                 totalLength = packetLength;
865                 packetLength = 0;
866             }
867         }
868 
869         boolean nonSslRecord = false;
870 
871         while (totalLength < OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
872             final int readableBytes = endOffset - offset;
873             if (readableBytes < 5) {
874                 break;
875             }
876 
877             final int packetLength = getEncryptedPacketLength(in, offset);
878             if (packetLength == -1) {
879                 nonSslRecord = true;
880                 break;
881             }
882 
883             assert packetLength > 0;
884 
885             if (packetLength > readableBytes) {
886                 // wait until the whole packet can be read
887                 this.packetLength = packetLength;
888                 break;
889             }
890 
891             int newTotalLength = totalLength + packetLength;
892             if (newTotalLength > OpenSslEngine.MAX_ENCRYPTED_PACKET_LENGTH) {
893                 // Don't read too much.
894                 break;
895             }
896 
897             // We have a whole packet.
898             // Increment the offset to handle the next packet.
899             offset += packetLength;
900             totalLength = newTotalLength;
901         }
902 
903         ChannelBuffer unwrapped = null;
904         if (totalLength > 0) {
905             // The buffer contains one or more full SSL records.
906             // Slice out the whole packet so unwrap will only be called with complete packets.
907             // Also directly reset the packetLength. This is needed as unwrap(..) may trigger
908             // decode(...) again via:
909             // 1) unwrap(..) is called
910             // 2) wrap(...) is called from within unwrap(...)
911             // 3) wrap(...) calls unwrapLater(...)
912             // 4) unwrapLater(...) calls decode(...)
913             //
914             // See https://github.com/netty/netty/issues/1534
915 
916             in.skipBytes(totalLength);
917             final ByteBuffer inNetBuf = in.toByteBuffer(startOffset, totalLength);
918             unwrapped = unwrap(ctx, channel, inNetBuf, totalLength, true);
919         }
920 
921         if (nonSslRecord) {
922             // Not an SSL/TLS packet
923             NotSslRecordException e = new NotSslRecordException(
924                     "not an SSL/TLS record: " + ChannelBuffers.hexDump(in));
925             in.skipBytes(in.readableBytes());
926             if (closeOnSslException) {
927                 // first trigger the exception and then close the channel
928                 fireExceptionCaught(ctx, e);
929                 Channels.close(ctx, future(channel));
930 
931                 // just return null as we closed the channel before, that
932                 // will take care of cleanup etc
933                 return null;
934             } else {
935                 throw e;
936             }
937         }
938 
939         return unwrapped;
940     }
941 
942     /**
943      * Reads a big-endian short integer from the buffer.  Please note that we do not use
944      * {@link ChannelBuffer#getShort(int)} because it might be a little-endian buffer.
945      */
946     private static short getShort(ChannelBuffer buf, int offset) {
947         return (short) (buf.getByte(offset) << 8 | buf.getByte(offset + 1) & 0xFF);
948     }
949 
950     private void wrap(ChannelHandlerContext context, Channel channel) throws SSLException {
951         ChannelBuffer msg;
952         ByteBuffer outNetBuf = bufferPool.acquireBuffer();
953         boolean success = true;
954         boolean offered = false;
955         boolean needsUnwrap = false;
956         PendingWrite pendingWrite = null;
957 
958         try {
959             loop:
960             for (;;) {
961                 // Acquire a lock to make sure unencrypted data is polled
962                 // in order and their encrypted counterpart is offered in
963                 // order.
964                 pendingUnencryptedWritesLock.lock();
965                 try {
966                     pendingWrite = pendingUnencryptedWrites.peek();
967                     if (pendingWrite == null) {
968                         break;
969                     }
970 
971                     ByteBuffer outAppBuf = pendingWrite.outAppBuf;
972                     if (outAppBuf == null) {
973                         // A write request with an empty buffer
974                         pendingUnencryptedWrites.remove();
975                         offerEncryptedWriteRequest(
976                                 new DownstreamMessageEvent(
977                                         channel, pendingWrite.future,
978                                         ChannelBuffers.EMPTY_BUFFER,
979                                         channel.getRemoteAddress()));
980                         offered = true;
981                     } else {
982                         synchronized (handshakeLock) {
983                             SSLEngineResult result = null;
984                             try {
985                                 result = engine.wrap(outAppBuf, outNetBuf);
986                             } finally {
987                                 if (!outAppBuf.hasRemaining()) {
988                                     pendingUnencryptedWrites.remove();
989                                 }
990                             }
991 
992                             if (result.bytesProduced() > 0) {
993                                 outNetBuf.flip();
994                                 int remaining = outNetBuf.remaining();
995                                 msg = ctx.getChannel().getConfig().getBufferFactory().getBuffer(remaining);
996 
997                                 // Transfer the bytes to the new ChannelBuffer using some safe method that will also
998                                 // work with "non" heap buffers
999                                 //
1000                                 // See https://github.com/netty/netty/issues/329
1001                                 msg.writeBytes(outNetBuf);
1002                                 outNetBuf.clear();
1003 
1004                                 ChannelFuture future;
1005                                 if (pendingWrite.outAppBuf.hasRemaining()) {
1006                                     // pendingWrite's future shouldn't be notified if
1007                                     // only partial data is written.
1008                                     future = succeededFuture(channel);
1009                                 } else {
1010                                     future = pendingWrite.future;
1011                                 }
1012 
1013                                 MessageEvent encryptedWrite = new DownstreamMessageEvent(
1014                                         channel, future, msg, channel.getRemoteAddress());
1015                                 offerEncryptedWriteRequest(encryptedWrite);
1016                                 offered = true;
1017                             } else if (result.getStatus() == Status.CLOSED) {
1018                                 // SSLEngine has been closed already.
1019                                 // Any further write attempts should be denied.
1020                                 success = false;
1021                                 break;
1022                             } else {
1023                                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1024                                 handleRenegotiation(handshakeStatus);
1025                                 switch (handshakeStatus) {
1026                                 case NEED_WRAP:
1027                                     if (outAppBuf.hasRemaining()) {
1028                                         break;
1029                                     } else {
1030                                         break loop;
1031                                     }
1032                                 case NEED_UNWRAP:
1033                                     needsUnwrap = true;
1034                                     break loop;
1035                                 case NEED_TASK:
1036                                     runDelegatedTasks();
1037                                     break;
1038                                 case FINISHED:
1039                                     setHandshakeSuccess(channel);
1040                                     if (result.getStatus() == Status.CLOSED) {
1041                                         success = false;
1042                                     }
1043                                     break loop;
1044                                 case NOT_HANDSHAKING:
1045                                     setHandshakeSuccessIfStillHandshaking(channel);
1046                                     if (result.getStatus() == Status.CLOSED) {
1047                                         success = false;
1048                                     }
1049                                     break loop;
1050                                 default:
1051                                     throw new IllegalStateException(
1052                                             "Unknown handshake status: " +
1053                                             handshakeStatus);
1054                                 }
1055                             }
1056                         }
1057                     }
1058                 } finally {
1059                     pendingUnencryptedWritesLock.unlock();
1060                 }
1061             }
1062         } catch (SSLException e) {
1063             success = false;
1064             setHandshakeFailure(channel, e);
1065             throw e;
1066         } finally {
1067             bufferPool.releaseBuffer(outNetBuf);
1068 
1069             if (offered) {
1070                 flushPendingEncryptedWrites(context);
1071             }
1072 
1073             if (!success) {
1074                 IllegalStateException cause =
1075                     new IllegalStateException("SSLEngine already closed");
1076 
1077                 // Check if we had a pendingWrite in process, if so we need to also notify as otherwise
1078                 // the ChannelFuture will never get notified
1079                 if (pendingWrite != null) {
1080                     pendingWrite.future.setFailure(cause);
1081                 }
1082 
1083                 // Mark all remaining pending writes as failure if anything
1084                 // wrong happened before the write requests are wrapped.
1085                 // Please note that we do not call setFailure while a lock is
1086                 // acquired, to avoid a potential dead lock.
1087                 for (;;) {
1088                     pendingUnencryptedWritesLock.lock();
1089                     try {
1090                         pendingWrite = pendingUnencryptedWrites.poll();
1091                         if (pendingWrite == null) {
1092                             break;
1093                         }
1094                     } finally {
1095                         pendingUnencryptedWritesLock.unlock();
1096                     }
1097 
1098                     pendingWrite.future.setFailure(cause);
1099                 }
1100             }
1101         }
1102 
1103         if (needsUnwrap) {
1104             unwrapNonAppData(ctx, channel, true);
1105         }
1106     }
1107 
1108     private void offerEncryptedWriteRequest(MessageEvent encryptedWrite) {
1109         final boolean locked = pendingEncryptedWritesLock.tryLock();
1110         try {
1111             pendingEncryptedWrites.add(encryptedWrite);
1112         } finally {
1113             if (locked) {
1114                 pendingEncryptedWritesLock.unlock();
1115             }
1116         }
1117     }
1118 
1119     private void flushPendingEncryptedWrites(ChannelHandlerContext ctx) {
1120         while (!pendingEncryptedWrites.isEmpty()) {
1121             // Avoid possible dead lock and data integrity issue
1122             // which is caused by cross communication between more than one channel
1123             // in the same VM.
1124             if (!pendingEncryptedWritesLock.tryLock()) {
1125                 return;
1126             }
1127 
1128             try {
1129                 MessageEvent e;
1130                 while ((e = pendingEncryptedWrites.poll()) != null) {
1131                     ctx.sendDownstream(e);
1132                 }
1133             } finally {
1134                 pendingEncryptedWritesLock.unlock();
1135             }
1136 
1137             // Other thread might have added more elements at this point, so we loop again if the queue got unempty.
1138         }
1139     }
1140 
1141     private ChannelFuture wrapNonAppData(ChannelHandlerContext ctx, Channel channel) throws SSLException {
1142         ChannelFuture future = null;
1143         ByteBuffer outNetBuf = bufferPool.acquireBuffer();
1144 
1145         SSLEngineResult result;
1146         try {
1147             for (;;) {
1148                 synchronized (handshakeLock) {
1149                     result = engine.wrap(EMPTY_BUFFER, outNetBuf);
1150                 }
1151 
1152                 if (result.bytesProduced() > 0) {
1153                     outNetBuf.flip();
1154                     ChannelBuffer msg =
1155                             ctx.getChannel().getConfig().getBufferFactory().getBuffer(outNetBuf.remaining());
1156 
1157                     // Transfer the bytes to the new ChannelBuffer using some safe method that will also
1158                     // work with "non" heap buffers
1159                     //
1160                     // See https://github.com/netty/netty/issues/329
1161                     msg.writeBytes(outNetBuf);
1162                     outNetBuf.clear();
1163 
1164                     future = future(channel);
1165                     future.addListener(new ChannelFutureListener() {
1166                         public void operationComplete(ChannelFuture future)
1167                                 throws Exception {
1168                             if (future.getCause() instanceof ClosedChannelException) {
1169                                 synchronized (ignoreClosedChannelExceptionLock) {
1170                                     ignoreClosedChannelException ++;
1171                                 }
1172                             }
1173                         }
1174                     });
1175 
1176                     write(ctx, future, msg);
1177                 }
1178 
1179                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1180                 handleRenegotiation(handshakeStatus);
1181                 switch (handshakeStatus) {
1182                 case FINISHED:
1183                     setHandshakeSuccess(channel);
1184                     runDelegatedTasks();
1185                     break;
1186                 case NEED_TASK:
1187                     runDelegatedTasks();
1188                     break;
1189                 case NEED_UNWRAP:
1190                     if (!Thread.holdsLock(handshakeLock)) {
1191                         // unwrap shouldn't be called when this method was
1192                         // called by unwrap - unwrap will keep running after
1193                         // this method returns.
1194                         unwrapNonAppData(ctx, channel, true);
1195                     }
1196                     break;
1197                 case NOT_HANDSHAKING:
1198                     if (setHandshakeSuccessIfStillHandshaking(channel)) {
1199                         runDelegatedTasks();
1200                     }
1201                     break;
1202                 case NEED_WRAP:
1203                     break;
1204                 default:
1205                     throw new IllegalStateException(
1206                             "Unexpected handshake status: " + handshakeStatus);
1207                 }
1208 
1209                 if (result.bytesProduced() == 0) {
1210                     break;
1211                 }
1212             }
1213         } catch (SSLException e) {
1214             setHandshakeFailure(channel, e);
1215             throw e;
1216         } finally {
1217             bufferPool.releaseBuffer(outNetBuf);
1218         }
1219 
1220         if (future == null) {
1221             future = succeededFuture(channel);
1222         }
1223 
1224         return future;
1225     }
1226 
1227     /**
1228      * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with an empty buffer to handle handshakes, etc.
1229      */
1230     private void unwrapNonAppData(
1231             ChannelHandlerContext ctx, Channel channel, boolean mightNeedHandshake) throws SSLException {
1232         unwrap(ctx, channel, EMPTY_BUFFER, -1, mightNeedHandshake);
1233     }
1234 
1235     /**
1236      * Unwraps inbound SSL records.
1237      */
1238     private ChannelBuffer unwrap(
1239             ChannelHandlerContext ctx, Channel channel,
1240             ByteBuffer nioInNetBuf,
1241             int initialNettyOutAppBufCapacity, boolean mightNeedHandshake) throws SSLException {
1242 
1243         final int nioInNetBufStartOffset = nioInNetBuf.position();
1244         final ByteBuffer nioOutAppBuf = bufferPool.acquireBuffer();
1245 
1246         ChannelBuffer nettyOutAppBuf = null;
1247 
1248         try {
1249             boolean needsWrap = false;
1250             for (;;) {
1251                 SSLEngineResult result;
1252                 boolean needsHandshake = false;
1253                 if (mightNeedHandshake) {
1254                     synchronized (handshakeLock) {
1255                         if (!handshaken && !handshaking &&
1256                             !engine.getUseClientMode() &&
1257                             !engine.isInboundDone() && !engine.isOutboundDone()) {
1258                             needsHandshake = true;
1259                         }
1260                     }
1261                 }
1262 
1263                 if (needsHandshake) {
1264                     handshake();
1265                 }
1266 
1267                 synchronized (handshakeLock) {
1268                     // Decrypt at least one record in the inbound network buffer.
1269                     // It is impossible to consume no record here because we made sure the inbound network buffer
1270                     // always contain at least one record in decode().  Therefore, if SSLEngine.unwrap() returns
1271                     // BUFFER_OVERFLOW, it is always resolved by retrying after emptying the application buffer.
1272                     for (;;) {
1273                         final int outAppBufSize = engine.getSession().getApplicationBufferSize();
1274                         final ByteBuffer outAppBuf;
1275                         if (nioOutAppBuf.capacity() < outAppBufSize) {
1276                             // SSLEngine wants a buffer larger than what the pool can provide.
1277                             // Allocate a temporary heap buffer.
1278                             outAppBuf = ByteBuffer.allocate(outAppBufSize);
1279                         } else {
1280                             outAppBuf = nioOutAppBuf;
1281                         }
1282 
1283                         try {
1284                             result = engine.unwrap(nioInNetBuf, outAppBuf);
1285                             switch (result.getStatus()) {
1286                                 case CLOSED:
1287                                     // notify about the CLOSED state of the SSLEngine. See #137
1288                                     sslEngineCloseFuture.setClosed();
1289                                     break;
1290                                 case BUFFER_OVERFLOW:
1291                                     // Flush the unwrapped data in the outAppBuf into frame and try again.
1292                                     // See the finally block.
1293                                     continue;
1294                             }
1295 
1296                             break;
1297                         } finally {
1298                             outAppBuf.flip();
1299 
1300                             // Copy the unwrapped data into a smaller buffer.
1301                             if (outAppBuf.hasRemaining()) {
1302                                 if (nettyOutAppBuf == null) {
1303                                     ChannelBufferFactory factory = ctx.getChannel().getConfig().getBufferFactory();
1304                                     nettyOutAppBuf = factory.getBuffer(initialNettyOutAppBufCapacity);
1305                                 }
1306                                 nettyOutAppBuf.writeBytes(outAppBuf);
1307                             }
1308                             outAppBuf.clear();
1309                         }
1310                     }
1311 
1312                     final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1313                     handleRenegotiation(handshakeStatus);
1314                     switch (handshakeStatus) {
1315                     case NEED_UNWRAP:
1316                         break;
1317                     case NEED_WRAP:
1318                         wrapNonAppData(ctx, channel);
1319                         break;
1320                     case NEED_TASK:
1321                         runDelegatedTasks();
1322                         break;
1323                     case FINISHED:
1324                         setHandshakeSuccess(channel);
1325                         needsWrap = true;
1326                         continue;
1327                     case NOT_HANDSHAKING:
1328                         if (setHandshakeSuccessIfStillHandshaking(channel)) {
1329                             needsWrap = true;
1330                             continue;
1331                         }
1332                         if (writeBeforeHandshakeDone) {
1333                             // We need to call wrap(...) in case there was a flush done before the handshake completed.
1334                             //
1335                             // See https://github.com/netty/netty/pull/2437
1336                             writeBeforeHandshakeDone = false;
1337                             needsWrap = true;
1338                         }
1339                         break;
1340                     default:
1341                         throw new IllegalStateException(
1342                                 "Unknown handshake status: " + handshakeStatus);
1343                     }
1344 
1345                     if (result.getStatus() == Status.BUFFER_UNDERFLOW ||
1346                         result.bytesConsumed() == 0 && result.bytesProduced() == 0) {
1347                         if (nioInNetBuf.hasRemaining() && !engine.isInboundDone()) {
1348                             // We expect SSLEngine to consume all the bytes we feed it, but
1349                             // empirical evidence indicates that we sometimes end up with leftovers
1350                             // Log when this happens to get a better understanding of this corner
1351                             // case.
1352                             // See https://github.com/netty/netty/pull/3584
1353                             logger.warn("Unexpected leftover data after SSLEngine.unwrap():"
1354                                     + " status=" + result.getStatus()
1355                                     + " handshakeStatus=" + result.getHandshakeStatus()
1356                                     + " consumed=" + result.bytesConsumed()
1357                                     + " produced=" + result.bytesProduced()
1358                                     + " remaining=" + nioInNetBuf.remaining()
1359                                     + " data=" + ChannelBuffers.hexDump(ChannelBuffers.wrappedBuffer(nioInNetBuf)));
1360                         }
1361                         break;
1362                     }
1363                 }
1364             }
1365 
1366             if (needsWrap) {
1367                 // wrap() acquires pendingUnencryptedWrites first and then
1368                 // handshakeLock.  If handshakeLock is already hold by the
1369                 // current thread, calling wrap() will lead to a dead lock
1370                 // i.e. pendingUnencryptedWrites -> handshakeLock vs.
1371                 //      handshakeLock -> pendingUnencryptedLock -> handshakeLock
1372                 //
1373                 // There is also the same issue between pendingEncryptedWrites
1374                 // and pendingUnencryptedWrites.
1375                 if (!Thread.holdsLock(handshakeLock) && !pendingEncryptedWritesLock.isHeldByCurrentThread()) {
1376                     wrap(ctx, channel);
1377                 }
1378             }
1379         } catch (SSLException e) {
1380             setHandshakeFailure(channel, e);
1381             throw e;
1382         } finally {
1383             bufferPool.releaseBuffer(nioOutAppBuf);
1384         }
1385 
1386         if (nettyOutAppBuf != null && nettyOutAppBuf.readable()) {
1387             return nettyOutAppBuf;
1388         } else {
1389             return null;
1390         }
1391     }
1392 
1393     private void handleRenegotiation(HandshakeStatus handshakeStatus) {
1394         synchronized (handshakeLock) {
1395             if (handshakeStatus == HandshakeStatus.NOT_HANDSHAKING ||
1396                 handshakeStatus == HandshakeStatus.FINISHED) {
1397                 // Not handshaking
1398                 return;
1399             }
1400 
1401             if (!handshaken) {
1402                 // Not renegotiation
1403                 return;
1404             }
1405 
1406             final boolean renegotiate;
1407             if (handshaking) {
1408                 // Renegotiation in progress or failed already.
1409                 // i.e. Renegotiation check has been done already below.
1410                 return;
1411             }
1412 
1413             if (engine.isInboundDone() || engine.isOutboundDone()) {
1414                 // Not handshaking but closing.
1415                 return;
1416             }
1417 
1418             if (isEnableRenegotiation()) {
1419                 // Continue renegotiation.
1420                 renegotiate = true;
1421             } else {
1422                 // Do not renegotiate.
1423                 renegotiate = false;
1424                 // Prevent reentrance of this method.
1425                 handshaking = true;
1426             }
1427 
1428             if (renegotiate) {
1429                 // Renegotiate.
1430                 handshake();
1431             } else {
1432                 // Raise an exception.
1433                 fireExceptionCaught(
1434                         ctx, new SSLException(
1435                                 "renegotiation attempted by peer; " +
1436                                 "closing the connection"));
1437 
1438                 // Close the connection to stop renegotiation.
1439                 Channels.close(ctx, succeededFuture(ctx.getChannel()));
1440             }
1441         }
1442     }
1443 
1444     /**
1445      * Fetches all delegated tasks from the {@link SSLEngine} and runs them via the {@link #delegatedTaskExecutor}.
1446      * If the {@link #delegatedTaskExecutor} is {@link ImmediateExecutor}, just call {@link Runnable#run()} directly
1447      * instead of using {@link Executor#execute(Runnable)}.  Otherwise, run the tasks via
1448      * the {@link #delegatedTaskExecutor} and wait until the tasks are finished.
1449      */
1450     private void runDelegatedTasks() {
1451         if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
1452             for (;;) {
1453                 final Runnable task;
1454                 synchronized (handshakeLock) {
1455                     task = engine.getDelegatedTask();
1456                 }
1457 
1458                 if (task == null) {
1459                     break;
1460                 }
1461 
1462                 delegatedTaskExecutor.execute(task);
1463             }
1464         } else {
1465             final List<Runnable> tasks = new ArrayList<Runnable>(2);
1466             for (;;) {
1467                 final Runnable task;
1468                 synchronized (handshakeLock) {
1469                     task = engine.getDelegatedTask();
1470                 }
1471 
1472                 if (task == null) {
1473                     break;
1474                 }
1475 
1476                 tasks.add(task);
1477             }
1478 
1479             if (tasks.isEmpty()) {
1480                 return;
1481             }
1482 
1483             final CountDownLatch latch = new CountDownLatch(1);
1484             delegatedTaskExecutor.execute(new Runnable() {
1485                 public void run() {
1486                     try {
1487                         for (Runnable task: tasks) {
1488                             task.run();
1489                         }
1490                     } catch (Exception e) {
1491                         fireExceptionCaught(ctx, e);
1492                     } finally {
1493                         latch.countDown();
1494                     }
1495                 }
1496             });
1497 
1498             boolean interrupted = false;
1499             while (latch.getCount() != 0) {
1500                 try {
1501                     latch.await();
1502                 } catch (InterruptedException e) {
1503                     // Interrupt later.
1504                     interrupted = true;
1505                 }
1506             }
1507 
1508             if (interrupted) {
1509                 Thread.currentThread().interrupt();
1510             }
1511         }
1512     }
1513 
1514     /**
1515      * Works around some Android {@link SSLEngine} implementations that skip {@link HandshakeStatus#FINISHED} and
1516      * go straight into {@link HandshakeStatus#NOT_HANDSHAKING} when handshake is finished.
1517      *
1518      * @return {@code true} if and only if the workaround has been applied and thus {@link #handshakeFuture} has been
1519      *         marked as success by this method
1520      */
1521     private boolean setHandshakeSuccessIfStillHandshaking(Channel channel) {
1522         if (handshaking && !handshakeFuture.isDone()) {
1523             setHandshakeSuccess(channel);
1524             return true;
1525         }
1526         return false;
1527     }
1528 
1529     private void setHandshakeSuccess(Channel channel) {
1530         synchronized (handshakeLock) {
1531             handshaking = false;
1532             handshaken = true;
1533 
1534             if (handshakeFuture == null) {
1535                 handshakeFuture = future(channel);
1536             }
1537             cancelHandshakeTimeout();
1538         }
1539 
1540         if (logger.isDebugEnabled()) {
1541             logger.debug(channel + " HANDSHAKEN: " + engine.getSession().getCipherSuite());
1542         }
1543 
1544         handshakeFuture.setSuccess();
1545     }
1546 
1547     private void setHandshakeFailure(Channel channel, SSLException cause) {
1548         synchronized (handshakeLock) {
1549             if (!handshaking) {
1550                 return;
1551             }
1552             handshaking = false;
1553             handshaken = false;
1554 
1555             if (handshakeFuture == null) {
1556                 handshakeFuture = future(channel);
1557             }
1558 
1559             // cancel the timeout now
1560             cancelHandshakeTimeout();
1561 
1562             // Release all resources such as internal buffers that SSLEngine
1563             // is managing.
1564 
1565             engine.closeOutbound();
1566 
1567             try {
1568                 engine.closeInbound();
1569             } catch (SSLException e) {
1570                 if (logger.isDebugEnabled()) {
1571                     logger.debug(
1572                             "SSLEngine.closeInbound() raised an exception after " +
1573                             "a handshake failure.", e);
1574                 }
1575             }
1576         }
1577 
1578         handshakeFuture.setFailure(cause);
1579         if (closeOnSslException) {
1580             Channels.close(ctx, future(channel));
1581         }
1582     }
1583 
1584     private void closeOutboundAndChannel(
1585             final ChannelHandlerContext context, final ChannelStateEvent e) {
1586         if (!e.getChannel().isConnected()) {
1587             context.sendDownstream(e);
1588             return;
1589         }
1590 
1591         // Ensure that the tear-down logic beyond this point is never invoked concurrently nor multiple times.
1592         if (!CLOSED_OUTBOUND_AND_CHANNEL_UPDATER.compareAndSet(this, 0, 1)) {
1593             // The other thread called this method already, and thus the connection will be closed eventually.
1594             // So, just wait until the connection is closed, and then forward the event so that the sink handles
1595             // the duplicate close attempt.
1596             e.getChannel().getCloseFuture().addListener(new ChannelFutureListener() {
1597                 public void operationComplete(ChannelFuture future) throws Exception {
1598                     context.sendDownstream(e);
1599                 }
1600             });
1601             return;
1602         }
1603 
1604         boolean passthrough = true;
1605         try {
1606             try {
1607                 unwrapNonAppData(ctx, e.getChannel(), false);
1608             } catch (SSLException ex) {
1609                 if (logger.isDebugEnabled()) {
1610                     logger.debug("Failed to unwrap before sending a close_notify message", ex);
1611                 }
1612             }
1613 
1614             if (!engine.isOutboundDone()) {
1615                 if (SENT_CLOSE_NOTIFY_UPDATER.compareAndSet(this, 0, 1)) {
1616                     engine.closeOutbound();
1617                     try {
1618                         ChannelFuture closeNotifyFuture = wrapNonAppData(context, e.getChannel());
1619                         closeNotifyFuture.addListener(
1620                                 new ClosingChannelFutureListener(context, e));
1621                         passthrough = false;
1622                     } catch (SSLException ex) {
1623                         if (logger.isDebugEnabled()) {
1624                             logger.debug("Failed to encode a close_notify message", ex);
1625                         }
1626                     }
1627                 }
1628             }
1629         } finally {
1630             if (passthrough) {
1631                 context.sendDownstream(e);
1632             }
1633         }
1634     }
1635 
1636     private static final class PendingWrite {
1637         final ChannelFuture future;
1638         final ByteBuffer outAppBuf;
1639 
1640         PendingWrite(ChannelFuture future, ByteBuffer outAppBuf) {
1641             this.future = future;
1642             this.outAppBuf = outAppBuf;
1643         }
1644     }
1645 
1646     private static final class ClosingChannelFutureListener implements ChannelFutureListener {
1647 
1648         private final ChannelHandlerContext context;
1649         private final ChannelStateEvent e;
1650 
1651         ClosingChannelFutureListener(
1652                 ChannelHandlerContext context, ChannelStateEvent e) {
1653             this.context = context;
1654             this.e = e;
1655         }
1656 
1657         public void operationComplete(ChannelFuture closeNotifyFuture) throws Exception {
1658             if (!(closeNotifyFuture.getCause() instanceof ClosedChannelException)) {
1659                 Channels.close(context, e.getFuture());
1660             } else {
1661                 e.getFuture().setSuccess();
1662             }
1663         }
1664     }
1665 
1666     @Override
1667     public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
1668         super.beforeAdd(ctx);
1669         this.ctx = ctx;
1670     }
1671 
1672     /**
1673      * Fail all pending writes which we were not able to flush out
1674      */
1675     @Override
1676     public void afterRemove(ChannelHandlerContext ctx) throws Exception {
1677         closeEngine();
1678 
1679         // there is no need for synchronization here as we do not receive downstream events anymore
1680         Throwable cause = null;
1681         for (;;) {
1682             PendingWrite pw = pendingUnencryptedWrites.poll();
1683             if (pw == null) {
1684                 break;
1685             }
1686             if (cause == null) {
1687                 cause = new IOException("Unable to write data");
1688             }
1689             pw.future.setFailure(cause);
1690         }
1691 
1692         for (;;) {
1693             MessageEvent ev = pendingEncryptedWrites.poll();
1694             if (ev == null) {
1695                 break;
1696             }
1697             if (cause == null) {
1698                 cause = new IOException("Unable to write data");
1699             }
1700             ev.getFuture().setFailure(cause);
1701         }
1702 
1703         if (cause != null) {
1704             fireExceptionCaughtLater(ctx, cause);
1705         }
1706     }
1707 
1708     /**
1709      * Calls {@link #handshake()} once the {@link Channel} is connected
1710      */
1711     @Override
1712     public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
1713         if (issueHandshake) {
1714             // issue and handshake and add a listener to it which will fire an exception event if
1715             // an exception was thrown while doing the handshake
1716             handshake().addListener(new ChannelFutureListener() {
1717 
1718                 public void operationComplete(ChannelFuture future) throws Exception {
1719                     if (future.isSuccess()) {
1720                         // Send the event upstream after the handshake was completed without an error.
1721                         //
1722                         // See https://github.com/netty/netty/issues/358
1723                         ctx.sendUpstream(e);
1724                     }
1725                 }
1726             });
1727         } else {
1728             super.channelConnected(ctx, e);
1729         }
1730     }
1731 
1732     /**
1733      * Loop over all the pending writes and fail them.
1734      *
1735      * See <a href="https://github.com/netty/netty/issues/305">#305</a> for more details.
1736      */
1737     @Override
1738     public void channelClosed(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
1739         // Move the fail of the writes to the IO-Thread to prevent possible deadlock
1740         // See https://github.com/netty/netty/issues/989
1741         ctx.getPipeline().execute(new Runnable() {
1742             public void run() {
1743                 if (!pendingUnencryptedWritesLock.tryLock()) {
1744                     return;
1745                 }
1746 
1747                 List<ChannelFuture> futures = null;
1748                 try {
1749                     for (;;) {
1750                         PendingWrite pw = pendingUnencryptedWrites.poll();
1751                         if (pw == null) {
1752                             break;
1753                         }
1754                         if (futures == null) {
1755                             futures = new ArrayList<ChannelFuture>();
1756                         }
1757                         futures.add(pw.future);
1758                     }
1759 
1760                     for (;;) {
1761                         MessageEvent ev = pendingEncryptedWrites.poll();
1762                         if (ev == null) {
1763                             break;
1764                         }
1765                         if (futures != null) {
1766                             futures = new ArrayList<ChannelFuture>();
1767                         }
1768                         futures.add(ev.getFuture());
1769                     }
1770                 } finally {
1771                     pendingUnencryptedWritesLock.unlock();
1772                 }
1773 
1774                 if (futures != null) {
1775                     final ClosedChannelException cause = new ClosedChannelException();
1776                     final int size = futures.size();
1777                     for (int i = 0; i < size; i ++) {
1778                         futures.get(i).setFailure(cause);
1779                     }
1780                     fireExceptionCaught(ctx, cause);
1781                 }
1782             }
1783         });
1784 
1785         super.channelClosed(ctx, e);
1786     }
1787 
1788     private final class SSLEngineInboundCloseFuture extends DefaultChannelFuture {
1789         SSLEngineInboundCloseFuture() {
1790             super(null, true);
1791         }
1792 
1793         void setClosed() {
1794             super.setSuccess();
1795         }
1796 
1797         @Override
1798         public Channel getChannel() {
1799             if (ctx == null) {
1800                 // Maybe we should better throw an IllegalStateException() ?
1801                 return null;
1802             } else {
1803                 return ctx.getChannel();
1804             }
1805         }
1806 
1807         @Override
1808         public boolean setSuccess() {
1809             return false;
1810         }
1811 
1812         @Override
1813         public boolean setFailure(Throwable cause) {
1814             return false;
1815         }
1816     }
1817 }