1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.security.cert.CertificateException;
61 import java.util.List;
62 import java.util.concurrent.Executor;
63 import java.util.concurrent.RejectedExecutionException;
64 import java.util.concurrent.TimeUnit;
65 import java.util.regex.Pattern;
66
67 import javax.net.ssl.SSLEngine;
68 import javax.net.ssl.SSLEngineResult;
69 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
70 import javax.net.ssl.SSLEngineResult.Status;
71 import javax.net.ssl.SSLException;
72 import javax.net.ssl.SSLHandshakeException;
73 import javax.net.ssl.SSLSession;
74
75 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
76 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
77 import static io.netty.util.internal.ObjectUtil.checkNotNull;
78 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
171 private static final InternalLogger logger =
172 InternalLoggerFactory.getInstance(SslHandler.class);
173 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
174 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
175 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
176 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
177 private static final int STATE_SENT_FIRST_MESSAGE = 1;
178 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
179 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
180 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
181
182
183
184
185 private static final int STATE_NEEDS_FLUSH = 1 << 4;
186 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
187 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
188 private static final int STATE_PROCESS_TASK = 1 << 7;
189
190
191
192
193 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
194 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
195
196
197
198
199
200 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
201
202 private enum SslEngineType {
203 TCNATIVE(true, COMPOSITE_CUMULATOR) {
204 @Override
205 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
206 int nioBufferCount = in.nioBufferCount();
207 int writerIndex = out.writerIndex();
208 final SSLEngineResult result;
209 if (nioBufferCount > 1) {
210
211
212
213
214
215 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
216 try {
217 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
218 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
219 } finally {
220 handler.singleBuffer[0] = null;
221 }
222 } else {
223 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
224 toByteBuffer(out, writerIndex, out.writableBytes()));
225 }
226 out.writerIndex(writerIndex + result.bytesProduced());
227 return result;
228 }
229
230 @Override
231 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
232 int pendingBytes, int numComponents) {
233 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
234 .calculateOutNetBufSize(pendingBytes, numComponents));
235 }
236
237 @Override
238 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
239 return ((ReferenceCountedOpenSslEngine) handler.engine)
240 .calculateMaxLengthForWrap(pendingBytes, numComponents);
241 }
242
243 @Override
244 int calculatePendingData(SslHandler handler, int guess) {
245 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
246 return sslPending > 0 ? sslPending : guess;
247 }
248
249 @Override
250 boolean jdkCompatibilityMode(SSLEngine engine) {
251 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
252 }
253 },
254 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
255 @Override
256 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
257 int nioBufferCount = in.nioBufferCount();
258 int writerIndex = out.writerIndex();
259 final SSLEngineResult result;
260 if (nioBufferCount > 1) {
261
262
263
264 try {
265 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
266 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
267 in.nioBuffers(in.readerIndex(), len),
268 handler.singleBuffer);
269 } finally {
270 handler.singleBuffer[0] = null;
271 }
272 } else {
273 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
274 toByteBuffer(out, writerIndex, out.writableBytes()));
275 }
276 out.writerIndex(writerIndex + result.bytesProduced());
277 return result;
278 }
279
280 @Override
281 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
282 int pendingBytes, int numComponents) {
283 return allocator.directBuffer(
284 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
285 }
286
287 @Override
288 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
289 return ((ConscryptAlpnSslEngine) handler.engine)
290 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
291 }
292
293 @Override
294 int calculatePendingData(SslHandler handler, int guess) {
295 return guess;
296 }
297
298 @Override
299 boolean jdkCompatibilityMode(SSLEngine engine) {
300 return true;
301 }
302 },
303 JDK(false, MERGE_CUMULATOR) {
304 @Override
305 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
306 int writerIndex = out.writerIndex();
307 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
308 int position = inNioBuffer.position();
309 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
310 toByteBuffer(out, writerIndex, out.writableBytes()));
311 out.writerIndex(writerIndex + result.bytesProduced());
312
313
314
315
316
317
318
319 if (result.bytesConsumed() == 0) {
320 int consumed = inNioBuffer.position() - position;
321 if (consumed != result.bytesConsumed()) {
322
323 return new SSLEngineResult(
324 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
325 }
326 }
327 return result;
328 }
329
330 @Override
331 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
332 int pendingBytes, int numComponents) {
333
334
335 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
336 }
337
338 @Override
339 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
340
341
342
343
344
345
346 return handler.engine.getSession().getPacketBufferSize();
347 }
348
349 @Override
350 int calculatePendingData(SslHandler handler, int guess) {
351 return guess;
352 }
353
354 @Override
355 boolean jdkCompatibilityMode(SSLEngine engine) {
356 return true;
357 }
358 };
359
360 static SslEngineType forEngine(SSLEngine engine) {
361 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
362 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
363 }
364
365 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
366 this.wantsDirectBuffer = wantsDirectBuffer;
367 this.cumulator = cumulator;
368 }
369
370 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
371
372 abstract int calculatePendingData(SslHandler handler, int guess);
373
374 abstract boolean jdkCompatibilityMode(SSLEngine engine);
375
376 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
377 int pendingBytes, int numComponents);
378
379 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
380
381
382
383
384
385
386
387 final boolean wantsDirectBuffer;
388
389
390
391
392
393
394
395
396
397
398
399 final Cumulator cumulator;
400 }
401
402 private volatile ChannelHandlerContext ctx;
403 private final SSLEngine engine;
404 private final SslEngineType engineType;
405 private final Executor delegatedTaskExecutor;
406 private final boolean jdkCompatibilityMode;
407
408
409
410
411
412
413 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
414
415 private final boolean startTls;
416 private final ResumptionController resumptionController;
417
418 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
419 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
420
421 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
422 private Promise<Channel> handshakePromise = new LazyChannelPromise();
423 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
424
425 private int packetLength;
426 private short state;
427
428 private volatile long handshakeTimeoutMillis = 10000;
429 private volatile long closeNotifyFlushTimeoutMillis = 3000;
430 private volatile long closeNotifyReadTimeoutMillis;
431 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
432
433
434
435
436
437
438 public SslHandler(SSLEngine engine) {
439 this(engine, false);
440 }
441
442
443
444
445
446
447
448
449 public SslHandler(SSLEngine engine, boolean startTls) {
450 this(engine, startTls, ImmediateExecutor.INSTANCE);
451 }
452
453
454
455
456
457
458
459
460 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
461 this(engine, false, delegatedTaskExecutor);
462 }
463
464
465
466
467
468
469
470
471
472
473 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
474 this(engine, startTls, delegatedTaskExecutor, null);
475 }
476
477 SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor,
478 ResumptionController resumptionController) {
479 this.engine = ObjectUtil.checkNotNull(engine, "engine");
480 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
481 engineType = SslEngineType.forEngine(engine);
482 this.startTls = startTls;
483 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
484 setCumulator(engineType.cumulator);
485 this.resumptionController = resumptionController;
486 }
487
488 public long getHandshakeTimeoutMillis() {
489 return handshakeTimeoutMillis;
490 }
491
492 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
493 checkNotNull(unit, "unit");
494 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
495 }
496
497 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
498 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
499 }
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521 @UnstableApi
522 public final void setWrapDataSize(int wrapDataSize) {
523 this.wrapDataSize = wrapDataSize;
524 }
525
526
527
528
529 @Deprecated
530 public long getCloseNotifyTimeoutMillis() {
531 return getCloseNotifyFlushTimeoutMillis();
532 }
533
534
535
536
537 @Deprecated
538 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
539 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
540 }
541
542
543
544
545 @Deprecated
546 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
547 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
548 }
549
550
551
552
553
554
555 public final long getCloseNotifyFlushTimeoutMillis() {
556 return closeNotifyFlushTimeoutMillis;
557 }
558
559
560
561
562
563
564 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
565 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
566 }
567
568
569
570
571 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
572 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
573 "closeNotifyFlushTimeoutMillis");
574 }
575
576
577
578
579
580
581 public final long getCloseNotifyReadTimeoutMillis() {
582 return closeNotifyReadTimeoutMillis;
583 }
584
585
586
587
588
589
590 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
591 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
592 }
593
594
595
596
597 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
598 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
599 "closeNotifyReadTimeoutMillis");
600 }
601
602
603
604
605 public SSLEngine engine() {
606 return engine;
607 }
608
609
610
611
612
613
614 public String applicationProtocol() {
615 SSLEngine engine = engine();
616 if (!(engine instanceof ApplicationProtocolAccessor)) {
617 return null;
618 }
619
620 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
621 }
622
623
624
625
626
627
628
629 public Future<Channel> handshakeFuture() {
630 return handshakePromise;
631 }
632
633
634
635
636 @Deprecated
637 public ChannelFuture close() {
638 return closeOutbound();
639 }
640
641
642
643
644 @Deprecated
645 public ChannelFuture close(ChannelPromise promise) {
646 return closeOutbound(promise);
647 }
648
649
650
651
652
653
654
655 public ChannelFuture closeOutbound() {
656 return closeOutbound(ctx.newPromise());
657 }
658
659
660
661
662
663
664
665 public ChannelFuture closeOutbound(final ChannelPromise promise) {
666 final ChannelHandlerContext ctx = this.ctx;
667 if (ctx.executor().inEventLoop()) {
668 closeOutbound0(promise);
669 } else {
670 ctx.executor().execute(new Runnable() {
671 @Override
672 public void run() {
673 closeOutbound0(promise);
674 }
675 });
676 }
677 return promise;
678 }
679
680 private void closeOutbound0(ChannelPromise promise) {
681 setState(STATE_OUTBOUND_CLOSED);
682 engine.closeOutbound();
683 try {
684 flush(ctx, promise);
685 } catch (Exception e) {
686 if (!promise.tryFailure(e)) {
687 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
688 }
689 }
690 }
691
692
693
694
695
696
697
698
699 public Future<Channel> sslCloseFuture() {
700 return sslClosePromise;
701 }
702
703 @Override
704 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
705 try {
706 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
707
708 pendingUnencryptedWrites.releaseAndFailAll(ctx,
709 new ChannelException("Pending write on removal of SslHandler"));
710 }
711 pendingUnencryptedWrites = null;
712
713 SSLException cause = null;
714
715
716
717
718 if (!handshakePromise.isDone()) {
719 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
720 if (handshakePromise.tryFailure(cause)) {
721 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
722 }
723 }
724 if (!sslClosePromise.isDone()) {
725 if (cause == null) {
726 cause = new SSLException("SslHandler removed before SSLEngine was closed");
727 }
728 notifyClosePromise(cause);
729 }
730 } finally {
731 ReferenceCountUtil.release(engine);
732 }
733 }
734
735 @Override
736 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
737 ctx.bind(localAddress, promise);
738 }
739
740 @Override
741 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
742 ChannelPromise promise) throws Exception {
743 ctx.connect(remoteAddress, localAddress, promise);
744 }
745
746 @Override
747 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
748 ctx.deregister(promise);
749 }
750
751 @Override
752 public void disconnect(final ChannelHandlerContext ctx,
753 final ChannelPromise promise) throws Exception {
754 closeOutboundAndChannel(ctx, promise, true);
755 }
756
757 @Override
758 public void close(final ChannelHandlerContext ctx,
759 final ChannelPromise promise) throws Exception {
760 closeOutboundAndChannel(ctx, promise, false);
761 }
762
763 @Override
764 public void read(ChannelHandlerContext ctx) throws Exception {
765 if (!handshakePromise.isDone()) {
766 setState(STATE_READ_DURING_HANDSHAKE);
767 }
768
769 ctx.read();
770 }
771
772 private static IllegalStateException newPendingWritesNullException() {
773 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
774 }
775
776 @Override
777 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
778 if (!(msg instanceof ByteBuf)) {
779 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
780 ReferenceCountUtil.safeRelease(msg);
781 promise.setFailure(exception);
782 } else if (pendingUnencryptedWrites == null) {
783 ReferenceCountUtil.safeRelease(msg);
784 promise.setFailure(newPendingWritesNullException());
785 } else {
786 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
787 }
788 }
789
790 @Override
791 public void flush(ChannelHandlerContext ctx) throws Exception {
792
793
794 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
795 setState(STATE_SENT_FIRST_MESSAGE);
796 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
797 forceFlush(ctx);
798
799
800 startHandshakeProcessing(true);
801 return;
802 }
803
804 if (isStateSet(STATE_PROCESS_TASK)) {
805 return;
806 }
807
808 try {
809 wrapAndFlush(ctx);
810 } catch (Throwable cause) {
811 setHandshakeFailure(ctx, cause);
812 PlatformDependent.throwException(cause);
813 }
814 }
815
816 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
817 if (pendingUnencryptedWrites.isEmpty()) {
818
819
820
821
822 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
823 }
824 if (!handshakePromise.isDone()) {
825 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
826 }
827 try {
828 wrap(ctx, false);
829 } finally {
830
831
832 forceFlush(ctx);
833 }
834 }
835
836
837 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
838 ByteBuf out = null;
839 ByteBufAllocator alloc = ctx.alloc();
840 try {
841 final int wrapDataSize = this.wrapDataSize;
842
843
844 outer: while (!ctx.isRemoved()) {
845 ChannelPromise promise = ctx.newPromise();
846 ByteBuf buf = wrapDataSize > 0 ?
847 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
848 pendingUnencryptedWrites.removeFirst(promise);
849 if (buf == null) {
850 break;
851 }
852
853 SSLEngineResult result;
854
855 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
856
857
858
859
860
861 int readableBytes = buf.readableBytes();
862 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
863 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
864 numPackets += 1;
865 }
866
867 if (out == null) {
868 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
869 }
870 result = wrapMultiple(alloc, engine, buf, out);
871 } else {
872 if (out == null) {
873 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
874 }
875 result = wrap(alloc, engine, buf, out);
876 }
877
878 if (buf.isReadable()) {
879 pendingUnencryptedWrites.addFirst(buf, promise);
880
881
882 promise = null;
883 } else {
884 buf.release();
885 }
886
887
888
889 if (out.isReadable()) {
890 final ByteBuf b = out;
891 out = null;
892 if (promise != null) {
893 ctx.write(b, promise);
894 } else {
895 ctx.write(b);
896 }
897 } else if (promise != null) {
898 ctx.write(Unpooled.EMPTY_BUFFER, promise);
899 }
900
901
902 if (result.getStatus() == Status.CLOSED) {
903
904
905 if (!pendingUnencryptedWrites.isEmpty()) {
906
907
908 Throwable exception = handshakePromise.cause();
909 if (exception == null) {
910 exception = sslClosePromise.cause();
911 if (exception == null) {
912 exception = new SslClosedEngineException("SSLEngine closed already");
913 }
914 }
915 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
916 }
917
918 return;
919 } else {
920 switch (result.getHandshakeStatus()) {
921 case NEED_TASK:
922 if (!runDelegatedTasks(inUnwrap)) {
923
924
925 break outer;
926 }
927 break;
928 case FINISHED:
929 case NOT_HANDSHAKING:
930 setHandshakeSuccess();
931 break;
932 case NEED_WRAP:
933
934
935
936
937 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
938 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
939 }
940 break;
941 case NEED_UNWRAP:
942
943
944 readIfNeeded(ctx);
945 return;
946 default:
947 throw new IllegalStateException(
948 "Unknown handshake status: " + result.getHandshakeStatus());
949 }
950 }
951 }
952 } finally {
953 if (out != null) {
954 out.release();
955 }
956 if (inUnwrap) {
957 setState(STATE_NEEDS_FLUSH);
958 }
959 }
960 }
961
962
963
964
965
966
967
968 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
969 ByteBuf out = null;
970 ByteBufAllocator alloc = ctx.alloc();
971 try {
972
973
974 outer: while (!ctx.isRemoved()) {
975 if (out == null) {
976
977
978
979 out = allocateOutNetBuf(ctx, 2048, 1);
980 }
981 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
982 if (result.bytesProduced() > 0) {
983 ctx.write(out).addListener(new ChannelFutureListener() {
984 @Override
985 public void operationComplete(ChannelFuture future) {
986 Throwable cause = future.cause();
987 if (cause != null) {
988 setHandshakeFailureTransportFailure(ctx, cause);
989 }
990 }
991 });
992 if (inUnwrap) {
993 setState(STATE_NEEDS_FLUSH);
994 }
995 out = null;
996 }
997
998 HandshakeStatus status = result.getHandshakeStatus();
999 switch (status) {
1000 case FINISHED:
1001
1002
1003
1004
1005 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1006 wrap(ctx, true);
1007 }
1008 return false;
1009 case NEED_TASK:
1010 if (!runDelegatedTasks(inUnwrap)) {
1011
1012
1013 break outer;
1014 }
1015 break;
1016 case NEED_UNWRAP:
1017 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1018
1019
1020
1021 return false;
1022 }
1023 break;
1024 case NEED_WRAP:
1025 break;
1026 case NOT_HANDSHAKING:
1027 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1028 wrap(ctx, true);
1029 }
1030
1031
1032 if (!inUnwrap) {
1033 unwrapNonAppData(ctx);
1034 }
1035 return true;
1036 default:
1037 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1038 }
1039
1040
1041
1042 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1043 break;
1044 }
1045
1046
1047
1048 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1049 break;
1050 }
1051 }
1052 } finally {
1053 if (out != null) {
1054 out.release();
1055 }
1056 }
1057 return false;
1058 }
1059
1060 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1061 throws SSLException {
1062 SSLEngineResult result = null;
1063
1064 do {
1065 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1066
1067
1068 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1069
1070 if (!out.isWritable(nextOutSize)) {
1071 if (result != null) {
1072
1073
1074 break;
1075 }
1076
1077
1078 out.ensureWritable(nextOutSize);
1079 }
1080
1081 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1082 result = wrap(alloc, engine, wrapBuf, out);
1083
1084 if (result.getStatus() == Status.CLOSED) {
1085
1086
1087 break;
1088 }
1089
1090 if (wrapBuf.isReadable()) {
1091
1092
1093 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1094 }
1095 } while (in.readableBytes() > 0);
1096
1097 return result;
1098 }
1099
1100 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1101 throws SSLException {
1102 ByteBuf newDirectIn = null;
1103 try {
1104 int readerIndex = in.readerIndex();
1105 int readableBytes = in.readableBytes();
1106
1107
1108
1109 final ByteBuffer[] in0;
1110 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1111
1112
1113
1114
1115 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1116 in0 = singleBuffer;
1117
1118
1119 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1120 } else {
1121 in0 = in.nioBuffers();
1122 }
1123 } else {
1124
1125
1126
1127 newDirectIn = alloc.directBuffer(readableBytes);
1128 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1129 in0 = singleBuffer;
1130 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1131 }
1132
1133 for (;;) {
1134
1135
1136 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1137 SSLEngineResult result = engine.wrap(in0, out0);
1138 in.skipBytes(result.bytesConsumed());
1139 out.writerIndex(out.writerIndex() + result.bytesProduced());
1140
1141 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1142 out.ensureWritable(engine.getSession().getPacketBufferSize());
1143 } else {
1144 return result;
1145 }
1146 }
1147 } finally {
1148
1149 singleBuffer[0] = null;
1150
1151 if (newDirectIn != null) {
1152 newDirectIn.release();
1153 }
1154 }
1155 }
1156
1157 @Override
1158 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1159 boolean handshakeFailed = handshakePromise.cause() != null;
1160
1161
1162 ClosedChannelException exception = new ClosedChannelException();
1163
1164
1165 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1166 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1167 "Connection closed while SSL/TLS handshake was in progress",
1168 SslHandler.class, "channelInactive"));
1169 }
1170
1171
1172
1173 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1174 false);
1175
1176
1177 notifyClosePromise(exception);
1178
1179 try {
1180 super.channelInactive(ctx);
1181 } catch (DecoderException e) {
1182 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1183
1184
1185
1186
1187
1188 throw e;
1189 }
1190 }
1191 }
1192
1193 @Override
1194 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1195 if (ignoreException(cause)) {
1196
1197
1198 if (logger.isDebugEnabled()) {
1199 logger.debug(
1200 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1201 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1202 }
1203
1204
1205
1206 if (ctx.channel().isActive()) {
1207 ctx.close();
1208 }
1209 } else {
1210 ctx.fireExceptionCaught(cause);
1211 }
1212 }
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223 private boolean ignoreException(Throwable t) {
1224 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1225 String message = t.getMessage();
1226
1227
1228
1229 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1230 return true;
1231 }
1232
1233
1234 StackTraceElement[] elements = t.getStackTrace();
1235 for (StackTraceElement element: elements) {
1236 String classname = element.getClassName();
1237 String methodname = element.getMethodName();
1238
1239
1240 if (classname.startsWith("io.netty.")) {
1241 continue;
1242 }
1243
1244
1245 if (!"read".equals(methodname)) {
1246 continue;
1247 }
1248
1249
1250
1251 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1252 return true;
1253 }
1254
1255 try {
1256
1257
1258
1259 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1260
1261 if (SocketChannel.class.isAssignableFrom(clazz)
1262 || DatagramChannel.class.isAssignableFrom(clazz)) {
1263 return true;
1264 }
1265
1266
1267 if (PlatformDependent.javaVersion() >= 7
1268 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1269 return true;
1270 }
1271 } catch (Throwable cause) {
1272 if (logger.isDebugEnabled()) {
1273 logger.debug("Unexpected exception while loading class {} classname {}",
1274 getClass(), classname, cause);
1275 }
1276 }
1277 }
1278 }
1279
1280 return false;
1281 }
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296 @Deprecated
1297 public static boolean isEncrypted(ByteBuf buffer) {
1298 return isEncrypted(buffer, false);
1299 }
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317 public static boolean isEncrypted(ByteBuf buffer, boolean probeSSLv2) {
1318 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1319 throw new IllegalArgumentException(
1320 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1321 }
1322 return getEncryptedPacketLength(buffer, buffer.readerIndex(), probeSSLv2) != SslUtils.NOT_ENCRYPTED;
1323 }
1324
1325 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1326 int packetLength = this.packetLength;
1327
1328 if (packetLength > 0) {
1329 if (in.readableBytes() < packetLength) {
1330 return;
1331 }
1332 } else {
1333
1334 final int readableBytes = in.readableBytes();
1335 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1336 return;
1337 }
1338 packetLength = getEncryptedPacketLength(in, in.readerIndex(), true);
1339 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1340
1341 NotSslRecordException e = new NotSslRecordException(
1342 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1343 in.skipBytes(in.readableBytes());
1344
1345
1346
1347 setHandshakeFailure(ctx, e);
1348
1349 throw e;
1350 }
1351 if (packetLength == NOT_ENOUGH_DATA) {
1352 return;
1353 }
1354 assert packetLength > 0;
1355 if (packetLength > readableBytes) {
1356
1357 this.packetLength = packetLength;
1358 return;
1359 }
1360 }
1361
1362
1363
1364 this.packetLength = 0;
1365 try {
1366 final int bytesConsumed = unwrap(ctx, in, packetLength);
1367 assert bytesConsumed == packetLength || engine.isInboundDone() :
1368 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1369 bytesConsumed;
1370 } catch (Throwable cause) {
1371 handleUnwrapThrowable(ctx, cause);
1372 }
1373 }
1374
1375 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1376 try {
1377 unwrap(ctx, in, in.readableBytes());
1378 } catch (Throwable cause) {
1379 handleUnwrapThrowable(ctx, cause);
1380 }
1381 }
1382
1383 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1384 try {
1385
1386
1387
1388
1389 if (handshakePromise.tryFailure(cause)) {
1390 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1391 }
1392
1393
1394 if (pendingUnencryptedWrites != null) {
1395
1396
1397 wrapAndFlush(ctx);
1398 }
1399 } catch (SSLException ex) {
1400 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1401 " because of an previous SSLException, ignoring...", ex);
1402 } finally {
1403
1404 setHandshakeFailure(ctx, cause, true, false, true);
1405 }
1406 PlatformDependent.throwException(cause);
1407 }
1408
1409 @Override
1410 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1411 if (isStateSet(STATE_PROCESS_TASK)) {
1412 return;
1413 }
1414 if (jdkCompatibilityMode) {
1415 decodeJdkCompatible(ctx, in);
1416 } else {
1417 decodeNonJdkCompatible(ctx, in);
1418 }
1419 }
1420
1421 @Override
1422 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1423 channelReadComplete0(ctx);
1424 }
1425
1426 private void channelReadComplete0(ChannelHandlerContext ctx) {
1427
1428 discardSomeReadBytes();
1429
1430 flushIfNeeded(ctx);
1431 readIfNeeded(ctx);
1432
1433 clearState(STATE_FIRE_CHANNEL_READ);
1434 ctx.fireChannelReadComplete();
1435 }
1436
1437 private void readIfNeeded(ChannelHandlerContext ctx) {
1438
1439 if (!ctx.channel().config().isAutoRead() &&
1440 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1441
1442
1443 ctx.read();
1444 }
1445 }
1446
1447 private void flushIfNeeded(ChannelHandlerContext ctx) {
1448 if (isStateSet(STATE_NEEDS_FLUSH)) {
1449 forceFlush(ctx);
1450 }
1451 }
1452
1453
1454
1455
1456 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1457 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1458 }
1459
1460
1461
1462
1463 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1464 final int originalLength = length;
1465 boolean wrapLater = false;
1466 boolean notifyClosure = false;
1467 boolean executedRead = false;
1468 ByteBuf decodeOut = allocate(ctx, length);
1469 try {
1470
1471
1472 do {
1473 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1474 final Status status = result.getStatus();
1475 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1476 final int produced = result.bytesProduced();
1477 final int consumed = result.bytesConsumed();
1478
1479
1480
1481
1482 packet.skipBytes(consumed);
1483 length -= consumed;
1484
1485
1486
1487
1488 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1489 wrapLater |= (decodeOut.isReadable() ?
1490 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1491 handshakeStatus == HandshakeStatus.FINISHED || !pendingUnencryptedWrites.isEmpty();
1492 }
1493
1494
1495
1496
1497 if (decodeOut.isReadable()) {
1498 setState(STATE_FIRE_CHANNEL_READ);
1499 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1500 executedRead = true;
1501 executeChannelRead(ctx, decodeOut);
1502 } else {
1503 ctx.fireChannelRead(decodeOut);
1504 }
1505 decodeOut = null;
1506 }
1507
1508 if (status == Status.CLOSED) {
1509 notifyClosure = true;
1510 } else if (status == Status.BUFFER_OVERFLOW) {
1511 if (decodeOut != null) {
1512 decodeOut.release();
1513 }
1514 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1515
1516
1517
1518
1519 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1520 applicationBufferSize : applicationBufferSize - produced));
1521 continue;
1522 }
1523
1524 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1525 boolean pending = runDelegatedTasks(true);
1526 if (!pending) {
1527
1528
1529
1530
1531
1532 wrapLater = false;
1533 break;
1534 }
1535 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1536
1537
1538
1539 if (wrapNonAppData(ctx, true) && length == 0) {
1540 break;
1541 }
1542 }
1543
1544 if (status == Status.BUFFER_UNDERFLOW ||
1545
1546 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1547 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1548 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1549
1550
1551 readIfNeeded(ctx);
1552 }
1553
1554 break;
1555 } else if (decodeOut == null) {
1556 decodeOut = allocate(ctx, length);
1557 }
1558 } while (!ctx.isRemoved());
1559
1560 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1561
1562
1563
1564
1565 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1566 wrapLater = true;
1567 }
1568
1569 if (wrapLater) {
1570 wrap(ctx, true);
1571 }
1572 } finally {
1573 if (decodeOut != null) {
1574 decodeOut.release();
1575 }
1576
1577 if (notifyClosure) {
1578 if (executedRead) {
1579 executeNotifyClosePromise(ctx);
1580 } else {
1581 notifyClosePromise(null);
1582 }
1583 }
1584 }
1585 return originalLength - length;
1586 }
1587
1588 private boolean setHandshakeSuccessUnwrapMarkReentry() throws SSLException {
1589
1590
1591 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1592 if (setReentryState) {
1593 setState(STATE_UNWRAP_REENTRY);
1594 }
1595 try {
1596 return setHandshakeSuccess();
1597 } finally {
1598
1599
1600 if (setReentryState) {
1601 clearState(STATE_UNWRAP_REENTRY);
1602 }
1603 }
1604 }
1605
1606 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1607 try {
1608 ctx.executor().execute(new Runnable() {
1609 @Override
1610 public void run() {
1611 notifyClosePromise(null);
1612 }
1613 });
1614 } catch (RejectedExecutionException e) {
1615 notifyClosePromise(e);
1616 }
1617 }
1618
1619 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1620 try {
1621 ctx.executor().execute(new Runnable() {
1622 @Override
1623 public void run() {
1624 ctx.fireChannelRead(decodedOut);
1625 }
1626 });
1627 } catch (RejectedExecutionException e) {
1628 decodedOut.release();
1629 throw e;
1630 }
1631 }
1632
1633 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1634 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1635 out.nioBuffer(index, len);
1636 }
1637
1638 private static boolean inEventLoop(Executor executor) {
1639 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1640 }
1641
1642
1643
1644
1645
1646
1647
1648
1649 private boolean runDelegatedTasks(boolean inUnwrap) {
1650 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1651
1652
1653 for (;;) {
1654 Runnable task = engine.getDelegatedTask();
1655 if (task == null) {
1656 return true;
1657 }
1658 setState(STATE_PROCESS_TASK);
1659 if (task instanceof AsyncRunnable) {
1660
1661 boolean pending = false;
1662 try {
1663 AsyncRunnable asyncTask = (AsyncRunnable) task;
1664 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1665 asyncTask.run(completionHandler);
1666 pending = completionHandler.resumeLater();
1667 if (pending) {
1668 return false;
1669 }
1670 } finally {
1671 if (!pending) {
1672
1673
1674 clearState(STATE_PROCESS_TASK);
1675 }
1676 }
1677 } else {
1678 try {
1679 task.run();
1680 } finally {
1681 clearState(STATE_PROCESS_TASK);
1682 }
1683 }
1684 }
1685 } else {
1686 executeDelegatedTask(inUnwrap);
1687 return false;
1688 }
1689 }
1690
1691 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1692 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1693 }
1694
1695 private void executeDelegatedTask(boolean inUnwrap) {
1696 executeDelegatedTask(getTaskRunner(inUnwrap));
1697 }
1698
1699 private void executeDelegatedTask(SslTasksRunner task) {
1700 setState(STATE_PROCESS_TASK);
1701 try {
1702 delegatedTaskExecutor.execute(task);
1703 } catch (RejectedExecutionException e) {
1704 clearState(STATE_PROCESS_TASK);
1705 throw e;
1706 }
1707 }
1708
1709 private final class AsyncTaskCompletionHandler implements Runnable {
1710 private final boolean inUnwrap;
1711 boolean didRun;
1712 boolean resumeLater;
1713
1714 AsyncTaskCompletionHandler(boolean inUnwrap) {
1715 this.inUnwrap = inUnwrap;
1716 }
1717
1718 @Override
1719 public void run() {
1720 didRun = true;
1721 if (resumeLater) {
1722 getTaskRunner(inUnwrap).runComplete();
1723 }
1724 }
1725
1726 boolean resumeLater() {
1727 if (!didRun) {
1728 resumeLater = true;
1729 return true;
1730 }
1731 return false;
1732 }
1733 }
1734
1735
1736
1737
1738
1739 private final class SslTasksRunner implements Runnable {
1740 private final boolean inUnwrap;
1741 private final Runnable runCompleteTask = new Runnable() {
1742 @Override
1743 public void run() {
1744 runComplete();
1745 }
1746 };
1747
1748 SslTasksRunner(boolean inUnwrap) {
1749 this.inUnwrap = inUnwrap;
1750 }
1751
1752
1753 private void taskError(Throwable e) {
1754 if (inUnwrap) {
1755
1756
1757
1758
1759 try {
1760 handleUnwrapThrowable(ctx, e);
1761 } catch (Throwable cause) {
1762 safeExceptionCaught(cause);
1763 }
1764 } else {
1765 setHandshakeFailure(ctx, e);
1766 forceFlush(ctx);
1767 }
1768 }
1769
1770
1771 private void safeExceptionCaught(Throwable cause) {
1772 try {
1773 exceptionCaught(ctx, wrapIfNeeded(cause));
1774 } catch (Throwable error) {
1775 ctx.fireExceptionCaught(error);
1776 }
1777 }
1778
1779 private Throwable wrapIfNeeded(Throwable cause) {
1780 if (!inUnwrap) {
1781
1782 return cause;
1783 }
1784
1785
1786 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1787 }
1788
1789 private void tryDecodeAgain() {
1790 try {
1791 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1792 } catch (Throwable cause) {
1793 safeExceptionCaught(cause);
1794 } finally {
1795
1796
1797
1798 channelReadComplete0(ctx);
1799 }
1800 }
1801
1802
1803
1804
1805
1806 private void resumeOnEventExecutor() {
1807 assert ctx.executor().inEventLoop();
1808 clearState(STATE_PROCESS_TASK);
1809 try {
1810 HandshakeStatus status = engine.getHandshakeStatus();
1811 switch (status) {
1812
1813
1814 case NEED_TASK:
1815 executeDelegatedTask(this);
1816
1817 break;
1818
1819
1820 case FINISHED:
1821
1822 case NOT_HANDSHAKING:
1823 setHandshakeSuccess();
1824 try {
1825
1826
1827 wrap(ctx, inUnwrap);
1828 } catch (Throwable e) {
1829 taskError(e);
1830 return;
1831 }
1832 if (inUnwrap) {
1833
1834
1835 unwrapNonAppData(ctx);
1836 }
1837
1838
1839 forceFlush(ctx);
1840
1841 tryDecodeAgain();
1842 break;
1843
1844
1845
1846 case NEED_UNWRAP:
1847 try {
1848 unwrapNonAppData(ctx);
1849 } catch (SSLException e) {
1850 handleUnwrapThrowable(ctx, e);
1851 return;
1852 }
1853 tryDecodeAgain();
1854 break;
1855
1856
1857
1858 case NEED_WRAP:
1859 try {
1860 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1861
1862
1863
1864
1865 unwrapNonAppData(ctx);
1866 }
1867
1868
1869 forceFlush(ctx);
1870 } catch (Throwable e) {
1871 taskError(e);
1872 return;
1873 }
1874
1875
1876 tryDecodeAgain();
1877 break;
1878
1879 default:
1880
1881 throw new AssertionError();
1882 }
1883 } catch (Throwable cause) {
1884 safeExceptionCaught(cause);
1885 }
1886 }
1887
1888 void runComplete() {
1889 EventExecutor executor = ctx.executor();
1890
1891
1892
1893
1894
1895
1896
1897 executor.execute(new Runnable() {
1898 @Override
1899 public void run() {
1900 resumeOnEventExecutor();
1901 }
1902 });
1903 }
1904
1905 @Override
1906 public void run() {
1907 try {
1908 Runnable task = engine.getDelegatedTask();
1909 if (task == null) {
1910
1911 return;
1912 }
1913 if (task instanceof AsyncRunnable) {
1914 AsyncRunnable asyncTask = (AsyncRunnable) task;
1915 asyncTask.run(runCompleteTask);
1916 } else {
1917 task.run();
1918 runComplete();
1919 }
1920 } catch (final Throwable cause) {
1921 handleException(cause);
1922 }
1923 }
1924
1925 private void handleException(final Throwable cause) {
1926 EventExecutor executor = ctx.executor();
1927 if (executor.inEventLoop()) {
1928 clearState(STATE_PROCESS_TASK);
1929 safeExceptionCaught(cause);
1930 } else {
1931 try {
1932 executor.execute(new Runnable() {
1933 @Override
1934 public void run() {
1935 clearState(STATE_PROCESS_TASK);
1936 safeExceptionCaught(cause);
1937 }
1938 });
1939 } catch (RejectedExecutionException ignore) {
1940 clearState(STATE_PROCESS_TASK);
1941
1942
1943 ctx.fireExceptionCaught(cause);
1944 }
1945 }
1946 }
1947 }
1948
1949
1950
1951
1952
1953
1954 private boolean setHandshakeSuccess() throws SSLException {
1955
1956
1957
1958 final SSLSession session = engine.getSession();
1959 if (resumptionController != null && !handshakePromise.isDone()) {
1960 try {
1961 if (resumptionController.validateResumeIfNeeded(engine) && logger.isDebugEnabled()) {
1962 logger.debug("{} Resumed and reauthenticated session", ctx.channel());
1963 }
1964 } catch (CertificateException e) {
1965 SSLHandshakeException exception = new SSLHandshakeException(e.getMessage());
1966 exception.initCause(e);
1967 throw exception;
1968 }
1969 }
1970 final boolean notified;
1971 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1972 if (logger.isDebugEnabled()) {
1973 logger.debug(
1974 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1975 ctx.channel(),
1976 session.getProtocol(),
1977 session.getCipherSuite());
1978 }
1979 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1980 }
1981 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1982 clearState(STATE_READ_DURING_HANDSHAKE);
1983 if (!ctx.channel().config().isAutoRead()) {
1984 ctx.read();
1985 }
1986 }
1987 return notified;
1988 }
1989
1990
1991
1992
1993 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1994 setHandshakeFailure(ctx, cause, true, true, false);
1995 }
1996
1997
1998
1999
2000 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
2001 boolean notify, boolean alwaysFlushAndClose) {
2002 try {
2003
2004 setState(STATE_OUTBOUND_CLOSED);
2005 engine.closeOutbound();
2006
2007 if (closeInbound) {
2008 try {
2009 engine.closeInbound();
2010 } catch (SSLException e) {
2011 if (logger.isDebugEnabled()) {
2012
2013
2014
2015
2016 String msg = e.getMessage();
2017 if (msg == null || !(msg.contains("possible truncation attack") ||
2018 msg.contains("closing inbound before receiving peer's close_notify"))) {
2019 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
2020 }
2021 }
2022 }
2023 }
2024 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
2025 SslUtils.handleHandshakeFailure(ctx, cause, notify);
2026 }
2027 } finally {
2028
2029 releaseAndFailAll(ctx, cause);
2030 }
2031 }
2032
2033 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
2034
2035
2036
2037 try {
2038 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
2039 releaseAndFailAll(ctx, transportFailure);
2040 if (handshakePromise.tryFailure(transportFailure)) {
2041 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2042 }
2043 } finally {
2044 ctx.close();
2045 }
2046 }
2047
2048 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2049 if (resumptionController != null &&
2050 (!engine.getSession().isValid() || cause instanceof SSLHandshakeException)) {
2051 resumptionController.remove(engine());
2052 }
2053 if (pendingUnencryptedWrites != null) {
2054 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2055 }
2056 }
2057
2058 private void notifyClosePromise(Throwable cause) {
2059 if (cause == null) {
2060 if (sslClosePromise.trySuccess(ctx.channel())) {
2061 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2062 }
2063 } else {
2064 if (sslClosePromise.tryFailure(cause)) {
2065 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2066 }
2067 }
2068 }
2069
2070 private void closeOutboundAndChannel(
2071 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2072 setState(STATE_OUTBOUND_CLOSED);
2073 engine.closeOutbound();
2074
2075 if (!ctx.channel().isActive()) {
2076 if (disconnect) {
2077 ctx.disconnect(promise);
2078 } else {
2079 ctx.close(promise);
2080 }
2081 return;
2082 }
2083
2084 ChannelPromise closeNotifyPromise = ctx.newPromise();
2085 try {
2086 flush(ctx, closeNotifyPromise);
2087 } finally {
2088 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2089 setState(STATE_CLOSE_NOTIFY);
2090
2091
2092
2093
2094
2095
2096
2097
2098 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2099 } else {
2100
2101 sslClosePromise.addListener(new FutureListener<Channel>() {
2102 @Override
2103 public void operationComplete(Future<Channel> future) {
2104 promise.setSuccess();
2105 }
2106 });
2107 }
2108 }
2109 }
2110
2111 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2112 if (pendingUnencryptedWrites != null) {
2113 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2114 } else {
2115 promise.setFailure(newPendingWritesNullException());
2116 }
2117 flush(ctx);
2118 }
2119
2120 @Override
2121 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2122 this.ctx = ctx;
2123 Channel channel = ctx.channel();
2124 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2125 @Override
2126 protected int wrapDataSize() {
2127 return SslHandler.this.wrapDataSize;
2128 }
2129 };
2130
2131 setOpensslEngineSocketFd(channel);
2132 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2133 boolean active = channel.isActive();
2134 if (active || fastOpen) {
2135
2136
2137
2138 startHandshakeProcessing(active);
2139
2140
2141 final ChannelOutboundBuffer outboundBuffer;
2142 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2143 outboundBuffer.totalPendingWriteBytes() > 0)) {
2144 setState(STATE_NEEDS_FLUSH);
2145 }
2146 }
2147 }
2148
2149 private void startHandshakeProcessing(boolean flushAtEnd) {
2150 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2151 setState(STATE_HANDSHAKE_STARTED);
2152 if (engine.getUseClientMode()) {
2153
2154
2155
2156 handshake(flushAtEnd);
2157 }
2158 applyHandshakeTimeout();
2159 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2160 forceFlush(ctx);
2161 }
2162 }
2163
2164
2165
2166
2167 public Future<Channel> renegotiate() {
2168 ChannelHandlerContext ctx = this.ctx;
2169 if (ctx == null) {
2170 throw new IllegalStateException();
2171 }
2172
2173 return renegotiate(ctx.executor().<Channel>newPromise());
2174 }
2175
2176
2177
2178
2179 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2180 ObjectUtil.checkNotNull(promise, "promise");
2181
2182 ChannelHandlerContext ctx = this.ctx;
2183 if (ctx == null) {
2184 throw new IllegalStateException();
2185 }
2186
2187 EventExecutor executor = ctx.executor();
2188 if (!executor.inEventLoop()) {
2189 executor.execute(new Runnable() {
2190 @Override
2191 public void run() {
2192 renegotiateOnEventLoop(promise);
2193 }
2194 });
2195 return promise;
2196 }
2197
2198 renegotiateOnEventLoop(promise);
2199 return promise;
2200 }
2201
2202 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2203 final Promise<Channel> oldHandshakePromise = handshakePromise;
2204 if (!oldHandshakePromise.isDone()) {
2205
2206
2207 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2208 } else {
2209 handshakePromise = newHandshakePromise;
2210 handshake(true);
2211 applyHandshakeTimeout();
2212 }
2213 }
2214
2215
2216
2217
2218
2219
2220
2221 private void handshake(boolean flushAtEnd) {
2222 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2223
2224
2225 return;
2226 }
2227 if (handshakePromise.isDone()) {
2228
2229
2230
2231
2232
2233 return;
2234 }
2235
2236
2237 final ChannelHandlerContext ctx = this.ctx;
2238 try {
2239 engine.beginHandshake();
2240 wrapNonAppData(ctx, false);
2241 } catch (Throwable e) {
2242 setHandshakeFailure(ctx, e);
2243 } finally {
2244 if (flushAtEnd) {
2245 forceFlush(ctx);
2246 }
2247 }
2248 }
2249
2250 private void applyHandshakeTimeout() {
2251 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2252
2253
2254 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2255 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2256 return;
2257 }
2258
2259 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2260 @Override
2261 public void run() {
2262 if (localHandshakePromise.isDone()) {
2263 return;
2264 }
2265 SSLException exception =
2266 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2267 try {
2268 if (localHandshakePromise.tryFailure(exception)) {
2269 SslUtils.handleHandshakeFailure(ctx, exception, true);
2270 }
2271 } finally {
2272 releaseAndFailAll(ctx, exception);
2273 }
2274 }
2275 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2276
2277
2278 localHandshakePromise.addListener(new FutureListener<Channel>() {
2279 @Override
2280 public void operationComplete(Future<Channel> f) throws Exception {
2281 timeoutFuture.cancel(false);
2282 }
2283 });
2284 }
2285
2286 private void forceFlush(ChannelHandlerContext ctx) {
2287 clearState(STATE_NEEDS_FLUSH);
2288 ctx.flush();
2289 }
2290
2291 private void setOpensslEngineSocketFd(Channel c) {
2292 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2293 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2294 }
2295 }
2296
2297
2298
2299
2300 @Override
2301 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2302 setOpensslEngineSocketFd(ctx.channel());
2303 if (!startTls) {
2304 startHandshakeProcessing(true);
2305 }
2306 ctx.fireChannelActive();
2307 }
2308
2309 private void safeClose(
2310 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2311 final ChannelPromise promise) {
2312 if (!ctx.channel().isActive()) {
2313 ctx.close(promise);
2314 return;
2315 }
2316
2317 final Future<?> timeoutFuture;
2318 if (!flushFuture.isDone()) {
2319 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2320 if (closeNotifyTimeout > 0) {
2321
2322 timeoutFuture = ctx.executor().schedule(new Runnable() {
2323 @Override
2324 public void run() {
2325
2326 if (!flushFuture.isDone()) {
2327 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2328 ctx.channel());
2329 addCloseListener(ctx.close(ctx.newPromise()), promise);
2330 }
2331 }
2332 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2333 } else {
2334 timeoutFuture = null;
2335 }
2336 } else {
2337 timeoutFuture = null;
2338 }
2339
2340
2341 flushFuture.addListener(new ChannelFutureListener() {
2342 @Override
2343 public void operationComplete(ChannelFuture f) {
2344 if (timeoutFuture != null) {
2345 timeoutFuture.cancel(false);
2346 }
2347 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2348 if (closeNotifyReadTimeout <= 0) {
2349
2350
2351 addCloseListener(ctx.close(ctx.newPromise()), promise);
2352 } else {
2353 final Future<?> closeNotifyReadTimeoutFuture;
2354
2355 if (!sslClosePromise.isDone()) {
2356 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2357 @Override
2358 public void run() {
2359 if (!sslClosePromise.isDone()) {
2360 logger.debug(
2361 "{} did not receive close_notify in {}ms; force-closing the connection.",
2362 ctx.channel(), closeNotifyReadTimeout);
2363
2364
2365 addCloseListener(ctx.close(ctx.newPromise()), promise);
2366 }
2367 }
2368 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2369 } else {
2370 closeNotifyReadTimeoutFuture = null;
2371 }
2372
2373
2374 sslClosePromise.addListener(new FutureListener<Channel>() {
2375 @Override
2376 public void operationComplete(Future<Channel> future) throws Exception {
2377 if (closeNotifyReadTimeoutFuture != null) {
2378 closeNotifyReadTimeoutFuture.cancel(false);
2379 }
2380 addCloseListener(ctx.close(ctx.newPromise()), promise);
2381 }
2382 });
2383 }
2384 }
2385 });
2386 }
2387
2388 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2389
2390
2391
2392
2393
2394
2395 PromiseNotifier.cascade(false, future, promise);
2396 }
2397
2398
2399
2400
2401
2402 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2403 ByteBufAllocator alloc = ctx.alloc();
2404 if (engineType.wantsDirectBuffer) {
2405 return alloc.directBuffer(capacity);
2406 } else {
2407 return alloc.buffer(capacity);
2408 }
2409 }
2410
2411
2412
2413
2414
2415 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2416 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2417 }
2418
2419 private boolean isStateSet(int bit) {
2420 return (state & bit) == bit;
2421 }
2422
2423 private void setState(int bit) {
2424 state |= bit;
2425 }
2426
2427 private void clearState(int bit) {
2428 state &= ~bit;
2429 }
2430
2431 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2432
2433 @Override
2434 protected EventExecutor executor() {
2435 if (ctx == null) {
2436 throw new IllegalStateException();
2437 }
2438 return ctx.executor();
2439 }
2440
2441 @Override
2442 protected void checkDeadLock() {
2443 if (ctx == null) {
2444
2445
2446
2447
2448
2449
2450 return;
2451 }
2452 super.checkDeadLock();
2453 }
2454 }
2455 }