1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import javax.net.ssl.SSLEngine;
55 import javax.net.ssl.SSLEngineResult;
56 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
57 import javax.net.ssl.SSLEngineResult.Status;
58 import javax.net.ssl.SSLException;
59 import javax.net.ssl.SSLHandshakeException;
60 import javax.net.ssl.SSLSession;
61 import java.io.IOException;
62 import java.net.SocketAddress;
63 import java.nio.ByteBuffer;
64 import java.nio.channels.ClosedChannelException;
65 import java.nio.channels.DatagramChannel;
66 import java.nio.channels.SocketChannel;
67 import java.security.cert.CertificateException;
68 import java.util.List;
69 import java.util.concurrent.Executor;
70 import java.util.concurrent.RejectedExecutionException;
71 import java.util.concurrent.TimeUnit;
72 import java.util.regex.Pattern;
73
74 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
75 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
76 import static io.netty.util.internal.ObjectUtil.checkNotNull;
77 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
170 private static final InternalLogger logger =
171 InternalLoggerFactory.getInstance(SslHandler.class);
172 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
173 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
174 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
175 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
176 private static final int STATE_SENT_FIRST_MESSAGE = 1;
177 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
178 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
179 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
180
181
182
183
184 private static final int STATE_NEEDS_FLUSH = 1 << 4;
185 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
186 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
187 private static final int STATE_PROCESS_TASK = 1 << 7;
188
189
190
191
192 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
193 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
194
195
196
197
198
199 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
200
201 private enum SslEngineType {
202 TCNATIVE(true, COMPOSITE_CUMULATOR) {
203 @Override
204 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
205 int nioBufferCount = in.nioBufferCount();
206 int writerIndex = out.writerIndex();
207 final SSLEngineResult result;
208 if (nioBufferCount > 1) {
209
210
211
212
213
214 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
215 try {
216 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
217 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
218 } finally {
219 handler.singleBuffer[0] = null;
220 }
221 } else {
222 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
223 toByteBuffer(out, writerIndex, out.writableBytes()));
224 }
225 out.writerIndex(writerIndex + result.bytesProduced());
226 return result;
227 }
228
229 @Override
230 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
231 int pendingBytes, int numComponents) {
232 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
233 .calculateOutNetBufSize(pendingBytes, numComponents));
234 }
235
236 @Override
237 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
238 return ((ReferenceCountedOpenSslEngine) handler.engine)
239 .calculateMaxLengthForWrap(pendingBytes, numComponents);
240 }
241
242 @Override
243 int calculatePendingData(SslHandler handler, int guess) {
244 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
245 return sslPending > 0 ? sslPending : guess;
246 }
247
248 @Override
249 boolean jdkCompatibilityMode(SSLEngine engine) {
250 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
251 }
252 },
253 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
254 @Override
255 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
256 int nioBufferCount = in.nioBufferCount();
257 int writerIndex = out.writerIndex();
258 final SSLEngineResult result;
259 if (nioBufferCount > 1) {
260
261
262
263 try {
264 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
265 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
266 in.nioBuffers(in.readerIndex(), len),
267 handler.singleBuffer);
268 } finally {
269 handler.singleBuffer[0] = null;
270 }
271 } else {
272 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
273 toByteBuffer(out, writerIndex, out.writableBytes()));
274 }
275 out.writerIndex(writerIndex + result.bytesProduced());
276 return result;
277 }
278
279 @Override
280 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
281 int pendingBytes, int numComponents) {
282 return allocator.directBuffer(
283 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
284 }
285
286 @Override
287 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
288 return ((ConscryptAlpnSslEngine) handler.engine)
289 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
290 }
291
292 @Override
293 int calculatePendingData(SslHandler handler, int guess) {
294 return guess;
295 }
296
297 @Override
298 boolean jdkCompatibilityMode(SSLEngine engine) {
299 return true;
300 }
301 },
302 JDK(false, MERGE_CUMULATOR) {
303 @Override
304 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
305 int writerIndex = out.writerIndex();
306 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
307 int position = inNioBuffer.position();
308 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
309 toByteBuffer(out, writerIndex, out.writableBytes()));
310 out.writerIndex(writerIndex + result.bytesProduced());
311
312
313
314
315
316
317
318 if (result.bytesConsumed() == 0) {
319 int consumed = inNioBuffer.position() - position;
320 if (consumed != result.bytesConsumed()) {
321
322 return new SSLEngineResult(
323 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
324 }
325 }
326 return result;
327 }
328
329 @Override
330 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
331 int pendingBytes, int numComponents) {
332
333
334 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
335 }
336
337 @Override
338 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
339
340
341
342
343
344
345 return handler.engine.getSession().getPacketBufferSize();
346 }
347
348 @Override
349 int calculatePendingData(SslHandler handler, int guess) {
350 return guess;
351 }
352
353 @Override
354 boolean jdkCompatibilityMode(SSLEngine engine) {
355 return true;
356 }
357 };
358
359 static SslEngineType forEngine(SSLEngine engine) {
360 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
361 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
362 }
363
364 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
365 this.wantsDirectBuffer = wantsDirectBuffer;
366 this.cumulator = cumulator;
367 }
368
369 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
370
371 abstract int calculatePendingData(SslHandler handler, int guess);
372
373 abstract boolean jdkCompatibilityMode(SSLEngine engine);
374
375 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
376 int pendingBytes, int numComponents);
377
378 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
379
380
381
382
383
384
385
386 final boolean wantsDirectBuffer;
387
388
389
390
391
392
393
394
395
396
397
398 final Cumulator cumulator;
399 }
400
401 private volatile ChannelHandlerContext ctx;
402 private final SSLEngine engine;
403 private final SslEngineType engineType;
404 private final Executor delegatedTaskExecutor;
405 private final boolean jdkCompatibilityMode;
406
407
408
409
410
411
412 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
413
414 private final boolean startTls;
415 private final ResumptionController resumptionController;
416
417 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
418 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
419
420 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
421 private Promise<Channel> handshakePromise = new LazyChannelPromise();
422 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
423
424 private int packetLength;
425 private short state;
426
427 private volatile long handshakeTimeoutMillis = 10000;
428 private volatile long closeNotifyFlushTimeoutMillis = 3000;
429 private volatile long closeNotifyReadTimeoutMillis;
430 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
431
432
433
434
435
436
437 public SslHandler(SSLEngine engine) {
438 this(engine, false);
439 }
440
441
442
443
444
445
446
447
448 public SslHandler(SSLEngine engine, boolean startTls) {
449 this(engine, startTls, ImmediateExecutor.INSTANCE);
450 }
451
452
453
454
455
456
457
458
459 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
460 this(engine, false, delegatedTaskExecutor);
461 }
462
463
464
465
466
467
468
469
470
471
472 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
473 this(engine, startTls, delegatedTaskExecutor, null);
474 }
475
476 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
477 ResumptionController resumptionController) {
478 this.engine = ObjectUtil.checkNotNull(engine, "engine");
479 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
480 engineType = SslEngineType.forEngine(engine);
481 this.startTls = startTls;
482 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
483 setCumulator(engineType.cumulator);
484 this.resumptionController = resumptionController;
485 }
486
487 public long getHandshakeTimeoutMillis() {
488 return handshakeTimeoutMillis;
489 }
490
491 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
492 checkNotNull(unit, "unit");
493 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
494 }
495
496 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
497 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
498 }
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520 @UnstableApi
521 public final void setWrapDataSize(int wrapDataSize) {
522 this.wrapDataSize = wrapDataSize;
523 }
524
525
526
527
528 @Deprecated
529 public long getCloseNotifyTimeoutMillis() {
530 return getCloseNotifyFlushTimeoutMillis();
531 }
532
533
534
535
536 @Deprecated
537 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
538 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
539 }
540
541
542
543
544 @Deprecated
545 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
546 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
547 }
548
549
550
551
552
553
554 public final long getCloseNotifyFlushTimeoutMillis() {
555 return closeNotifyFlushTimeoutMillis;
556 }
557
558
559
560
561
562
563 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
564 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
565 }
566
567
568
569
570 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
571 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
572 "closeNotifyFlushTimeoutMillis");
573 }
574
575
576
577
578
579
580 public final long getCloseNotifyReadTimeoutMillis() {
581 return closeNotifyReadTimeoutMillis;
582 }
583
584
585
586
587
588
589 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
590 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
591 }
592
593
594
595
596 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
597 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
598 "closeNotifyReadTimeoutMillis");
599 }
600
601
602
603
604 public SSLEngine engine() {
605 return engine;
606 }
607
608
609
610
611
612
613 public String applicationProtocol() {
614 SSLEngine engine = engine();
615 if (!(engine instanceof ApplicationProtocolAccessor)) {
616 return null;
617 }
618
619 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
620 }
621
622
623
624
625
626
627
628 public Future<Channel> handshakeFuture() {
629 return handshakePromise;
630 }
631
632
633
634
635 @Deprecated
636 public ChannelFuture close() {
637 return closeOutbound();
638 }
639
640
641
642
643 @Deprecated
644 public ChannelFuture close(ChannelPromise promise) {
645 return closeOutbound(promise);
646 }
647
648
649
650
651
652
653
654 public ChannelFuture closeOutbound() {
655 return closeOutbound(ctx.newPromise());
656 }
657
658
659
660
661
662
663
664 public ChannelFuture closeOutbound(final ChannelPromise promise) {
665 final ChannelHandlerContext ctx = this.ctx;
666 if (ctx.executor().inEventLoop()) {
667 closeOutbound0(promise);
668 } else {
669 ctx.executor().execute(new Runnable() {
670 @Override
671 public void run() {
672 closeOutbound0(promise);
673 }
674 });
675 }
676 return promise;
677 }
678
679 private void closeOutbound0(ChannelPromise promise) {
680 setState(STATE_OUTBOUND_CLOSED);
681 engine.closeOutbound();
682 try {
683 flush(ctx, promise);
684 } catch (Exception e) {
685 if (!promise.tryFailure(e)) {
686 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
687 }
688 }
689 }
690
691
692
693
694
695
696
697
698 public Future<Channel> sslCloseFuture() {
699 return sslClosePromise;
700 }
701
702 @Override
703 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
704 try {
705 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
706
707 pendingUnencryptedWrites.releaseAndFailAll(ctx,
708 new ChannelException("Pending write on removal of SslHandler"));
709 }
710 pendingUnencryptedWrites = null;
711
712 SSLException cause = null;
713
714
715
716
717 if (!handshakePromise.isDone()) {
718 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
719 if (handshakePromise.tryFailure(cause)) {
720 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
721 }
722 }
723 if (!sslClosePromise.isDone()) {
724 if (cause == null) {
725 cause = new SSLException("SslHandler removed before SSLEngine was closed");
726 }
727 notifyClosePromise(cause);
728 }
729 } finally {
730 ReferenceCountUtil.release(engine);
731 }
732 }
733
734 @Override
735 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
736 ctx.bind(localAddress, promise);
737 }
738
739 @Override
740 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
741 ChannelPromise promise) throws Exception {
742 ctx.connect(remoteAddress, localAddress, promise);
743 }
744
745 @Override
746 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
747 ctx.deregister(promise);
748 }
749
750 @Override
751 public void disconnect(final ChannelHandlerContext ctx,
752 final ChannelPromise promise) throws Exception {
753 closeOutboundAndChannel(ctx, promise, true);
754 }
755
756 @Override
757 public void close(final ChannelHandlerContext ctx,
758 final ChannelPromise promise) throws Exception {
759 closeOutboundAndChannel(ctx, promise, false);
760 }
761
762 @Override
763 public void read(ChannelHandlerContext ctx) throws Exception {
764 if (!handshakePromise.isDone()) {
765 setState(STATE_READ_DURING_HANDSHAKE);
766 }
767
768 ctx.read();
769 }
770
771 private static IllegalStateException newPendingWritesNullException() {
772 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
773 }
774
775 @Override
776 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
777 if (!(msg instanceof ByteBuf)) {
778 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
779 ReferenceCountUtil.safeRelease(msg);
780 promise.setFailure(exception);
781 } else if (pendingUnencryptedWrites == null) {
782 ReferenceCountUtil.safeRelease(msg);
783 promise.setFailure(newPendingWritesNullException());
784 } else {
785 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
786 }
787 }
788
789 @Override
790 public void flush(ChannelHandlerContext ctx) throws Exception {
791
792
793 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
794 setState(STATE_SENT_FIRST_MESSAGE);
795 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
796 forceFlush(ctx);
797
798
799 startHandshakeProcessing(true);
800 return;
801 }
802
803 if (isStateSet(STATE_PROCESS_TASK)) {
804 return;
805 }
806
807 try {
808 wrapAndFlush(ctx);
809 } catch (Throwable cause) {
810 setHandshakeFailure(ctx, cause);
811 PlatformDependent.throwException(cause);
812 }
813 }
814
815 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
816 if (pendingUnencryptedWrites.isEmpty()) {
817
818
819
820
821 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
822 }
823 if (!handshakePromise.isDone()) {
824 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
825 }
826 try {
827 wrap(ctx, false);
828 } finally {
829
830
831 forceFlush(ctx);
832 }
833 }
834
835
836 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
837 ByteBuf out = null;
838 ByteBufAllocator alloc = ctx.alloc();
839 try {
840 final int wrapDataSize = this.wrapDataSize;
841
842
843 outer: while (!ctx.isRemoved()) {
844 ChannelPromise promise = ctx.newPromise();
845 ByteBuf buf = wrapDataSize > 0 ?
846 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
847 pendingUnencryptedWrites.removeFirst(promise);
848 if (buf == null) {
849 break;
850 }
851
852 SSLEngineResult result;
853
854 try {
855 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
856
857
858
859
860
861 int readableBytes = buf.readableBytes();
862 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
863 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
864 numPackets += 1;
865 }
866
867 if (out == null) {
868 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
869 }
870 result = wrapMultiple(alloc, engine, buf, out);
871 } else {
872 if (out == null) {
873 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
874 }
875 result = wrap(alloc, engine, buf, out);
876 }
877 } catch (SSLException e) {
878
879
880
881
882
883
884 buf.release();
885 promise.setFailure(e);
886 throw e;
887 }
888
889 if (buf.isReadable()) {
890 pendingUnencryptedWrites.addFirst(buf, promise);
891
892
893 promise = null;
894 } else {
895 buf.release();
896 }
897
898
899
900 if (out.isReadable()) {
901 final ByteBuf b = out;
902 out = null;
903 if (promise != null) {
904 ctx.write(b, promise);
905 } else {
906 ctx.write(b);
907 }
908 } else if (promise != null) {
909 ctx.write(Unpooled.EMPTY_BUFFER, promise);
910 }
911
912
913 if (result.getStatus() == Status.CLOSED) {
914
915
916 if (!pendingUnencryptedWrites.isEmpty()) {
917
918
919 Throwable exception = handshakePromise.cause();
920 if (exception == null) {
921 exception = sslClosePromise.cause();
922 if (exception == null) {
923 exception = new SslClosedEngineException("SSLEngine closed already");
924 }
925 }
926 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
927 }
928
929 return;
930 } else {
931 switch (result.getHandshakeStatus()) {
932 case NEED_TASK:
933 if (!runDelegatedTasks(inUnwrap)) {
934
935
936 break outer;
937 }
938 break;
939 case FINISHED:
940 case NOT_HANDSHAKING:
941 setHandshakeSuccess();
942 break;
943 case NEED_WRAP:
944
945
946
947
948 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
949 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
950 }
951 break;
952 case NEED_UNWRAP:
953
954
955 readIfNeeded(ctx);
956 return;
957 default:
958 throw new IllegalStateException(
959 "Unknown handshake status: " + result.getHandshakeStatus());
960 }
961 }
962 }
963 } finally {
964 if (out != null) {
965 out.release();
966 }
967 if (inUnwrap) {
968 setState(STATE_NEEDS_FLUSH);
969 }
970 }
971 }
972
973
974
975
976
977
978
979 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
980 ByteBuf out = null;
981 ByteBufAllocator alloc = ctx.alloc();
982 try {
983
984
985 outer: while (!ctx.isRemoved()) {
986 if (out == null) {
987
988
989
990 out = allocateOutNetBuf(ctx, 2048, 1);
991 }
992 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
993 if (result.bytesProduced() > 0) {
994 ctx.write(out).addListener(new ChannelFutureListener() {
995 @Override
996 public void operationComplete(ChannelFuture future) {
997 Throwable cause = future.cause();
998 if (cause != null) {
999 setHandshakeFailureTransportFailure(ctx, cause);
1000 }
1001 }
1002 });
1003 if (inUnwrap) {
1004 setState(STATE_NEEDS_FLUSH);
1005 }
1006 out = null;
1007 }
1008
1009 HandshakeStatus status = result.getHandshakeStatus();
1010 switch (status) {
1011 case FINISHED:
1012
1013
1014
1015
1016 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1017 wrap(ctx, true);
1018 }
1019 return false;
1020 case NEED_TASK:
1021 if (!runDelegatedTasks(inUnwrap)) {
1022
1023
1024 break outer;
1025 }
1026 break;
1027 case NEED_UNWRAP:
1028 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1029
1030
1031
1032 return false;
1033 }
1034 break;
1035 case NEED_WRAP:
1036 break;
1037 case NOT_HANDSHAKING:
1038 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1039 wrap(ctx, true);
1040 }
1041
1042
1043 if (!inUnwrap) {
1044 unwrapNonAppData(ctx);
1045 }
1046 return true;
1047 default:
1048 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1049 }
1050
1051
1052
1053 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1054 break;
1055 }
1056
1057
1058
1059 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1060 break;
1061 }
1062 }
1063 } finally {
1064 if (out != null) {
1065 out.release();
1066 }
1067 }
1068 return false;
1069 }
1070
1071 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1072 throws SSLException {
1073 SSLEngineResult result = null;
1074
1075 do {
1076 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1077
1078
1079 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1080
1081 if (!out.isWritable(nextOutSize)) {
1082 if (result != null) {
1083
1084
1085 break;
1086 }
1087
1088
1089 out.ensureWritable(nextOutSize);
1090 }
1091
1092 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1093 result = wrap(alloc, engine, wrapBuf, out);
1094
1095 if (result.getStatus() == Status.CLOSED) {
1096
1097
1098 break;
1099 }
1100
1101 if (wrapBuf.isReadable()) {
1102
1103
1104 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1105 }
1106 } while (in.readableBytes() > 0);
1107
1108 return result;
1109 }
1110
1111 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1112 throws SSLException {
1113 ByteBuf newDirectIn = null;
1114 try {
1115 int readerIndex = in.readerIndex();
1116 int readableBytes = in.readableBytes();
1117
1118
1119
1120 final ByteBuffer[] in0;
1121 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1122
1123
1124
1125
1126 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1127 in0 = singleBuffer;
1128
1129
1130 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1131 } else {
1132 in0 = in.nioBuffers();
1133 }
1134 } else {
1135
1136
1137
1138 newDirectIn = alloc.directBuffer(readableBytes);
1139 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1140 in0 = singleBuffer;
1141 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1142 }
1143
1144 for (;;) {
1145
1146
1147 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1148 SSLEngineResult result = engine.wrap(in0, out0);
1149 in.skipBytes(result.bytesConsumed());
1150 out.writerIndex(out.writerIndex() + result.bytesProduced());
1151
1152 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1153 out.ensureWritable(engine.getSession().getPacketBufferSize());
1154 } else {
1155 return result;
1156 }
1157 }
1158 } finally {
1159
1160 singleBuffer[0] = null;
1161
1162 if (newDirectIn != null) {
1163 newDirectIn.release();
1164 }
1165 }
1166 }
1167
1168 @Override
1169 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1170 boolean handshakeFailed = handshakePromise.cause() != null;
1171
1172
1173 ClosedChannelException exception = new ClosedChannelException();
1174
1175
1176 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1177 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1178 "Connection closed while SSL/TLS handshake was in progress",
1179 SslHandler.class, "channelInactive"));
1180 }
1181
1182
1183
1184 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1185 false);
1186
1187
1188 notifyClosePromise(exception);
1189
1190 try {
1191 super.channelInactive(ctx);
1192 } catch (DecoderException e) {
1193 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1194
1195
1196
1197
1198
1199 throw e;
1200 }
1201 }
1202 }
1203
1204 @Override
1205 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1206 if (ignoreException(cause)) {
1207
1208
1209 if (logger.isDebugEnabled()) {
1210 logger.debug(
1211 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1212 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1213 }
1214
1215
1216
1217 if (ctx.channel().isActive()) {
1218 ctx.close();
1219 }
1220 } else {
1221 ctx.fireExceptionCaught(cause);
1222 }
1223 }
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234 private boolean ignoreException(Throwable t) {
1235 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1236 String message = t.getMessage();
1237
1238
1239
1240 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1241 return true;
1242 }
1243
1244
1245 StackTraceElement[] elements = t.getStackTrace();
1246 for (StackTraceElement element: elements) {
1247 String classname = element.getClassName();
1248 String methodname = element.getMethodName();
1249
1250
1251 if (classname.startsWith("io.netty.")) {
1252 continue;
1253 }
1254
1255
1256 if (!"read".equals(methodname)) {
1257 continue;
1258 }
1259
1260
1261
1262 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1263 return true;
1264 }
1265
1266 try {
1267
1268
1269
1270 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1271
1272 if (SocketChannel.class.isAssignableFrom(clazz)
1273 || DatagramChannel.class.isAssignableFrom(clazz)) {
1274 return true;
1275 }
1276
1277
1278 if (PlatformDependent.javaVersion() >= 7
1279 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1280 return true;
1281 }
1282 } catch (Throwable cause) {
1283 if (logger.isDebugEnabled()) {
1284 logger.debug("Unexpected exception while loading class {} classname {}",
1285 getClass(), classname, cause);
1286 }
1287 }
1288 }
1289 }
1290
1291 return false;
1292 }
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307 @Deprecated
1308 public static boolean isEncrypted(ByteBuf buffer) {
1309 return isEncrypted(buffer, false);
1310 }
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1329 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1330 throw new IllegalArgumentException(
1331 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1332 }
1333 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1334 }
1335
1336 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1337 int packetLength = this.packetLength;
1338
1339 if (packetLength > 0) {
1340 if (in.readableBytes() < packetLength) {
1341 return;
1342 }
1343 } else {
1344
1345 final int readableBytes = in.readableBytes();
1346 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1347 return;
1348 }
1349 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1350 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1351
1352 NotSslRecordException e = new NotSslRecordException(
1353 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1354 in.skipBytes(in.readableBytes());
1355
1356
1357
1358 setHandshakeFailure(ctx, e);
1359
1360 throw e;
1361 }
1362 if (packetLength == NOT_ENOUGH_DATA) {
1363 return;
1364 }
1365 assert packetLength > 0;
1366 if (packetLength > readableBytes) {
1367
1368 this.packetLength = packetLength;
1369 return;
1370 }
1371 }
1372
1373
1374
1375 this.packetLength = 0;
1376 try {
1377 final int bytesConsumed = unwrap(ctx, in, packetLength);
1378 if (bytesConsumed != packetLength && !engine.isInboundDone()) {
1379
1380
1381 throw new NotSslRecordException();
1382 }
1383 } catch (Throwable cause) {
1384 handleUnwrapThrowable(ctx, cause);
1385 }
1386 }
1387
1388 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1389 try {
1390 unwrap(ctx, in, in.readableBytes());
1391 } catch (Throwable cause) {
1392 handleUnwrapThrowable(ctx, cause);
1393 }
1394 }
1395
1396 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1397 try {
1398
1399
1400
1401
1402 if (handshakePromise.tryFailure(cause)) {
1403 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1404 }
1405
1406
1407 if (pendingUnencryptedWrites != null) {
1408
1409
1410 wrapAndFlush(ctx);
1411 }
1412 } catch (SSLException ex) {
1413 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1414 " because of an previous SSLException, ignoring...", ex);
1415 } finally {
1416
1417 setHandshakeFailure(ctx, cause, true, false, true);
1418 }
1419 PlatformDependent.throwException(cause);
1420 }
1421
1422 @Override
1423 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1424 if (isStateSet(STATE_PROCESS_TASK)) {
1425 return;
1426 }
1427 if (jdkCompatibilityMode) {
1428 decodeJdkCompatible(ctx, in);
1429 } else {
1430 decodeNonJdkCompatible(ctx, in);
1431 }
1432 }
1433
1434 @Override
1435 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1436 channelReadComplete0(ctx);
1437 }
1438
1439 private void channelReadComplete0(ChannelHandlerContext ctx) {
1440
1441 discardSomeReadBytes();
1442
1443 flushIfNeeded(ctx);
1444 readIfNeeded(ctx);
1445
1446 clearState(STATE_FIRE_CHANNEL_READ);
1447 ctx.fireChannelReadComplete();
1448 }
1449
1450 private void readIfNeeded(ChannelHandlerContext ctx) {
1451
1452 if (!ctx.channel().config().isAutoRead() &&
1453 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1454
1455
1456 ctx.read();
1457 }
1458 }
1459
1460 private void flushIfNeeded(ChannelHandlerContext ctx) {
1461 if (isStateSet(STATE_NEEDS_FLUSH)) {
1462 forceFlush(ctx);
1463 }
1464 }
1465
1466
1467
1468
1469 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1470 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1471 }
1472
1473
1474
1475
1476 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1477 final int originalLength = length;
1478 boolean wrapLater = false;
1479 boolean notifyClosure = false;
1480 boolean executedRead = false;
1481 ByteBuf decodeOut = allocate(ctx, length);
1482 try {
1483
1484
1485 do {
1486 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1487 final Status status = result.getStatus();
1488 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1489 final int produced = result.bytesProduced();
1490 final int consumed = result.bytesConsumed();
1491
1492
1493
1494
1495 packet.skipBytes(consumed);
1496 length -= consumed;
1497
1498
1499
1500
1501 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1502 wrapLater |= (decodeOut.isReadable() ?
1503 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1504 handshakeStatus == HandshakeStatus.FINISHED ||
1505
1506
1507 (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty());
1508 }
1509
1510
1511
1512
1513 if (decodeOut.isReadable()) {
1514 setState(STATE_FIRE_CHANNEL_READ);
1515 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1516 executedRead = true;
1517 executeChannelRead(ctx, decodeOut);
1518 } else {
1519 ctx.fireChannelRead(decodeOut);
1520 }
1521 decodeOut = null;
1522 }
1523
1524 if (status == Status.CLOSED) {
1525 notifyClosure = true;
1526 } else if (status == Status.BUFFER_OVERFLOW) {
1527 if (decodeOut != null) {
1528 decodeOut.release();
1529 }
1530 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1531
1532
1533
1534
1535 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1536 applicationBufferSize : applicationBufferSize - produced));
1537 continue;
1538 }
1539
1540 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1541 boolean pending = runDelegatedTasks(true);
1542 if (!pending) {
1543
1544
1545
1546
1547
1548 wrapLater = false;
1549 break;
1550 }
1551 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1552
1553
1554
1555 if (wrapNonAppData(ctx, true) && length == 0) {
1556 break;
1557 }
1558 }
1559
1560 if (status == Status.BUFFER_UNDERFLOW ||
1561
1562 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1563 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1564 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1565
1566
1567 readIfNeeded(ctx);
1568 }
1569
1570 break;
1571 } else if (decodeOut == null) {
1572 decodeOut = allocate(ctx, length);
1573 }
1574 } while (!ctx.isRemoved());
1575
1576 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1577
1578
1579
1580
1581 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1582 wrapLater = true;
1583 }
1584
1585 if (wrapLater) {
1586 wrap(ctx, true);
1587 }
1588 } finally {
1589 if (decodeOut != null) {
1590 decodeOut.release();
1591 }
1592
1593 if (notifyClosure) {
1594 if (executedRead) {
1595 executeNotifyClosePromise(ctx);
1596 } else {
1597 notifyClosePromise(null);
1598 }
1599 }
1600 }
1601 return originalLength - length;
1602 }
1603
1604 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1605
1606
1607 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1608 if (setReentryState) {
1609 setState(STATE_UNWRAP_REENTRY);
1610 }
1611 try {
1612 return setHandshakeSuccess();
1613 } finally {
1614
1615
1616 if (setReentryState) {
1617 clearState(STATE_UNWRAP_REENTRY);
1618 }
1619 }
1620 }
1621
1622 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1623 try {
1624 ctx.executor().execute(new Runnable() {
1625 @Override
1626 public void run() {
1627 notifyClosePromise(null);
1628 }
1629 });
1630 } catch (RejectedExecutionException e) {
1631 notifyClosePromise(e);
1632 }
1633 }
1634
1635 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1636 try {
1637 ctx.executor().execute(new Runnable() {
1638 @Override
1639 public void run() {
1640 ctx.fireChannelRead(decodedOut);
1641 }
1642 });
1643 } catch (RejectedExecutionException e) {
1644 decodedOut.release();
1645 throw e;
1646 }
1647 }
1648
1649 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1650 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1651 out.nioBuffer(index, len);
1652 }
1653
1654 private static boolean inEventLoop(Executor executor) {
1655 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1656 }
1657
1658
1659
1660
1661
1662
1663
1664
1665 private boolean runDelegatedTasks(boolean inUnwrap) {
1666 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1667
1668
1669 for (;;) {
1670 Runnable task = engine.getDelegatedTask();
1671 if (task == null) {
1672 return true;
1673 }
1674 setState(STATE_PROCESS_TASK);
1675 if (task instanceof AsyncRunnable) {
1676
1677 boolean pending = false;
1678 try {
1679 AsyncRunnable asyncTask = (AsyncRunnable) task;
1680 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1681 asyncTask.run(completionHandler);
1682 pending = completionHandler.resumeLater();
1683 if (pending) {
1684 return false;
1685 }
1686 } finally {
1687 if (!pending) {
1688
1689
1690 clearState(STATE_PROCESS_TASK);
1691 }
1692 }
1693 } else {
1694 try {
1695 task.run();
1696 } finally {
1697 clearState(STATE_PROCESS_TASK);
1698 }
1699 }
1700 }
1701 } else {
1702 executeDelegatedTask(inUnwrap);
1703 return false;
1704 }
1705 }
1706
1707 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1708 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1709 }
1710
1711 private void executeDelegatedTask(boolean inUnwrap) {
1712 executeDelegatedTask(getTaskRunner(inUnwrap));
1713 }
1714
1715 private void executeDelegatedTask(SslTasksRunner task) {
1716 setState(STATE_PROCESS_TASK);
1717 try {
1718 delegatedTaskExecutor.execute(task);
1719 } catch (RejectedExecutionException e) {
1720 clearState(STATE_PROCESS_TASK);
1721 throw e;
1722 }
1723 }
1724
1725 private final class AsyncTaskCompletionHandler implements Runnable {
1726 private final boolean inUnwrap;
1727 boolean didRun;
1728 boolean resumeLater;
1729
1730 AsyncTaskCompletionHandler(boolean inUnwrap) {
1731 this.inUnwrap = inUnwrap;
1732 }
1733
1734 @Override
1735 public void run() {
1736 didRun = true;
1737 if (resumeLater) {
1738 getTaskRunner(inUnwrap).runComplete();
1739 }
1740 }
1741
1742 boolean resumeLater() {
1743 if (!didRun) {
1744 resumeLater = true;
1745 return true;
1746 }
1747 return false;
1748 }
1749 }
1750
1751
1752
1753
1754
1755 private final class SslTasksRunner implements Runnable {
1756 private final boolean inUnwrap;
1757 private final Runnable runCompleteTask = new Runnable() {
1758 @Override
1759 public void run() {
1760 runComplete();
1761 }
1762 };
1763
1764 SslTasksRunner(boolean inUnwrap) {
1765 this.inUnwrap = inUnwrap;
1766 }
1767
1768
1769 private void taskError(Throwable e) {
1770 if (inUnwrap) {
1771
1772
1773
1774
1775 try {
1776 handleUnwrapThrowable(ctx, e);
1777 } catch (Throwable cause) {
1778 safeExceptionCaught(cause);
1779 }
1780 } else {
1781 setHandshakeFailure(ctx, e);
1782 forceFlush(ctx);
1783 }
1784 }
1785
1786
1787 private void safeExceptionCaught(Throwable cause) {
1788 try {
1789 exceptionCaught(ctx, wrapIfNeeded(cause));
1790 } catch (Throwable error) {
1791 ctx.fireExceptionCaught(error);
1792 }
1793 }
1794
1795 private Throwable wrapIfNeeded(Throwable cause) {
1796 if (!inUnwrap) {
1797
1798 return cause;
1799 }
1800
1801
1802 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1803 }
1804
1805 private void tryDecodeAgain() {
1806 try {
1807 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1808 } catch (Throwable cause) {
1809 safeExceptionCaught(cause);
1810 } finally {
1811
1812
1813
1814 channelReadComplete0(ctx);
1815 }
1816 }
1817
1818
1819
1820
1821
1822 private void resumeOnEventExecutor() {
1823 assert ctx.executor().inEventLoop();
1824 clearState(STATE_PROCESS_TASK);
1825 try {
1826 HandshakeStatus status = engine.getHandshakeStatus();
1827 switch (status) {
1828
1829
1830 case NEED_TASK:
1831 executeDelegatedTask(this);
1832
1833 break;
1834
1835
1836 case FINISHED:
1837
1838 case NOT_HANDSHAKING:
1839 setHandshakeSuccess();
1840 try {
1841
1842
1843 wrap(ctx, inUnwrap);
1844 } catch (Throwable e) {
1845 taskError(e);
1846 return;
1847 }
1848 if (inUnwrap) {
1849
1850
1851 unwrapNonAppData(ctx);
1852 }
1853
1854
1855 forceFlush(ctx);
1856
1857 tryDecodeAgain();
1858 break;
1859
1860
1861
1862 case NEED_UNWRAP:
1863 try {
1864 unwrapNonAppData(ctx);
1865 } catch (SSLException e) {
1866 handleUnwrapThrowable(ctx, e);
1867 return;
1868 }
1869 tryDecodeAgain();
1870 break;
1871
1872
1873
1874 case NEED_WRAP:
1875 try {
1876 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1877
1878
1879
1880
1881 unwrapNonAppData(ctx);
1882 }
1883
1884
1885 forceFlush(ctx);
1886 } catch (Throwable e) {
1887 taskError(e);
1888 return;
1889 }
1890
1891
1892 tryDecodeAgain();
1893 break;
1894
1895 default:
1896
1897 throw new AssertionError();
1898 }
1899 } catch (Throwable cause) {
1900 safeExceptionCaught(cause);
1901 }
1902 }
1903
1904 void runComplete() {
1905 EventExecutor executor = ctx.executor();
1906
1907
1908
1909
1910
1911
1912
1913 executor.execute(new Runnable() {
1914 @Override
1915 public void run() {
1916 resumeOnEventExecutor();
1917 }
1918 });
1919 }
1920
1921 @Override
1922 public void run() {
1923 try {
1924 Runnable task = engine.getDelegatedTask();
1925 if (task == null) {
1926
1927 return;
1928 }
1929 if (task instanceof AsyncRunnable) {
1930 AsyncRunnable asyncTask = (AsyncRunnable) task;
1931 asyncTask.run(runCompleteTask);
1932 } else {
1933 task.run();
1934 runComplete();
1935 }
1936 } catch (final Throwable cause) {
1937 handleException(cause);
1938 }
1939 }
1940
1941 private void handleException(final Throwable cause) {
1942 EventExecutor executor = ctx.executor();
1943 if (executor.inEventLoop()) {
1944 clearState(STATE_PROCESS_TASK);
1945 safeExceptionCaught(cause);
1946 } else {
1947 try {
1948 executor.execute(new Runnable() {
1949 @Override
1950 public void run() {
1951 clearState(STATE_PROCESS_TASK);
1952 safeExceptionCaught(cause);
1953 }
1954 });
1955 } catch (RejectedExecutionException ignore) {
1956 clearState(STATE_PROCESS_TASK);
1957
1958
1959 ctx.fireExceptionCaught(cause);
1960 }
1961 }
1962 }
1963 }
1964
1965
1966
1967
1968
1969
1970 private boolean setHandshakeSuccess() throws SSLException {
1971
1972
1973
1974 final SSLSession session = engine.getSession();
1975 if (resumptionController != null && !handshakePromise.isDone()) {
1976 try {
1977 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1978 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
1979 }
1980 } catch (CertificateException e) {
1981 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
1982 exception.initCause(e);
1983 throw exception;
1984 }
1985 }
1986 final boolean notified;
1987 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1988 if (logger.isDebugEnabled()) {
1989 logger.debug(
1990 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1991 ctx.channel(),
1992 session.getProtocol(),
1993 session.getCipherSuite());
1994 }
1995 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1996 }
1997 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1998 clearState(STATE_READ_DURING_HANDSHAKE);
1999 if (!ctx.channel().config().isAutoRead()) {
2000 ctx.read();
2001 }
2002 }
2003 return notified;
2004 }
2005
2006
2007
2008
2009 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
2010 setHandshakeFailure(ctx, cause, true, true, false);
2011 }
2012
2013
2014
2015
2016 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
2017 boolean notify, boolean alwaysFlushAndClose) {
2018 try {
2019
2020 setState(STATE_OUTBOUND_CLOSED);
2021 engine.closeOutbound();
2022
2023 if (closeInbound) {
2024 try {
2025 engine.closeInbound();
2026 } catch (SSLException e) {
2027 if (logger.isDebugEnabled()) {
2028
2029
2030
2031
2032 String msg = e.getMessage();
2033 if (msg == null || !(msg.contains("possible truncation attack") ||
2034 msg.contains("closing inbound before receiving peer's close_notify"))) {
2035 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2036 }
2037 }
2038 }
2039 }
2040 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2041 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2042 }
2043 } finally {
2044
2045 releaseAndFailAll(ctx, cause);
2046 }
2047 }
2048
2049 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2050
2051
2052
2053 try {
2054 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2055 releaseAndFailAll(ctx, transportFailure);
2056 if (handshakePromise.tryFailure(transportFailure)) {
2057 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2058 }
2059 } finally {
2060 ctx.close();
2061 }
2062 }
2063
2064 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2065 if (resumptionController != null &&
2066 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2067 resumptionController.remove(engine());
2068 }
2069 if (pendingUnencryptedWrites != null) {
2070 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2071 }
2072 }
2073
2074 private void notifyClosePromise(Throwable cause) {
2075 if (cause == null) {
2076 if (sslClosePromise.trySuccess(ctx.channel())) {
2077 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2078 }
2079 } else {
2080 if (sslClosePromise.tryFailure(cause)) {
2081 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2082 }
2083 }
2084 }
2085
2086 private void closeOutboundAndChannel(
2087 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2088 setState(STATE_OUTBOUND_CLOSED);
2089 engine.closeOutbound();
2090
2091 if (!ctx.channel().isActive()) {
2092 if (disconnect) {
2093 ctx.disconnect(promise);
2094 } else {
2095 ctx.close(promise);
2096 }
2097 return;
2098 }
2099
2100 ChannelPromise closeNotifyPromise = ctx.newPromise();
2101 try {
2102 flush(ctx, closeNotifyPromise);
2103 } finally {
2104 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2105 setState(STATE_CLOSE_NOTIFY);
2106
2107
2108
2109
2110
2111
2112
2113
2114 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2115 } else {
2116
2117 sslClosePromise.addListener(new FutureListener<Channel>() {
2118 @Override
2119 public void operationComplete(Future<Channel> future) {
2120 promise.setSuccess();
2121 }
2122 });
2123 }
2124 }
2125 }
2126
2127 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2128 if (pendingUnencryptedWrites != null) {
2129 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2130 } else {
2131 promise.setFailure(newPendingWritesNullException());
2132 }
2133 flush(ctx);
2134 }
2135
2136 @Override
2137 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2138 this.ctx = ctx;
2139 Channel channel = ctx.channel();
2140 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2141 @Override
2142 protected int wrapDataSize() {
2143 return SslHandler.this.wrapDataSize;
2144 }
2145 };
2146
2147 setOpensslEngineSocketFd(channel);
2148 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2149 boolean active = channel.isActive();
2150 if (active || fastOpen) {
2151
2152
2153
2154 startHandshakeProcessing(active);
2155
2156
2157 final ChannelOutboundBuffer outboundBuffer;
2158 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2159 outboundBuffer.totalPendingWriteBytes() > 0)) {
2160 setState(STATE_NEEDS_FLUSH);
2161 }
2162 }
2163 }
2164
2165 private void startHandshakeProcessing(boolean flushAtEnd) {
2166 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2167 setState(STATE_HANDSHAKE_STARTED);
2168 if (engine.getUseClientMode()) {
2169
2170
2171
2172 handshake(flushAtEnd);
2173 }
2174 applyHandshakeTimeout();
2175 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2176 forceFlush(ctx);
2177 }
2178 }
2179
2180
2181
2182
2183 public Future<Channel> renegotiate() {
2184 ChannelHandlerContext ctx = this.ctx;
2185 if (ctx == null) {
2186 throw new IllegalStateException();
2187 }
2188
2189 return renegotiate(ctx.executor().<Channel>newPromise());
2190 }
2191
2192
2193
2194
2195 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2196 ObjectUtil.checkNotNull(promise, "promise");
2197
2198 ChannelHandlerContext ctx = this.ctx;
2199 if (ctx == null) {
2200 throw new IllegalStateException();
2201 }
2202
2203 EventExecutor executor = ctx.executor();
2204 if (!executor.inEventLoop()) {
2205 executor.execute(new Runnable() {
2206 @Override
2207 public void run() {
2208 renegotiateOnEventLoop(promise);
2209 }
2210 });
2211 return promise;
2212 }
2213
2214 renegotiateOnEventLoop(promise);
2215 return promise;
2216 }
2217
2218 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2219 final Promise<Channel> oldHandshakePromise = handshakePromise;
2220 if (!oldHandshakePromise.isDone()) {
2221
2222
2223 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2224 } else {
2225 handshakePromise = newHandshakePromise;
2226 handshake(true);
2227 applyHandshakeTimeout();
2228 }
2229 }
2230
2231
2232
2233
2234
2235
2236
2237 private void handshake(boolean flushAtEnd) {
2238 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2239
2240
2241 return;
2242 }
2243 if (handshakePromise.isDone()) {
2244
2245
2246
2247
2248
2249 return;
2250 }
2251
2252
2253 final ChannelHandlerContext ctx = this.ctx;
2254 try {
2255 engine.beginHandshake();
2256 wrapNonAppData(ctx, false);
2257 } catch (Throwable e) {
2258 setHandshakeFailure(ctx, e);
2259 } finally {
2260 if (flushAtEnd) {
2261 forceFlush(ctx);
2262 }
2263 }
2264 }
2265
2266 private void applyHandshakeTimeout() {
2267 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2268
2269
2270 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2271 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2272 return;
2273 }
2274
2275 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2276 @Override
2277 public void run() {
2278 if (localHandshakePromise.isDone()) {
2279 return;
2280 }
2281 SSLException exception =
2282 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2283 try {
2284 if (localHandshakePromise.tryFailure(exception)) {
2285 SslUtils.handleHandshakeFailure(ctx, exception, true);
2286 }
2287 } finally {
2288 releaseAndFailAll(ctx, exception);
2289 }
2290 }
2291 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2292
2293
2294 localHandshakePromise.addListener(new FutureListener<Channel>() {
2295 @Override
2296 public void operationComplete(Future<Channel> f) throws Exception {
2297 timeoutFuture.cancel(false);
2298 }
2299 });
2300 }
2301
2302 private void forceFlush(ChannelHandlerContext ctx) {
2303 clearState(STATE_NEEDS_FLUSH);
2304 ctx.flush();
2305 }
2306
2307 private void setOpensslEngineSocketFd(Channel c) {
2308 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2309 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2310 }
2311 }
2312
2313
2314
2315
2316 @Override
2317 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2318 setOpensslEngineSocketFd(ctx.channel());
2319 if (!startTls) {
2320 startHandshakeProcessing(true);
2321 }
2322 ctx.fireChannelActive();
2323 }
2324
2325 private void safeClose(
2326 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2327 final ChannelPromise promise) {
2328 if (!ctx.channel().isActive()) {
2329 ctx.close(promise);
2330 return;
2331 }
2332
2333 final Future<?> timeoutFuture;
2334 if (!flushFuture.isDone()) {
2335 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2336 if (closeNotifyTimeout > 0) {
2337
2338 timeoutFuture = ctx.executor().schedule(new Runnable() {
2339 @Override
2340 public void run() {
2341
2342 if (!flushFuture.isDone()) {
2343 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2344 ctx.channel());
2345 addCloseListener(ctx.close(ctx.newPromise()), promise);
2346 }
2347 }
2348 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2349 } else {
2350 timeoutFuture = null;
2351 }
2352 } else {
2353 timeoutFuture = null;
2354 }
2355
2356
2357 flushFuture.addListener(new ChannelFutureListener() {
2358 @Override
2359 public void operationComplete(ChannelFuture f) {
2360 if (timeoutFuture != null) {
2361 timeoutFuture.cancel(false);
2362 }
2363 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2364 if (closeNotifyReadTimeout <= 0) {
2365
2366
2367 addCloseListener(ctx.close(ctx.newPromise()), promise);
2368 } else {
2369 final Future<?> closeNotifyReadTimeoutFuture;
2370
2371 if (!sslClosePromise.isDone()) {
2372 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2373 @Override
2374 public void run() {
2375 if (!sslClosePromise.isDone()) {
2376 logger.debug(
2377 "{} did not receive close_notify in {}ms; force-closing the connection.",
2378 ctx.channel(), closeNotifyReadTimeout);
2379
2380
2381 addCloseListener(ctx.close(ctx.newPromise()), promise);
2382 }
2383 }
2384 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2385 } else {
2386 closeNotifyReadTimeoutFuture = null;
2387 }
2388
2389
2390 sslClosePromise.addListener(new FutureListener<Channel>() {
2391 @Override
2392 public void operationComplete(Future<Channel> future) throws Exception {
2393 if (closeNotifyReadTimeoutFuture != null) {
2394 closeNotifyReadTimeoutFuture.cancel(false);
2395 }
2396 addCloseListener(ctx.close(ctx.newPromise()), promise);
2397 }
2398 });
2399 }
2400 }
2401 });
2402 }
2403
2404 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2405
2406
2407
2408
2409
2410
2411 PromiseNotifier.cascade(false, future, promise);
2412 }
2413
2414
2415
2416
2417
2418 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2419 ByteBufAllocator alloc = ctx.alloc();
2420 if (engineType.wantsDirectBuffer) {
2421 return alloc.directBuffer(capacity);
2422 } else {
2423 return alloc.buffer(capacity);
2424 }
2425 }
2426
2427
2428
2429
2430
2431 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2432 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2433 }
2434
2435 private boolean isStateSet(int bit) {
2436 return (state & bit) == bit;
2437 }
2438
2439 private void setState(int bit) {
2440 state |= bit;
2441 }
2442
2443 private void clearState(int bit) {
2444 state &= ~bit;
2445 }
2446
2447 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2448
2449 @Override
2450 protected EventExecutor executor() {
2451 if (ctx == null) {
2452 throw new IllegalStateException();
2453 }
2454 return ctx.executor();
2455 }
2456
2457 @Override
2458 protected void checkDeadLock() {
2459 if (ctx == null) {
2460
2461
2462
2463
2464
2465
2466 return;
2467 }
2468 super.checkDeadLock();
2469 }
2470 }
2471 }