View Javadoc
1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.ssl;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.buffer.ByteBufUtil;
21  import io.netty.buffer.CompositeByteBuf;
22  import io.netty.buffer.Unpooled;
23  import io.netty.channel.AbstractCoalescingBufferQueue;
24  import io.netty.channel.Channel;
25  import io.netty.channel.ChannelConfig;
26  import io.netty.channel.ChannelException;
27  import io.netty.channel.ChannelFuture;
28  import io.netty.channel.ChannelFutureListener;
29  import io.netty.channel.ChannelHandlerContext;
30  import io.netty.channel.ChannelInboundHandler;
31  import io.netty.channel.ChannelOutboundHandler;
32  import io.netty.channel.ChannelPipeline;
33  import io.netty.channel.ChannelPromise;
34  import io.netty.channel.ChannelPromiseNotifier;
35  import io.netty.handler.codec.ByteToMessageDecoder;
36  import io.netty.handler.codec.UnsupportedMessageTypeException;
37  import io.netty.util.ReferenceCountUtil;
38  import io.netty.util.ReferenceCounted;
39  import io.netty.util.concurrent.DefaultPromise;
40  import io.netty.util.concurrent.EventExecutor;
41  import io.netty.util.concurrent.Future;
42  import io.netty.util.concurrent.FutureListener;
43  import io.netty.util.concurrent.ImmediateExecutor;
44  import io.netty.util.concurrent.Promise;
45  import io.netty.util.internal.PlatformDependent;
46  import io.netty.util.internal.ThrowableUtil;
47  import io.netty.util.internal.UnstableApi;
48  import io.netty.util.internal.logging.InternalLogger;
49  import io.netty.util.internal.logging.InternalLoggerFactory;
50  
51  import java.io.IOException;
52  import java.net.SocketAddress;
53  import java.nio.ByteBuffer;
54  import java.nio.channels.ClosedChannelException;
55  import java.nio.channels.DatagramChannel;
56  import java.nio.channels.SocketChannel;
57  import java.util.ArrayList;
58  import java.util.List;
59  import java.util.concurrent.CountDownLatch;
60  import java.util.concurrent.Executor;
61  import java.util.concurrent.ScheduledFuture;
62  import java.util.concurrent.TimeUnit;
63  import java.util.regex.Pattern;
64  
65  import javax.net.ssl.SSLEngine;
66  import javax.net.ssl.SSLEngineResult;
67  import javax.net.ssl.SSLEngineResult.HandshakeStatus;
68  import javax.net.ssl.SSLEngineResult.Status;
69  import javax.net.ssl.SSLException;
70  import javax.net.ssl.SSLSession;
71  
72  import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
73  import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
74  
75  /**
76   * Adds <a href="http://en.wikipedia.org/wiki/Transport_Layer_Security">SSL
77   * &middot; TLS</a> and StartTLS support to a {@link Channel}.  Please refer
78   * to the <strong>"SecureChat"</strong> example in the distribution or the web
79   * site for the detailed usage.
80   *
81   * <h3>Beginning the handshake</h3>
82   * <p>
83   * Beside using the handshake {@link ChannelFuture} to get notified about the completion of the handshake it's
84   * also possible to detect it by implement the
85   * {@link ChannelInboundHandler#userEventTriggered(ChannelHandlerContext, Object)}
86   * method and check for a {@link SslHandshakeCompletionEvent}.
87   *
88   * <h3>Handshake</h3>
89   * <p>
90   * The handshake will be automatically issued for you once the {@link Channel} is active and
91   * {@link SSLEngine#getUseClientMode()} returns {@code true}.
92   * So no need to bother with it by your self.
93   *
94   * <h3>Closing the session</h3>
95   * <p>
96   * To close the SSL session, the {@link #close()} method should be
97   * called to send the {@code close_notify} message to the remote peer.  One
98   * exception is when you close the {@link Channel} - {@link SslHandler}
99   * intercepts the close request and send the {@code close_notify} message
100  * before the channel closure automatically.  Once the SSL session is closed,
101  * it is not reusable, and consequently you should create a new
102  * {@link SslHandler} with a new {@link SSLEngine} as explained in the
103  * following section.
104  *
105  * <h3>Restarting the session</h3>
106  * <p>
107  * To restart the SSL session, you must remove the existing closed
108  * {@link SslHandler} from the {@link ChannelPipeline}, insert a new
109  * {@link SslHandler} with a new {@link SSLEngine} into the pipeline,
110  * and start the handshake process as described in the first section.
111  *
112  * <h3>Implementing StartTLS</h3>
113  * <p>
114  * <a href="http://en.wikipedia.org/wiki/STARTTLS">StartTLS</a> is the
115  * communication pattern that secures the wire in the middle of the plaintext
116  * connection.  Please note that it is different from SSL &middot; TLS, that
117  * secures the wire from the beginning of the connection.  Typically, StartTLS
118  * is composed of three steps:
119  * <ol>
120  * <li>Client sends a StartTLS request to server.</li>
121  * <li>Server sends a StartTLS response to client.</li>
122  * <li>Client begins SSL handshake.</li>
123  * </ol>
124  * If you implement a server, you need to:
125  * <ol>
126  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
127  *     to {@code true},</li>
128  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
129  * <li>write a StartTLS response.</li>
130  * </ol>
131  * Please note that you must insert {@link SslHandler} <em>before</em> sending
132  * the StartTLS response.  Otherwise the client can send begin SSL handshake
133  * before {@link SslHandler} is inserted to the {@link ChannelPipeline}, causing
134  * data corruption.
135  * <p>
136  * The client-side implementation is much simpler.
137  * <ol>
138  * <li>Write a StartTLS request,</li>
139  * <li>wait for the StartTLS response,</li>
140  * <li>create a new {@link SslHandler} instance with {@code startTls} flag set
141  *     to {@code false},</li>
142  * <li>insert the {@link SslHandler} to the {@link ChannelPipeline}, and</li>
143  * <li>Initiate SSL handshake.</li>
144  * </ol>
145  *
146  * <h3>Known issues</h3>
147  * <p>
148  * Because of a known issue with the current implementation of the SslEngine that comes
149  * with Java it may be possible that you see blocked IO-Threads while a full GC is done.
150  * <p>
151  * So if you are affected you can workaround this problem by adjust the cache settings
152  * like shown below:
153  *
154  * <pre>
155  *     SslContext context = ...;
156  *     context.getServerSessionContext().setSessionCacheSize(someSaneSize);
157  *     context.getServerSessionContext().setSessionTime(someSameTimeout);
158  * </pre>
159  * <p>
160  * What values to use here depends on the nature of your application and should be set
161  * based on monitoring and debugging of it.
162  * For more details see
163  * <a href="https://github.com/netty/netty/issues/832">#832</a> in our issue tracker.
164  */
165 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
166 
167     private static final InternalLogger logger =
168             InternalLoggerFactory.getInstance(SslHandler.class);
169 
170     private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
171             "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
172     private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
173             "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
174 
175     /**
176      * Used in {@link #unwrapNonAppData(ChannelHandlerContext)} as input for
177      * {@link #unwrap(ChannelHandlerContext, ByteBuf, int,  int)}.  Using this static instance reduce object
178      * creation as {@link Unpooled#EMPTY_BUFFER#nioBuffer()} creates a new {@link ByteBuffer} everytime.
179      */
180     private static final SSLException SSLENGINE_CLOSED = ThrowableUtil.unknownStackTrace(
181             new SSLException("SSLEngine closed already"), SslHandler.class, "wrap(...)");
182     private static final SSLException HANDSHAKE_TIMED_OUT = ThrowableUtil.unknownStackTrace(
183             new SSLException("handshake timed out"), SslHandler.class, "handshake(...)");
184     private static final ClosedChannelException CHANNEL_CLOSED = ThrowableUtil.unknownStackTrace(
185             new ClosedChannelException(), SslHandler.class, "channelInactive(...)");
186 
187     /**
188      * <a href="https://tools.ietf.org/html/rfc5246#section-6.2">2^14</a> which is the maximum sized plaintext chunk
189      * allowed by the TLS RFC.
190      */
191     private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
192 
193     private enum SslEngineType {
194         TCNATIVE(true, COMPOSITE_CUMULATOR) {
195             @Override
196             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
197                     throws SSLException {
198                 int nioBufferCount = in.nioBufferCount();
199                 int writerIndex = out.writerIndex();
200                 final SSLEngineResult result;
201                 if (nioBufferCount > 1) {
202                     /*
203                      * If {@link OpenSslEngine} is in use,
204                      * we can use a special {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} method
205                      * that accepts multiple {@link ByteBuffer}s without additional memory copies.
206                      */
207                     ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
208                     try {
209                         handler.singleBuffer[0] = toByteBuffer(out, writerIndex,
210                             out.writableBytes());
211                         result = opensslEngine.unwrap(in.nioBuffers(readerIndex, len), handler.singleBuffer);
212                     } finally {
213                         handler.singleBuffer[0] = null;
214                     }
215                 } else {
216                     result = handler.engine.unwrap(toByteBuffer(in, readerIndex, len),
217                         toByteBuffer(out, writerIndex, out.writableBytes()));
218                 }
219                 out.writerIndex(writerIndex + result.bytesProduced());
220                 return result;
221             }
222 
223             @Override
224             int getPacketBufferSize(SslHandler handler) {
225                 return ((ReferenceCountedOpenSslEngine) handler.engine).maxEncryptedPacketLength0();
226             }
227 
228             @Override
229             int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents) {
230                 return ((ReferenceCountedOpenSslEngine) handler.engine).calculateMaxLengthForWrap(pendingBytes,
231                                                                                                   numComponents);
232             }
233 
234             @Override
235             int calculatePendingData(SslHandler handler, int guess) {
236                 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
237                 return sslPending > 0 ? sslPending : guess;
238             }
239 
240             @Override
241             boolean jdkCompatibilityMode(SSLEngine engine) {
242                 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
243             }
244         },
245         CONSCRYPT(true, COMPOSITE_CUMULATOR) {
246             @Override
247             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
248                     throws SSLException {
249                 int nioBufferCount = in.nioBufferCount();
250                 int writerIndex = out.writerIndex();
251                 final SSLEngineResult result;
252                 if (nioBufferCount > 1) {
253                     /*
254                      * Use a special unwrap method without additional memory copies.
255                      */
256                     try {
257                         handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
258                         result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
259                                 in.nioBuffers(readerIndex, len),
260                                 handler.singleBuffer);
261                     } finally {
262                         handler.singleBuffer[0] = null;
263                     }
264                 } else {
265                     result = handler.engine.unwrap(toByteBuffer(in, readerIndex, len),
266                             toByteBuffer(out, writerIndex, out.writableBytes()));
267                 }
268                 out.writerIndex(writerIndex + result.bytesProduced());
269                 return result;
270             }
271 
272             @Override
273             int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents) {
274                 return ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents);
275             }
276 
277             @Override
278             int calculatePendingData(SslHandler handler, int guess) {
279                 return guess;
280             }
281 
282             @Override
283             boolean jdkCompatibilityMode(SSLEngine engine) {
284                 return true;
285             }
286         },
287         JDK(false, MERGE_CUMULATOR) {
288             @Override
289             SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
290                     throws SSLException {
291                 int writerIndex = out.writerIndex();
292                 ByteBuffer inNioBuffer = toByteBuffer(in, readerIndex, len);
293                 int position = inNioBuffer.position();
294                 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
295                     toByteBuffer(out, writerIndex, out.writableBytes()));
296                 out.writerIndex(writerIndex + result.bytesProduced());
297 
298                 // This is a workaround for a bug in Android 5.0. Android 5.0 does not correctly update the
299                 // SSLEngineResult.bytesConsumed() in some cases and just return 0.
300                 //
301                 // See:
302                 //     - https://android-review.googlesource.com/c/platform/external/conscrypt/+/122080
303                 //     - https://github.com/netty/netty/issues/7758
304                 if (result.bytesConsumed() == 0) {
305                     int consumed = inNioBuffer.position() - position;
306                     if (consumed != result.bytesConsumed()) {
307                         // Create a new SSLEngineResult with the correct bytesConsumed().
308                         return new SSLEngineResult(
309                                 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
310                     }
311                 }
312                 return result;
313             }
314 
315             @Override
316             int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents) {
317                 return handler.engine.getSession().getPacketBufferSize();
318             }
319 
320             @Override
321             int calculatePendingData(SslHandler handler, int guess) {
322                 return guess;
323             }
324 
325             @Override
326             boolean jdkCompatibilityMode(SSLEngine engine) {
327                 return true;
328             }
329         };
330 
331         static SslEngineType forEngine(SSLEngine engine) {
332             return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
333                    engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
334         }
335 
336         SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
337             this.wantsDirectBuffer = wantsDirectBuffer;
338             this.cumulator = cumulator;
339         }
340 
341         int getPacketBufferSize(SslHandler handler) {
342             return handler.engine.getSession().getPacketBufferSize();
343         }
344 
345         abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int readerIndex, int len, ByteBuf out)
346                 throws SSLException;
347 
348         abstract int calculateWrapBufferCapacity(SslHandler handler, int pendingBytes, int numComponents);
349 
350         abstract int calculatePendingData(SslHandler handler, int guess);
351 
352         abstract boolean jdkCompatibilityMode(SSLEngine engine);
353 
354         // BEGIN Platform-dependent flags
355 
356         /**
357          * {@code true} if and only if {@link SSLEngine} expects a direct buffer.
358          */
359         final boolean wantsDirectBuffer;
360 
361         // END Platform-dependent flags
362 
363         /**
364          * When using JDK {@link SSLEngine}, we use {@link #MERGE_CUMULATOR} because it works only with
365          * one {@link ByteBuffer}.
366          *
367          * When using {@link OpenSslEngine}, we can use {@link #COMPOSITE_CUMULATOR} because it has
368          * {@link OpenSslEngine#unwrap(ByteBuffer[], ByteBuffer[])} which works with multiple {@link ByteBuffer}s
369          * and which does not need to do extra memory copies.
370          */
371         final Cumulator cumulator;
372     }
373 
374     private volatile ChannelHandlerContext ctx;
375     private final SSLEngine engine;
376     private final SslEngineType engineType;
377     private final Executor delegatedTaskExecutor;
378     private final boolean jdkCompatibilityMode;
379 
380     /**
381      * Used if {@link SSLEngine#wrap(ByteBuffer[], ByteBuffer)} and {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer[])}
382      * should be called with a {@link ByteBuf} that is only backed by one {@link ByteBuffer} to reduce the object
383      * creation.
384      */
385     private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
386 
387     private final boolean startTls;
388     private boolean sentFirstMessage;
389     private boolean flushedBeforeHandshake;
390     private boolean readDuringHandshake;
391     private boolean handshakeStarted;
392     private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
393     private Promise<Channel> handshakePromise = new LazyChannelPromise();
394     private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
395 
396     /**
397      * Set by wrap*() methods when something is produced.
398      * {@link #channelReadComplete(ChannelHandlerContext)} will check this flag, clear it, and call ctx.flush().
399      */
400     private boolean needsFlush;
401 
402     private boolean outboundClosed;
403     private boolean closeNotify;
404 
405     private int packetLength;
406 
407     /**
408      * This flag is used to determine if we need to call {@link ChannelHandlerContext#read()} to consume more data
409      * when {@link ChannelConfig#isAutoRead()} is {@code false}.
410      */
411     private boolean firedChannelRead;
412 
413     private volatile long handshakeTimeoutMillis = 10000;
414     private volatile long closeNotifyFlushTimeoutMillis = 3000;
415     private volatile long closeNotifyReadTimeoutMillis;
416     volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
417 
418     /**
419      * Creates a new instance.
420      *
421      * @param engine  the {@link SSLEngine} this handler will use
422      */
423     public SslHandler(SSLEngine engine) {
424         this(engine, false);
425     }
426 
427     /**
428      * Creates a new instance.
429      *
430      * @param engine    the {@link SSLEngine} this handler will use
431      * @param startTls  {@code true} if the first write request shouldn't be
432      *                  encrypted by the {@link SSLEngine}
433      */
434     @SuppressWarnings("deprecation")
435     public SslHandler(SSLEngine engine, boolean startTls) {
436         this(engine, startTls, ImmediateExecutor.INSTANCE);
437     }
438 
439     /**
440      * @deprecated Use {@link #SslHandler(SSLEngine)} instead.
441      */
442     @Deprecated
443     public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
444         this(engine, false, delegatedTaskExecutor);
445     }
446 
447     /**
448      * @deprecated Use {@link #SslHandler(SSLEngine, boolean)} instead.
449      */
450     @Deprecated
451     public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
452         if (engine == null) {
453             throw new NullPointerException("engine");
454         }
455         if (delegatedTaskExecutor == null) {
456             throw new NullPointerException("delegatedTaskExecutor");
457         }
458         this.engine = engine;
459         engineType = SslEngineType.forEngine(engine);
460         this.delegatedTaskExecutor = delegatedTaskExecutor;
461         this.startTls = startTls;
462         this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
463         setCumulator(engineType.cumulator);
464     }
465 
466     public long getHandshakeTimeoutMillis() {
467         return handshakeTimeoutMillis;
468     }
469 
470     public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
471         if (unit == null) {
472             throw new NullPointerException("unit");
473         }
474 
475         setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
476     }
477 
478     public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
479         if (handshakeTimeoutMillis < 0) {
480             throw new IllegalArgumentException(
481                     "handshakeTimeoutMillis: " + handshakeTimeoutMillis + " (expected: >= 0)");
482         }
483         this.handshakeTimeoutMillis = handshakeTimeoutMillis;
484     }
485 
486     /**
487      * Sets the number of bytes to pass to each {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
488      * <p>
489      * This value will partition data which is passed to write
490      * {@link #write(ChannelHandlerContext, Object, ChannelPromise)}. The partitioning will work as follows:
491      * <ul>
492      * <li>If {@code wrapDataSize <= 0} then we will write each data chunk as is.</li>
493      * <li>If {@code wrapDataSize > data size} then we will attempt to aggregate multiple data chunks together.</li>
494      * <li>If {@code wrapDataSize > data size}  Else if {@code wrapDataSize <= data size} then we will divide the data
495      * into chunks of {@code wrapDataSize} when writing.</li>
496      * </ul>
497      * <p>
498      * If the {@link SSLEngine} doesn't support a gather wrap operation (e.g. {@link SslProvider#OPENSSL}) then
499      * aggregating data before wrapping can help reduce the ratio between TLS overhead vs data payload which will lead
500      * to better goodput. Writing fixed chunks of data can also help target the underlying transport's (e.g. TCP)
501      * frame size. Under lossy/congested network conditions this may help the peer get full TLS packets earlier and
502      * be able to do work sooner, as opposed to waiting for the all the pieces of the TLS packet to arrive.
503      * @param wrapDataSize the number of bytes which will be passed to each
504      *      {@link SSLEngine#wrap(ByteBuffer[], int, int, ByteBuffer)} call.
505      */
506     @UnstableApi
507     public final void setWrapDataSize(int wrapDataSize) {
508         this.wrapDataSize = wrapDataSize;
509     }
510 
511     /**
512      * @deprecated use {@link #getCloseNotifyFlushTimeoutMillis()}
513      */
514     @Deprecated
515     public long getCloseNotifyTimeoutMillis() {
516         return getCloseNotifyFlushTimeoutMillis();
517     }
518 
519     /**
520      * @deprecated use {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}
521      */
522     @Deprecated
523     public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
524         setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
525     }
526 
527     /**
528      * @deprecated use {@link #setCloseNotifyFlushTimeoutMillis(long)}
529      */
530     @Deprecated
531     public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
532         setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
533     }
534 
535     /**
536      * Gets the timeout for flushing the close_notify that was triggered by closing the
537      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
538      * forcibly.
539      */
540     public final long getCloseNotifyFlushTimeoutMillis() {
541         return closeNotifyFlushTimeoutMillis;
542     }
543 
544     /**
545      * Sets the timeout for flushing the close_notify that was triggered by closing the
546      * {@link Channel}. If the close_notify was not flushed in the given timeout the {@link Channel} will be closed
547      * forcibly.
548      */
549     public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
550         setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
551     }
552 
553     /**
554      * See {@link #setCloseNotifyFlushTimeout(long, TimeUnit)}.
555      */
556     public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
557         if (closeNotifyFlushTimeoutMillis < 0) {
558             throw new IllegalArgumentException(
559                     "closeNotifyFlushTimeoutMillis: " + closeNotifyFlushTimeoutMillis + " (expected: >= 0)");
560         }
561         this.closeNotifyFlushTimeoutMillis = closeNotifyFlushTimeoutMillis;
562     }
563 
564     /**
565      * Gets the timeout (in ms) for receiving the response for the close_notify that was triggered by closing the
566      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
567      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
568      */
569     public final long getCloseNotifyReadTimeoutMillis() {
570         return closeNotifyReadTimeoutMillis;
571     }
572 
573     /**
574      * Sets the timeout  for receiving the response for the close_notify that was triggered by closing the
575      * {@link Channel}. This timeout starts after the close_notify message was successfully written to the
576      * remote peer. Use {@code 0} to directly close the {@link Channel} and not wait for the response.
577      */
578     public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
579         setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
580     }
581 
582     /**
583      * See {@link #setCloseNotifyReadTimeout(long, TimeUnit)}.
584      */
585     public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
586         if (closeNotifyReadTimeoutMillis < 0) {
587             throw new IllegalArgumentException(
588                     "closeNotifyReadTimeoutMillis: " + closeNotifyReadTimeoutMillis + " (expected: >= 0)");
589         }
590         this.closeNotifyReadTimeoutMillis = closeNotifyReadTimeoutMillis;
591     }
592 
593     /**
594      * Returns the {@link SSLEngine} which is used by this handler.
595      */
596     public SSLEngine engine() {
597         return engine;
598     }
599 
600     /**
601      * Returns the name of the current application-level protocol.
602      *
603      * @return the protocol name or {@code null} if application-level protocol has not been negotiated
604      */
605     public String applicationProtocol() {
606         SSLEngine engine = engine();
607         if (!(engine instanceof ApplicationProtocolAccessor)) {
608             return null;
609         }
610 
611         return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
612     }
613 
614     /**
615      * Returns a {@link Future} that will get notified once the current TLS handshake completes.
616      *
617      * @return the {@link Future} for the initial TLS handshake if {@link #renegotiate()} was not invoked.
618      *         The {@link Future} for the most recent {@linkplain #renegotiate() TLS renegotiation} otherwise.
619      */
620     public Future<Channel> handshakeFuture() {
621         return handshakePromise;
622     }
623 
624     /**
625      * Sends an SSL {@code close_notify} message to the specified channel and
626      * destroys the underlying {@link SSLEngine}.
627      *
628      * @deprecated use {@link Channel#close()} or {@link ChannelHandlerContext#close()}
629      */
630     @Deprecated
631     public ChannelFuture close() {
632         return close(ctx.newPromise());
633     }
634 
635     /**
636      * See {@link #close()}
637      *
638      * @deprecated use {@link Channel#close()} or {@link ChannelHandlerContext#close()}
639      */
640     @Deprecated
641     public ChannelFuture close(final ChannelPromise promise) {
642         final ChannelHandlerContext ctx = this.ctx;
643         ctx.executor().execute(new Runnable() {
644             @Override
645             public void run() {
646                 outboundClosed = true;
647                 engine.closeOutbound();
648                 try {
649                     flush(ctx, promise);
650                 } catch (Exception e) {
651                     if (!promise.tryFailure(e)) {
652                         logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
653                     }
654                 }
655             }
656         });
657 
658         return promise;
659     }
660 
661     /**
662      * Return the {@link Future} that will get notified if the inbound of the {@link SSLEngine} is closed.
663      *
664      * This method will return the same {@link Future} all the time.
665      *
666      * @see SSLEngine
667      */
668     public Future<Channel> sslCloseFuture() {
669         return sslClosePromise;
670     }
671 
672     @Override
673     public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
674         if (!pendingUnencryptedWrites.isEmpty()) {
675             // Check if queue is not empty first because create a new ChannelException is expensive
676             pendingUnencryptedWrites.releaseAndFailAll(ctx,
677                     new ChannelException("Pending write on removal of SslHandler"));
678         }
679         pendingUnencryptedWrites = null;
680         if (engine instanceof ReferenceCounted) {
681             ((ReferenceCounted) engine).release();
682         }
683     }
684 
685     @Override
686     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
687         ctx.bind(localAddress, promise);
688     }
689 
690     @Override
691     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
692                         ChannelPromise promise) throws Exception {
693         ctx.connect(remoteAddress, localAddress, promise);
694     }
695 
696     @Override
697     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
698         ctx.deregister(promise);
699     }
700 
701     @Override
702     public void disconnect(final ChannelHandlerContext ctx,
703                            final ChannelPromise promise) throws Exception {
704         closeOutboundAndChannel(ctx, promise, true);
705     }
706 
707     @Override
708     public void close(final ChannelHandlerContext ctx,
709                       final ChannelPromise promise) throws Exception {
710         closeOutboundAndChannel(ctx, promise, false);
711     }
712 
713     @Override
714     public void read(ChannelHandlerContext ctx) throws Exception {
715         if (!handshakePromise.isDone()) {
716             readDuringHandshake = true;
717         }
718 
719         ctx.read();
720     }
721 
722     private static IllegalStateException newPendingWritesNullException() {
723         return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
724     }
725 
726     @Override
727     public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
728         if (!(msg instanceof ByteBuf)) {
729             UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
730             ReferenceCountUtil.safeRelease(msg);
731             promise.setFailure(exception);
732         } else if (pendingUnencryptedWrites == null) {
733             ReferenceCountUtil.safeRelease(msg);
734             promise.setFailure(newPendingWritesNullException());
735         } else {
736             pendingUnencryptedWrites.add((ByteBuf) msg, promise);
737         }
738     }
739 
740     @Override
741     public void flush(ChannelHandlerContext ctx) throws Exception {
742         // Do not encrypt the first write request if this handler is
743         // created with startTLS flag turned on.
744         if (startTls && !sentFirstMessage) {
745             sentFirstMessage = true;
746             pendingUnencryptedWrites.writeAndRemoveAll(ctx);
747             forceFlush(ctx);
748             return;
749         }
750 
751         try {
752             wrapAndFlush(ctx);
753         } catch (Throwable cause) {
754             setHandshakeFailure(ctx, cause);
755             PlatformDependent.throwException(cause);
756         }
757     }
758 
759     private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
760         if (pendingUnencryptedWrites.isEmpty()) {
761             // It's important to NOT use a voidPromise here as the user
762             // may want to add a ChannelFutureListener to the ChannelPromise later.
763             //
764             // See https://github.com/netty/netty/issues/3364
765             pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
766         }
767         if (!handshakePromise.isDone()) {
768             flushedBeforeHandshake = true;
769         }
770         try {
771             wrap(ctx, false);
772         } finally {
773             // We may have written some parts of data before an exception was thrown so ensure we always flush.
774             // See https://github.com/netty/netty/issues/3900#issuecomment-172481830
775             forceFlush(ctx);
776         }
777     }
778 
779     // This method will not call setHandshakeFailure(...) !
780     private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
781         ByteBuf out = null;
782         ChannelPromise promise = null;
783         ByteBufAllocator alloc = ctx.alloc();
784         boolean needUnwrap = false;
785         ByteBuf buf = null;
786         try {
787             final int wrapDataSize = this.wrapDataSize;
788             // Only continue to loop if the handler was not removed in the meantime.
789             // See https://github.com/netty/netty/issues/5860
790             while (!ctx.isRemoved()) {
791                 promise = ctx.newPromise();
792                 buf = wrapDataSize > 0 ?
793                         pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
794                         pendingUnencryptedWrites.removeFirst(promise);
795                 if (buf == null) {
796                     break;
797                 }
798 
799                 if (out == null) {
800                     out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
801                 }
802 
803                 SSLEngineResult result = wrap(alloc, engine, buf, out);
804 
805                 if (result.getStatus() == Status.CLOSED) {
806                     buf.release();
807                     buf = null;
808                     promise.tryFailure(SSLENGINE_CLOSED);
809                     promise = null;
810                     // SSLEngine has been closed already.
811                     // Any further write attempts should be denied.
812                     pendingUnencryptedWrites.releaseAndFailAll(ctx, SSLENGINE_CLOSED);
813                     return;
814                 } else {
815                     if (buf.isReadable()) {
816                         pendingUnencryptedWrites.addFirst(buf, promise);
817                         // When we add the buffer/promise pair back we need to be sure we don't complete the promise
818                         // later in finishWrap. We only complete the promise if the buffer is completely consumed.
819                         promise = null;
820                     } else {
821                         buf.release();
822                     }
823                     buf = null;
824 
825                     switch (result.getHandshakeStatus()) {
826                         case NEED_TASK:
827                             runDelegatedTasks();
828                             break;
829                         case FINISHED:
830                             setHandshakeSuccess();
831                             // deliberate fall-through
832                         case NOT_HANDSHAKING:
833                             setHandshakeSuccessIfStillHandshaking();
834                             // deliberate fall-through
835                         case NEED_WRAP:
836                             finishWrap(ctx, out, promise, inUnwrap, false);
837                             promise = null;
838                             out = null;
839                             break;
840                         case NEED_UNWRAP:
841                             needUnwrap = true;
842                             return;
843                         default:
844                             throw new IllegalStateException(
845                                     "Unknown handshake status: " + result.getHandshakeStatus());
846                     }
847                 }
848             }
849         } finally {
850             // Ownership of buffer was not transferred, release it.
851             if (buf != null) {
852                 buf.release();
853             }
854             finishWrap(ctx, out, promise, inUnwrap, needUnwrap);
855         }
856     }
857 
858     private void finishWrap(ChannelHandlerContext ctx, ByteBuf out, ChannelPromise promise, boolean inUnwrap,
859             boolean needUnwrap) {
860         if (out == null) {
861             out = Unpooled.EMPTY_BUFFER;
862         } else if (!out.isReadable()) {
863             out.release();
864             out = Unpooled.EMPTY_BUFFER;
865         }
866 
867         if (promise != null) {
868             ctx.write(out, promise);
869         } else {
870             ctx.write(out);
871         }
872 
873         if (inUnwrap) {
874             needsFlush = true;
875         }
876 
877         if (needUnwrap) {
878             // The underlying engine is starving so we need to feed it with more data.
879             // See https://github.com/netty/netty/pull/5039
880             readIfNeeded(ctx);
881         }
882     }
883 
884     /**
885      * This method will not call
886      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable, boolean, boolean, boolean)} or
887      * {@link #setHandshakeFailure(ChannelHandlerContext, Throwable)}.
888      * @return {@code true} if this method ends on {@link SSLEngineResult.HandshakeStatus#NOT_HANDSHAKING}.
889      */
890     private boolean wrapNonAppData(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
891         ByteBuf out = null;
892         ByteBufAllocator alloc = ctx.alloc();
893         try {
894             // Only continue to loop if the handler was not removed in the meantime.
895             // See https://github.com/netty/netty/issues/5860
896             while (!ctx.isRemoved()) {
897                 if (out == null) {
898                     // As this is called for the handshake we have no real idea how big the buffer needs to be.
899                     // That said 2048 should give us enough room to include everything like ALPN / NPN data.
900                     // If this is not enough we will increase the buffer in wrap(...).
901                     out = allocateOutNetBuf(ctx, 2048, 1);
902                 }
903                 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
904 
905                 if (result.bytesProduced() > 0) {
906                     ctx.write(out);
907                     if (inUnwrap) {
908                         needsFlush = true;
909                     }
910                     out = null;
911                 }
912 
913                 switch (result.getHandshakeStatus()) {
914                     case FINISHED:
915                         setHandshakeSuccess();
916                         return false;
917                     case NEED_TASK:
918                         runDelegatedTasks();
919                         break;
920                     case NEED_UNWRAP:
921                         if (inUnwrap) {
922                             // If we asked for a wrap, the engine requested an unwrap, and we are in unwrap there is
923                             // no use in trying to call wrap again because we have already attempted (or will after we
924                             // return) to feed more data to the engine.
925                             return false;
926                         }
927 
928                         unwrapNonAppData(ctx);
929                         break;
930                     case NEED_WRAP:
931                         break;
932                     case NOT_HANDSHAKING:
933                         setHandshakeSuccessIfStillHandshaking();
934                         // Workaround for TLS False Start problem reported at:
935                         // https://github.com/netty/netty/issues/1108#issuecomment-14266970
936                         if (!inUnwrap) {
937                             unwrapNonAppData(ctx);
938                         }
939                         return true;
940                     default:
941                         throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
942                 }
943 
944                 if (result.bytesProduced() == 0) {
945                     break;
946                 }
947 
948                 // It should not consume empty buffers when it is not handshaking
949                 // Fix for Android, where it was encrypting empty buffers even when not handshaking
950                 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
951                     break;
952                 }
953             }
954         }  finally {
955             if (out != null) {
956                 out.release();
957             }
958         }
959         return false;
960     }
961 
962     private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
963             throws SSLException {
964         ByteBuf newDirectIn = null;
965         try {
966             int readerIndex = in.readerIndex();
967             int readableBytes = in.readableBytes();
968 
969             // We will call SslEngine.wrap(ByteBuffer[], ByteBuffer) to allow efficient handling of
970             // CompositeByteBuf without force an extra memory copy when CompositeByteBuffer.nioBuffer() is called.
971             final ByteBuffer[] in0;
972             if (in.isDirect() || !engineType.wantsDirectBuffer) {
973                 // As CompositeByteBuf.nioBufferCount() can be expensive (as it needs to check all composed ByteBuf
974                 // to calculate the count) we will just assume a CompositeByteBuf contains more then 1 ByteBuf.
975                 // The worst that can happen is that we allocate an extra ByteBuffer[] in CompositeByteBuf.nioBuffers()
976                 // which is better then walking the composed ByteBuf in most cases.
977                 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
978                     in0 = singleBuffer;
979                     // We know its only backed by 1 ByteBuffer so use internalNioBuffer to keep object allocation
980                     // to a minimum.
981                     in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
982                 } else {
983                     in0 = in.nioBuffers();
984                 }
985             } else {
986                 // We could even go further here and check if its a CompositeByteBuf and if so try to decompose it and
987                 // only replace the ByteBuffer that are not direct. At the moment we just will replace the whole
988                 // CompositeByteBuf to keep the complexity to a minimum
989                 newDirectIn = alloc.directBuffer(readableBytes);
990                 newDirectIn.writeBytes(in, readerIndex, readableBytes);
991                 in0 = singleBuffer;
992                 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
993             }
994 
995             for (;;) {
996                 ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes());
997                 SSLEngineResult result = engine.wrap(in0, out0);
998                 in.skipBytes(result.bytesConsumed());
999                 out.writerIndex(out.writerIndex() + result.bytesProduced());
1000 
1001                 switch (result.getStatus()) {
1002                 case BUFFER_OVERFLOW:
1003                     out.ensureWritable(engine.getSession().getPacketBufferSize());
1004                     break;
1005                 default:
1006                     return result;
1007                 }
1008             }
1009         } finally {
1010             // Null out to allow GC of ByteBuffer
1011             singleBuffer[0] = null;
1012 
1013             if (newDirectIn != null) {
1014                 newDirectIn.release();
1015             }
1016         }
1017     }
1018 
1019     @Override
1020     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1021         // Make sure to release SSLEngine,
1022         // and notify the handshake future if the connection has been closed during handshake.
1023         setHandshakeFailure(ctx, CHANNEL_CLOSED, !outboundClosed, handshakeStarted, false);
1024 
1025         // Ensure we always notify the sslClosePromise as well
1026         notifyClosePromise(CHANNEL_CLOSED);
1027 
1028         super.channelInactive(ctx);
1029     }
1030 
1031     @Override
1032     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1033         if (ignoreException(cause)) {
1034             // It is safe to ignore the 'connection reset by peer' or
1035             // 'broken pipe' error after sending close_notify.
1036             if (logger.isDebugEnabled()) {
1037                 logger.debug(
1038                         "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1039                         "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1040             }
1041 
1042             // Close the connection explicitly just in case the transport
1043             // did not close the connection automatically.
1044             if (ctx.channel().isActive()) {
1045                 ctx.close();
1046             }
1047         } else {
1048             ctx.fireExceptionCaught(cause);
1049         }
1050     }
1051 
1052     /**
1053      * Checks if the given {@link Throwable} can be ignore and just "swallowed"
1054      *
1055      * When an ssl connection is closed a close_notify message is sent.
1056      * After that the peer also sends close_notify however, it's not mandatory to receive
1057      * the close_notify. The party who sent the initial close_notify can close the connection immediately
1058      * then the peer will get connection reset error.
1059      *
1060      */
1061     private boolean ignoreException(Throwable t) {
1062         if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1063             String message = t.getMessage();
1064 
1065             // first try to match connection reset / broke peer based on the regex. This is the fastest way
1066             // but may fail on different jdk impls or OS's
1067             if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1068                 return true;
1069             }
1070 
1071             // Inspect the StackTraceElements to see if it was a connection reset / broken pipe or not
1072             StackTraceElement[] elements = t.getStackTrace();
1073             for (StackTraceElement element: elements) {
1074                 String classname = element.getClassName();
1075                 String methodname = element.getMethodName();
1076 
1077                 // skip all classes that belong to the io.netty package
1078                 if (classname.startsWith("io.netty.")) {
1079                     continue;
1080                 }
1081 
1082                 // check if the method name is read if not skip it
1083                 if (!"read".equals(methodname)) {
1084                     continue;
1085                 }
1086 
1087                 // This will also match against SocketInputStream which is used by openjdk 7 and maybe
1088                 // also others
1089                 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1090                     return true;
1091                 }
1092 
1093                 try {
1094                     // No match by now.. Try to load the class via classloader and inspect it.
1095                     // This is mainly done as other JDK implementations may differ in name of
1096                     // the impl.
1097                     Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1098 
1099                     if (SocketChannel.class.isAssignableFrom(clazz)
1100                             || DatagramChannel.class.isAssignableFrom(clazz)) {
1101                         return true;
1102                     }
1103 
1104                     // also match against SctpChannel via String matching as it may not present.
1105                     if (PlatformDependent.javaVersion() >= 7
1106                             && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1107                         return true;
1108                     }
1109                 } catch (Throwable cause) {
1110                     logger.debug("Unexpected exception while loading class {} classname {}",
1111                                  getClass(), classname, cause);
1112                 }
1113             }
1114         }
1115 
1116         return false;
1117     }
1118 
1119     /**
1120      * Returns {@code true} if the given {@link ByteBuf} is encrypted. Be aware that this method
1121      * will not increase the readerIndex of the given {@link ByteBuf}.
1122      *
1123      * @param   buffer
1124      *                  The {@link ByteBuf} to read from. Be aware that it must have at least 5 bytes to read,
1125      *                  otherwise it will throw an {@link IllegalArgumentException}.
1126      * @return encrypted
1127      *                  {@code true} if the {@link ByteBuf} is encrypted, {@code false} otherwise.
1128      * @throws IllegalArgumentException
1129      *                  Is thrown if the given {@link ByteBuf} has not at least 5 bytes to read.
1130      */
1131     public static boolean isEncrypted(ByteBuf buffer) {
1132         if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1133             throw new IllegalArgumentException(
1134                     "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1135         }
1136         return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1137     }
1138 
1139     private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1140         int packetLength = this.packetLength;
1141         // If we calculated the length of the current SSL record before, use that information.
1142         if (packetLength > 0) {
1143             if (in.readableBytes() < packetLength) {
1144                 return;
1145             }
1146         } else {
1147             // Get the packet length and wait until we get a packets worth of data to unwrap.
1148             final int readableBytes = in.readableBytes();
1149             if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1150                 return;
1151             }
1152             packetLength = getEncryptedPacketLength(in, in.readerIndex());
1153             if (packetLength == SslUtils.NOT_ENCRYPTED) {
1154                 // Not an SSL/TLS packet
1155                 NotSslRecordException e = new NotSslRecordException(
1156                         "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1157                 in.skipBytes(in.readableBytes());
1158 
1159                 // First fail the handshake promise as we may need to have access to the SSLEngine which may
1160                 // be released because the user will remove the SslHandler in an exceptionCaught(...) implementation.
1161                 setHandshakeFailure(ctx, e);
1162 
1163                 throw e;
1164             }
1165             assert packetLength > 0;
1166             if (packetLength > readableBytes) {
1167                 // wait until the whole packet can be read
1168                 this.packetLength = packetLength;
1169                 return;
1170             }
1171         }
1172 
1173         // Reset the state of this class so we can get the length of the next packet. We assume the entire packet will
1174         // be consumed by the SSLEngine.
1175         this.packetLength = 0;
1176         try {
1177             int bytesConsumed = unwrap(ctx, in, in.readerIndex(), packetLength);
1178             assert bytesConsumed == packetLength || engine.isInboundDone() :
1179                     "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1180                             bytesConsumed;
1181             in.skipBytes(bytesConsumed);
1182         } catch (Throwable cause) {
1183             handleUnwrapThrowable(ctx, cause);
1184         }
1185     }
1186 
1187     private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1188         try {
1189             in.skipBytes(unwrap(ctx, in, in.readerIndex(), in.readableBytes()));
1190         } catch (Throwable cause) {
1191             handleUnwrapThrowable(ctx, cause);
1192         }
1193     }
1194 
1195     private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1196         try {
1197             // We should attempt to notify the handshake failure before writing any pending data. If we are in unwrap
1198             // and failed during the handshake process, and we attempt to wrap, then promises will fail, and if
1199             // listeners immediately close the Channel then we may end up firing the handshake event after the Channel
1200             // has been closed.
1201             if (handshakePromise.tryFailure(cause)) {
1202                 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1203             }
1204 
1205             // We need to flush one time as there may be an alert that we should send to the remote peer because
1206             // of the SSLException reported here.
1207             wrapAndFlush(ctx);
1208         } catch (SSLException ex) {
1209             logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1210                     " because of an previous SSLException, ignoring...", ex);
1211         } finally {
1212             // ensure we always flush and close the channel.
1213             setHandshakeFailure(ctx, cause, true, false, true);
1214         }
1215         PlatformDependent.throwException(cause);
1216     }
1217 
1218     @Override
1219     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1220         if (jdkCompatibilityMode) {
1221             decodeJdkCompatible(ctx, in);
1222         } else {
1223             decodeNonJdkCompatible(ctx, in);
1224         }
1225     }
1226 
1227     @Override
1228     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1229         // Discard bytes of the cumulation buffer if needed.
1230         discardSomeReadBytes();
1231 
1232         flushIfNeeded(ctx);
1233         readIfNeeded(ctx);
1234 
1235         firedChannelRead = false;
1236         ctx.fireChannelReadComplete();
1237     }
1238 
1239     private void readIfNeeded(ChannelHandlerContext ctx) {
1240         // If handshake is not finished yet, we need more data.
1241         if (!ctx.channel().config().isAutoRead() && (!firedChannelRead || !handshakePromise.isDone())) {
1242             // No auto-read used and no message passed through the ChannelPipeline or the handshake was not complete
1243             // yet, which means we need to trigger the read to ensure we not encounter any stalls.
1244             ctx.read();
1245         }
1246     }
1247 
1248     private void flushIfNeeded(ChannelHandlerContext ctx) {
1249         if (needsFlush) {
1250             forceFlush(ctx);
1251         }
1252     }
1253 
1254     /**
1255      * Calls {@link SSLEngine#unwrap(ByteBuffer, ByteBuffer)} with an empty buffer to handle handshakes, etc.
1256      */
1257     private void unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1258         unwrap(ctx, Unpooled.EMPTY_BUFFER, 0, 0);
1259     }
1260 
1261     /**
1262      * Unwraps inbound SSL records.
1263      */
1264     private int unwrap(
1265             ChannelHandlerContext ctx, ByteBuf packet, int offset, int length) throws SSLException {
1266         final int originalLength = length;
1267         boolean wrapLater = false;
1268         boolean notifyClosure = false;
1269         int overflowReadableBytes = -1;
1270         ByteBuf decodeOut = allocate(ctx, length);
1271         try {
1272             // Only continue to loop if the handler was not removed in the meantime.
1273             // See https://github.com/netty/netty/issues/5860
1274             unwrapLoop: while (!ctx.isRemoved()) {
1275                 final SSLEngineResult result = engineType.unwrap(this, packet, offset, length, decodeOut);
1276                 final Status status = result.getStatus();
1277                 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1278                 final int produced = result.bytesProduced();
1279                 final int consumed = result.bytesConsumed();
1280 
1281                 // Update indexes for the next iteration
1282                 offset += consumed;
1283                 length -= consumed;
1284 
1285                 switch (status) {
1286                 case BUFFER_OVERFLOW:
1287                     final int readableBytes = decodeOut.readableBytes();
1288                     final int previousOverflowReadableBytes = overflowReadableBytes;
1289                     overflowReadableBytes = readableBytes;
1290                     int bufferSize = engine.getSession().getApplicationBufferSize() - readableBytes;
1291                     if (readableBytes > 0) {
1292                         firedChannelRead = true;
1293                         ctx.fireChannelRead(decodeOut);
1294 
1295                         // This buffer was handled, null it out.
1296                         decodeOut = null;
1297                         if (bufferSize <= 0) {
1298                             // It may happen that readableBytes >= engine.getSession().getApplicationBufferSize()
1299                             // while there is still more to unwrap, in this case we will just allocate a new buffer
1300                             // with the capacity of engine.getSession().getApplicationBufferSize() and call unwrap
1301                             // again.
1302                             bufferSize = engine.getSession().getApplicationBufferSize();
1303                         }
1304                     } else {
1305                         // This buffer was handled, null it out.
1306                         decodeOut.release();
1307                         decodeOut = null;
1308                     }
1309                     if (readableBytes == 0 && previousOverflowReadableBytes == 0) {
1310                         // If there is two consecutive loops where we overflow and are not able to consume any data,
1311                         // assume the amount of data exceeds the maximum amount for the engine and bail
1312                         throw new IllegalStateException("Two consecutive overflows but no content was consumed. " +
1313                                  SSLSession.class.getSimpleName() + " getApplicationBufferSize: " +
1314                                  engine.getSession().getApplicationBufferSize() + " maybe too small.");
1315                     }
1316                     // Allocate a new buffer which can hold all the rest data and loop again.
1317                     // TODO: We may want to reconsider how we calculate the length here as we may
1318                     // have more then one ssl message to decode.
1319                     decodeOut = allocate(ctx, engineType.calculatePendingData(this, bufferSize));
1320                     continue;
1321                 case CLOSED:
1322                     // notify about the CLOSED state of the SSLEngine. See #137
1323                     notifyClosure = true;
1324                     overflowReadableBytes = -1;
1325                     break;
1326                 default:
1327                     overflowReadableBytes = -1;
1328                     break;
1329                 }
1330 
1331                 switch (handshakeStatus) {
1332                     case NEED_UNWRAP:
1333                         break;
1334                     case NEED_WRAP:
1335                         // If the wrap operation transitions the status to NOT_HANDSHAKING and there is no more data to
1336                         // unwrap then the next call to unwrap will not produce any data. We can avoid the potentially
1337                         // costly unwrap operation and break out of the loop.
1338                         if (wrapNonAppData(ctx, true) && length == 0) {
1339                             break unwrapLoop;
1340                         }
1341                         break;
1342                     case NEED_TASK:
1343                         runDelegatedTasks();
1344                         break;
1345                     case FINISHED:
1346                         setHandshakeSuccess();
1347                         wrapLater = true;
1348 
1349                         // We 'break' here and NOT 'continue' as android API version 21 has a bug where they consume
1350                         // data from the buffer but NOT correctly set the SSLEngineResult.bytesConsumed().
1351                         // Because of this it will raise an exception on the next iteration of the for loop on android
1352                         // API version 21. Just doing a break will work here as produced and consumed will both be 0
1353                         // and so we break out of the complete for (;;) loop and so call decode(...) again later on.
1354                         // On other platforms this will have no negative effect as we will just continue with the
1355                         // for (;;) loop if something was either consumed or produced.
1356                         //
1357                         // See:
1358                         //  - https://github.com/netty/netty/issues/4116
1359                         //  - https://code.google.com/p/android/issues/detail?id=198639&thanks=198639&ts=1452501203
1360                         break;
1361                     case NOT_HANDSHAKING:
1362                         if (setHandshakeSuccessIfStillHandshaking()) {
1363                             wrapLater = true;
1364                             continue;
1365                         }
1366                         if (flushedBeforeHandshake) {
1367                             // We need to call wrap(...) in case there was a flush done before the handshake completed.
1368                             //
1369                             // See https://github.com/netty/netty/pull/2437
1370                             flushedBeforeHandshake = false;
1371                             wrapLater = true;
1372                         }
1373                         // If we are not handshaking and there is no more data to unwrap then the next call to unwrap
1374                         // will not produce any data. We can avoid the potentially costly unwrap operation and break
1375                         // out of the loop.
1376                         if (length == 0) {
1377                             break unwrapLoop;
1378                         }
1379                         break;
1380                     default:
1381                         throw new IllegalStateException("unknown handshake status: " + handshakeStatus);
1382                 }
1383 
1384                 if (status == Status.BUFFER_UNDERFLOW || consumed == 0 && produced == 0) {
1385                     if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1386                         // The underlying engine is starving so we need to feed it with more data.
1387                         // See https://github.com/netty/netty/pull/5039
1388                         readIfNeeded(ctx);
1389                     }
1390 
1391                     break;
1392                 }
1393             }
1394 
1395             if (wrapLater) {
1396                 wrap(ctx, true);
1397             }
1398 
1399             if (notifyClosure) {
1400                 notifyClosePromise(null);
1401             }
1402         } finally {
1403             if (decodeOut != null) {
1404                 if (decodeOut.isReadable()) {
1405                     firedChannelRead = true;
1406 
1407                     ctx.fireChannelRead(decodeOut);
1408                 } else {
1409                     decodeOut.release();
1410                 }
1411             }
1412         }
1413         return originalLength - length;
1414     }
1415 
1416     private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1417         return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1418                 out.nioBuffer(index, len);
1419     }
1420 
1421     /**
1422      * Fetches all delegated tasks from the {@link SSLEngine} and runs them via the {@link #delegatedTaskExecutor}.
1423      * If the {@link #delegatedTaskExecutor} is {@link ImmediateExecutor}, just call {@link Runnable#run()} directly
1424      * instead of using {@link Executor#execute(Runnable)}.  Otherwise, run the tasks via
1425      * the {@link #delegatedTaskExecutor} and wait until the tasks are finished.
1426      */
1427     private void runDelegatedTasks() {
1428         if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE) {
1429             for (;;) {
1430                 Runnable task = engine.getDelegatedTask();
1431                 if (task == null) {
1432                     break;
1433                 }
1434 
1435                 task.run();
1436             }
1437         } else {
1438             final List<Runnable> tasks = new ArrayList<Runnable>(2);
1439             for (;;) {
1440                 final Runnable task = engine.getDelegatedTask();
1441                 if (task == null) {
1442                     break;
1443                 }
1444 
1445                 tasks.add(task);
1446             }
1447 
1448             if (tasks.isEmpty()) {
1449                 return;
1450             }
1451 
1452             final CountDownLatch latch = new CountDownLatch(1);
1453             delegatedTaskExecutor.execute(new Runnable() {
1454                 @Override
1455                 public void run() {
1456                     try {
1457                         for (Runnable task: tasks) {
1458                             task.run();
1459                         }
1460                     } catch (Exception e) {
1461                         ctx.fireExceptionCaught(e);
1462                     } finally {
1463                         latch.countDown();
1464                     }
1465                 }
1466             });
1467 
1468             boolean interrupted = false;
1469             while (latch.getCount() != 0) {
1470                 try {
1471                     latch.await();
1472                 } catch (InterruptedException e) {
1473                     // Interrupt later.
1474                     interrupted = true;
1475                 }
1476             }
1477 
1478             if (interrupted) {
1479                 Thread.currentThread().interrupt();
1480             }
1481         }
1482     }
1483 
1484     /**
1485      * Works around some Android {@link SSLEngine} implementations that skip {@link HandshakeStatus#FINISHED} and
1486      * go straight into {@link HandshakeStatus#NOT_HANDSHAKING} when handshake is finished.
1487      *
1488      * @return {@code true} if and only if the workaround has been applied and thus {@link #handshakeFuture} has been
1489      *         marked as success by this method
1490      */
1491     private boolean setHandshakeSuccessIfStillHandshaking() {
1492         if (!handshakePromise.isDone()) {
1493             setHandshakeSuccess();
1494             return true;
1495         }
1496         return false;
1497     }
1498 
1499     /**
1500      * Notify all the handshake futures about the successfully handshake
1501      */
1502     private void setHandshakeSuccess() {
1503         handshakePromise.trySuccess(ctx.channel());
1504 
1505         if (logger.isDebugEnabled()) {
1506             logger.debug("{} HANDSHAKEN: {}", ctx.channel(), engine.getSession().getCipherSuite());
1507         }
1508         ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1509 
1510         if (readDuringHandshake && !ctx.channel().config().isAutoRead()) {
1511             readDuringHandshake = false;
1512             ctx.read();
1513         }
1514     }
1515 
1516     /**
1517      * Notify all the handshake futures about the failure during the handshake.
1518      */
1519     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1520         setHandshakeFailure(ctx, cause, true, true, false);
1521     }
1522 
1523     /**
1524      * Notify all the handshake futures about the failure during the handshake.
1525      */
1526     private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1527                                      boolean notify, boolean alwaysFlushAndClose) {
1528         try {
1529             // Release all resources such as internal buffers that SSLEngine
1530             // is managing.
1531             outboundClosed = true;
1532             engine.closeOutbound();
1533 
1534             if (closeInbound) {
1535                 try {
1536                     engine.closeInbound();
1537                 } catch (SSLException e) {
1538                     if (logger.isDebugEnabled()) {
1539                         // only log in debug mode as it most likely harmless and latest chrome still trigger
1540                         // this all the time.
1541                         //
1542                         // See https://github.com/netty/netty/issues/1340
1543                         String msg = e.getMessage();
1544                         if (msg == null || !msg.contains("possible truncation attack")) {
1545                             logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1546                         }
1547                     }
1548                 }
1549             }
1550             if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1551                 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1552             }
1553         } finally {
1554             // Ensure we remove and fail all pending writes in all cases and so release memory quickly.
1555             releaseAndFailAll(cause);
1556         }
1557     }
1558 
1559     private void releaseAndFailAll(Throwable cause) {
1560         if (pendingUnencryptedWrites != null) {
1561             pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
1562         }
1563     }
1564 
1565     private void notifyClosePromise(Throwable cause) {
1566         if (cause == null) {
1567             if (sslClosePromise.trySuccess(ctx.channel())) {
1568                 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
1569             }
1570         } else {
1571             if (sslClosePromise.tryFailure(cause)) {
1572                 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
1573             }
1574         }
1575     }
1576 
1577     private void closeOutboundAndChannel(
1578             final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
1579         outboundClosed = true;
1580         engine.closeOutbound();
1581 
1582         if (!ctx.channel().isActive()) {
1583             if (disconnect) {
1584                 ctx.disconnect(promise);
1585             } else {
1586                 ctx.close(promise);
1587             }
1588             return;
1589         }
1590 
1591         ChannelPromise closeNotifyPromise = ctx.newPromise();
1592         try {
1593             flush(ctx, closeNotifyPromise);
1594         } finally {
1595             if (!closeNotify) {
1596                 closeNotify = true;
1597                 // It's important that we do not pass the original ChannelPromise to safeClose(...) as when flush(....)
1598                 // throws an Exception it will be propagated to the AbstractChannelHandlerContext which will try
1599                 // to fail the promise because of this. This will then fail as it was already completed by
1600                 // safeClose(...). We create a new ChannelPromise and try to notify the original ChannelPromise
1601                 // once it is complete. If we fail to do so we just ignore it as in this case it was failed already
1602                 // because of a propagated Exception.
1603                 //
1604                 // See https://github.com/netty/netty/issues/5931
1605                 safeClose(ctx, closeNotifyPromise, ctx.newPromise().addListener(
1606                         new ChannelPromiseNotifier(false, promise)));
1607             } else {
1608                 /// We already handling the close_notify so just attach the promise to the sslClosePromise.
1609                 sslClosePromise.addListener(new FutureListener<Channel>() {
1610                     @Override
1611                     public void operationComplete(Future<Channel> future) {
1612                         promise.setSuccess();
1613                     }
1614                 });
1615             }
1616         }
1617     }
1618 
1619     private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1620         if (pendingUnencryptedWrites != null) {
1621             pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
1622         } else {
1623             promise.setFailure(newPendingWritesNullException());
1624         }
1625         flush(ctx);
1626     }
1627 
1628     @Override
1629     public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
1630         this.ctx = ctx;
1631 
1632         pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(ctx.channel(), 16);
1633         if (ctx.channel().isActive()) {
1634             startHandshakeProcessing();
1635         }
1636     }
1637 
1638     private void startHandshakeProcessing() {
1639         handshakeStarted = true;
1640         if (engine.getUseClientMode()) {
1641             // Begin the initial handshake.
1642             // channelActive() event has been fired already, which means this.channelActive() will
1643             // not be invoked. We have to initialize here instead.
1644             handshake(null);
1645         } else {
1646             applyHandshakeTimeout(null);
1647         }
1648     }
1649 
1650     /**
1651      * Performs TLS renegotiation.
1652      */
1653     public Future<Channel> renegotiate() {
1654         ChannelHandlerContext ctx = this.ctx;
1655         if (ctx == null) {
1656             throw new IllegalStateException();
1657         }
1658 
1659         return renegotiate(ctx.executor().<Channel>newPromise());
1660     }
1661 
1662     /**
1663      * Performs TLS renegotiation.
1664      */
1665     public Future<Channel> renegotiate(final Promise<Channel> promise) {
1666         if (promise == null) {
1667             throw new NullPointerException("promise");
1668         }
1669 
1670         ChannelHandlerContext ctx = this.ctx;
1671         if (ctx == null) {
1672             throw new IllegalStateException();
1673         }
1674 
1675         EventExecutor executor = ctx.executor();
1676         if (!executor.inEventLoop()) {
1677             executor.execute(new Runnable() {
1678                 @Override
1679                 public void run() {
1680                     handshake(promise);
1681                 }
1682             });
1683             return promise;
1684         }
1685 
1686         handshake(promise);
1687         return promise;
1688     }
1689 
1690     /**
1691      * Performs TLS (re)negotiation.
1692      *
1693      * @param newHandshakePromise if {@code null}, use the existing {@link #handshakePromise},
1694      *                            assuming that the current negotiation has not been finished.
1695      *                            Currently, {@code null} is expected only for the initial handshake.
1696      */
1697     private void handshake(final Promise<Channel> newHandshakePromise) {
1698         final Promise<Channel> p;
1699         if (newHandshakePromise != null) {
1700             final Promise<Channel> oldHandshakePromise = handshakePromise;
1701             if (!oldHandshakePromise.isDone()) {
1702                 // There's no need to handshake because handshake is in progress already.
1703                 // Merge the new promise into the old one.
1704                 oldHandshakePromise.addListener(new FutureListener<Channel>() {
1705                     @Override
1706                     public void operationComplete(Future<Channel> future) throws Exception {
1707                         if (future.isSuccess()) {
1708                             newHandshakePromise.setSuccess(future.getNow());
1709                         } else {
1710                             newHandshakePromise.setFailure(future.cause());
1711                         }
1712                     }
1713                 });
1714                 return;
1715             }
1716 
1717             handshakePromise = p = newHandshakePromise;
1718         } else if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
1719             // Not all SSLEngine implementations support calling beginHandshake multiple times while a handshake
1720             // is in progress. See https://github.com/netty/netty/issues/4718.
1721             return;
1722         } else {
1723             // Forced to reuse the old handshake.
1724             p = handshakePromise;
1725             assert !p.isDone();
1726         }
1727 
1728         // Begin handshake.
1729         final ChannelHandlerContext ctx = this.ctx;
1730         try {
1731             engine.beginHandshake();
1732             wrapNonAppData(ctx, false);
1733         } catch (Throwable e) {
1734             setHandshakeFailure(ctx, e);
1735         } finally {
1736            forceFlush(ctx);
1737         }
1738         applyHandshakeTimeout(p);
1739     }
1740 
1741     private void applyHandshakeTimeout(Promise<Channel> p) {
1742         final Promise<Channel> promise = p == null ? handshakePromise : p;
1743         // Set timeout if necessary.
1744         final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
1745         if (handshakeTimeoutMillis <= 0 || promise.isDone()) {
1746             return;
1747         }
1748 
1749         final ScheduledFuture<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
1750             @Override
1751             public void run() {
1752                 if (promise.isDone()) {
1753                     return;
1754                 }
1755                 try {
1756                     if (handshakePromise.tryFailure(HANDSHAKE_TIMED_OUT)) {
1757                         SslUtils.handleHandshakeFailure(ctx, HANDSHAKE_TIMED_OUT, true);
1758                     }
1759                 } finally {
1760                     releaseAndFailAll(HANDSHAKE_TIMED_OUT);
1761                 }
1762             }
1763         }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
1764 
1765         // Cancel the handshake timeout when handshake is finished.
1766         promise.addListener(new FutureListener<Channel>() {
1767             @Override
1768             public void operationComplete(Future<Channel> f) throws Exception {
1769                 timeoutFuture.cancel(false);
1770             }
1771         });
1772     }
1773 
1774     private void forceFlush(ChannelHandlerContext ctx) {
1775         needsFlush = false;
1776         ctx.flush();
1777     }
1778 
1779     /**
1780      * Issues an initial TLS handshake once connected when used in client-mode
1781      */
1782     @Override
1783     public void channelActive(final ChannelHandlerContext ctx) throws Exception {
1784         if (!startTls) {
1785             startHandshakeProcessing();
1786         }
1787         ctx.fireChannelActive();
1788     }
1789 
1790     private void safeClose(
1791             final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
1792             final ChannelPromise promise) {
1793         if (!ctx.channel().isActive()) {
1794             ctx.close(promise);
1795             return;
1796         }
1797 
1798         final ScheduledFuture<?> timeoutFuture;
1799         if (!flushFuture.isDone()) {
1800             long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
1801             if (closeNotifyTimeout > 0) {
1802                 // Force-close the connection if close_notify is not fully sent in time.
1803                 timeoutFuture = ctx.executor().schedule(new Runnable() {
1804                     @Override
1805                     public void run() {
1806                         // May be done in the meantime as cancel(...) is only best effort.
1807                         if (!flushFuture.isDone()) {
1808                             logger.warn("{} Last write attempt timed out; force-closing the connection.",
1809                                     ctx.channel());
1810                             addCloseListener(ctx.close(ctx.newPromise()), promise);
1811                         }
1812                     }
1813                 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
1814             } else {
1815                 timeoutFuture = null;
1816             }
1817         } else {
1818             timeoutFuture = null;
1819         }
1820 
1821         // Close the connection if close_notify is sent in time.
1822         flushFuture.addListener(new ChannelFutureListener() {
1823             @Override
1824             public void operationComplete(ChannelFuture f)
1825                     throws Exception {
1826                 if (timeoutFuture != null) {
1827                     timeoutFuture.cancel(false);
1828                 }
1829                 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
1830                 if (closeNotifyReadTimeout <= 0) {
1831                     // Trigger the close in all cases to make sure the promise is notified
1832                     // See https://github.com/netty/netty/issues/2358
1833                     addCloseListener(ctx.close(ctx.newPromise()), promise);
1834                 } else {
1835                     final ScheduledFuture<?> closeNotifyReadTimeoutFuture;
1836 
1837                     if (!sslClosePromise.isDone()) {
1838                         closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
1839                             @Override
1840                             public void run() {
1841                                 if (!sslClosePromise.isDone()) {
1842                                     logger.debug(
1843                                             "{} did not receive close_notify in {}ms; force-closing the connection.",
1844                                             ctx.channel(), closeNotifyReadTimeout);
1845 
1846                                     // Do the close now...
1847                                     addCloseListener(ctx.close(ctx.newPromise()), promise);
1848                                 }
1849                             }
1850                         }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
1851                     } else {
1852                         closeNotifyReadTimeoutFuture = null;
1853                     }
1854 
1855                     // Do the close once the we received the close_notify.
1856                     sslClosePromise.addListener(new FutureListener<Channel>() {
1857                         @Override
1858                         public void operationComplete(Future<Channel> future) throws Exception {
1859                             if (closeNotifyReadTimeoutFuture != null) {
1860                                 closeNotifyReadTimeoutFuture.cancel(false);
1861                             }
1862                             addCloseListener(ctx.close(ctx.newPromise()), promise);
1863                         }
1864                     });
1865                 }
1866             }
1867         });
1868     }
1869 
1870     private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
1871         // We notify the promise in the ChannelPromiseNotifier as there is a "race" where the close(...) call
1872         // by the timeoutFuture and the close call in the flushFuture listener will be called. Because of
1873         // this we need to use trySuccess() and tryFailure(...) as otherwise we can cause an
1874         // IllegalStateException.
1875         // Also we not want to log if the notification happens as this is expected in some cases.
1876         // See https://github.com/netty/netty/issues/5598
1877         future.addListener(new ChannelPromiseNotifier(false, promise));
1878     }
1879 
1880     /**
1881      * Always prefer a direct buffer when it's pooled, so that we reduce the number of memory copies
1882      * in {@link OpenSslEngine}.
1883      */
1884     private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
1885         ByteBufAllocator alloc = ctx.alloc();
1886         if (engineType.wantsDirectBuffer) {
1887             return alloc.directBuffer(capacity);
1888         } else {
1889             return alloc.buffer(capacity);
1890         }
1891     }
1892 
1893     /**
1894      * Allocates an outbound network buffer for {@link SSLEngine#wrap(ByteBuffer, ByteBuffer)} which can encrypt
1895      * the specified amount of pending bytes.
1896      */
1897     private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
1898         return allocate(ctx, engineType.calculateWrapBufferCapacity(this, pendingBytes, numComponents));
1899     }
1900 
1901     /**
1902      * Each call to SSL_write will introduce about ~100 bytes of overhead. This coalescing queue attempts to increase
1903      * goodput by aggregating the plaintext in chunks of {@link #wrapDataSize}. If many small chunks are written
1904      * this can increase goodput, decrease the amount of calls to SSL_write, and decrease overall encryption operations.
1905      */
1906     private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
1907 
1908         SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
1909             super(channel, initSize);
1910         }
1911 
1912         @Override
1913         protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
1914             final int wrapDataSize = SslHandler.this.wrapDataSize;
1915             if (cumulation instanceof CompositeByteBuf) {
1916                 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
1917                 int numComponents = composite.numComponents();
1918                 if (numComponents == 0 ||
1919                         !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
1920                     composite.addComponent(true, next);
1921                 }
1922                 return composite;
1923             }
1924             return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
1925                     copyAndCompose(alloc, cumulation, next);
1926         }
1927 
1928         @Override
1929         protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
1930             if (first instanceof CompositeByteBuf) {
1931                 CompositeByteBuf composite = (CompositeByteBuf) first;
1932                 first = allocator.directBuffer(composite.readableBytes());
1933                 try {
1934                     first.writeBytes(composite);
1935                 } catch (Throwable cause) {
1936                     first.release();
1937                     PlatformDependent.throwException(cause);
1938                 }
1939                 composite.release();
1940             }
1941             return first;
1942         }
1943 
1944         @Override
1945         protected ByteBuf removeEmptyValue() {
1946             return null;
1947         }
1948     }
1949 
1950     private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
1951         final int inReadableBytes = next.readableBytes();
1952         final int cumulationCapacity = cumulation.capacity();
1953         if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
1954                 // Avoid using the same buffer if next's data would make cumulation exceed the wrapDataSize.
1955                 // Only copy if there is enough space available and the capacity is large enough, and attempt to
1956                 // resize if the capacity is small.
1957                 (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
1958                         cumulationCapacity < wrapDataSize &&
1959                                 ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
1960             cumulation.writeBytes(next);
1961             next.release();
1962             return true;
1963         }
1964         return false;
1965     }
1966 
1967     private final class LazyChannelPromise extends DefaultPromise<Channel> {
1968 
1969         @Override
1970         protected EventExecutor executor() {
1971             if (ctx == null) {
1972                 throw new IllegalStateException();
1973             }
1974             return ctx.executor();
1975         }
1976 
1977         @Override
1978         protected void checkDeadLock() {
1979             if (ctx == null) {
1980                 // If ctx is null the handlerAdded(...) callback was not called, in this case the checkDeadLock()
1981                 // method was called from another Thread then the one that is used by ctx.executor(). We need to
1982                 // guard against this as a user can see a race if handshakeFuture().sync() is called but the
1983                 // handlerAdded(..) method was not yet as it is called from the EventExecutor of the
1984                 // ChannelHandlerContext. If we not guard against this super.checkDeadLock() would cause an
1985                 // IllegalStateException when trying to call executor().
1986                 return;
1987             }
1988             super.checkDeadLock();
1989         }
1990     }
1991 }