View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.ssl;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.CompositeByteBuf;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractCoalescingBufferQueue;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelException;
27  import io.netty.channel.ChannelFuture;
28  import io.netty.channel.ChannelFutureListener;
29  import io.netty.channel.ChannelHandlerContext;
30  import io.netty.channel.ChannelInboundHandler;
31  import io.netty.channel.ChannelOutboundHandler;
32  import io.netty.channel.ChannelPipeline;
33  import io.netty.channel.ChannelPromise;
34  import io.netty.channel.ChannelPromiseNotifier;
35  import io.netty.handler.codec.ByteToMessageDecoder;
36  import io.netty.handler.codec.UnsupportedMessageTypeException;
37  import io.netty.util.ReferenceCountUtil;
38  import io.netty.util.ReferenceCounted;
39  import io.netty.util.concurrent.DefaultPromise;
40  import io.netty.util.concurrent.EventExecutor;
41  import io.netty.util.concurrent.Future;
42  import io.netty.util.concurrent.FutureListener;
43  import io.netty.util.concurrent.ImmediateExecutor;
44  import io.netty.util.concurrent.Promise;
45  import io.netty.util.concurrent.PromiseNotifier;
46  import io.netty.util.internal.PlatformDependent;
47  import io.netty.util.internal.ThrowableUtil;
48  import io.netty.util.internal.UnstableApi;
49  import io.netty.util.internal.logging.InternalLogger;
50  import io.netty.util.internal.logging.InternalLoggerFactory;
51  
52  import java.io.IOException;
53  import java.net.SocketAddress;
54  import java.nio.ByteBuffer;
55  import java.nio.channels.ClosedChannelException;
56  import java.nio.channels.DatagramChannel;
57  import java.nio.channels.SocketChannel;
58  import java.util.ArrayList;
59  import java.util.List;
60  import java.util.concurrent.CountDownLatch;
61  import java.util.concurrent.Executor;
62  import java.util.concurrent.ScheduledFuture;
63  import java.util.concurrent.TimeUnit;
64  import java.util.regex.Pattern;
65  
66  import javax.net.ssl.SSLEngine;
67  import javax.net.ssl.SSLEngineResult;
68  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
69  import javax.net.ssl.SSLEngineResult.Status;
70  import javax.net.ssl.SSLException;
71  import javax.net.ssl.SSLSession;
72  
73  import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
74  import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
75  
76  /**
77   * Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
78   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
79   * to the <strong>"SecureChat"</strong> example in the distribution or the web
80   * site for the detailed usage.
81   *
82   * <h3>Beginning the handshake</h3>
83   * <p>
84   * Beside using the handshake {@link ChannelFuture} to get notified about the completion of the handshake it's
85   * also possible to detect it by implement the
86   * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
87   * method and check for a {@link SslHandshakeCompletionEvent}.
88   *
89   * <h3>Handshake</h3>
90   * <p>
91   * The handshake will be automatically issued for you once the {@link Channel} is active and
92   * {@link SSLEngine#getUseClientMode()} returns {@code true}.
93   * So no need to bother with it by your self.
94   *
95   * <h3>Closing the session</h3>
96   * <p>
97   * To close the SSL session, the {@link #closeOutbound()} method should be
98   * called to send the {@code close_notify} message to the remote peer. One
99   * exception is when you close the {@link Channel} - {@link SslHandler}
100  * intercepts the close request and send the {@code close_notify} message
101  * before the channel closure automatically.  Once the SSL session is closed,
102  * it is not reusable, and consequently you should create a new
103  * {@link SslHandler} with a new {@link SSLEngine} as explained in the
104  * following section.
105  *
106  * <h3>Restarting the session</h3>
107  * <p>
108  * To restart the SSL session, you must remove the existing closed
109  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
110  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
111  * and start the handshake process as described in the first section.
112  *
113  * <h3>Implementing StartTLS</h3>
114  * <p>
115  * <a href="http://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
116  * communication pattern that secures the wire in the middle of the plaintext
117  * connection.  Please note that it is different from SSL &middot; TLS, that
118  * secures the wire from the beginning of the connection.  Typically, StartTLS
119  * is composed of three steps:
120  * <ol>
121  * <li>Client sends a StartTLS request to server.</li>
122  * <li>Server sends a StartTLS response to client.</li>
123  * <li>Client begins SSL handshake.</li>
124  * </ol>
125  * If you implement a server, you need to:
126  * <ol>
127  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
128  *     to {@code true},</li>
129  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
130  * <li>write a StartTLS response.</li>
131  * </ol>
132  * Please note that you must insert {@link SslHandler} <em>before</em> sending
133  * the StartTLS response.  Otherwise the client can send begin SSL handshake
134  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
135  * data corruption.
136  * <p>
137  * The client-side implementation is much simpler.
138  * <ol>
139  * <li>Write a StartTLS request,</li>
140  * <li>wait for the StartTLS response,</li>
141  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
142  *     to {@code false},</li>
143  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
144  * <li>Initiate SSL handshake.</li>
145  * </ol>
146  *
147  * <h3>Known issues</h3>
148  * <p>
149  * Because of a known issue with the current implementation of the SslEngine that comes
150  * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
151  * <p>
152  * So if you are affected you can workaround this problem by adjust the cache settings
153  * like shown below:
154  *
155  * <pre>
156  *     SslContext context = ...;
157  *     context.getServerSessionContext().setSessionCacheSize(someSaneSize);
158  *     context.getServerSessionContext().setSessionTime(someSameTimeout);
159  * </pre>
160  * <p>
161  * What values to use here depends on the nature of your application and should be set
162  * based on monitoring and debugging of it.
163  * For more details see
164  * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
165  */
166 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
167 
168     private static final InternalLogger logger =
169             InternalLoggerFactory.getInstance(SslHandler.class);
170 
171     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
172             "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
173     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
174             "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
175 
176     /**
177      * Used in {@link #unwrapNonAppData(ChannelHandlerContext)} as input for
178      * {@link #unwrap(ChannelHandlerContext, ByteBuf, int,  int)}.  Using this static instance reduce object
179      * creation as {@link Unpooled#EMPTY_BUFFER#nioBuffer()} creates a new {@link ByteBuffer} everytime.
180      */
181     private static final SSLException SSLENGINE_CLOSED = ThrowableUtil.unknownStackTrace(
182             new SSLException("SSLEngine closed already"), SslHandler.class, "wrap(...)");
183     private static final SSLException HANDSHAKE_TIMED_OUT = ThrowableUtil.unknownStackTrace(
184             new SSLException("handshake timed out"), SslHandler.class, "handshake(...)");
185     private static final ClosedChannelException CHANNEL_CLOSED = ThrowableUtil.unknownStackTrace(
186             new ClosedChannelException(), SslHandler.class, "channelInactive(...)");
187 
188     /**
189      * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
190      * allowed by the TLS RFC.
191      */
192     private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
193 
194     private enum SslEngineType {
195         TCNATIVE(true, COMPOSITE_CUMULATOR) {
196             @Override
197             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
198                     throws SSLException {
199                 int nioBufferCount = in.nioBufferCount();
200                 int writerIndex = out.writerIndex();
201                 final SSLEngineResult result;
202                 if (nioBufferCount > 1) {
203                     /*
204                      * If {@link OpenSslEngine} is in use,
205                      * we can use a special {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} method
206                      * that accepts multiple {@link ByteBuffer}s without additional memory copies.
207                      */
208                     ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
209                     try {
210                         handler.singleBuffer[0] = toByteBuffer(out, writerIndex,
211                             out.writableBytes());
212                         result = opensslEngine.unwrap(in.nioBuffers(readerIndex, len), handler.singleBuffer);
213                     } finally {
214                         handler.singleBuffer[0] = null;
215                     }
216                 } else {
217                     result = handler.engine.unwrap(toByteBuffer(in, readerIndex, len),
218                         toByteBuffer(out, writerIndex, out.writableBytes()));
219                 }
220                 out.writerIndex(writerIndex + result.bytesProduced());
221                 return result;
222             }
223 
224             @Override
225             int getPacketBufferSize(SslHandler handler) {
226                 return ((ReferenceCountedOpenSslEngine) handler.engine).maxEncryptedPacketLength0();
227             }
228 
229             @Override
230             int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents) {
231                 return ((ReferenceCountedOpenSslEngine) handler.engine).calculateMaxLengthForWrap(pendingBytes,
232                                                                                                   numComponents);
233             }
234 
235             @Override
236             int calculatePendingData(SslHandler handler, int guess) {
237                 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
238                 return sslPending > 0 ? sslPending : guess;
239             }
240 
241             @Override
242             boolean jdkCompatibilityMode(SSLEngine engine) {
243                 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
244             }
245         },
246         CONSCRYPT(true, COMPOSITE_CUMULATOR) {
247             @Override
248             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
249                     throws SSLException {
250                 int nioBufferCount = in.nioBufferCount();
251                 int writerIndex = out.writerIndex();
252                 final SSLEngineResult result;
253                 if (nioBufferCount > 1) {
254                     /*
255                      * Use a special unwrap method without additional memory copies.
256                      */
257                     try {
258                         handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
259                         result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
260                                 in.nioBuffers(readerIndex, len),
261                                 handler.singleBuffer);
262                     } finally {
263                         handler.singleBuffer[0] = null;
264                     }
265                 } else {
266                     result = handler.engine.unwrap(toByteBuffer(in, readerIndex, len),
267                             toByteBuffer(out, writerIndex, out.writableBytes()));
268                 }
269                 out.writerIndex(writerIndex + result.bytesProduced());
270                 return result;
271             }
272 
273             @Override
274             int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents) {
275                 return ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents);
276             }
277 
278             @Override
279             int calculatePendingData(SslHandler handler, int guess) {
280                 return guess;
281             }
282 
283             @Override
284             boolean jdkCompatibilityMode(SSLEngine engine) {
285                 return true;
286             }
287         },
288         JDK(false, MERGE_CUMULATOR) {
289             @Override
290             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
291                     throws SSLException {
292                 int writerIndex = out.writerIndex();
293                 ByteBuffer inNioBuffer = toByteBuffer(in, readerIndex, len);
294                 int position = inNioBuffer.position();
295                 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
296                     toByteBuffer(out, writerIndex, out.writableBytes()));
297                 out.writerIndex(writerIndex + result.bytesProduced());
298 
299                 // This is a workaround for a bug in Android 5.0. Android 5.0 does not correctly update the
300                 // SSLEngineResult.bytesConsumed() in some cases and just return 0.
301                 //
302                 // See:
303                 //     - https://android-review.googlesource.com/c/platform/external/conscrypt/+/122080
304                 //     - https://github.com/netty/netty/issues/7758
305                 if (result.bytesConsumed() == 0) {
306                     int consumed = inNioBuffer.position() - position;
307                     if (consumed != result.bytesConsumed()) {
308                         // Create a new SSLEngineResult with the correct bytesConsumed().
309                         return new SSLEngineResult(
310                                 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
311                     }
312                 }
313                 return result;
314             }
315 
316             @Override
317             int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents) {
318                 return handler.engine.getSession().getPacketBufferSize();
319             }
320 
321             @Override
322             int calculatePendingData(SslHandler handler, int guess) {
323                 return guess;
324             }
325 
326             @Override
327             boolean jdkCompatibilityMode(SSLEngine engine) {
328                 return true;
329             }
330         };
331 
332         static SslEngineType forEngine(SSLEngine engine) {
333             return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
334                    engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
335         }
336 
337         SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
338             this.wantsDirectBuffer = wantsDirectBuffer;
339             this.cumulator = cumulator;
340         }
341 
342         int getPacketBufferSize(SslHandler handler) {
343             return handler.engine.getSession().getPacketBufferSize();
344         }
345 
346         abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
347                 throws SSLException;
348 
349         abstract int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents);
350 
351         abstract int calculatePendingData(SslHandler handler, int guess);
352 
353         abstract boolean jdkCompatibilityMode(SSLEngine engine);
354 
355         // BEGIN Platform-dependent flags
356 
357         /**
358          * {@code true} if and only if {@link SSLEngine} expects a direct buffer.
359          */
360         final boolean wantsDirectBuffer;
361 
362         // END Platform-dependent flags
363 
364         /**
365          * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
366          * one {@link ByteBuffer}.
367          *
368          * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
369          * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
370          * and which does not need to do extra memory copies.
371          */
372         final Cumulator cumulator;
373     }
374 
375     private volatile ChannelHandlerContext ctx;
376     private final SSLEngine engine;
377     private final SslEngineType engineType;
378     private final Executor delegatedTaskExecutor;
379     private final boolean jdkCompatibilityMode;
380 
381     /**
382      * Used if {@link SSLEngine#wrap(ByteBuffer[], ByteBuffer)} and {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer[])}
383      * should be called with a {@link ByteBuf} that is only backed by one {@link ByteBuffer} to reduce the object
384      * creation.
385      */
386     private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
387 
388     private final boolean startTls;
389     private boolean sentFirstMessage;
390     private boolean flushedBeforeHandshake;
391     private boolean readDuringHandshake;
392     private boolean handshakeStarted;
393     private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
394     private Promise<Channel> handshakePromise = new LazyChannelPromise();
395     private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
396 
397     /**
398      * Set by wrap*() methods when something is produced.
399      * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
400      */
401     private boolean needsFlush;
402 
403     private boolean outboundClosed;
404     private boolean closeNotify;
405 
406     private int packetLength;
407 
408     /**
409      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
410      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
411      */
412     private boolean firedChannelRead;
413 
414     private volatile long handshakeTimeoutMillis = 10000;
415     private volatile long closeNotifyFlushTimeoutMillis = 3000;
416     private volatile long closeNotifyReadTimeoutMillis;
417     volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
418 
419     /**
420      * Creates a new instance.
421      *
422      * @param engine  the {@link SSLEngine} this handler will use
423      */
424     public SslHandler(SSLEngine engine) {
425         this(engine, false);
426     }
427 
428     /**
429      * Creates a new instance.
430      *
431      * @param engine    the {@link SSLEngine} this handler will use
432      * @param startTls  {@code true} if the first write request shouldn't be
433      *                  encrypted by the {@link SSLEngine}
434      */
435     @SuppressWarnings("deprecation")
436     public SslHandler(SSLEngine engine, boolean startTls) {
437         this(engine, startTls, ImmediateExecutor.INSTANCE);
438     }
439 
440     /**
441      * @deprecated Use {@link #SslHandler(SSLEngine)} instead.
442      */
443     @Deprecated
444     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
445         this(engine, false, delegatedTaskExecutor);
446     }
447 
448     /**
449      * @deprecated Use {@link #SslHandler(SSLEngine, boolean)} instead.
450      */
451     @Deprecated
452     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
453         if (engine == null) {
454             throw new NullPointerException("engine");
455         }
456         if (delegatedTaskExecutor == null) {
457             throw new NullPointerException("delegatedTaskExecutor");
458         }
459         this.engine = engine;
460         engineType = SslEngineType.forEngine(engine);
461         this.delegatedTaskExecutor = delegatedTaskExecutor;
462         this.startTls = startTls;
463         this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
464         setCumulator(engineType.cumulator);
465     }
466 
467     public long getHandshakeTimeoutMillis() {
468         return handshakeTimeoutMillis;
469     }
470 
471     public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
472         if (unit == null) {
473             throw new NullPointerException("unit");
474         }
475 
476         setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
477     }
478 
479     public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
480         if (handshakeTimeoutMillis < 0) {
481             throw new IllegalArgumentException(
482                     "handshakeTimeoutMillis: " + handshakeTimeoutMillis + " (expected: >= 0)");
483         }
484         this.handshakeTimeoutMillis = handshakeTimeoutMillis;
485     }
486 
487     /**
488      * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
489      * <p>
490      * This value will partition data which is passed to write
491      * {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows:
492      * <ul>
493      * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
494      * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
495      * <li>If {@code wrapDataSize > data size}  Else if {@code wrapDataSize <= data size} then we will divide the data
496      * into chunks of {@code wrapDataSize} when writing.</li>
497      * </ul>
498      * <p>
499      * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
500      * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
501      * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
502      * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
503      * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
504      * @param wrapDataSize the number of bytes which will be passed to each
505      *      {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
506      */
507     @UnstableApi
508     public final void setWrapDataSize(int wrapDataSize) {
509         this.wrapDataSize = wrapDataSize;
510     }
511 
512     /**
513      * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
514      */
515     @Deprecated
516     public long getCloseNotifyTimeoutMillis() {
517         return getCloseNotifyFlushTimeoutMillis();
518     }
519 
520     /**
521      * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
522      */
523     @Deprecated
524     public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
525         setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
526     }
527 
528     /**
529      * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
530      */
531     @Deprecated
532     public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
533         setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
534     }
535 
536     /**
537      * Gets the timeout for flushing the close_notify that was triggered by closing the
538      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
539      * forcibly.
540      */
541     public final long getCloseNotifyFlushTimeoutMillis() {
542         return closeNotifyFlushTimeoutMillis;
543     }
544 
545     /**
546      * Sets the timeout for flushing the close_notify that was triggered by closing the
547      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
548      * forcibly.
549      */
550     public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
551         setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
552     }
553 
554     /**
555      * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
556      */
557     public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
558         if (closeNotifyFlushTimeoutMillis < 0) {
559             throw new IllegalArgumentException(
560                     "closeNotifyFlushTimeoutMillis: " + closeNotifyFlushTimeoutMillis + " (expected: >= 0)");
561         }
562         this.closeNotifyFlushTimeoutMillis = closeNotifyFlushTimeoutMillis;
563     }
564 
565     /**
566      * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
567      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
568      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
569      */
570     public final long getCloseNotifyReadTimeoutMillis() {
571         return closeNotifyReadTimeoutMillis;
572     }
573 
574     /**
575      * Sets the timeout  for receiving the response for the close_notify that was triggered by closing the
576      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
577      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
578      */
579     public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
580         setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
581     }
582 
583     /**
584      * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
585      */
586     public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
587         if (closeNotifyReadTimeoutMillis < 0) {
588             throw new IllegalArgumentException(
589                     "closeNotifyReadTimeoutMillis: " + closeNotifyReadTimeoutMillis + " (expected: >= 0)");
590         }
591         this.closeNotifyReadTimeoutMillis = closeNotifyReadTimeoutMillis;
592     }
593 
594     /**
595      * Returns the {@link SSLEngine} which is used by this handler.
596      */
597     public SSLEngine engine() {
598         return engine;
599     }
600 
601     /**
602      * Returns the name of the current application-level protocol.
603      *
604      * @return the protocol name or {@code null} if application-level protocol has not been negotiated
605      */
606     public String applicationProtocol() {
607         SSLEngine engine = engine();
608         if (!(engine instanceof ApplicationProtocolAccessor)) {
609             return null;
610         }
611 
612         return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
613     }
614 
615     /**
616      * Returns a {@link Future} that will get notified once the current TLS handshake completes.
617      *
618      * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
619      *         The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
620      */
621     public Future<Channel> handshakeFuture() {
622         return handshakePromise;
623     }
624 
625     /**
626      * Use {@link #closeOutbound()}
627      */
628     @Deprecated
629     public ChannelFuture close() {
630         return closeOutbound();
631     }
632 
633     /**
634      * Use {@link #closeOutbound(ChannelPromise)}
635      */
636     @Deprecated
637     public ChannelFuture close(ChannelPromise promise) {
638         return closeOutbound(promise);
639     }
640 
641     /**
642      * Sends an SSL {@code close_notify} message to the specified channel and
643      * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
644      * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
645      * {@link ChannelHandlerContext#close()}
646      */
647     public ChannelFuture closeOutbound() {
648         return closeOutbound(ctx.newPromise());
649     }
650 
651     /**
652      * Sends an SSL {@code close_notify} message to the specified channel and
653      * destroys the underlying {@link SSLEngine}. This will <strong>not</strong> close the underlying
654      * {@link Channel}. If you want to also close the {@link Channel} use {@link Channel#close()} or
655      * {@link ChannelHandlerContext#close()}
656      */
657     public ChannelFuture closeOutbound(final ChannelPromise promise) {
658         final ChannelHandlerContext ctx = this.ctx;
659         if (ctx.executor().inEventLoop()) {
660             closeOutbound0(promise);
661         } else {
662             ctx.executor().execute(new Runnable() {
663                 @Override
664                 public void run() {
665                     closeOutbound0(promise);
666                 }
667             });
668         }
669         return promise;
670     }
671 
672     private void closeOutbound0(ChannelPromise promise) {
673         outboundClosed = true;
674         engine.closeOutbound();
675         try {
676             flush(ctx, promise);
677         } catch (Exception e) {
678             if (!promise.tryFailure(e)) {
679                 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
680             }
681         }
682     }
683 
684     /**
685      * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
686      *
687      * This method will return the same {@link Future} all the time.
688      *
689      * @see SSLEngine
690      */
691     public Future<Channel> sslCloseFuture() {
692         return sslClosePromise;
693     }
694 
695     @Override
696     public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
697         if (!pendingUnencryptedWrites.isEmpty()) {
698             // Check if queue is not empty first because create a new ChannelException is expensive
699             pendingUnencryptedWrites.releaseAndFailAll(ctx,
700                     new ChannelException("Pending write on removal of SslHandler"));
701         }
702         pendingUnencryptedWrites = null;
703         if (engine instanceof ReferenceCounted) {
704             ((ReferenceCounted) engine).release();
705         }
706     }
707 
708     @Override
709     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
710         ctx.bind(localAddress, promise);
711     }
712 
713     @Override
714     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
715                         ChannelPromise promise) throws Exception {
716         ctx.connect(remoteAddress, localAddress, promise);
717     }
718 
719     @Override
720     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
721         ctx.deregister(promise);
722     }
723 
724     @Override
725     public void disconnect(final ChannelHandlerContext ctx,
726                            final ChannelPromise promise) throws Exception {
727         closeOutboundAndChannel(ctx, promise, true);
728     }
729 
730     @Override
731     public void close(final ChannelHandlerContext ctx,
732                       final ChannelPromise promise) throws Exception {
733         closeOutboundAndChannel(ctx, promise, false);
734     }
735 
736     @Override
737     public void read(ChannelHandlerContext ctx) throws Exception {
738         if (!handshakePromise.isDone()) {
739             readDuringHandshake = true;
740         }
741 
742         ctx.read();
743     }
744 
745     private static IllegalStateException newPendingWritesNullException() {
746         return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
747     }
748 
749     @Override
750     public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
751         if (!(msg instanceof ByteBuf)) {
752             UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
753             ReferenceCountUtil.safeRelease(msg);
754             promise.setFailure(exception);
755         } else if (pendingUnencryptedWrites == null) {
756             ReferenceCountUtil.safeRelease(msg);
757             promise.setFailure(newPendingWritesNullException());
758         } else {
759             pendingUnencryptedWrites.add((ByteBuf) msg, promise);
760         }
761     }
762 
763     @Override
764     public void flush(ChannelHandlerContext ctx) throws Exception {
765         // Do not encrypt the first write request if this handler is
766         // created with startTLS flag turned on.
767         if (startTls && !sentFirstMessage) {
768             sentFirstMessage = true;
769             pendingUnencryptedWrites.writeAndRemoveAll(ctx);
770             forceFlush(ctx);
771             // Explicit start handshake processing once we send the first message. This will also ensure
772             // we will schedule the timeout if needed.
773             startHandshakeProcessing();
774             return;
775         }
776 
777         try {
778             wrapAndFlush(ctx);
779         } catch (Throwable cause) {
780             setHandshakeFailure(ctx, cause);
781             PlatformDependent.throwException(cause);
782         }
783     }
784 
785     private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
786         if (pendingUnencryptedWrites.isEmpty()) {
787             // It's important to NOT use a voidPromise here as the user
788             // may want to add a ChannelFutureListener to the ChannelPromise later.
789             //
790             // See https://github.com/netty/netty/issues/3364
791             pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
792         }
793         if (!handshakePromise.isDone()) {
794             flushedBeforeHandshake = true;
795         }
796         try {
797             wrap(ctx, false);
798         } finally {
799             // We may have written some parts of data before an exception was thrown so ensure we always flush.
800             // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
801             forceFlush(ctx);
802         }
803     }
804 
805     // This method will not call setHandshakeFailure(...) !
806     private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
807         ByteBuf out = null;
808         ChannelPromise promise = null;
809         ByteBufAllocator alloc = ctx.alloc();
810         boolean needUnwrap = false;
811         ByteBuf buf = null;
812         try {
813             final int wrapDataSize = this.wrapDataSize;
814             // Only continue to loop if the handler was not removed in the meantime.
815             // See https://github.com/netty/netty/issues/5860
816             while (!ctx.isRemoved()) {
817                 promise = ctx.newPromise();
818                 buf = wrapDataSize > 0 ?
819                         pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
820                         pendingUnencryptedWrites.removeFirst(promise);
821                 if (buf == null) {
822                     break;
823                 }
824 
825                 if (out == null) {
826                     out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
827                 }
828 
829                 SSLEngineResult result = wrap(alloc, engine, buf, out);
830 
831                 if (result.getStatus() == Status.CLOSED) {
832                     buf.release();
833                     buf = null;
834                     promise.tryFailure(SSLENGINE_CLOSED);
835                     promise = null;
836                     // SSLEngine has been closed already.
837                     // Any further write attempts should be denied.
838                     pendingUnencryptedWrites.releaseAndFailAll(ctx, SSLENGINE_CLOSED);
839                     return;
840                 } else {
841                     if (buf.isReadable()) {
842                         pendingUnencryptedWrites.addFirst(buf, promise);
843                         // When we add the buffer/promise pair back we need to be sure we don't complete the promise
844                         // later in finishWrap. We only complete the promise if the buffer is completely consumed.
845                         promise = null;
846                     } else {
847                         buf.release();
848                     }
849                     buf = null;
850 
851                     switch (result.getHandshakeStatus()) {
852                         case NEED_TASK:
853                             runDelegatedTasks();
854                             break;
855                         case FINISHED:
856                             setHandshakeSuccess();
857                             // deliberate fall-through
858                         case NOT_HANDSHAKING:
859                             setHandshakeSuccessIfStillHandshaking();
860                             // deliberate fall-through
861                         case NEED_WRAP:
862                             finishWrap(ctx, out, promise, inUnwrap, false);
863                             promise = null;
864                             out = null;
865                             break;
866                         case NEED_UNWRAP:
867                             needUnwrap = true;
868                             return;
869                         default:
870                             throw new IllegalStateException(
871                                     "Unknown handshake status: " + result.getHandshakeStatus());
872                     }
873                 }
874             }
875         } finally {
876             // Ownership of buffer was not transferred, release it.
877             if (buf != null) {
878                 buf.release();
879             }
880             finishWrap(ctx, out, promise, inUnwrap, needUnwrap);
881         }
882     }
883 
884     private void finishWrap(ChannelHandlerContext ctx, ByteBuf out, ChannelPromise promise, boolean inUnwrap,
885             boolean needUnwrap) {
886         if (out == null) {
887             out = Unpooled.EMPTY_BUFFER;
888         } else if (!out.isReadable()) {
889             out.release();
890             out = Unpooled.EMPTY_BUFFER;
891         }
892 
893         if (promise != null) {
894             ctx.write(out, promise);
895         } else {
896             ctx.write(out);
897         }
898 
899         if (inUnwrap) {
900             needsFlush = true;
901         }
902 
903         if (needUnwrap) {
904             // The underlying engine is starving so we need to feed it with more data.
905             // See https://github.com/netty/netty/pull/5039
906             readIfNeeded(ctx);
907         }
908     }
909 
910     /**
911      * This method will not call
912      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
913      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
914      * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
915      */
916     private boolean wrapNonAppData(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
917         ByteBuf out = null;
918         ByteBufAllocator alloc = ctx.alloc();
919         try {
920             // Only continue to loop if the handler was not removed in the meantime.
921             // See https://github.com/netty/netty/issues/5860
922             while (!ctx.isRemoved()) {
923                 if (out == null) {
924                     // As this is called for the handshake we have no real idea how big the buffer needs to be.
925                     // That said 2048 should give us enough room to include everything like ALPN / NPN data.
926                     // If this is not enough we will increase the buffer in wrap(...).
927                     out = allocateOutNetBuf(ctx, 2048, 1);
928                 }
929                 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
930 
931                 if (result.bytesProduced() > 0) {
932                     ctx.write(out);
933                     if (inUnwrap) {
934                         needsFlush = true;
935                     }
936                     out = null;
937                 }
938 
939                 switch (result.getHandshakeStatus()) {
940                     case FINISHED:
941                         setHandshakeSuccess();
942                         return false;
943                     case NEED_TASK:
944                         runDelegatedTasks();
945                         break;
946                     case NEED_UNWRAP:
947                         if (inUnwrap) {
948                             // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
949                             // no use in trying to call wrap again because we have already attempted (or will after we
950                             // return) to feed more data to the engine.
951                             return false;
952                         }
953 
954                         unwrapNonAppData(ctx);
955                         break;
956                     case NEED_WRAP:
957                         break;
958                     case NOT_HANDSHAKING:
959                         setHandshakeSuccessIfStillHandshaking();
960                         // Workaround for TLS False Start problem reported at:
961                         // https://github.com/netty/netty/issues/1108#issuecomment-14266970
962                         if (!inUnwrap) {
963                             unwrapNonAppData(ctx);
964                         }
965                         return true;
966                     default:
967                         throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
968                 }
969 
970                 if (result.bytesProduced() == 0) {
971                     break;
972                 }
973 
974                 // It should not consume empty buffers when it is not handshaking
975                 // Fix for Android, where it was encrypting empty buffers even when not handshaking
976                 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
977                     break;
978                 }
979             }
980         }  finally {
981             if (out != null) {
982                 out.release();
983             }
984         }
985         return false;
986     }
987 
988     private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
989             throws SSLException {
990         ByteBuf newDirectIn = null;
991         try {
992             int readerIndex = in.readerIndex();
993             int readableBytes = in.readableBytes();
994 
995             // We will call SslEngine.wrap(ByteBuffer[], ByteBuffer) to allow efficient handling of
996             // CompositeByteBuf without force an extra memory copy when CompositeByteBuffer.nioBuffer() is called.
997             final ByteBuffer[] in0;
998             if (in.isDirect() || !engineType.wantsDirectBuffer) {
999                 // As CompositeByteBuf.nioBufferCount() can be expensive (as it needs to check all composed ByteBuf
1000                 // to calculate the count) we will just assume a CompositeByteBuf contains more then 1 ByteBuf.
1001                 // The worst that can happen is that we allocate an extra ByteBuffer[] in CompositeByteBuf.nioBuffers()
1002                 // which is better then walking the composed ByteBuf in most cases.
1003                 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1004                     in0 = singleBuffer;
1005                     // We know its only backed by 1 ByteBuffer so use internalNioBuffer to keep object allocation
1006                     // to a minimum.
1007                     in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1008                 } else {
1009                     in0 = in.nioBuffers();
1010                 }
1011             } else {
1012                 // We could even go further here and check if its a CompositeByteBuf and if so try to decompose it and
1013                 // only replace the ByteBuffer that are not direct. At the moment we just will replace the whole
1014                 // CompositeByteBuf to keep the complexity to a minimum
1015                 newDirectIn = alloc.directBuffer(readableBytes);
1016                 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1017                 in0 = singleBuffer;
1018                 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1019             }
1020 
1021             for (;;) {
1022                 ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes());
1023                 SSLEngineResult result = engine.wrap(in0, out0);
1024                 in.skipBytes(result.bytesConsumed());
1025                 out.writerIndex(out.writerIndex() + result.bytesProduced());
1026 
1027                 switch (result.getStatus()) {
1028                 case BUFFER_OVERFLOW:
1029                     out.ensureWritable(engine.getSession().getPacketBufferSize());
1030                     break;
1031                 default:
1032                     return result;
1033                 }
1034             }
1035         } finally {
1036             // Null out to allow GC of ByteBuffer
1037             singleBuffer[0] = null;
1038 
1039             if (newDirectIn != null) {
1040                 newDirectIn.release();
1041             }
1042         }
1043     }
1044 
1045     @Override
1046     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1047         // Make sure to release SSLEngine,
1048         // and notify the handshake future if the connection has been closed during handshake.
1049         setHandshakeFailure(ctx, CHANNEL_CLOSED, !outboundClosed, handshakeStarted, false);
1050 
1051         // Ensure we always notify the sslClosePromise as well
1052         notifyClosePromise(CHANNEL_CLOSED);
1053 
1054         super.channelInactive(ctx);
1055     }
1056 
1057     @Override
1058     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1059         if (ignoreException(cause)) {
1060             // It is safe to ignore the 'connection reset by peer' or
1061             // 'broken pipe' error after sending close_notify.
1062             if (logger.isDebugEnabled()) {
1063                 logger.debug(
1064                         "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1065                         "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1066             }
1067 
1068             // Close the connection explicitly just in case the transport
1069             // did not close the connection automatically.
1070             if (ctx.channel().isActive()) {
1071                 ctx.close();
1072             }
1073         } else {
1074             ctx.fireExceptionCaught(cause);
1075         }
1076     }
1077 
1078     /**
1079      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
1080      *
1081      * When an ssl connection is closed a close_notify message is sent.
1082      * After that the peer also sends close_notify however, it's not mandatory to receive
1083      * the close_notify. The party who sent the initial close_notify can close the connection immediately
1084      * then the peer will get connection reset error.
1085      *
1086      */
1087     private boolean ignoreException(Throwable t) {
1088         if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1089             String message = t.getMessage();
1090 
1091             // first try to match connection reset / broke peer based on the regex. This is the fastest way
1092             // but may fail on different jdk impls or OS's
1093             if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1094                 return true;
1095             }
1096 
1097             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
1098             StackTraceElement[] elements = t.getStackTrace();
1099             for (StackTraceElement element: elements) {
1100                 String classname = element.getClassName();
1101                 String methodname = element.getMethodName();
1102 
1103                 // skip all classes that belong to the io.netty package
1104                 if (classname.startsWith("io.netty.")) {
1105                     continue;
1106                 }
1107 
1108                 // check if the method name is read if not skip it
1109                 if (!"read".equals(methodname)) {
1110                     continue;
1111                 }
1112 
1113                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
1114                 // also others
1115                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1116                     return true;
1117                 }
1118 
1119                 try {
1120                     // No match by now.. Try to load the class via classloader and inspect it.
1121                     // This is mainly done as other JDK implementations may differ in name of
1122                     // the impl.
1123                     Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1124 
1125                     if (SocketChannel.class.isAssignableFrom(clazz)
1126                             || DatagramChannel.class.isAssignableFrom(clazz)) {
1127                         return true;
1128                     }
1129 
1130                     // also match against SctpChannel via String matching as it may not present.
1131                     if (PlatformDependent.javaVersion() >= 7
1132                             && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1133                         return true;
1134                     }
1135                 } catch (Throwable cause) {
1136                     logger.debug("Unexpected exception while loading class {} classname {}",
1137                                  getClass(), classname, cause);
1138                 }
1139             }
1140         }
1141 
1142         return false;
1143     }
1144 
1145     /**
1146      * Returns {@code true} if the given {@link ByteBuf} is encrypted. Be aware that this method
1147      * will not increase the readerIndex of the given {@link ByteBuf}.
1148      *
1149      * @param   buffer
1150      *                  The {@link ByteBuf} to read from. Be aware that it must have at least 5 bytes to read,
1151      *                  otherwise it will throw an {@link IllegalArgumentException}.
1152      * @return encrypted
1153      *                  {@code true} if the {@link ByteBuf} is encrypted, {@code false} otherwise.
1154      * @throws IllegalArgumentException
1155      *                  Is thrown if the given {@link ByteBuf} has not at least 5 bytes to read.
1156      */
1157     public static boolean isEncrypted(ByteBuf buffer) {
1158         if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1159             throw new IllegalArgumentException(
1160                     "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1161         }
1162         return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1163     }
1164 
1165     private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1166         int packetLength = this.packetLength;
1167         // If we calculated the length of the current SSL record before, use that information.
1168         if (packetLength > 0) {
1169             if (in.readableBytes() < packetLength) {
1170                 return;
1171             }
1172         } else {
1173             // Get the packet length and wait until we get a packets worth of data to unwrap.
1174             final int readableBytes = in.readableBytes();
1175             if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1176                 return;
1177             }
1178             packetLength = getEncryptedPacketLength(in, in.readerIndex());
1179             if (packetLength == SslUtils.NOT_ENCRYPTED) {
1180                 // Not an SSL/TLS packet
1181                 NotSslRecordException e = new NotSslRecordException(
1182                         "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1183                 in.skipBytes(in.readableBytes());
1184 
1185                 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1186                 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1187                 setHandshakeFailure(ctx, e);
1188 
1189                 throw e;
1190             }
1191             assert packetLength > 0;
1192             if (packetLength > readableBytes) {
1193                 // wait until the whole packet can be read
1194                 this.packetLength = packetLength;
1195                 return;
1196             }
1197         }
1198 
1199         // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1200         // be consumed by the SSLEngine.
1201         this.packetLength = 0;
1202         try {
1203             int bytesConsumed = unwrap(ctx, in, in.readerIndex(), packetLength);
1204             assert bytesConsumed == packetLength || engine.isInboundDone() :
1205                     "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1206                             bytesConsumed;
1207             in.skipBytes(bytesConsumed);
1208         } catch (Throwable cause) {
1209             handleUnwrapThrowable(ctx, cause);
1210         }
1211     }
1212 
1213     private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1214         try {
1215             in.skipBytes(unwrap(ctx, in, in.readerIndex(), in.readableBytes()));
1216         } catch (Throwable cause) {
1217             handleUnwrapThrowable(ctx, cause);
1218         }
1219     }
1220 
1221     private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1222         try {
1223             // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1224             // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1225             // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1226             // has been closed.
1227             if (handshakePromise.tryFailure(cause)) {
1228                 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1229             }
1230 
1231             // We need to flush one time as there may be an alert that we should send to the remote peer because
1232             // of the SSLException reported here.
1233             wrapAndFlush(ctx);
1234         } catch (SSLException ex) {
1235             logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1236                     " because of an previous SSLException, ignoring...", ex);
1237         } finally {
1238             // ensure we always flush and close the channel.
1239             setHandshakeFailure(ctx, cause, true, false, true);
1240         }
1241         PlatformDependent.throwException(cause);
1242     }
1243 
1244     @Override
1245     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1246         if (jdkCompatibilityMode) {
1247             decodeJdkCompatible(ctx, in);
1248         } else {
1249             decodeNonJdkCompatible(ctx, in);
1250         }
1251     }
1252 
1253     @Override
1254     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1255         // Discard bytes of the cumulation buffer if needed.
1256         discardSomeReadBytes();
1257 
1258         flushIfNeeded(ctx);
1259         readIfNeeded(ctx);
1260 
1261         firedChannelRead = false;
1262         ctx.fireChannelReadComplete();
1263     }
1264 
1265     private void readIfNeeded(ChannelHandlerContext ctx) {
1266         // If handshake is not finished yet, we need more data.
1267         if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) {
1268             // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1269             // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1270             ctx.read();
1271         }
1272     }
1273 
1274     private void flushIfNeeded(ChannelHandlerContext ctx) {
1275         if (needsFlush) {
1276             forceFlush(ctx);
1277         }
1278     }
1279 
1280     /**
1281      * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with an empty buffer to handle handshakes, etc.
1282      */
1283     private void unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1284         unwrap(ctx, Unpooled.EMPTY_BUFFER, 0, 0);
1285     }
1286 
1287     /**
1288      * Unwraps inbound SSL records.
1289      */
1290     private int unwrap(
1291             ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException {
1292         final int originalLength = length;
1293         boolean wrapLater = false;
1294         boolean notifyClosure = false;
1295         int overflowReadableBytes = -1;
1296         ByteBuf decodeOut = allocate(ctx, length);
1297         try {
1298             // Only continue to loop if the handler was not removed in the meantime.
1299             // See https://github.com/netty/netty/issues/5860
1300             unwrapLoop: while (!ctx.isRemoved()) {
1301                 final SSLEngineResult result = engineType.unwrap(this, packet, offset, length, decodeOut);
1302                 final Status status = result.getStatus();
1303                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1304                 final int produced = result.bytesProduced();
1305                 final int consumed = result.bytesConsumed();
1306 
1307                 // Update indexes for the next iteration
1308                 offset += consumed;
1309                 length -= consumed;
1310 
1311                 switch (status) {
1312                 case BUFFER_OVERFLOW:
1313                     final int readableBytes = decodeOut.readableBytes();
1314                     final int previousOverflowReadableBytes = overflowReadableBytes;
1315                     overflowReadableBytes = readableBytes;
1316                     int bufferSize = engine.getSession().getApplicationBufferSize() - readableBytes;
1317                     if (readableBytes > 0) {
1318                         firedChannelRead = true;
1319                         ctx.fireChannelRead(decodeOut);
1320 
1321                         // This buffer was handled, null it out.
1322                         decodeOut = null;
1323                         if (bufferSize <= 0) {
1324                             // It may happen that readableBytes >= engine.getSession().getApplicationBufferSize()
1325                             // while there is still more to unwrap, in this case we will just allocate a new buffer
1326                             // with the capacity of engine.getSession().getApplicationBufferSize() and call unwrap
1327                             // again.
1328                             bufferSize = engine.getSession().getApplicationBufferSize();
1329                         }
1330                     } else {
1331                         // This buffer was handled, null it out.
1332                         decodeOut.release();
1333                         decodeOut = null;
1334                     }
1335                     if (readableBytes == 0 && previousOverflowReadableBytes == 0) {
1336                         // If there is two consecutive loops where we overflow and are not able to consume any data,
1337                         // assume the amount of data exceeds the maximum amount for the engine and bail
1338                         throw new IllegalStateException("Two consecutive overflows but no content was consumed. " +
1339                                  SSLSession.class.getSimpleName() + " getApplicationBufferSize: " +
1340                                  engine.getSession().getApplicationBufferSize() + " maybe too small.");
1341                     }
1342                     // Allocate a new buffer which can hold all the rest data and loop again.
1343                     // TODO: We may want to reconsider how we calculate the length here as we may
1344                     // have more then one ssl message to decode.
1345                     decodeOut = allocate(ctx, engineType.calculatePendingData(this, bufferSize));
1346                     continue;
1347                 case CLOSED:
1348                     // notify about the CLOSED state of the SSLEngine. See #137
1349                     notifyClosure = true;
1350                     overflowReadableBytes = -1;
1351                     break;
1352                 default:
1353                     overflowReadableBytes = -1;
1354                     break;
1355                 }
1356 
1357                 switch (handshakeStatus) {
1358                     case NEED_UNWRAP:
1359                         break;
1360                     case NEED_WRAP:
1361                         // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1362                         // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1363                         // costly unwrap operation and break out of the loop.
1364                         if (wrapNonAppData(ctx, true) && length == 0) {
1365                             break unwrapLoop;
1366                         }
1367                         break;
1368                     case NEED_TASK:
1369                         runDelegatedTasks();
1370                         break;
1371                     case FINISHED:
1372                         setHandshakeSuccess();
1373                         wrapLater = true;
1374 
1375                         // We 'break' here and NOT 'continue' as android API version 21 has a bug where they consume
1376                         // data from the buffer but NOT correctly set the SSLEngineResult.bytesConsumed().
1377                         // Because of this it will raise an exception on the next iteration of the for loop on android
1378                         // API version 21. Just doing a break will work here as produced and consumed will both be 0
1379                         // and so we break out of the complete for (;;) loop and so call decode(...) again later on.
1380                         // On other platforms this will have no negative effect as we will just continue with the
1381                         // for (;;) loop if something was either consumed or produced.
1382                         //
1383                         // See:
1384                         //  - https://github.com/netty/netty/issues/4116
1385                         //  - https://code.google.com/p/android/issues/detail?id=198639&thanks=198639&ts=1452501203
1386                         break;
1387                     case NOT_HANDSHAKING:
1388                         if (setHandshakeSuccessIfStillHandshaking()) {
1389                             wrapLater = true;
1390                             continue;
1391                         }
1392 
1393                         // If we are not handshaking and there is no more data to unwrap then the next call to unwrap
1394                         // will not produce any data. We can avoid the potentially costly unwrap operation and break
1395                         // out of the loop.
1396                         if (length == 0) {
1397                             break unwrapLoop;
1398                         }
1399                         break;
1400                     default:
1401                         throw new IllegalStateException("unknown handshake status: " + handshakeStatus);
1402                 }
1403 
1404                 if (status == Status.BUFFER_UNDERFLOW || consumed == 0 && produced == 0) {
1405                     if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1406                         // The underlying engine is starving so we need to feed it with more data.
1407                         // See https://github.com/netty/netty/pull/5039
1408                         readIfNeeded(ctx);
1409                     }
1410 
1411                     break;
1412                 }
1413             }
1414 
1415             if (flushedBeforeHandshake && handshakePromise.isDone()) {
1416                 // We need to call wrap(...) in case there was a flush done before the handshake completed to ensure
1417                 // we do not stale.
1418                 //
1419                 // See https://github.com/netty/netty/pull/2437
1420                 flushedBeforeHandshake = false;
1421                 wrapLater = true;
1422             }
1423 
1424             if (wrapLater) {
1425                 wrap(ctx, true);
1426             }
1427 
1428             if (notifyClosure) {
1429                 notifyClosePromise(null);
1430             }
1431         } finally {
1432             if (decodeOut != null) {
1433                 if (decodeOut.isReadable()) {
1434                     firedChannelRead = true;
1435 
1436                     ctx.fireChannelRead(decodeOut);
1437                 } else {
1438                     decodeOut.release();
1439                 }
1440             }
1441         }
1442         return originalLength - length;
1443     }
1444 
1445     private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1446         return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1447                 out.nioBuffer(index, len);
1448     }
1449 
1450     /**
1451      * Fetches all delegated tasks from the {@link SSLEngine} and runs them via the {@link #delegatedTaskExecutor}.
1452      * If the {@link #delegatedTaskExecutor} is {@link ImmediateExecutor}, just call {@link Runnable#run()} directly
1453      * instead of using {@link Executor#execute(Runnable)}.  Otherwise, run the tasks via
1454      * the {@link #delegatedTaskExecutor} and wait until the tasks are finished.
1455      */
1456     private void runDelegatedTasks() {
1457         if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
1458             for (;;) {
1459                 Runnable task = engine.getDelegatedTask();
1460                 if (task == null) {
1461                     break;
1462                 }
1463 
1464                 task.run();
1465             }
1466         } else {
1467             final List<Runnable> tasks = new ArrayList<Runnable>(2);
1468             for (;;) {
1469                 final Runnable task = engine.getDelegatedTask();
1470                 if (task == null) {
1471                     break;
1472                 }
1473 
1474                 tasks.add(task);
1475             }
1476 
1477             if (tasks.isEmpty()) {
1478                 return;
1479             }
1480 
1481             final CountDownLatch latch = new CountDownLatch(1);
1482             delegatedTaskExecutor.execute(new Runnable() {
1483                 @Override
1484                 public void run() {
1485                     try {
1486                         for (Runnable task: tasks) {
1487                             task.run();
1488                         }
1489                     } catch (Exception e) {
1490                         ctx.fireExceptionCaught(e);
1491                     } finally {
1492                         latch.countDown();
1493                     }
1494                 }
1495             });
1496 
1497             boolean interrupted = false;
1498             while (latch.getCount() != 0) {
1499                 try {
1500                     latch.await();
1501                 } catch (InterruptedException e) {
1502                     // Interrupt later.
1503                     interrupted = true;
1504                 }
1505             }
1506 
1507             if (interrupted) {
1508                 Thread.currentThread().interrupt();
1509             }
1510         }
1511     }
1512 
1513     /**
1514      * Works around some Android {@link SSLEngine} implementations that skip {@link HandshakeStatus#FINISHED} and
1515      * go straight into {@link HandshakeStatus#NOT_HANDSHAKING} when handshake is finished.
1516      *
1517      * @return {@code true} if and only if the workaround has been applied and thus {@link #handshakeFuture} has been
1518      *         marked as success by this method
1519      */
1520     private boolean setHandshakeSuccessIfStillHandshaking() {
1521         if (!handshakePromise.isDone()) {
1522             setHandshakeSuccess();
1523             return true;
1524         }
1525         return false;
1526     }
1527 
1528     /**
1529      * Notify all the handshake futures about the successfully handshake
1530      */
1531     private void setHandshakeSuccess() {
1532         handshakePromise.trySuccess(ctx.channel());
1533 
1534         if (logger.isDebugEnabled()) {
1535             logger.debug("{} HANDSHAKEN: {}", ctx.channel(), engine.getSession().getCipherSuite());
1536         }
1537         ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1538 
1539         if (readDuringHandshake && !ctx.channel().config().isAutoRead()) {
1540             readDuringHandshake = false;
1541             ctx.read();
1542         }
1543     }
1544 
1545     /**
1546      * Notify all the handshake futures about the failure during the handshake.
1547      */
1548     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1549         setHandshakeFailure(ctx, cause, true, true, false);
1550     }
1551 
1552     /**
1553      * Notify all the handshake futures about the failure during the handshake.
1554      */
1555     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1556                                      boolean notify, boolean alwaysFlushAndClose) {
1557         try {
1558             // Release all resources such as internal buffers that SSLEngine
1559             // is managing.
1560             outboundClosed = true;
1561             engine.closeOutbound();
1562 
1563             if (closeInbound) {
1564                 try {
1565                     engine.closeInbound();
1566                 } catch (SSLException e) {
1567                     if (logger.isDebugEnabled()) {
1568                         // only log in debug mode as it most likely harmless and latest chrome still trigger
1569                         // this all the time.
1570                         //
1571                         // See https://github.com/netty/netty/issues/1340
1572                         String msg = e.getMessage();
1573                         if (msg == null || !(msg.contains("possible truncation attack") ||
1574                                 msg.contains("closing inbound before receiving peer's close_notify"))) {
1575                             logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1576                         }
1577                     }
1578                 }
1579             }
1580             if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1581                 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1582             }
1583         } finally {
1584             // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
1585             releaseAndFailAll(cause);
1586         }
1587     }
1588 
1589     private void releaseAndFailAll(Throwable cause) {
1590         if (pendingUnencryptedWrites != null) {
1591             pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
1592         }
1593     }
1594 
1595     private void notifyClosePromise(Throwable cause) {
1596         if (cause == null) {
1597             if (sslClosePromise.trySuccess(ctx.channel())) {
1598                 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
1599             }
1600         } else {
1601             if (sslClosePromise.tryFailure(cause)) {
1602                 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
1603             }
1604         }
1605     }
1606 
1607     private void closeOutboundAndChannel(
1608             final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
1609         outboundClosed = true;
1610         engine.closeOutbound();
1611 
1612         if (!ctx.channel().isActive()) {
1613             if (disconnect) {
1614                 ctx.disconnect(promise);
1615             } else {
1616                 ctx.close(promise);
1617             }
1618             return;
1619         }
1620 
1621         ChannelPromise closeNotifyPromise = ctx.newPromise();
1622         try {
1623             flush(ctx, closeNotifyPromise);
1624         } finally {
1625             if (!closeNotify) {
1626                 closeNotify = true;
1627                 // It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....)
1628                 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
1629                 // to fail the promise because of this. This will then fail as it was already completed by
1630                 // safeClose(...). We create a new ChannelPromise and try to notify the original ChannelPromise
1631                 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
1632                 // because of a propagated Exception.
1633                 //
1634                 // See https://github.com/netty/netty/issues/5931
1635                 safeClose(ctx, closeNotifyPromise, ctx.newPromise().addListener(
1636                         new ChannelPromiseNotifier(false, promise)));
1637             } else {
1638                 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
1639                 sslClosePromise.addListener(new FutureListener<Channel>() {
1640                     @Override
1641                     public void operationComplete(Future<Channel> future) {
1642                         promise.setSuccess();
1643                     }
1644                 });
1645             }
1646         }
1647     }
1648 
1649     private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1650         if (pendingUnencryptedWrites != null) {
1651             pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
1652         } else {
1653             promise.setFailure(newPendingWritesNullException());
1654         }
1655         flush(ctx);
1656     }
1657 
1658     @Override
1659     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
1660         this.ctx = ctx;
1661 
1662         pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16);
1663         if (ctx.channel().isActive()) {
1664             startHandshakeProcessing();
1665         }
1666     }
1667 
1668     private void startHandshakeProcessing() {
1669         if (!handshakeStarted) {
1670             handshakeStarted = true;
1671             if (engine.getUseClientMode()) {
1672                 // Begin the initial handshake.
1673                 // channelActive() event has been fired already, which means this.channelActive() will
1674                 // not be invoked. We have to initialize here instead.
1675                 handshake();
1676             }
1677             applyHandshakeTimeout();
1678         }
1679     }
1680 
1681     /**
1682      * Performs TLS renegotiation.
1683      */
1684     public Future<Channel> renegotiate() {
1685         ChannelHandlerContext ctx = this.ctx;
1686         if (ctx == null) {
1687             throw new IllegalStateException();
1688         }
1689 
1690         return renegotiate(ctx.executor().<Channel>newPromise());
1691     }
1692 
1693     /**
1694      * Performs TLS renegotiation.
1695      */
1696     public Future<Channel> renegotiate(final Promise<Channel> promise) {
1697         if (promise == null) {
1698             throw new NullPointerException("promise");
1699         }
1700 
1701         ChannelHandlerContext ctx = this.ctx;
1702         if (ctx == null) {
1703             throw new IllegalStateException();
1704         }
1705 
1706         EventExecutor executor = ctx.executor();
1707         if (!executor.inEventLoop()) {
1708             executor.execute(new Runnable() {
1709                 @Override
1710                 public void run() {
1711                     renegotiateOnEventLoop(promise);
1712                 }
1713             });
1714             return promise;
1715         }
1716 
1717         renegotiateOnEventLoop(promise);
1718         return promise;
1719     }
1720 
1721     private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
1722         final Promise<Channel> oldHandshakePromise = handshakePromise;
1723         if (!oldHandshakePromise.isDone()) {
1724             // There's no need to handshake because handshake is in progress already.
1725             // Merge the new promise into the old one.
1726             oldHandshakePromise.addListener(new PromiseNotifier<Channel, Future<Channel>>(newHandshakePromise));
1727         } else {
1728             handshakePromise = newHandshakePromise;
1729             handshake();
1730             applyHandshakeTimeout();
1731         }
1732     }
1733 
1734     /**
1735      * Performs TLS (re)negotiation.
1736      */
1737     private void handshake() {
1738         if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
1739             // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
1740             // is in progress. See https://github.com/netty/netty/issues/4718.
1741             return;
1742         } else {
1743             if (handshakePromise.isDone()) {
1744                 // If the handshake is done already lets just return directly as there is no need to trigger it again.
1745                 // This can happen if the handshake(...) was triggered before we called channelActive(...) by a
1746                 // flush() that was triggered by a ChannelFutureListener that was added to the ChannelFuture returned
1747                 // from the connect(...) method. In this case we will see the flush() happen before we had a chance to
1748                 // call fireChannelActive() on the pipeline.
1749                 return;
1750             }
1751         }
1752 
1753         // Begin handshake.
1754         final ChannelHandlerContext ctx = this.ctx;
1755         try {
1756             engine.beginHandshake();
1757             wrapNonAppData(ctx, false);
1758         } catch (Throwable e) {
1759             setHandshakeFailure(ctx, e);
1760         } finally {
1761             forceFlush(ctx);
1762         }
1763     }
1764 
1765     private void applyHandshakeTimeout() {
1766         final Promise<Channel> localHandshakePromise = this.handshakePromise;
1767 
1768         // Set timeout if necessary.
1769         final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
1770         if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
1771             return;
1772         }
1773 
1774         final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
1775             @Override
1776             public void run() {
1777                 if (localHandshakePromise.isDone()) {
1778                     return;
1779                 }
1780                 try {
1781                     if (localHandshakePromise.tryFailure(HANDSHAKE_TIMED_OUT)) {
1782                         SslUtils.handleHandshakeFailure(ctx, HANDSHAKE_TIMED_OUT, true);
1783                     }
1784                 } finally {
1785                     releaseAndFailAll(HANDSHAKE_TIMED_OUT);
1786                 }
1787             }
1788         }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1789 
1790         // Cancel the handshake timeout when handshake is finished.
1791         localHandshakePromise.addListener(new FutureListener<Channel>() {
1792             @Override
1793             public void operationComplete(Future<Channel> f) throws Exception {
1794                 timeoutFuture.cancel(false);
1795             }
1796         });
1797     }
1798 
1799     private void forceFlush(ChannelHandlerContext ctx) {
1800         needsFlush = false;
1801         ctx.flush();
1802     }
1803 
1804     /**
1805      * Issues an initial TLS handshake once connected when used in client-mode
1806      */
1807     @Override
1808     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
1809         if (!startTls) {
1810             startHandshakeProcessing();
1811         }
1812         ctx.fireChannelActive();
1813     }
1814 
1815     private void safeClose(
1816             final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
1817             final ChannelPromise promise) {
1818         if (!ctx.channel().isActive()) {
1819             ctx.close(promise);
1820             return;
1821         }
1822 
1823         final ScheduledFuture<?> timeoutFuture;
1824         if (!flushFuture.isDone()) {
1825             long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
1826             if (closeNotifyTimeout > 0) {
1827                 // Force-close the connection if close_notify is not fully sent in time.
1828                 timeoutFuture = ctx.executor().schedule(new Runnable() {
1829                     @Override
1830                     public void run() {
1831                         // May be done in the meantime as cancel(...) is only best effort.
1832                         if (!flushFuture.isDone()) {
1833                             logger.warn("{} Last write attempt timed out; force-closing the connection.",
1834                                     ctx.channel());
1835                             addCloseListener(ctx.close(ctx.newPromise()), promise);
1836                         }
1837                     }
1838                 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
1839             } else {
1840                 timeoutFuture = null;
1841             }
1842         } else {
1843             timeoutFuture = null;
1844         }
1845 
1846         // Close the connection if close_notify is sent in time.
1847         flushFuture.addListener(new ChannelFutureListener() {
1848             @Override
1849             public void operationComplete(ChannelFuture f)
1850                     throws Exception {
1851                 if (timeoutFuture != null) {
1852                     timeoutFuture.cancel(false);
1853                 }
1854                 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
1855                 if (closeNotifyReadTimeout <= 0) {
1856                     // Trigger the close in all cases to make sure the promise is notified
1857                     // See https://github.com/netty/netty/issues/2358
1858                     addCloseListener(ctx.close(ctx.newPromise()), promise);
1859                 } else {
1860                     final ScheduledFuture<?> closeNotifyReadTimeoutFuture;
1861 
1862                     if (!sslClosePromise.isDone()) {
1863                         closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
1864                             @Override
1865                             public void run() {
1866                                 if (!sslClosePromise.isDone()) {
1867                                     logger.debug(
1868                                             "{} did not receive close_notify in {}ms; force-closing the connection.",
1869                                             ctx.channel(), closeNotifyReadTimeout);
1870 
1871                                     // Do the close now...
1872                                     addCloseListener(ctx.close(ctx.newPromise()), promise);
1873                                 }
1874                             }
1875                         }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
1876                     } else {
1877                         closeNotifyReadTimeoutFuture = null;
1878                     }
1879 
1880                     // Do the close once the we received the close_notify.
1881                     sslClosePromise.addListener(new FutureListener<Channel>() {
1882                         @Override
1883                         public void operationComplete(Future<Channel> future) throws Exception {
1884                             if (closeNotifyReadTimeoutFuture != null) {
1885                                 closeNotifyReadTimeoutFuture.cancel(false);
1886                             }
1887                             addCloseListener(ctx.close(ctx.newPromise()), promise);
1888                         }
1889                     });
1890                 }
1891             }
1892         });
1893     }
1894 
1895     private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
1896         // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
1897         // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
1898         // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
1899         // IllegalStateException.
1900         // Also we not want to log if the notification happens as this is expected in some cases.
1901         // See https://github.com/netty/netty/issues/5598
1902         future.addListener(new ChannelPromiseNotifier(false, promise));
1903     }
1904 
1905     /**
1906      * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
1907      * in {@link OpenSslEngine}.
1908      */
1909     private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
1910         ByteBufAllocator alloc = ctx.alloc();
1911         if (engineType.wantsDirectBuffer) {
1912             return alloc.directBuffer(capacity);
1913         } else {
1914             return alloc.buffer(capacity);
1915         }
1916     }
1917 
1918     /**
1919      * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
1920      * the specified amount of pending bytes.
1921      */
1922     private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
1923         return allocate(ctx, engineType.calculateWrapBufferCapacity(this, pendingBytes, numComponents));
1924     }
1925 
1926     /**
1927      * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
1928      * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
1929      * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
1930      */
1931     private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
1932 
1933         SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
1934             super(channel, initSize);
1935         }
1936 
1937         @Override
1938         protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
1939             final int wrapDataSize = SslHandler.this.wrapDataSize;
1940             if (cumulation instanceof CompositeByteBuf) {
1941                 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
1942                 int numComponents = composite.numComponents();
1943                 if (numComponents == 0 ||
1944                         !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
1945                     composite.addComponent(true, next);
1946                 }
1947                 return composite;
1948             }
1949             return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
1950                     copyAndCompose(alloc, cumulation, next);
1951         }
1952 
1953         @Override
1954         protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
1955             if (first instanceof CompositeByteBuf) {
1956                 CompositeByteBuf composite = (CompositeByteBuf) first;
1957                 first = allocator.directBuffer(composite.readableBytes());
1958                 try {
1959                     first.writeBytes(composite);
1960                 } catch (Throwable cause) {
1961                     first.release();
1962                     PlatformDependent.throwException(cause);
1963                 }
1964                 composite.release();
1965             }
1966             return first;
1967         }
1968 
1969         @Override
1970         protected ByteBuf removeEmptyValue() {
1971             return null;
1972         }
1973     }
1974 
1975     private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
1976         final int inReadableBytes = next.readableBytes();
1977         final int cumulationCapacity = cumulation.capacity();
1978         if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
1979                 // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
1980                 // Only copy if there is enough space available and the capacity is large enough, and attempt to
1981                 // resize if the capacity is small.
1982                 (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
1983                         cumulationCapacity < wrapDataSize &&
1984                                 ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
1985             cumulation.writeBytes(next);
1986             next.release();
1987             return true;
1988         }
1989         return false;
1990     }
1991 
1992     private final class LazyChannelPromise extends DefaultPromise<Channel> {
1993 
1994         @Override
1995         protected EventExecutor executor() {
1996             if (ctx == null) {
1997                 throw new IllegalStateException();
1998             }
1999             return ctx.executor();
2000         }
2001 
2002         @Override
2003         protected void checkDeadLock() {
2004             if (ctx == null) {
2005                 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
2006                 // method was called from another Thread then the one that is used by ctx.executor(). We need to
2007                 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
2008                 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
2009                 // ChannelHandlerContext. If we not guard against this super.checkDeadLock() would cause an
2010                 // IllegalStateException when trying to call executor().
2011                 return;
2012             }
2013             super.checkDeadLock();
2014         }
2015     }
2016 }