1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.Channel;
24 import io.netty.channel.ChannelConfig;
25 import io.netty.channel.ChannelException;
26 import io.netty.channel.ChannelFuture;
27 import io.netty.channel.ChannelFutureListener;
28 import io.netty.channel.ChannelHandlerContext;
29 import io.netty.channel.ChannelInboundHandler;
30 import io.netty.channel.ChannelOption;
31 import io.netty.channel.ChannelOutboundBuffer;
32 import io.netty.channel.ChannelOutboundHandler;
33 import io.netty.channel.ChannelPipeline;
34 import io.netty.channel.ChannelPromise;
35 import io.netty.channel.unix.UnixChannel;
36 import io.netty.handler.codec.ByteToMessageDecoder;
37 import io.netty.handler.codec.DecoderException;
38 import io.netty.handler.codec.UnsupportedMessageTypeException;
39 import io.netty.util.ReferenceCountUtil;
40 import io.netty.util.concurrent.DefaultPromise;
41 import io.netty.util.concurrent.EventExecutor;
42 import io.netty.util.concurrent.Future;
43 import io.netty.util.concurrent.FutureListener;
44 import io.netty.util.concurrent.ImmediateExecutor;
45 import io.netty.util.concurrent.Promise;
46 import io.netty.util.concurrent.PromiseNotifier;
47 import io.netty.util.internal.ObjectUtil;
48 import io.netty.util.internal.PlatformDependent;
49 import io.netty.util.internal.ThrowableUtil;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.util.List;
61 import java.util.concurrent.Executor;
62 import java.util.concurrent.RejectedExecutionException;
63 import java.util.concurrent.TimeUnit;
64 import java.util.regex.Pattern;
65
66 import javax.net.ssl.SSLEngine;
67 import javax.net.ssl.SSLEngineResult;
68 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
69 import javax.net.ssl.SSLEngineResult.Status;
70 import javax.net.ssl.SSLException;
71 import javax.net.ssl.SSLHandshakeException;
72 import javax.net.ssl.SSLSession;
73
74 import static io.netty.handler.ssl.SslUtils.NOT_ENOUGH_DATA;
75 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
76 import static io.netty.util.internal.ObjectUtil.checkNotNull;
77 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
170 private static final InternalLogger logger =
171 InternalLoggerFactory.getInstance(SslHandler.class);
172 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
173 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
174 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
175 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
176 private static final int STATE_SENT_FIRST_MESSAGE = 1;
177 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
178 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
179 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
180
181
182
183
184 private static final int STATE_NEEDS_FLUSH = 1 << 4;
185 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
186 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
187 private static final int STATE_PROCESS_TASK = 1 << 7;
188
189
190
191
192 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
193 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
194
195
196
197
198
199 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
200
201 private enum SslEngineType {
202 TCNATIVE(true, COMPOSITE_CUMULATOR) {
203 @Override
204 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
205 int nioBufferCount = in.nioBufferCount();
206 int writerIndex = out.writerIndex();
207 final SSLEngineResult result;
208 if (nioBufferCount > 1) {
209
210
211
212
213
214 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
215 try {
216 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
217 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
218 } finally {
219 handler.singleBuffer[0] = null;
220 }
221 } else {
222 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
223 toByteBuffer(out, writerIndex, out.writableBytes()));
224 }
225 out.writerIndex(writerIndex + result.bytesProduced());
226 return result;
227 }
228
229 @Override
230 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
231 int pendingBytes, int numComponents) {
232 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
233 .calculateOutNetBufSize(pendingBytes, numComponents));
234 }
235
236 @Override
237 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
238 return ((ReferenceCountedOpenSslEngine) handler.engine)
239 .calculateMaxLengthForWrap(pendingBytes, numComponents);
240 }
241
242 @Override
243 int calculatePendingData(SslHandler handler, int guess) {
244 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
245 return sslPending > 0 ? sslPending : guess;
246 }
247
248 @Override
249 boolean jdkCompatibilityMode(SSLEngine engine) {
250 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
251 }
252 },
253 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
254 @Override
255 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
256 int nioBufferCount = in.nioBufferCount();
257 int writerIndex = out.writerIndex();
258 final SSLEngineResult result;
259 if (nioBufferCount > 1) {
260
261
262
263 try {
264 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
265 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
266 in.nioBuffers(in.readerIndex(), len),
267 handler.singleBuffer);
268 } finally {
269 handler.singleBuffer[0] = null;
270 }
271 } else {
272 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
273 toByteBuffer(out, writerIndex, out.writableBytes()));
274 }
275 out.writerIndex(writerIndex + result.bytesProduced());
276 return result;
277 }
278
279 @Override
280 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
281 int pendingBytes, int numComponents) {
282 return allocator.directBuffer(
283 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
284 }
285
286 @Override
287 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
288 return ((ConscryptAlpnSslEngine) handler.engine)
289 .calculateRequiredOutBufSpace(pendingBytes, numComponents);
290 }
291
292 @Override
293 int calculatePendingData(SslHandler handler, int guess) {
294 return guess;
295 }
296
297 @Override
298 boolean jdkCompatibilityMode(SSLEngine engine) {
299 return true;
300 }
301 },
302 JDK(false, MERGE_CUMULATOR) {
303 @Override
304 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
305 int writerIndex = out.writerIndex();
306 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
307 int position = inNioBuffer.position();
308 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
309 toByteBuffer(out, writerIndex, out.writableBytes()));
310 out.writerIndex(writerIndex + result.bytesProduced());
311
312
313
314
315
316
317
318 if (result.bytesConsumed() == 0) {
319 int consumed = inNioBuffer.position() - position;
320 if (consumed != result.bytesConsumed()) {
321
322 return new SSLEngineResult(
323 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
324 }
325 }
326 return result;
327 }
328
329 @Override
330 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
331 int pendingBytes, int numComponents) {
332
333
334 return allocator.heapBuffer(Math.max(pendingBytes, handler.engine.getSession().getPacketBufferSize()));
335 }
336
337 @Override
338 int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents) {
339
340
341
342
343
344
345 return handler.engine.getSession().getPacketBufferSize();
346 }
347
348 @Override
349 int calculatePendingData(SslHandler handler, int guess) {
350 return guess;
351 }
352
353 @Override
354 boolean jdkCompatibilityMode(SSLEngine engine) {
355 return true;
356 }
357 };
358
359 static SslEngineType forEngine(SSLEngine engine) {
360 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
361 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
362 }
363
364 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
365 this.wantsDirectBuffer = wantsDirectBuffer;
366 this.cumulator = cumulator;
367 }
368
369 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
370
371 abstract int calculatePendingData(SslHandler handler, int guess);
372
373 abstract boolean jdkCompatibilityMode(SSLEngine engine);
374
375 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
376 int pendingBytes, int numComponents);
377
378 abstract int calculateRequiredOutBufSpace(SslHandler handler, int pendingBytes, int numComponents);
379
380
381
382
383
384
385
386 final boolean wantsDirectBuffer;
387
388
389
390
391
392
393
394
395
396
397
398 final Cumulator cumulator;
399 }
400
401 private volatile ChannelHandlerContext ctx;
402 private final SSLEngine engine;
403 private final SslEngineType engineType;
404 private final Executor delegatedTaskExecutor;
405 private final boolean jdkCompatibilityMode;
406
407
408
409
410
411
412 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
413
414 private final boolean startTls;
415
416 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
417 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
418
419 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
420 private Promise<Channel> handshakePromise = new LazyChannelPromise();
421 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
422
423 private int packetLength;
424 private short state;
425
426 private volatile long handshakeTimeoutMillis = 10000;
427 private volatile long closeNotifyFlushTimeoutMillis = 3000;
428 private volatile long closeNotifyReadTimeoutMillis;
429 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
430
431
432
433
434
435
436 public SslHandler(SSLEngine engine) {
437 this(engine, false);
438 }
439
440
441
442
443
444
445
446
447 public SslHandler(SSLEngine engine, boolean startTls) {
448 this(engine, startTls, ImmediateExecutor.INSTANCE);
449 }
450
451
452
453
454
455
456
457
458 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
459 this(engine, false, delegatedTaskExecutor);
460 }
461
462
463
464
465
466
467
468
469
470
471 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
472 this.engine = ObjectUtil.checkNotNull(engine, "engine");
473 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
474 engineType = SslEngineType.forEngine(engine);
475 this.startTls = startTls;
476 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
477 setCumulator(engineType.cumulator);
478 }
479
480 public long getHandshakeTimeoutMillis() {
481 return handshakeTimeoutMillis;
482 }
483
484 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
485 checkNotNull(unit, "unit");
486 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
487 }
488
489 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
490 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
491 }
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513 @UnstableApi
514 public final void setWrapDataSize(int wrapDataSize) {
515 this.wrapDataSize = wrapDataSize;
516 }
517
518
519
520
521 @Deprecated
522 public long getCloseNotifyTimeoutMillis() {
523 return getCloseNotifyFlushTimeoutMillis();
524 }
525
526
527
528
529 @Deprecated
530 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
531 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
532 }
533
534
535
536
537 @Deprecated
538 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
539 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
540 }
541
542
543
544
545
546
547 public final long getCloseNotifyFlushTimeoutMillis() {
548 return closeNotifyFlushTimeoutMillis;
549 }
550
551
552
553
554
555
556 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
557 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
558 }
559
560
561
562
563 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
564 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
565 "closeNotifyFlushTimeoutMillis");
566 }
567
568
569
570
571
572
573 public final long getCloseNotifyReadTimeoutMillis() {
574 return closeNotifyReadTimeoutMillis;
575 }
576
577
578
579
580
581
582 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
583 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
584 }
585
586
587
588
589 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
590 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
591 "closeNotifyReadTimeoutMillis");
592 }
593
594
595
596
597 public SSLEngine engine() {
598 return engine;
599 }
600
601
602
603
604
605
606 public String applicationProtocol() {
607 SSLEngine engine = engine();
608 if (!(engine instanceof ApplicationProtocolAccessor)) {
609 return null;
610 }
611
612 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
613 }
614
615
616
617
618
619
620
621 public Future<Channel> handshakeFuture() {
622 return handshakePromise;
623 }
624
625
626
627
628 @Deprecated
629 public ChannelFuture close() {
630 return closeOutbound();
631 }
632
633
634
635
636 @Deprecated
637 public ChannelFuture close(ChannelPromise promise) {
638 return closeOutbound(promise);
639 }
640
641
642
643
644
645
646
647 public ChannelFuture closeOutbound() {
648 return closeOutbound(ctx.newPromise());
649 }
650
651
652
653
654
655
656
657 public ChannelFuture closeOutbound(final ChannelPromise promise) {
658 final ChannelHandlerContext ctx = this.ctx;
659 if (ctx.executor().inEventLoop()) {
660 closeOutbound0(promise);
661 } else {
662 ctx.executor().execute(new Runnable() {
663 @Override
664 public void run() {
665 closeOutbound0(promise);
666 }
667 });
668 }
669 return promise;
670 }
671
672 private void closeOutbound0(ChannelPromise promise) {
673 setState(STATE_OUTBOUND_CLOSED);
674 engine.closeOutbound();
675 try {
676 flush(ctx, promise);
677 } catch (Exception e) {
678 if (!promise.tryFailure(e)) {
679 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
680 }
681 }
682 }
683
684
685
686
687
688
689
690
691 public Future<Channel> sslCloseFuture() {
692 return sslClosePromise;
693 }
694
695 @Override
696 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
697 try {
698 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
699
700 pendingUnencryptedWrites.releaseAndFailAll(ctx,
701 new ChannelException("Pending write on removal of SslHandler"));
702 }
703 pendingUnencryptedWrites = null;
704
705 SSLException cause = null;
706
707
708
709
710 if (!handshakePromise.isDone()) {
711 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
712 if (handshakePromise.tryFailure(cause)) {
713 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
714 }
715 }
716 if (!sslClosePromise.isDone()) {
717 if (cause == null) {
718 cause = new SSLException("SslHandler removed before SSLEngine was closed");
719 }
720 notifyClosePromise(cause);
721 }
722 } finally {
723 ReferenceCountUtil.release(engine);
724 }
725 }
726
727 @Override
728 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
729 ctx.bind(localAddress, promise);
730 }
731
732 @Override
733 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
734 ChannelPromise promise) throws Exception {
735 ctx.connect(remoteAddress, localAddress, promise);
736 }
737
738 @Override
739 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
740 ctx.deregister(promise);
741 }
742
743 @Override
744 public void disconnect(final ChannelHandlerContext ctx,
745 final ChannelPromise promise) throws Exception {
746 closeOutboundAndChannel(ctx, promise, true);
747 }
748
749 @Override
750 public void close(final ChannelHandlerContext ctx,
751 final ChannelPromise promise) throws Exception {
752 closeOutboundAndChannel(ctx, promise, false);
753 }
754
755 @Override
756 public void read(ChannelHandlerContext ctx) throws Exception {
757 if (!handshakePromise.isDone()) {
758 setState(STATE_READ_DURING_HANDSHAKE);
759 }
760
761 ctx.read();
762 }
763
764 private static IllegalStateException newPendingWritesNullException() {
765 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
766 }
767
768 @Override
769 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
770 if (!(msg instanceof ByteBuf)) {
771 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
772 ReferenceCountUtil.safeRelease(msg);
773 promise.setFailure(exception);
774 } else if (pendingUnencryptedWrites == null) {
775 ReferenceCountUtil.safeRelease(msg);
776 promise.setFailure(newPendingWritesNullException());
777 } else {
778 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
779 }
780 }
781
782 @Override
783 public void flush(ChannelHandlerContext ctx) throws Exception {
784
785
786 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
787 setState(STATE_SENT_FIRST_MESSAGE);
788 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
789 forceFlush(ctx);
790
791
792 startHandshakeProcessing(true);
793 return;
794 }
795
796 if (isStateSet(STATE_PROCESS_TASK)) {
797 return;
798 }
799
800 try {
801 wrapAndFlush(ctx);
802 } catch (Throwable cause) {
803 setHandshakeFailure(ctx, cause);
804 PlatformDependent.throwException(cause);
805 }
806 }
807
808 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
809 if (pendingUnencryptedWrites.isEmpty()) {
810
811
812
813
814 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
815 }
816 if (!handshakePromise.isDone()) {
817 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
818 }
819 try {
820 wrap(ctx, false);
821 } finally {
822
823
824 forceFlush(ctx);
825 }
826 }
827
828
829 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
830 ByteBuf out = null;
831 ByteBufAllocator alloc = ctx.alloc();
832 try {
833 final int wrapDataSize = this.wrapDataSize;
834
835
836 outer: while (!ctx.isRemoved()) {
837 ChannelPromise promise = ctx.newPromise();
838 ByteBuf buf = wrapDataSize > 0 ?
839 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
840 pendingUnencryptedWrites.removeFirst(promise);
841 if (buf == null) {
842 break;
843 }
844
845 SSLEngineResult result;
846
847 if (buf.readableBytes() > MAX_PLAINTEXT_LENGTH) {
848
849
850
851
852
853 int readableBytes = buf.readableBytes();
854 int numPackets = readableBytes / MAX_PLAINTEXT_LENGTH;
855 if (readableBytes % MAX_PLAINTEXT_LENGTH != 0) {
856 numPackets += 1;
857 }
858
859 if (out == null) {
860 out = allocateOutNetBuf(ctx, readableBytes, buf.nioBufferCount() + numPackets);
861 }
862 result = wrapMultiple(alloc, engine, buf, out);
863 } else {
864 if (out == null) {
865 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
866 }
867 result = wrap(alloc, engine, buf, out);
868 }
869
870 if (buf.isReadable()) {
871 pendingUnencryptedWrites.addFirst(buf, promise);
872
873
874 promise = null;
875 } else {
876 buf.release();
877 }
878
879
880
881 if (out.isReadable()) {
882 final ByteBuf b = out;
883 out = null;
884 if (promise != null) {
885 ctx.write(b, promise);
886 } else {
887 ctx.write(b);
888 }
889 } else if (promise != null) {
890 ctx.write(Unpooled.EMPTY_BUFFER, promise);
891 }
892
893
894 if (result.getStatus() == Status.CLOSED) {
895
896
897 if (!pendingUnencryptedWrites.isEmpty()) {
898
899
900 Throwable exception = handshakePromise.cause();
901 if (exception == null) {
902 exception = sslClosePromise.cause();
903 if (exception == null) {
904 exception = new SslClosedEngineException("SSLEngine closed already");
905 }
906 }
907 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
908 }
909
910 return;
911 } else {
912 switch (result.getHandshakeStatus()) {
913 case NEED_TASK:
914 if (!runDelegatedTasks(inUnwrap)) {
915
916
917 break outer;
918 }
919 break;
920 case FINISHED:
921 case NOT_HANDSHAKING:
922 setHandshakeSuccess();
923 break;
924 case NEED_WRAP:
925
926
927
928
929 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
930 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
931 }
932 break;
933 case NEED_UNWRAP:
934
935
936 readIfNeeded(ctx);
937 return;
938 default:
939 throw new IllegalStateException(
940 "Unknown handshake status: " + result.getHandshakeStatus());
941 }
942 }
943 }
944 } finally {
945 if (out != null) {
946 out.release();
947 }
948 if (inUnwrap) {
949 setState(STATE_NEEDS_FLUSH);
950 }
951 }
952 }
953
954
955
956
957
958
959
960 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
961 ByteBuf out = null;
962 ByteBufAllocator alloc = ctx.alloc();
963 try {
964
965
966 outer: while (!ctx.isRemoved()) {
967 if (out == null) {
968
969
970
971 out = allocateOutNetBuf(ctx, 2048, 1);
972 }
973 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
974 if (result.bytesProduced() > 0) {
975 ctx.write(out).addListener(new ChannelFutureListener() {
976 @Override
977 public void operationComplete(ChannelFuture future) {
978 Throwable cause = future.cause();
979 if (cause != null) {
980 setHandshakeFailureTransportFailure(ctx, cause);
981 }
982 }
983 });
984 if (inUnwrap) {
985 setState(STATE_NEEDS_FLUSH);
986 }
987 out = null;
988 }
989
990 HandshakeStatus status = result.getHandshakeStatus();
991 switch (status) {
992 case FINISHED:
993
994
995
996
997 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
998 wrap(ctx, true);
999 }
1000 return false;
1001 case NEED_TASK:
1002 if (!runDelegatedTasks(inUnwrap)) {
1003
1004
1005 break outer;
1006 }
1007 break;
1008 case NEED_UNWRAP:
1009 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
1010
1011
1012
1013 return false;
1014 }
1015 break;
1016 case NEED_WRAP:
1017 break;
1018 case NOT_HANDSHAKING:
1019 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
1020 wrap(ctx, true);
1021 }
1022
1023
1024 if (!inUnwrap) {
1025 unwrapNonAppData(ctx);
1026 }
1027 return true;
1028 default:
1029 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
1030 }
1031
1032
1033
1034 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
1035 break;
1036 }
1037
1038
1039
1040 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
1041 break;
1042 }
1043 }
1044 } finally {
1045 if (out != null) {
1046 out.release();
1047 }
1048 }
1049 return false;
1050 }
1051
1052 private SSLEngineResult wrapMultiple(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1053 throws SSLException {
1054 SSLEngineResult result = null;
1055
1056 do {
1057 int nextSliceSize = Math.min(MAX_PLAINTEXT_LENGTH, in.readableBytes());
1058
1059
1060 int nextOutSize = engineType.calculateRequiredOutBufSpace(this, nextSliceSize, in.nioBufferCount());
1061
1062 if (!out.isWritable(nextOutSize)) {
1063 if (result != null) {
1064
1065
1066 break;
1067 }
1068
1069
1070 out.ensureWritable(nextOutSize);
1071 }
1072
1073 ByteBuf wrapBuf = in.readSlice(nextSliceSize);
1074 result = wrap(alloc, engine, wrapBuf, out);
1075
1076 if (result.getStatus() == Status.CLOSED) {
1077
1078
1079 break;
1080 }
1081
1082 if (wrapBuf.isReadable()) {
1083
1084
1085 in.readerIndex(in.readerIndex() - wrapBuf.readableBytes());
1086 }
1087 } while (in.readableBytes() > 0);
1088
1089 return result;
1090 }
1091
1092 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1093 throws SSLException {
1094 ByteBuf newDirectIn = null;
1095 try {
1096 int readerIndex = in.readerIndex();
1097 int readableBytes = in.readableBytes();
1098
1099
1100
1101 final ByteBuffer[] in0;
1102 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1103
1104
1105
1106
1107 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1108 in0 = singleBuffer;
1109
1110
1111 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1112 } else {
1113 in0 = in.nioBuffers();
1114 }
1115 } else {
1116
1117
1118
1119 newDirectIn = alloc.directBuffer(readableBytes);
1120 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1121 in0 = singleBuffer;
1122 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1123 }
1124
1125 for (;;) {
1126
1127
1128 ByteBuffer out0 = toByteBuffer(out, out.writerIndex(), out.writableBytes());
1129 SSLEngineResult result = engine.wrap(in0, out0);
1130 in.skipBytes(result.bytesConsumed());
1131 out.writerIndex(out.writerIndex() + result.bytesProduced());
1132
1133 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1134 out.ensureWritable(engine.getSession().getPacketBufferSize());
1135 } else {
1136 return result;
1137 }
1138 }
1139 } finally {
1140
1141 singleBuffer[0] = null;
1142
1143 if (newDirectIn != null) {
1144 newDirectIn.release();
1145 }
1146 }
1147 }
1148
1149 @Override
1150 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1151 boolean handshakeFailed = handshakePromise.cause() != null;
1152
1153
1154 ClosedChannelException exception = new ClosedChannelException();
1155
1156
1157 if (isStateSet(STATE_HANDSHAKE_STARTED) && !handshakePromise.isDone()) {
1158 ThrowableUtil.addSuppressed(exception, StacklessSSLHandshakeException.newInstance(
1159 "Connection closed while SSL/TLS handshake was in progress",
1160 SslHandler.class, "channelInactive"));
1161 }
1162
1163
1164
1165 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1166 false);
1167
1168
1169 notifyClosePromise(exception);
1170
1171 try {
1172 super.channelInactive(ctx);
1173 } catch (DecoderException e) {
1174 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1175
1176
1177
1178
1179
1180 throw e;
1181 }
1182 }
1183 }
1184
1185 @Override
1186 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1187 if (ignoreException(cause)) {
1188
1189
1190 if (logger.isDebugEnabled()) {
1191 logger.debug(
1192 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1193 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1194 }
1195
1196
1197
1198 if (ctx.channel().isActive()) {
1199 ctx.close();
1200 }
1201 } else {
1202 ctx.fireExceptionCaught(cause);
1203 }
1204 }
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215 private boolean ignoreException(Throwable t) {
1216 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1217 String message = t.getMessage();
1218
1219
1220
1221 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1222 return true;
1223 }
1224
1225
1226 StackTraceElement[] elements = t.getStackTrace();
1227 for (StackTraceElement element: elements) {
1228 String classname = element.getClassName();
1229 String methodname = element.getMethodName();
1230
1231
1232 if (classname.startsWith("io.netty.")) {
1233 continue;
1234 }
1235
1236
1237 if (!"read".equals(methodname)) {
1238 continue;
1239 }
1240
1241
1242
1243 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1244 return true;
1245 }
1246
1247 try {
1248
1249
1250
1251 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1252
1253 if (SocketChannel.class.isAssignableFrom(clazz)
1254 || DatagramChannel.class.isAssignableFrom(clazz)) {
1255 return true;
1256 }
1257
1258
1259 if (PlatformDependent.javaVersion() >= 7
1260 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1261 return true;
1262 }
1263 } catch (Throwable cause) {
1264 if (logger.isDebugEnabled()) {
1265 logger.debug("Unexpected exception while loading class {} classname {}",
1266 getClass(), classname, cause);
1267 }
1268 }
1269 }
1270 }
1271
1272 return false;
1273 }
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287 public static boolean isEncrypted(ByteBuf buffer) {
1288 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1289 throw new IllegalArgumentException(
1290 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1291 }
1292 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1293 }
1294
1295 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1296 int packetLength = this.packetLength;
1297
1298 if (packetLength > 0) {
1299 if (in.readableBytes() < packetLength) {
1300 return;
1301 }
1302 } else {
1303
1304 final int readableBytes = in.readableBytes();
1305 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1306 return;
1307 }
1308 packetLength = getEncryptedPacketLength(in, in.readerIndex());
1309 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1310
1311 NotSslRecordException e = new NotSslRecordException(
1312 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1313 in.skipBytes(in.readableBytes());
1314
1315
1316
1317 setHandshakeFailure(ctx, e);
1318
1319 throw e;
1320 }
1321 if (packetLength == NOT_ENOUGH_DATA) {
1322 return;
1323 }
1324 assert packetLength > 0;
1325 if (packetLength > readableBytes) {
1326
1327 this.packetLength = packetLength;
1328 return;
1329 }
1330 }
1331
1332
1333
1334 this.packetLength = 0;
1335 try {
1336 final int bytesConsumed = unwrap(ctx, in, packetLength);
1337 assert bytesConsumed == packetLength || engine.isInboundDone() :
1338 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1339 bytesConsumed;
1340 } catch (Throwable cause) {
1341 handleUnwrapThrowable(ctx, cause);
1342 }
1343 }
1344
1345 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1346 try {
1347 unwrap(ctx, in, in.readableBytes());
1348 } catch (Throwable cause) {
1349 handleUnwrapThrowable(ctx, cause);
1350 }
1351 }
1352
1353 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1354 try {
1355
1356
1357
1358
1359 if (handshakePromise.tryFailure(cause)) {
1360 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1361 }
1362
1363
1364 if (pendingUnencryptedWrites != null) {
1365
1366
1367 wrapAndFlush(ctx);
1368 }
1369 } catch (SSLException ex) {
1370 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1371 " because of an previous SSLException, ignoring...", ex);
1372 } finally {
1373
1374 setHandshakeFailure(ctx, cause, true, false, true);
1375 }
1376 PlatformDependent.throwException(cause);
1377 }
1378
1379 @Override
1380 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1381 if (isStateSet(STATE_PROCESS_TASK)) {
1382 return;
1383 }
1384 if (jdkCompatibilityMode) {
1385 decodeJdkCompatible(ctx, in);
1386 } else {
1387 decodeNonJdkCompatible(ctx, in);
1388 }
1389 }
1390
1391 @Override
1392 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1393 channelReadComplete0(ctx);
1394 }
1395
1396 private void channelReadComplete0(ChannelHandlerContext ctx) {
1397
1398 discardSomeReadBytes();
1399
1400 flushIfNeeded(ctx);
1401 readIfNeeded(ctx);
1402
1403 clearState(STATE_FIRE_CHANNEL_READ);
1404 ctx.fireChannelReadComplete();
1405 }
1406
1407 private void readIfNeeded(ChannelHandlerContext ctx) {
1408
1409 if (!ctx.channel().config().isAutoRead() &&
1410 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1411
1412
1413 ctx.read();
1414 }
1415 }
1416
1417 private void flushIfNeeded(ChannelHandlerContext ctx) {
1418 if (isStateSet(STATE_NEEDS_FLUSH)) {
1419 forceFlush(ctx);
1420 }
1421 }
1422
1423
1424
1425
1426 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1427 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1428 }
1429
1430
1431
1432
1433 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1434 final int originalLength = length;
1435 boolean wrapLater = false;
1436 boolean notifyClosure = false;
1437 boolean executedRead = false;
1438 ByteBuf decodeOut = allocate(ctx, length);
1439 try {
1440
1441
1442 do {
1443 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1444 final Status status = result.getStatus();
1445 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1446 final int produced = result.bytesProduced();
1447 final int consumed = result.bytesConsumed();
1448
1449
1450
1451
1452 packet.skipBytes(consumed);
1453 length -= consumed;
1454
1455
1456
1457
1458 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1459 wrapLater |= (decodeOut.isReadable() ?
1460 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1461 handshakeStatus == HandshakeStatus.FINISHED;
1462 }
1463
1464
1465
1466
1467 if (decodeOut.isReadable()) {
1468 setState(STATE_FIRE_CHANNEL_READ);
1469 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1470 executedRead = true;
1471 executeChannelRead(ctx, decodeOut);
1472 } else {
1473 ctx.fireChannelRead(decodeOut);
1474 }
1475 decodeOut = null;
1476 }
1477
1478 if (status == Status.CLOSED) {
1479 notifyClosure = true;
1480 } else if (status == Status.BUFFER_OVERFLOW) {
1481 if (decodeOut != null) {
1482 decodeOut.release();
1483 }
1484 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1485
1486
1487
1488
1489 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1490 applicationBufferSize : applicationBufferSize - produced));
1491 continue;
1492 }
1493
1494 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1495 boolean pending = runDelegatedTasks(true);
1496 if (!pending) {
1497
1498
1499
1500
1501
1502 wrapLater = false;
1503 break;
1504 }
1505 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1506
1507
1508
1509 if (wrapNonAppData(ctx, true) && length == 0) {
1510 break;
1511 }
1512 }
1513
1514 if (status == Status.BUFFER_UNDERFLOW ||
1515
1516 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1517 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1518 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1519
1520
1521 readIfNeeded(ctx);
1522 }
1523
1524 break;
1525 } else if (decodeOut == null) {
1526 decodeOut = allocate(ctx, length);
1527 }
1528 } while (!ctx.isRemoved());
1529
1530 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1531
1532
1533
1534
1535 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1536 wrapLater = true;
1537 }
1538
1539 if (wrapLater) {
1540 wrap(ctx, true);
1541 }
1542 } finally {
1543 if (decodeOut != null) {
1544 decodeOut.release();
1545 }
1546
1547 if (notifyClosure) {
1548 if (executedRead) {
1549 executeNotifyClosePromise(ctx);
1550 } else {
1551 notifyClosePromise(null);
1552 }
1553 }
1554 }
1555 return originalLength - length;
1556 }
1557
1558 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1559
1560
1561 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1562 if (setReentryState) {
1563 setState(STATE_UNWRAP_REENTRY);
1564 }
1565 try {
1566 return setHandshakeSuccess();
1567 } finally {
1568
1569
1570 if (setReentryState) {
1571 clearState(STATE_UNWRAP_REENTRY);
1572 }
1573 }
1574 }
1575
1576 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1577 try {
1578 ctx.executor().execute(new Runnable() {
1579 @Override
1580 public void run() {
1581 notifyClosePromise(null);
1582 }
1583 });
1584 } catch (RejectedExecutionException e) {
1585 notifyClosePromise(e);
1586 }
1587 }
1588
1589 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1590 try {
1591 ctx.executor().execute(new Runnable() {
1592 @Override
1593 public void run() {
1594 ctx.fireChannelRead(decodedOut);
1595 }
1596 });
1597 } catch (RejectedExecutionException e) {
1598 decodedOut.release();
1599 throw e;
1600 }
1601 }
1602
1603 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1604 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1605 out.nioBuffer(index, len);
1606 }
1607
1608 private static boolean inEventLoop(Executor executor) {
1609 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1610 }
1611
1612
1613
1614
1615
1616
1617
1618
1619 private boolean runDelegatedTasks(boolean inUnwrap) {
1620 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1621
1622
1623 for (;;) {
1624 Runnable task = engine.getDelegatedTask();
1625 if (task == null) {
1626 return true;
1627 }
1628 setState(STATE_PROCESS_TASK);
1629 if (task instanceof AsyncRunnable) {
1630
1631 boolean pending = false;
1632 try {
1633 AsyncRunnable asyncTask = (AsyncRunnable) task;
1634 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1635 asyncTask.run(completionHandler);
1636 pending = completionHandler.resumeLater();
1637 if (pending) {
1638 return false;
1639 }
1640 } finally {
1641 if (!pending) {
1642
1643
1644 clearState(STATE_PROCESS_TASK);
1645 }
1646 }
1647 } else {
1648 try {
1649 task.run();
1650 } finally {
1651 clearState(STATE_PROCESS_TASK);
1652 }
1653 }
1654 }
1655 } else {
1656 executeDelegatedTask(inUnwrap);
1657 return false;
1658 }
1659 }
1660
1661 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1662 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1663 }
1664
1665 private void executeDelegatedTask(boolean inUnwrap) {
1666 executeDelegatedTask(getTaskRunner(inUnwrap));
1667 }
1668
1669 private void executeDelegatedTask(SslTasksRunner task) {
1670 setState(STATE_PROCESS_TASK);
1671 try {
1672 delegatedTaskExecutor.execute(task);
1673 } catch (RejectedExecutionException e) {
1674 clearState(STATE_PROCESS_TASK);
1675 throw e;
1676 }
1677 }
1678
1679 private final class AsyncTaskCompletionHandler implements Runnable {
1680 private final boolean inUnwrap;
1681 boolean didRun;
1682 boolean resumeLater;
1683
1684 AsyncTaskCompletionHandler(boolean inUnwrap) {
1685 this.inUnwrap = inUnwrap;
1686 }
1687
1688 @Override
1689 public void run() {
1690 didRun = true;
1691 if (resumeLater) {
1692 getTaskRunner(inUnwrap).runComplete();
1693 }
1694 }
1695
1696 boolean resumeLater() {
1697 if (!didRun) {
1698 resumeLater = true;
1699 return true;
1700 }
1701 return false;
1702 }
1703 }
1704
1705
1706
1707
1708
1709 private final class SslTasksRunner implements Runnable {
1710 private final boolean inUnwrap;
1711 private final Runnable runCompleteTask = new Runnable() {
1712 @Override
1713 public void run() {
1714 runComplete();
1715 }
1716 };
1717
1718 SslTasksRunner(boolean inUnwrap) {
1719 this.inUnwrap = inUnwrap;
1720 }
1721
1722
1723 private void taskError(Throwable e) {
1724 if (inUnwrap) {
1725
1726
1727
1728
1729 try {
1730 handleUnwrapThrowable(ctx, e);
1731 } catch (Throwable cause) {
1732 safeExceptionCaught(cause);
1733 }
1734 } else {
1735 setHandshakeFailure(ctx, e);
1736 forceFlush(ctx);
1737 }
1738 }
1739
1740
1741 private void safeExceptionCaught(Throwable cause) {
1742 try {
1743 exceptionCaught(ctx, wrapIfNeeded(cause));
1744 } catch (Throwable error) {
1745 ctx.fireExceptionCaught(error);
1746 }
1747 }
1748
1749 private Throwable wrapIfNeeded(Throwable cause) {
1750 if (!inUnwrap) {
1751
1752 return cause;
1753 }
1754
1755
1756 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1757 }
1758
1759 private void tryDecodeAgain() {
1760 try {
1761 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1762 } catch (Throwable cause) {
1763 safeExceptionCaught(cause);
1764 } finally {
1765
1766
1767
1768 channelReadComplete0(ctx);
1769 }
1770 }
1771
1772
1773
1774
1775
1776 private void resumeOnEventExecutor() {
1777 assert ctx.executor().inEventLoop();
1778 clearState(STATE_PROCESS_TASK);
1779 try {
1780 HandshakeStatus status = engine.getHandshakeStatus();
1781 switch (status) {
1782
1783
1784 case NEED_TASK:
1785 executeDelegatedTask(this);
1786
1787 break;
1788
1789
1790 case FINISHED:
1791
1792 case NOT_HANDSHAKING:
1793 setHandshakeSuccess();
1794 try {
1795
1796
1797 wrap(ctx, inUnwrap);
1798 } catch (Throwable e) {
1799 taskError(e);
1800 return;
1801 }
1802 if (inUnwrap) {
1803
1804
1805 unwrapNonAppData(ctx);
1806 }
1807
1808
1809 forceFlush(ctx);
1810
1811 tryDecodeAgain();
1812 break;
1813
1814
1815
1816 case NEED_UNWRAP:
1817 try {
1818 unwrapNonAppData(ctx);
1819 } catch (SSLException e) {
1820 handleUnwrapThrowable(ctx, e);
1821 return;
1822 }
1823 tryDecodeAgain();
1824 break;
1825
1826
1827
1828 case NEED_WRAP:
1829 try {
1830 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1831
1832
1833
1834
1835 unwrapNonAppData(ctx);
1836 }
1837
1838
1839 forceFlush(ctx);
1840 } catch (Throwable e) {
1841 taskError(e);
1842 return;
1843 }
1844
1845
1846 tryDecodeAgain();
1847 break;
1848
1849 default:
1850
1851 throw new AssertionError();
1852 }
1853 } catch (Throwable cause) {
1854 safeExceptionCaught(cause);
1855 }
1856 }
1857
1858 void runComplete() {
1859 EventExecutor executor = ctx.executor();
1860
1861
1862
1863
1864
1865
1866
1867 executor.execute(new Runnable() {
1868 @Override
1869 public void run() {
1870 resumeOnEventExecutor();
1871 }
1872 });
1873 }
1874
1875 @Override
1876 public void run() {
1877 try {
1878 Runnable task = engine.getDelegatedTask();
1879 if (task == null) {
1880
1881 return;
1882 }
1883 if (task instanceof AsyncRunnable) {
1884 AsyncRunnable asyncTask = (AsyncRunnable) task;
1885 asyncTask.run(runCompleteTask);
1886 } else {
1887 task.run();
1888 runComplete();
1889 }
1890 } catch (final Throwable cause) {
1891 handleException(cause);
1892 }
1893 }
1894
1895 private void handleException(final Throwable cause) {
1896 EventExecutor executor = ctx.executor();
1897 if (executor.inEventLoop()) {
1898 clearState(STATE_PROCESS_TASK);
1899 safeExceptionCaught(cause);
1900 } else {
1901 try {
1902 executor.execute(new Runnable() {
1903 @Override
1904 public void run() {
1905 clearState(STATE_PROCESS_TASK);
1906 safeExceptionCaught(cause);
1907 }
1908 });
1909 } catch (RejectedExecutionException ignore) {
1910 clearState(STATE_PROCESS_TASK);
1911
1912
1913 ctx.fireExceptionCaught(cause);
1914 }
1915 }
1916 }
1917 }
1918
1919
1920
1921
1922
1923
1924 private boolean setHandshakeSuccess() {
1925
1926
1927
1928 final boolean notified;
1929 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1930 if (logger.isDebugEnabled()) {
1931 SSLSession session = engine.getSession();
1932 logger.debug(
1933 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1934 ctx.channel(),
1935 session.getProtocol(),
1936 session.getCipherSuite());
1937 }
1938 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1939 }
1940 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1941 clearState(STATE_READ_DURING_HANDSHAKE);
1942 if (!ctx.channel().config().isAutoRead()) {
1943 ctx.read();
1944 }
1945 }
1946 return notified;
1947 }
1948
1949
1950
1951
1952 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1953 setHandshakeFailure(ctx, cause, true, true, false);
1954 }
1955
1956
1957
1958
1959 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1960 boolean notify, boolean alwaysFlushAndClose) {
1961 try {
1962
1963 setState(STATE_OUTBOUND_CLOSED);
1964 engine.closeOutbound();
1965
1966 if (closeInbound) {
1967 try {
1968 engine.closeInbound();
1969 } catch (SSLException e) {
1970 if (logger.isDebugEnabled()) {
1971
1972
1973
1974
1975 String msg = e.getMessage();
1976 if (msg == null || !(msg.contains("possible truncation attack") ||
1977 msg.contains("closing inbound before receiving peer's close_notify"))) {
1978 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1979 }
1980 }
1981 }
1982 }
1983 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1984 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1985 }
1986 } finally {
1987
1988 releaseAndFailAll(ctx, cause);
1989 }
1990 }
1991
1992 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1993
1994
1995
1996 try {
1997 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
1998 releaseAndFailAll(ctx, transportFailure);
1999 if (handshakePromise.tryFailure(transportFailure)) {
2000 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
2001 }
2002 } finally {
2003 ctx.close();
2004 }
2005 }
2006
2007 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
2008 if (pendingUnencryptedWrites != null) {
2009 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
2010 }
2011 }
2012
2013 private void notifyClosePromise(Throwable cause) {
2014 if (cause == null) {
2015 if (sslClosePromise.trySuccess(ctx.channel())) {
2016 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
2017 }
2018 } else {
2019 if (sslClosePromise.tryFailure(cause)) {
2020 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
2021 }
2022 }
2023 }
2024
2025 private void closeOutboundAndChannel(
2026 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
2027 setState(STATE_OUTBOUND_CLOSED);
2028 engine.closeOutbound();
2029
2030 if (!ctx.channel().isActive()) {
2031 if (disconnect) {
2032 ctx.disconnect(promise);
2033 } else {
2034 ctx.close(promise);
2035 }
2036 return;
2037 }
2038
2039 ChannelPromise closeNotifyPromise = ctx.newPromise();
2040 try {
2041 flush(ctx, closeNotifyPromise);
2042 } finally {
2043 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
2044 setState(STATE_CLOSE_NOTIFY);
2045
2046
2047
2048
2049
2050
2051
2052
2053 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
2054 } else {
2055
2056 sslClosePromise.addListener(new FutureListener<Channel>() {
2057 @Override
2058 public void operationComplete(Future<Channel> future) {
2059 promise.setSuccess();
2060 }
2061 });
2062 }
2063 }
2064 }
2065
2066 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
2067 if (pendingUnencryptedWrites != null) {
2068 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
2069 } else {
2070 promise.setFailure(newPendingWritesNullException());
2071 }
2072 flush(ctx);
2073 }
2074
2075 @Override
2076 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
2077 this.ctx = ctx;
2078 Channel channel = ctx.channel();
2079 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16, engineType.wantsDirectBuffer) {
2080 @Override
2081 protected int wrapDataSize() {
2082 return SslHandler.this.wrapDataSize;
2083 }
2084 };
2085
2086 setOpensslEngineSocketFd(channel);
2087 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
2088 boolean active = channel.isActive();
2089 if (active || fastOpen) {
2090
2091
2092
2093 startHandshakeProcessing(active);
2094
2095
2096 final ChannelOutboundBuffer outboundBuffer;
2097 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
2098 outboundBuffer.totalPendingWriteBytes() > 0)) {
2099 setState(STATE_NEEDS_FLUSH);
2100 }
2101 }
2102 }
2103
2104 private void startHandshakeProcessing(boolean flushAtEnd) {
2105 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2106 setState(STATE_HANDSHAKE_STARTED);
2107 if (engine.getUseClientMode()) {
2108
2109
2110
2111 handshake(flushAtEnd);
2112 }
2113 applyHandshakeTimeout();
2114 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2115 forceFlush(ctx);
2116 }
2117 }
2118
2119
2120
2121
2122 public Future<Channel> renegotiate() {
2123 ChannelHandlerContext ctx = this.ctx;
2124 if (ctx == null) {
2125 throw new IllegalStateException();
2126 }
2127
2128 return renegotiate(ctx.executor().<Channel>newPromise());
2129 }
2130
2131
2132
2133
2134 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2135 ObjectUtil.checkNotNull(promise, "promise");
2136
2137 ChannelHandlerContext ctx = this.ctx;
2138 if (ctx == null) {
2139 throw new IllegalStateException();
2140 }
2141
2142 EventExecutor executor = ctx.executor();
2143 if (!executor.inEventLoop()) {
2144 executor.execute(new Runnable() {
2145 @Override
2146 public void run() {
2147 renegotiateOnEventLoop(promise);
2148 }
2149 });
2150 return promise;
2151 }
2152
2153 renegotiateOnEventLoop(promise);
2154 return promise;
2155 }
2156
2157 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2158 final Promise<Channel> oldHandshakePromise = handshakePromise;
2159 if (!oldHandshakePromise.isDone()) {
2160
2161
2162 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2163 } else {
2164 handshakePromise = newHandshakePromise;
2165 handshake(true);
2166 applyHandshakeTimeout();
2167 }
2168 }
2169
2170
2171
2172
2173
2174
2175
2176 private void handshake(boolean flushAtEnd) {
2177 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2178
2179
2180 return;
2181 }
2182 if (handshakePromise.isDone()) {
2183
2184
2185
2186
2187
2188 return;
2189 }
2190
2191
2192 final ChannelHandlerContext ctx = this.ctx;
2193 try {
2194 engine.beginHandshake();
2195 wrapNonAppData(ctx, false);
2196 } catch (Throwable e) {
2197 setHandshakeFailure(ctx, e);
2198 } finally {
2199 if (flushAtEnd) {
2200 forceFlush(ctx);
2201 }
2202 }
2203 }
2204
2205 private void applyHandshakeTimeout() {
2206 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2207
2208
2209 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2210 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2211 return;
2212 }
2213
2214 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2215 @Override
2216 public void run() {
2217 if (localHandshakePromise.isDone()) {
2218 return;
2219 }
2220 SSLException exception =
2221 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2222 try {
2223 if (localHandshakePromise.tryFailure(exception)) {
2224 SslUtils.handleHandshakeFailure(ctx, exception, true);
2225 }
2226 } finally {
2227 releaseAndFailAll(ctx, exception);
2228 }
2229 }
2230 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2231
2232
2233 localHandshakePromise.addListener(new FutureListener<Channel>() {
2234 @Override
2235 public void operationComplete(Future<Channel> f) throws Exception {
2236 timeoutFuture.cancel(false);
2237 }
2238 });
2239 }
2240
2241 private void forceFlush(ChannelHandlerContext ctx) {
2242 clearState(STATE_NEEDS_FLUSH);
2243 ctx.flush();
2244 }
2245
2246 private void setOpensslEngineSocketFd(Channel c) {
2247 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2248 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2249 }
2250 }
2251
2252
2253
2254
2255 @Override
2256 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2257 setOpensslEngineSocketFd(ctx.channel());
2258 if (!startTls) {
2259 startHandshakeProcessing(true);
2260 }
2261 ctx.fireChannelActive();
2262 }
2263
2264 private void safeClose(
2265 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2266 final ChannelPromise promise) {
2267 if (!ctx.channel().isActive()) {
2268 ctx.close(promise);
2269 return;
2270 }
2271
2272 final Future<?> timeoutFuture;
2273 if (!flushFuture.isDone()) {
2274 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2275 if (closeNotifyTimeout > 0) {
2276
2277 timeoutFuture = ctx.executor().schedule(new Runnable() {
2278 @Override
2279 public void run() {
2280
2281 if (!flushFuture.isDone()) {
2282 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2283 ctx.channel());
2284 addCloseListener(ctx.close(ctx.newPromise()), promise);
2285 }
2286 }
2287 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2288 } else {
2289 timeoutFuture = null;
2290 }
2291 } else {
2292 timeoutFuture = null;
2293 }
2294
2295
2296 flushFuture.addListener(new ChannelFutureListener() {
2297 @Override
2298 public void operationComplete(ChannelFuture f) {
2299 if (timeoutFuture != null) {
2300 timeoutFuture.cancel(false);
2301 }
2302 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2303 if (closeNotifyReadTimeout <= 0) {
2304
2305
2306 addCloseListener(ctx.close(ctx.newPromise()), promise);
2307 } else {
2308 final Future<?> closeNotifyReadTimeoutFuture;
2309
2310 if (!sslClosePromise.isDone()) {
2311 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2312 @Override
2313 public void run() {
2314 if (!sslClosePromise.isDone()) {
2315 logger.debug(
2316 "{} did not receive close_notify in {}ms; force-closing the connection.",
2317 ctx.channel(), closeNotifyReadTimeout);
2318
2319
2320 addCloseListener(ctx.close(ctx.newPromise()), promise);
2321 }
2322 }
2323 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2324 } else {
2325 closeNotifyReadTimeoutFuture = null;
2326 }
2327
2328
2329 sslClosePromise.addListener(new FutureListener<Channel>() {
2330 @Override
2331 public void operationComplete(Future<Channel> future) throws Exception {
2332 if (closeNotifyReadTimeoutFuture != null) {
2333 closeNotifyReadTimeoutFuture.cancel(false);
2334 }
2335 addCloseListener(ctx.close(ctx.newPromise()), promise);
2336 }
2337 });
2338 }
2339 }
2340 });
2341 }
2342
2343 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2344
2345
2346
2347
2348
2349
2350 PromiseNotifier.cascade(false, future, promise);
2351 }
2352
2353
2354
2355
2356
2357 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2358 ByteBufAllocator alloc = ctx.alloc();
2359 if (engineType.wantsDirectBuffer) {
2360 return alloc.directBuffer(capacity);
2361 } else {
2362 return alloc.buffer(capacity);
2363 }
2364 }
2365
2366
2367
2368
2369
2370 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2371 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2372 }
2373
2374 private boolean isStateSet(int bit) {
2375 return (state & bit) == bit;
2376 }
2377
2378 private void setState(int bit) {
2379 state |= bit;
2380 }
2381
2382 private void clearState(int bit) {
2383 state &= ~bit;
2384 }
2385
2386 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2387
2388 @Override
2389 protected EventExecutor executor() {
2390 if (ctx == null) {
2391 throw new IllegalStateException();
2392 }
2393 return ctx.executor();
2394 }
2395
2396 @Override
2397 protected void checkDeadLock() {
2398 if (ctx == null) {
2399
2400
2401
2402
2403
2404
2405 return;
2406 }
2407 super.checkDeadLock();
2408 }
2409 }
2410 }