1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.ssl;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.buffer.ByteBufUtil;
21 import io.netty.buffer.CompositeByteBuf;
22 import io.netty.buffer.Unpooled;
23 import io.netty.channel.AbstractCoalescingBufferQueue;
24 import io.netty.channel.Channel;
25 import io.netty.channel.ChannelConfig;
26 import io.netty.channel.ChannelException;
27 import io.netty.channel.ChannelFuture;
28 import io.netty.channel.ChannelFutureListener;
29 import io.netty.channel.ChannelHandlerContext;
30 import io.netty.channel.ChannelInboundHandler;
31 import io.netty.channel.ChannelOption;
32 import io.netty.channel.ChannelOutboundBuffer;
33 import io.netty.channel.ChannelOutboundHandler;
34 import io.netty.channel.ChannelPipeline;
35 import io.netty.channel.ChannelPromise;
36 import io.netty.channel.unix.UnixChannel;
37 import io.netty.handler.codec.ByteToMessageDecoder;
38 import io.netty.handler.codec.DecoderException;
39 import io.netty.handler.codec.UnsupportedMessageTypeException;
40 import io.netty.util.ReferenceCountUtil;
41 import io.netty.util.concurrent.DefaultPromise;
42 import io.netty.util.concurrent.EventExecutor;
43 import io.netty.util.concurrent.Future;
44 import io.netty.util.concurrent.FutureListener;
45 import io.netty.util.concurrent.ImmediateExecutor;
46 import io.netty.util.concurrent.Promise;
47 import io.netty.util.concurrent.PromiseNotifier;
48 import io.netty.util.internal.ObjectUtil;
49 import io.netty.util.internal.PlatformDependent;
50 import io.netty.util.internal.UnstableApi;
51 import io.netty.util.internal.logging.InternalLogger;
52 import io.netty.util.internal.logging.InternalLoggerFactory;
53
54 import java.io.IOException;
55 import java.net.SocketAddress;
56 import java.nio.ByteBuffer;
57 import java.nio.channels.ClosedChannelException;
58 import java.nio.channels.DatagramChannel;
59 import java.nio.channels.SocketChannel;
60 import java.util.List;
61 import java.util.concurrent.Executor;
62 import java.util.concurrent.RejectedExecutionException;
63 import java.util.concurrent.TimeUnit;
64 import java.util.regex.Pattern;
65
66 import javax.net.ssl.SSLEngine;
67 import javax.net.ssl.SSLEngineResult;
68 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
69 import javax.net.ssl.SSLEngineResult.Status;
70 import javax.net.ssl.SSLException;
71 import javax.net.ssl.SSLHandshakeException;
72 import javax.net.ssl.SSLSession;
73
74 import static io.netty.buffer.ByteBufUtil.ensureWritableSuccess;
75 import static io.netty.handler.ssl.SslUtils.getEncryptedPacketLength;
76 import static io.netty.util.internal.ObjectUtil.checkNotNull;
77 import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169 public class SslHandler extends ByteToMessageDecoder implements ChannelOutboundHandler {
170 private static final InternalLogger logger =
171 InternalLoggerFactory.getInstance(SslHandler.class);
172 private static final Pattern IGNORABLE_CLASS_IN_STACK = Pattern.compile(
173 "^.*(?:Socket|Datagram|Sctp|Udt)Channel.*$");
174 private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
175 "^.*(?:connection.*(?:reset|closed|abort|broken)|broken.*pipe).*$", Pattern.CASE_INSENSITIVE);
176 private static final int STATE_SENT_FIRST_MESSAGE = 1;
177 private static final int STATE_FLUSHED_BEFORE_HANDSHAKE = 1 << 1;
178 private static final int STATE_READ_DURING_HANDSHAKE = 1 << 2;
179 private static final int STATE_HANDSHAKE_STARTED = 1 << 3;
180
181
182
183
184 private static final int STATE_NEEDS_FLUSH = 1 << 4;
185 private static final int STATE_OUTBOUND_CLOSED = 1 << 5;
186 private static final int STATE_CLOSE_NOTIFY = 1 << 6;
187 private static final int STATE_PROCESS_TASK = 1 << 7;
188
189
190
191
192 private static final int STATE_FIRE_CHANNEL_READ = 1 << 8;
193 private static final int STATE_UNWRAP_REENTRY = 1 << 9;
194
195
196
197
198
199 private static final int MAX_PLAINTEXT_LENGTH = 16 * 1024;
200
201 private enum SslEngineType {
202 TCNATIVE(true, COMPOSITE_CUMULATOR) {
203 @Override
204 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
205 int nioBufferCount = in.nioBufferCount();
206 int writerIndex = out.writerIndex();
207 final SSLEngineResult result;
208 if (nioBufferCount > 1) {
209
210
211
212
213
214 ReferenceCountedOpenSslEngine opensslEngine = (ReferenceCountedOpenSslEngine) handler.engine;
215 try {
216 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
217 result = opensslEngine.unwrap(in.nioBuffers(in.readerIndex(), len), handler.singleBuffer);
218 } finally {
219 handler.singleBuffer[0] = null;
220 }
221 } else {
222 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
223 toByteBuffer(out, writerIndex, out.writableBytes()));
224 }
225 out.writerIndex(writerIndex + result.bytesProduced());
226 return result;
227 }
228
229 @Override
230 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
231 int pendingBytes, int numComponents) {
232 return allocator.directBuffer(((ReferenceCountedOpenSslEngine) handler.engine)
233 .calculateMaxLengthForWrap(pendingBytes, numComponents));
234 }
235
236 @Override
237 int calculatePendingData(SslHandler handler, int guess) {
238 int sslPending = ((ReferenceCountedOpenSslEngine) handler.engine).sslPending();
239 return sslPending > 0 ? sslPending : guess;
240 }
241
242 @Override
243 boolean jdkCompatibilityMode(SSLEngine engine) {
244 return ((ReferenceCountedOpenSslEngine) engine).jdkCompatibilityMode;
245 }
246 },
247 CONSCRYPT(true, COMPOSITE_CUMULATOR) {
248 @Override
249 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
250 int nioBufferCount = in.nioBufferCount();
251 int writerIndex = out.writerIndex();
252 final SSLEngineResult result;
253 if (nioBufferCount > 1) {
254
255
256
257 try {
258 handler.singleBuffer[0] = toByteBuffer(out, writerIndex, out.writableBytes());
259 result = ((ConscryptAlpnSslEngine) handler.engine).unwrap(
260 in.nioBuffers(in.readerIndex(), len),
261 handler.singleBuffer);
262 } finally {
263 handler.singleBuffer[0] = null;
264 }
265 } else {
266 result = handler.engine.unwrap(toByteBuffer(in, in.readerIndex(), len),
267 toByteBuffer(out, writerIndex, out.writableBytes()));
268 }
269 out.writerIndex(writerIndex + result.bytesProduced());
270 return result;
271 }
272
273 @Override
274 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
275 int pendingBytes, int numComponents) {
276 return allocator.directBuffer(
277 ((ConscryptAlpnSslEngine) handler.engine).calculateOutNetBufSize(pendingBytes, numComponents));
278 }
279
280 @Override
281 int calculatePendingData(SslHandler handler, int guess) {
282 return guess;
283 }
284
285 @Override
286 boolean jdkCompatibilityMode(SSLEngine engine) {
287 return true;
288 }
289 },
290 JDK(false, MERGE_CUMULATOR) {
291 @Override
292 SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException {
293 int writerIndex = out.writerIndex();
294 ByteBuffer inNioBuffer = toByteBuffer(in, in.readerIndex(), len);
295 int position = inNioBuffer.position();
296 final SSLEngineResult result = handler.engine.unwrap(inNioBuffer,
297 toByteBuffer(out, writerIndex, out.writableBytes()));
298 out.writerIndex(writerIndex + result.bytesProduced());
299
300
301
302
303
304
305
306 if (result.bytesConsumed() == 0) {
307 int consumed = inNioBuffer.position() - position;
308 if (consumed != result.bytesConsumed()) {
309
310 return new SSLEngineResult(
311 result.getStatus(), result.getHandshakeStatus(), consumed, result.bytesProduced());
312 }
313 }
314 return result;
315 }
316
317 @Override
318 ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
319 int pendingBytes, int numComponents) {
320
321
322
323
324
325
326 return allocator.heapBuffer(handler.engine.getSession().getPacketBufferSize());
327 }
328
329 @Override
330 int calculatePendingData(SslHandler handler, int guess) {
331 return guess;
332 }
333
334 @Override
335 boolean jdkCompatibilityMode(SSLEngine engine) {
336 return true;
337 }
338 };
339
340 static SslEngineType forEngine(SSLEngine engine) {
341 return engine instanceof ReferenceCountedOpenSslEngine ? TCNATIVE :
342 engine instanceof ConscryptAlpnSslEngine ? CONSCRYPT : JDK;
343 }
344
345 SslEngineType(boolean wantsDirectBuffer, Cumulator cumulator) {
346 this.wantsDirectBuffer = wantsDirectBuffer;
347 this.cumulator = cumulator;
348 }
349
350 abstract SSLEngineResult unwrap(SslHandler handler, ByteBuf in, int len, ByteBuf out) throws SSLException;
351
352 abstract int calculatePendingData(SslHandler handler, int guess);
353
354 abstract boolean jdkCompatibilityMode(SSLEngine engine);
355
356 abstract ByteBuf allocateWrapBuffer(SslHandler handler, ByteBufAllocator allocator,
357 int pendingBytes, int numComponents);
358
359
360
361
362
363
364
365 final boolean wantsDirectBuffer;
366
367
368
369
370
371
372
373
374
375
376
377 final Cumulator cumulator;
378 }
379
380 private volatile ChannelHandlerContext ctx;
381 private final SSLEngine engine;
382 private final SslEngineType engineType;
383 private final Executor delegatedTaskExecutor;
384 private final boolean jdkCompatibilityMode;
385
386
387
388
389
390
391 private final ByteBuffer[] singleBuffer = new ByteBuffer[1];
392
393 private final boolean startTls;
394
395 private final SslTasksRunner sslTaskRunnerForUnwrap = new SslTasksRunner(true);
396 private final SslTasksRunner sslTaskRunner = new SslTasksRunner(false);
397
398 private SslHandlerCoalescingBufferQueue pendingUnencryptedWrites;
399 private Promise<Channel> handshakePromise = new LazyChannelPromise();
400 private final LazyChannelPromise sslClosePromise = new LazyChannelPromise();
401
402 private int packetLength;
403 private short state;
404
405 private volatile long handshakeTimeoutMillis = 10000;
406 private volatile long closeNotifyFlushTimeoutMillis = 3000;
407 private volatile long closeNotifyReadTimeoutMillis;
408 volatile int wrapDataSize = MAX_PLAINTEXT_LENGTH;
409
410
411
412
413
414
415 public SslHandler(SSLEngine engine) {
416 this(engine, false);
417 }
418
419
420
421
422
423
424
425
426 public SslHandler(SSLEngine engine, boolean startTls) {
427 this(engine, startTls, ImmediateExecutor.INSTANCE);
428 }
429
430
431
432
433
434
435
436
437 public SslHandler(SSLEngine engine, Executor delegatedTaskExecutor) {
438 this(engine, false, delegatedTaskExecutor);
439 }
440
441
442
443
444
445
446
447
448
449
450 public SslHandler(SSLEngine engine, boolean startTls, Executor delegatedTaskExecutor) {
451 this.engine = ObjectUtil.checkNotNull(engine, "engine");
452 this.delegatedTaskExecutor = ObjectUtil.checkNotNull(delegatedTaskExecutor, "delegatedTaskExecutor");
453 engineType = SslEngineType.forEngine(engine);
454 this.startTls = startTls;
455 this.jdkCompatibilityMode = engineType.jdkCompatibilityMode(engine);
456 setCumulator(engineType.cumulator);
457 }
458
459 public long getHandshakeTimeoutMillis() {
460 return handshakeTimeoutMillis;
461 }
462
463 public void setHandshakeTimeout(long handshakeTimeout, TimeUnit unit) {
464 checkNotNull(unit, "unit");
465 setHandshakeTimeoutMillis(unit.toMillis(handshakeTimeout));
466 }
467
468 public void setHandshakeTimeoutMillis(long handshakeTimeoutMillis) {
469 this.handshakeTimeoutMillis = checkPositiveOrZero(handshakeTimeoutMillis, "handshakeTimeoutMillis");
470 }
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492 @UnstableApi
493 public final void setWrapDataSize(int wrapDataSize) {
494 this.wrapDataSize = wrapDataSize;
495 }
496
497
498
499
500 @Deprecated
501 public long getCloseNotifyTimeoutMillis() {
502 return getCloseNotifyFlushTimeoutMillis();
503 }
504
505
506
507
508 @Deprecated
509 public void setCloseNotifyTimeout(long closeNotifyTimeout, TimeUnit unit) {
510 setCloseNotifyFlushTimeout(closeNotifyTimeout, unit);
511 }
512
513
514
515
516 @Deprecated
517 public void setCloseNotifyTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
518 setCloseNotifyFlushTimeoutMillis(closeNotifyFlushTimeoutMillis);
519 }
520
521
522
523
524
525
526 public final long getCloseNotifyFlushTimeoutMillis() {
527 return closeNotifyFlushTimeoutMillis;
528 }
529
530
531
532
533
534
535 public final void setCloseNotifyFlushTimeout(long closeNotifyFlushTimeout, TimeUnit unit) {
536 setCloseNotifyFlushTimeoutMillis(unit.toMillis(closeNotifyFlushTimeout));
537 }
538
539
540
541
542 public final void setCloseNotifyFlushTimeoutMillis(long closeNotifyFlushTimeoutMillis) {
543 this.closeNotifyFlushTimeoutMillis = checkPositiveOrZero(closeNotifyFlushTimeoutMillis,
544 "closeNotifyFlushTimeoutMillis");
545 }
546
547
548
549
550
551
552 public final long getCloseNotifyReadTimeoutMillis() {
553 return closeNotifyReadTimeoutMillis;
554 }
555
556
557
558
559
560
561 public final void setCloseNotifyReadTimeout(long closeNotifyReadTimeout, TimeUnit unit) {
562 setCloseNotifyReadTimeoutMillis(unit.toMillis(closeNotifyReadTimeout));
563 }
564
565
566
567
568 public final void setCloseNotifyReadTimeoutMillis(long closeNotifyReadTimeoutMillis) {
569 this.closeNotifyReadTimeoutMillis = checkPositiveOrZero(closeNotifyReadTimeoutMillis,
570 "closeNotifyReadTimeoutMillis");
571 }
572
573
574
575
576 public SSLEngine engine() {
577 return engine;
578 }
579
580
581
582
583
584
585 public String applicationProtocol() {
586 SSLEngine engine = engine();
587 if (!(engine instanceof ApplicationProtocolAccessor)) {
588 return null;
589 }
590
591 return ((ApplicationProtocolAccessor) engine).getNegotiatedApplicationProtocol();
592 }
593
594
595
596
597
598
599
600 public Future<Channel> handshakeFuture() {
601 return handshakePromise;
602 }
603
604
605
606
607 @Deprecated
608 public ChannelFuture close() {
609 return closeOutbound();
610 }
611
612
613
614
615 @Deprecated
616 public ChannelFuture close(ChannelPromise promise) {
617 return closeOutbound(promise);
618 }
619
620
621
622
623
624
625
626 public ChannelFuture closeOutbound() {
627 return closeOutbound(ctx.newPromise());
628 }
629
630
631
632
633
634
635
636 public ChannelFuture closeOutbound(final ChannelPromise promise) {
637 final ChannelHandlerContext ctx = this.ctx;
638 if (ctx.executor().inEventLoop()) {
639 closeOutbound0(promise);
640 } else {
641 ctx.executor().execute(new Runnable() {
642 @Override
643 public void run() {
644 closeOutbound0(promise);
645 }
646 });
647 }
648 return promise;
649 }
650
651 private void closeOutbound0(ChannelPromise promise) {
652 setState(STATE_OUTBOUND_CLOSED);
653 engine.closeOutbound();
654 try {
655 flush(ctx, promise);
656 } catch (Exception e) {
657 if (!promise.tryFailure(e)) {
658 logger.warn("{} flush() raised a masked exception.", ctx.channel(), e);
659 }
660 }
661 }
662
663
664
665
666
667
668
669
670 public Future<Channel> sslCloseFuture() {
671 return sslClosePromise;
672 }
673
674 @Override
675 public void handlerRemoved0(ChannelHandlerContext ctx) throws Exception {
676 try {
677 if (pendingUnencryptedWrites != null && !pendingUnencryptedWrites.isEmpty()) {
678
679 pendingUnencryptedWrites.releaseAndFailAll(ctx,
680 new ChannelException("Pending write on removal of SslHandler"));
681 }
682 pendingUnencryptedWrites = null;
683
684 SSLException cause = null;
685
686
687
688
689 if (!handshakePromise.isDone()) {
690 cause = new SSLHandshakeException("SslHandler removed before handshake completed");
691 if (handshakePromise.tryFailure(cause)) {
692 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
693 }
694 }
695 if (!sslClosePromise.isDone()) {
696 if (cause == null) {
697 cause = new SSLException("SslHandler removed before SSLEngine was closed");
698 }
699 notifyClosePromise(cause);
700 }
701 } finally {
702 ReferenceCountUtil.release(engine);
703 }
704 }
705
706 @Override
707 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
708 ctx.bind(localAddress, promise);
709 }
710
711 @Override
712 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress,
713 ChannelPromise promise) throws Exception {
714 ctx.connect(remoteAddress, localAddress, promise);
715 }
716
717 @Override
718 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
719 ctx.deregister(promise);
720 }
721
722 @Override
723 public void disconnect(final ChannelHandlerContext ctx,
724 final ChannelPromise promise) throws Exception {
725 closeOutboundAndChannel(ctx, promise, true);
726 }
727
728 @Override
729 public void close(final ChannelHandlerContext ctx,
730 final ChannelPromise promise) throws Exception {
731 closeOutboundAndChannel(ctx, promise, false);
732 }
733
734 @Override
735 public void read(ChannelHandlerContext ctx) throws Exception {
736 if (!handshakePromise.isDone()) {
737 setState(STATE_READ_DURING_HANDSHAKE);
738 }
739
740 ctx.read();
741 }
742
743 private static IllegalStateException newPendingWritesNullException() {
744 return new IllegalStateException("pendingUnencryptedWrites is null, handlerRemoved0 called?");
745 }
746
747 @Override
748 public void write(final ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
749 if (!(msg instanceof ByteBuf)) {
750 UnsupportedMessageTypeException exception = new UnsupportedMessageTypeException(msg, ByteBuf.class);
751 ReferenceCountUtil.safeRelease(msg);
752 promise.setFailure(exception);
753 } else if (pendingUnencryptedWrites == null) {
754 ReferenceCountUtil.safeRelease(msg);
755 promise.setFailure(newPendingWritesNullException());
756 } else {
757 pendingUnencryptedWrites.add((ByteBuf) msg, promise);
758 }
759 }
760
761 @Override
762 public void flush(ChannelHandlerContext ctx) throws Exception {
763
764
765 if (startTls && !isStateSet(STATE_SENT_FIRST_MESSAGE)) {
766 setState(STATE_SENT_FIRST_MESSAGE);
767 pendingUnencryptedWrites.writeAndRemoveAll(ctx);
768 forceFlush(ctx);
769
770
771 startHandshakeProcessing(true);
772 return;
773 }
774
775 if (isStateSet(STATE_PROCESS_TASK)) {
776 return;
777 }
778
779 try {
780 wrapAndFlush(ctx);
781 } catch (Throwable cause) {
782 setHandshakeFailure(ctx, cause);
783 PlatformDependent.throwException(cause);
784 }
785 }
786
787 private void wrapAndFlush(ChannelHandlerContext ctx) throws SSLException {
788 if (pendingUnencryptedWrites.isEmpty()) {
789
790
791
792
793 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, ctx.newPromise());
794 }
795 if (!handshakePromise.isDone()) {
796 setState(STATE_FLUSHED_BEFORE_HANDSHAKE);
797 }
798 try {
799 wrap(ctx, false);
800 } finally {
801
802
803 forceFlush(ctx);
804 }
805 }
806
807
808 private void wrap(ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
809 ByteBuf out = null;
810 ByteBufAllocator alloc = ctx.alloc();
811 try {
812 final int wrapDataSize = this.wrapDataSize;
813
814
815 outer: while (!ctx.isRemoved()) {
816 ChannelPromise promise = ctx.newPromise();
817 ByteBuf buf = wrapDataSize > 0 ?
818 pendingUnencryptedWrites.remove(alloc, wrapDataSize, promise) :
819 pendingUnencryptedWrites.removeFirst(promise);
820 if (buf == null) {
821 break;
822 }
823
824 if (out == null) {
825 out = allocateOutNetBuf(ctx, buf.readableBytes(), buf.nioBufferCount());
826 }
827
828 SSLEngineResult result = wrap(alloc, engine, buf, out);
829 if (buf.isReadable()) {
830 pendingUnencryptedWrites.addFirst(buf, promise);
831
832
833 promise = null;
834 } else {
835 buf.release();
836 }
837
838
839
840 if (out.isReadable()) {
841 final ByteBuf b = out;
842 out = null;
843 if (promise != null) {
844 ctx.write(b, promise);
845 } else {
846 ctx.write(b);
847 }
848 } else if (promise != null) {
849 ctx.write(Unpooled.EMPTY_BUFFER, promise);
850 }
851
852
853 if (result.getStatus() == Status.CLOSED) {
854
855
856 Throwable exception = handshakePromise.cause();
857 if (exception == null) {
858 exception = sslClosePromise.cause();
859 if (exception == null) {
860 exception = new SslClosedEngineException("SSLEngine closed already");
861 }
862 }
863 pendingUnencryptedWrites.releaseAndFailAll(ctx, exception);
864 return;
865 } else {
866 switch (result.getHandshakeStatus()) {
867 case NEED_TASK:
868 if (!runDelegatedTasks(inUnwrap)) {
869
870
871 break outer;
872 }
873 break;
874 case FINISHED:
875 case NOT_HANDSHAKING:
876 setHandshakeSuccess();
877 break;
878 case NEED_WRAP:
879
880
881
882
883 if (result.bytesProduced() > 0 && pendingUnencryptedWrites.isEmpty()) {
884 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER);
885 }
886 break;
887 case NEED_UNWRAP:
888
889
890 readIfNeeded(ctx);
891 return;
892 default:
893 throw new IllegalStateException(
894 "Unknown handshake status: " + result.getHandshakeStatus());
895 }
896 }
897 }
898 } finally {
899 if (out != null) {
900 out.release();
901 }
902 if (inUnwrap) {
903 setState(STATE_NEEDS_FLUSH);
904 }
905 }
906 }
907
908
909
910
911
912
913
914 private boolean wrapNonAppData(final ChannelHandlerContext ctx, boolean inUnwrap) throws SSLException {
915 ByteBuf out = null;
916 ByteBufAllocator alloc = ctx.alloc();
917 try {
918
919
920 outer: while (!ctx.isRemoved()) {
921 if (out == null) {
922
923
924
925 out = allocateOutNetBuf(ctx, 2048, 1);
926 }
927 SSLEngineResult result = wrap(alloc, engine, Unpooled.EMPTY_BUFFER, out);
928 if (result.bytesProduced() > 0) {
929 ctx.write(out).addListener(new ChannelFutureListener() {
930 @Override
931 public void operationComplete(ChannelFuture future) {
932 Throwable cause = future.cause();
933 if (cause != null) {
934 setHandshakeFailureTransportFailure(ctx, cause);
935 }
936 }
937 });
938 if (inUnwrap) {
939 setState(STATE_NEEDS_FLUSH);
940 }
941 out = null;
942 }
943
944 HandshakeStatus status = result.getHandshakeStatus();
945 switch (status) {
946 case FINISHED:
947
948
949
950
951 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
952 wrap(ctx, true);
953 }
954 return false;
955 case NEED_TASK:
956 if (!runDelegatedTasks(inUnwrap)) {
957
958
959 break outer;
960 }
961 break;
962 case NEED_UNWRAP:
963 if (inUnwrap || unwrapNonAppData(ctx) <= 0) {
964
965
966
967 return false;
968 }
969 break;
970 case NEED_WRAP:
971 break;
972 case NOT_HANDSHAKING:
973 if (setHandshakeSuccess() && inUnwrap && !pendingUnencryptedWrites.isEmpty()) {
974 wrap(ctx, true);
975 }
976
977
978 if (!inUnwrap) {
979 unwrapNonAppData(ctx);
980 }
981 return true;
982 default:
983 throw new IllegalStateException("Unknown handshake status: " + result.getHandshakeStatus());
984 }
985
986
987
988 if (result.bytesProduced() == 0 && status != HandshakeStatus.NEED_TASK) {
989 break;
990 }
991
992
993
994 if (result.bytesConsumed() == 0 && result.getHandshakeStatus() == HandshakeStatus.NOT_HANDSHAKING) {
995 break;
996 }
997 }
998 } finally {
999 if (out != null) {
1000 out.release();
1001 }
1002 }
1003 return false;
1004 }
1005
1006 private SSLEngineResult wrap(ByteBufAllocator alloc, SSLEngine engine, ByteBuf in, ByteBuf out)
1007 throws SSLException {
1008 ByteBuf newDirectIn = null;
1009 try {
1010 int readerIndex = in.readerIndex();
1011 int readableBytes = in.readableBytes();
1012
1013
1014
1015 final ByteBuffer[] in0;
1016 if (in.isDirect() || !engineType.wantsDirectBuffer) {
1017
1018
1019
1020
1021 if (!(in instanceof CompositeByteBuf) && in.nioBufferCount() == 1) {
1022 in0 = singleBuffer;
1023
1024
1025 in0[0] = in.internalNioBuffer(readerIndex, readableBytes);
1026 } else {
1027 in0 = in.nioBuffers();
1028 }
1029 } else {
1030
1031
1032
1033 newDirectIn = alloc.directBuffer(readableBytes);
1034 newDirectIn.writeBytes(in, readerIndex, readableBytes);
1035 in0 = singleBuffer;
1036 in0[0] = newDirectIn.internalNioBuffer(newDirectIn.readerIndex(), readableBytes);
1037 }
1038
1039 for (;;) {
1040 ByteBuffer out0 = out.nioBuffer(out.writerIndex(), out.writableBytes());
1041 SSLEngineResult result = engine.wrap(in0, out0);
1042 in.skipBytes(result.bytesConsumed());
1043 out.writerIndex(out.writerIndex() + result.bytesProduced());
1044
1045 if (result.getStatus() == Status.BUFFER_OVERFLOW) {
1046 out.ensureWritable(engine.getSession().getPacketBufferSize());
1047 } else {
1048 return result;
1049 }
1050 }
1051 } finally {
1052
1053 singleBuffer[0] = null;
1054
1055 if (newDirectIn != null) {
1056 newDirectIn.release();
1057 }
1058 }
1059 }
1060
1061 @Override
1062 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
1063 boolean handshakeFailed = handshakePromise.cause() != null;
1064
1065 ClosedChannelException exception = new ClosedChannelException();
1066
1067
1068 setHandshakeFailure(ctx, exception, !isStateSet(STATE_OUTBOUND_CLOSED), isStateSet(STATE_HANDSHAKE_STARTED),
1069 false);
1070
1071
1072 notifyClosePromise(exception);
1073
1074 try {
1075 super.channelInactive(ctx);
1076 } catch (DecoderException e) {
1077 if (!handshakeFailed || !(e.getCause() instanceof SSLException)) {
1078
1079
1080
1081
1082
1083 throw e;
1084 }
1085 }
1086 }
1087
1088 @Override
1089 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
1090 if (ignoreException(cause)) {
1091
1092
1093 if (logger.isDebugEnabled()) {
1094 logger.debug(
1095 "{} Swallowing a harmless 'connection reset by peer / broken pipe' error that occurred " +
1096 "while writing close_notify in response to the peer's close_notify", ctx.channel(), cause);
1097 }
1098
1099
1100
1101 if (ctx.channel().isActive()) {
1102 ctx.close();
1103 }
1104 } else {
1105 ctx.fireExceptionCaught(cause);
1106 }
1107 }
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118 private boolean ignoreException(Throwable t) {
1119 if (!(t instanceof SSLException) && t instanceof IOException && sslClosePromise.isDone()) {
1120 String message = t.getMessage();
1121
1122
1123
1124 if (message != null && IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
1125 return true;
1126 }
1127
1128
1129 StackTraceElement[] elements = t.getStackTrace();
1130 for (StackTraceElement element: elements) {
1131 String classname = element.getClassName();
1132 String methodname = element.getMethodName();
1133
1134
1135 if (classname.startsWith("io.netty.")) {
1136 continue;
1137 }
1138
1139
1140 if (!"read".equals(methodname)) {
1141 continue;
1142 }
1143
1144
1145
1146 if (IGNORABLE_CLASS_IN_STACK.matcher(classname).matches()) {
1147 return true;
1148 }
1149
1150 try {
1151
1152
1153
1154 Class<?> clazz = PlatformDependent.getClassLoader(getClass()).loadClass(classname);
1155
1156 if (SocketChannel.class.isAssignableFrom(clazz)
1157 || DatagramChannel.class.isAssignableFrom(clazz)) {
1158 return true;
1159 }
1160
1161
1162 if (PlatformDependent.javaVersion() >= 7
1163 && "com.sun.nio.sctp.SctpChannel".equals(clazz.getSuperclass().getName())) {
1164 return true;
1165 }
1166 } catch (Throwable cause) {
1167 if (logger.isDebugEnabled()) {
1168 logger.debug("Unexpected exception while loading class {} classname {}",
1169 getClass(), classname, cause);
1170 }
1171 }
1172 }
1173 }
1174
1175 return false;
1176 }
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190 public static boolean isEncrypted(ByteBuf buffer) {
1191 if (buffer.readableBytes() < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1192 throw new IllegalArgumentException(
1193 "buffer must have at least " + SslUtils.SSL_RECORD_HEADER_LENGTH + " readable bytes");
1194 }
1195 return getEncryptedPacketLength(buffer, buffer.readerIndex()) != SslUtils.NOT_ENCRYPTED;
1196 }
1197
1198 private void decodeJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) throws NotSslRecordException {
1199 int packetLength = this.packetLength;
1200
1201 if (packetLength > 0) {
1202 if (in.readableBytes() < packetLength) {
1203 return;
1204 }
1205 } else {
1206
1207 final int readableBytes = in.readableBytes();
1208 if (readableBytes < SslUtils.SSL_RECORD_HEADER_LENGTH) {
1209 return;
1210 }
1211 packetLength = getEncryptedPacketLength(in, in.readerIndex());
1212 if (packetLength == SslUtils.NOT_ENCRYPTED) {
1213
1214 NotSslRecordException e = new NotSslRecordException(
1215 "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
1216 in.skipBytes(in.readableBytes());
1217
1218
1219
1220 setHandshakeFailure(ctx, e);
1221
1222 throw e;
1223 }
1224 assert packetLength > 0;
1225 if (packetLength > readableBytes) {
1226
1227 this.packetLength = packetLength;
1228 return;
1229 }
1230 }
1231
1232
1233
1234 this.packetLength = 0;
1235 try {
1236 final int bytesConsumed = unwrap(ctx, in, packetLength);
1237 assert bytesConsumed == packetLength || engine.isInboundDone() :
1238 "we feed the SSLEngine a packets worth of data: " + packetLength + " but it only consumed: " +
1239 bytesConsumed;
1240 } catch (Throwable cause) {
1241 handleUnwrapThrowable(ctx, cause);
1242 }
1243 }
1244
1245 private void decodeNonJdkCompatible(ChannelHandlerContext ctx, ByteBuf in) {
1246 try {
1247 unwrap(ctx, in, in.readableBytes());
1248 } catch (Throwable cause) {
1249 handleUnwrapThrowable(ctx, cause);
1250 }
1251 }
1252
1253 private void handleUnwrapThrowable(ChannelHandlerContext ctx, Throwable cause) {
1254 try {
1255
1256
1257
1258
1259 if (handshakePromise.tryFailure(cause)) {
1260 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(cause));
1261 }
1262
1263
1264 if (pendingUnencryptedWrites != null) {
1265
1266
1267 wrapAndFlush(ctx);
1268 }
1269 } catch (SSLException ex) {
1270 logger.debug("SSLException during trying to call SSLEngine.wrap(...)" +
1271 " because of an previous SSLException, ignoring...", ex);
1272 } finally {
1273
1274 setHandshakeFailure(ctx, cause, true, false, true);
1275 }
1276 PlatformDependent.throwException(cause);
1277 }
1278
1279 @Override
1280 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
1281 if (isStateSet(STATE_PROCESS_TASK)) {
1282 return;
1283 }
1284 if (jdkCompatibilityMode) {
1285 decodeJdkCompatible(ctx, in);
1286 } else {
1287 decodeNonJdkCompatible(ctx, in);
1288 }
1289 }
1290
1291 @Override
1292 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
1293 channelReadComplete0(ctx);
1294 }
1295
1296 private void channelReadComplete0(ChannelHandlerContext ctx) {
1297
1298 discardSomeReadBytes();
1299
1300 flushIfNeeded(ctx);
1301 readIfNeeded(ctx);
1302
1303 clearState(STATE_FIRE_CHANNEL_READ);
1304 ctx.fireChannelReadComplete();
1305 }
1306
1307 private void readIfNeeded(ChannelHandlerContext ctx) {
1308
1309 if (!ctx.channel().config().isAutoRead() &&
1310 (!isStateSet(STATE_FIRE_CHANNEL_READ) || !handshakePromise.isDone())) {
1311
1312
1313 ctx.read();
1314 }
1315 }
1316
1317 private void flushIfNeeded(ChannelHandlerContext ctx) {
1318 if (isStateSet(STATE_NEEDS_FLUSH)) {
1319 forceFlush(ctx);
1320 }
1321 }
1322
1323
1324
1325
1326 private int unwrapNonAppData(ChannelHandlerContext ctx) throws SSLException {
1327 return unwrap(ctx, Unpooled.EMPTY_BUFFER, 0);
1328 }
1329
1330
1331
1332
1333 private int unwrap(ChannelHandlerContext ctx, ByteBuf packet, int length) throws SSLException {
1334 final int originalLength = length;
1335 boolean wrapLater = false;
1336 boolean notifyClosure = false;
1337 boolean executedRead = false;
1338 ByteBuf decodeOut = allocate(ctx, length);
1339 try {
1340
1341
1342 do {
1343 final SSLEngineResult result = engineType.unwrap(this, packet, length, decodeOut);
1344 final Status status = result.getStatus();
1345 final HandshakeStatus handshakeStatus = result.getHandshakeStatus();
1346 final int produced = result.bytesProduced();
1347 final int consumed = result.bytesConsumed();
1348
1349
1350
1351
1352 packet.skipBytes(consumed);
1353 length -= consumed;
1354
1355
1356
1357
1358 if (handshakeStatus == HandshakeStatus.FINISHED || handshakeStatus == HandshakeStatus.NOT_HANDSHAKING) {
1359 wrapLater |= (decodeOut.isReadable() ?
1360 setHandshakeSuccessUnwrapMarkReentry() : setHandshakeSuccess()) ||
1361 handshakeStatus == HandshakeStatus.FINISHED;
1362 }
1363
1364
1365
1366
1367 if (decodeOut.isReadable()) {
1368 setState(STATE_FIRE_CHANNEL_READ);
1369 if (isStateSet(STATE_UNWRAP_REENTRY)) {
1370 executedRead = true;
1371 executeChannelRead(ctx, decodeOut);
1372 } else {
1373 ctx.fireChannelRead(decodeOut);
1374 }
1375 decodeOut = null;
1376 }
1377
1378 if (status == Status.CLOSED) {
1379 notifyClosure = true;
1380 } else if (status == Status.BUFFER_OVERFLOW) {
1381 if (decodeOut != null) {
1382 decodeOut.release();
1383 }
1384 final int applicationBufferSize = engine.getSession().getApplicationBufferSize();
1385
1386
1387
1388
1389 decodeOut = allocate(ctx, engineType.calculatePendingData(this, applicationBufferSize < produced ?
1390 applicationBufferSize : applicationBufferSize - produced));
1391 continue;
1392 }
1393
1394 if (handshakeStatus == HandshakeStatus.NEED_TASK) {
1395 boolean pending = runDelegatedTasks(true);
1396 if (!pending) {
1397
1398
1399
1400
1401
1402 wrapLater = false;
1403 break;
1404 }
1405 } else if (handshakeStatus == HandshakeStatus.NEED_WRAP) {
1406
1407
1408
1409 if (wrapNonAppData(ctx, true) && length == 0) {
1410 break;
1411 }
1412 }
1413
1414 if (status == Status.BUFFER_UNDERFLOW ||
1415
1416 handshakeStatus != HandshakeStatus.NEED_TASK && (consumed == 0 && produced == 0 ||
1417 (length == 0 && handshakeStatus == HandshakeStatus.NOT_HANDSHAKING))) {
1418 if (handshakeStatus == HandshakeStatus.NEED_UNWRAP) {
1419
1420
1421 readIfNeeded(ctx);
1422 }
1423
1424 break;
1425 } else if (decodeOut == null) {
1426 decodeOut = allocate(ctx, length);
1427 }
1428 } while (!ctx.isRemoved());
1429
1430 if (isStateSet(STATE_FLUSHED_BEFORE_HANDSHAKE) && handshakePromise.isDone()) {
1431
1432
1433
1434
1435 clearState(STATE_FLUSHED_BEFORE_HANDSHAKE);
1436 wrapLater = true;
1437 }
1438
1439 if (wrapLater) {
1440 wrap(ctx, true);
1441 }
1442 } finally {
1443 if (decodeOut != null) {
1444 decodeOut.release();
1445 }
1446
1447 if (notifyClosure) {
1448 if (executedRead) {
1449 executeNotifyClosePromise(ctx);
1450 } else {
1451 notifyClosePromise(null);
1452 }
1453 }
1454 }
1455 return originalLength - length;
1456 }
1457
1458 private boolean setHandshakeSuccessUnwrapMarkReentry() {
1459
1460
1461 final boolean setReentryState = !isStateSet(STATE_UNWRAP_REENTRY);
1462 if (setReentryState) {
1463 setState(STATE_UNWRAP_REENTRY);
1464 }
1465 try {
1466 return setHandshakeSuccess();
1467 } finally {
1468
1469
1470 if (setReentryState) {
1471 clearState(STATE_UNWRAP_REENTRY);
1472 }
1473 }
1474 }
1475
1476 private void executeNotifyClosePromise(final ChannelHandlerContext ctx) {
1477 try {
1478 ctx.executor().execute(new Runnable() {
1479 @Override
1480 public void run() {
1481 notifyClosePromise(null);
1482 }
1483 });
1484 } catch (RejectedExecutionException e) {
1485 notifyClosePromise(e);
1486 }
1487 }
1488
1489 private void executeChannelRead(final ChannelHandlerContext ctx, final ByteBuf decodedOut) {
1490 try {
1491 ctx.executor().execute(new Runnable() {
1492 @Override
1493 public void run() {
1494 ctx.fireChannelRead(decodedOut);
1495 }
1496 });
1497 } catch (RejectedExecutionException e) {
1498 decodedOut.release();
1499 throw e;
1500 }
1501 }
1502
1503 private static ByteBuffer toByteBuffer(ByteBuf out, int index, int len) {
1504 return out.nioBufferCount() == 1 ? out.internalNioBuffer(index, len) :
1505 out.nioBuffer(index, len);
1506 }
1507
1508 private static boolean inEventLoop(Executor executor) {
1509 return executor instanceof EventExecutor && ((EventExecutor) executor).inEventLoop();
1510 }
1511
1512
1513
1514
1515
1516
1517
1518
1519 private boolean runDelegatedTasks(boolean inUnwrap) {
1520 if (delegatedTaskExecutor == ImmediateExecutor.INSTANCE || inEventLoop(delegatedTaskExecutor)) {
1521
1522
1523 for (;;) {
1524 Runnable task = engine.getDelegatedTask();
1525 if (task == null) {
1526 return true;
1527 }
1528 setState(STATE_PROCESS_TASK);
1529 if (task instanceof AsyncRunnable) {
1530
1531 boolean pending = false;
1532 try {
1533 AsyncRunnable asyncTask = (AsyncRunnable) task;
1534 AsyncTaskCompletionHandler completionHandler = new AsyncTaskCompletionHandler(inUnwrap);
1535 asyncTask.run(completionHandler);
1536 pending = completionHandler.resumeLater();
1537 if (pending) {
1538 return false;
1539 }
1540 } finally {
1541 if (!pending) {
1542
1543
1544 clearState(STATE_PROCESS_TASK);
1545 }
1546 }
1547 } else {
1548 try {
1549 task.run();
1550 } finally {
1551 clearState(STATE_PROCESS_TASK);
1552 }
1553 }
1554 }
1555 } else {
1556 executeDelegatedTask(inUnwrap);
1557 return false;
1558 }
1559 }
1560
1561 private SslTasksRunner getTaskRunner(boolean inUnwrap) {
1562 return inUnwrap ? sslTaskRunnerForUnwrap : sslTaskRunner;
1563 }
1564
1565 private void executeDelegatedTask(boolean inUnwrap) {
1566 executeDelegatedTask(getTaskRunner(inUnwrap));
1567 }
1568
1569 private void executeDelegatedTask(SslTasksRunner task) {
1570 setState(STATE_PROCESS_TASK);
1571 try {
1572 delegatedTaskExecutor.execute(task);
1573 } catch (RejectedExecutionException e) {
1574 clearState(STATE_PROCESS_TASK);
1575 throw e;
1576 }
1577 }
1578
1579 private final class AsyncTaskCompletionHandler implements Runnable {
1580 private final boolean inUnwrap;
1581 boolean didRun;
1582 boolean resumeLater;
1583
1584 AsyncTaskCompletionHandler(boolean inUnwrap) {
1585 this.inUnwrap = inUnwrap;
1586 }
1587
1588 @Override
1589 public void run() {
1590 didRun = true;
1591 if (resumeLater) {
1592 getTaskRunner(inUnwrap).runComplete();
1593 }
1594 }
1595
1596 boolean resumeLater() {
1597 if (!didRun) {
1598 resumeLater = true;
1599 return true;
1600 }
1601 return false;
1602 }
1603 }
1604
1605
1606
1607
1608
1609 private final class SslTasksRunner implements Runnable {
1610 private final boolean inUnwrap;
1611 private final Runnable runCompleteTask = new Runnable() {
1612 @Override
1613 public void run() {
1614 runComplete();
1615 }
1616 };
1617
1618 SslTasksRunner(boolean inUnwrap) {
1619 this.inUnwrap = inUnwrap;
1620 }
1621
1622
1623 private void taskError(Throwable e) {
1624 if (inUnwrap) {
1625
1626
1627
1628
1629 try {
1630 handleUnwrapThrowable(ctx, e);
1631 } catch (Throwable cause) {
1632 safeExceptionCaught(cause);
1633 }
1634 } else {
1635 setHandshakeFailure(ctx, e);
1636 forceFlush(ctx);
1637 }
1638 }
1639
1640
1641 private void safeExceptionCaught(Throwable cause) {
1642 try {
1643 exceptionCaught(ctx, wrapIfNeeded(cause));
1644 } catch (Throwable error) {
1645 ctx.fireExceptionCaught(error);
1646 }
1647 }
1648
1649 private Throwable wrapIfNeeded(Throwable cause) {
1650 if (!inUnwrap) {
1651
1652 return cause;
1653 }
1654
1655
1656 return cause instanceof DecoderException ? cause : new DecoderException(cause);
1657 }
1658
1659 private void tryDecodeAgain() {
1660 try {
1661 channelRead(ctx, Unpooled.EMPTY_BUFFER);
1662 } catch (Throwable cause) {
1663 safeExceptionCaught(cause);
1664 } finally {
1665
1666
1667
1668 channelReadComplete0(ctx);
1669 }
1670 }
1671
1672
1673
1674
1675
1676 private void resumeOnEventExecutor() {
1677 assert ctx.executor().inEventLoop();
1678 clearState(STATE_PROCESS_TASK);
1679 try {
1680 HandshakeStatus status = engine.getHandshakeStatus();
1681 switch (status) {
1682
1683
1684 case NEED_TASK:
1685 executeDelegatedTask(this);
1686
1687 break;
1688
1689
1690 case FINISHED:
1691
1692 case NOT_HANDSHAKING:
1693 setHandshakeSuccess();
1694 try {
1695
1696
1697 wrap(ctx, inUnwrap);
1698 } catch (Throwable e) {
1699 taskError(e);
1700 return;
1701 }
1702 if (inUnwrap) {
1703
1704
1705 unwrapNonAppData(ctx);
1706 }
1707
1708
1709 forceFlush(ctx);
1710
1711 tryDecodeAgain();
1712 break;
1713
1714
1715
1716 case NEED_UNWRAP:
1717 try {
1718 unwrapNonAppData(ctx);
1719 } catch (SSLException e) {
1720 handleUnwrapThrowable(ctx, e);
1721 return;
1722 }
1723 tryDecodeAgain();
1724 break;
1725
1726
1727
1728 case NEED_WRAP:
1729 try {
1730 if (!wrapNonAppData(ctx, false) && inUnwrap) {
1731
1732
1733
1734
1735 unwrapNonAppData(ctx);
1736 }
1737
1738
1739 forceFlush(ctx);
1740 } catch (Throwable e) {
1741 taskError(e);
1742 return;
1743 }
1744
1745
1746 tryDecodeAgain();
1747 break;
1748
1749 default:
1750
1751 throw new AssertionError();
1752 }
1753 } catch (Throwable cause) {
1754 safeExceptionCaught(cause);
1755 }
1756 }
1757
1758 void runComplete() {
1759 EventExecutor executor = ctx.executor();
1760
1761
1762
1763
1764
1765
1766
1767 executor.execute(new Runnable() {
1768 @Override
1769 public void run() {
1770 resumeOnEventExecutor();
1771 }
1772 });
1773 }
1774
1775 @Override
1776 public void run() {
1777 try {
1778 Runnable task = engine.getDelegatedTask();
1779 if (task == null) {
1780
1781 return;
1782 }
1783 if (task instanceof AsyncRunnable) {
1784 AsyncRunnable asyncTask = (AsyncRunnable) task;
1785 asyncTask.run(runCompleteTask);
1786 } else {
1787 task.run();
1788 runComplete();
1789 }
1790 } catch (final Throwable cause) {
1791 handleException(cause);
1792 }
1793 }
1794
1795 private void handleException(final Throwable cause) {
1796 EventExecutor executor = ctx.executor();
1797 if (executor.inEventLoop()) {
1798 clearState(STATE_PROCESS_TASK);
1799 safeExceptionCaught(cause);
1800 } else {
1801 try {
1802 executor.execute(new Runnable() {
1803 @Override
1804 public void run() {
1805 clearState(STATE_PROCESS_TASK);
1806 safeExceptionCaught(cause);
1807 }
1808 });
1809 } catch (RejectedExecutionException ignore) {
1810 clearState(STATE_PROCESS_TASK);
1811
1812
1813 ctx.fireExceptionCaught(cause);
1814 }
1815 }
1816 }
1817 }
1818
1819
1820
1821
1822
1823
1824 private boolean setHandshakeSuccess() {
1825
1826
1827
1828 final boolean notified;
1829 if (notified = !handshakePromise.isDone() && handshakePromise.trySuccess(ctx.channel())) {
1830 if (logger.isDebugEnabled()) {
1831 SSLSession session = engine.getSession();
1832 logger.debug(
1833 "{} HANDSHAKEN: protocol:{} cipher suite:{}",
1834 ctx.channel(),
1835 session.getProtocol(),
1836 session.getCipherSuite());
1837 }
1838 ctx.fireUserEventTriggered(SslHandshakeCompletionEvent.SUCCESS);
1839 }
1840 if (isStateSet(STATE_READ_DURING_HANDSHAKE)) {
1841 clearState(STATE_READ_DURING_HANDSHAKE);
1842 if (!ctx.channel().config().isAutoRead()) {
1843 ctx.read();
1844 }
1845 }
1846 return notified;
1847 }
1848
1849
1850
1851
1852 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause) {
1853 setHandshakeFailure(ctx, cause, true, true, false);
1854 }
1855
1856
1857
1858
1859 private void setHandshakeFailure(ChannelHandlerContext ctx, Throwable cause, boolean closeInbound,
1860 boolean notify, boolean alwaysFlushAndClose) {
1861 try {
1862
1863 setState(STATE_OUTBOUND_CLOSED);
1864 engine.closeOutbound();
1865
1866 if (closeInbound) {
1867 try {
1868 engine.closeInbound();
1869 } catch (SSLException e) {
1870 if (logger.isDebugEnabled()) {
1871
1872
1873
1874
1875 String msg = e.getMessage();
1876 if (msg == null || !(msg.contains("possible truncation attack") ||
1877 msg.contains("closing inbound before receiving peer's close_notify"))) {
1878 logger.debug("{} SSLEngine.closeInbound() raised an exception.", ctx.channel(), e);
1879 }
1880 }
1881 }
1882 }
1883 if (handshakePromise.tryFailure(cause) || alwaysFlushAndClose) {
1884 SslUtils.handleHandshakeFailure(ctx, cause, notify);
1885 }
1886 } finally {
1887
1888 releaseAndFailAll(ctx, cause);
1889 }
1890 }
1891
1892 private void setHandshakeFailureTransportFailure(ChannelHandlerContext ctx, Throwable cause) {
1893
1894
1895
1896 try {
1897 SSLException transportFailure = new SSLException("failure when writing TLS control frames", cause);
1898 releaseAndFailAll(ctx, transportFailure);
1899 if (handshakePromise.tryFailure(transportFailure)) {
1900 ctx.fireUserEventTriggered(new SslHandshakeCompletionEvent(transportFailure));
1901 }
1902 } finally {
1903 ctx.close();
1904 }
1905 }
1906
1907 private void releaseAndFailAll(ChannelHandlerContext ctx, Throwable cause) {
1908 if (pendingUnencryptedWrites != null) {
1909 pendingUnencryptedWrites.releaseAndFailAll(ctx, cause);
1910 }
1911 }
1912
1913 private void notifyClosePromise(Throwable cause) {
1914 if (cause == null) {
1915 if (sslClosePromise.trySuccess(ctx.channel())) {
1916 ctx.fireUserEventTriggered(SslCloseCompletionEvent.SUCCESS);
1917 }
1918 } else {
1919 if (sslClosePromise.tryFailure(cause)) {
1920 ctx.fireUserEventTriggered(new SslCloseCompletionEvent(cause));
1921 }
1922 }
1923 }
1924
1925 private void closeOutboundAndChannel(
1926 final ChannelHandlerContext ctx, final ChannelPromise promise, boolean disconnect) throws Exception {
1927 setState(STATE_OUTBOUND_CLOSED);
1928 engine.closeOutbound();
1929
1930 if (!ctx.channel().isActive()) {
1931 if (disconnect) {
1932 ctx.disconnect(promise);
1933 } else {
1934 ctx.close(promise);
1935 }
1936 return;
1937 }
1938
1939 ChannelPromise closeNotifyPromise = ctx.newPromise();
1940 try {
1941 flush(ctx, closeNotifyPromise);
1942 } finally {
1943 if (!isStateSet(STATE_CLOSE_NOTIFY)) {
1944 setState(STATE_CLOSE_NOTIFY);
1945
1946
1947
1948
1949
1950
1951
1952
1953 safeClose(ctx, closeNotifyPromise, PromiseNotifier.cascade(false, ctx.newPromise(), promise));
1954 } else {
1955
1956 sslClosePromise.addListener(new FutureListener<Channel>() {
1957 @Override
1958 public void operationComplete(Future<Channel> future) {
1959 promise.setSuccess();
1960 }
1961 });
1962 }
1963 }
1964 }
1965
1966 private void flush(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
1967 if (pendingUnencryptedWrites != null) {
1968 pendingUnencryptedWrites.add(Unpooled.EMPTY_BUFFER, promise);
1969 } else {
1970 promise.setFailure(newPendingWritesNullException());
1971 }
1972 flush(ctx);
1973 }
1974
1975 @Override
1976 public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
1977 this.ctx = ctx;
1978 Channel channel = ctx.channel();
1979 pendingUnencryptedWrites = new SslHandlerCoalescingBufferQueue(channel, 16);
1980
1981 setOpensslEngineSocketFd(channel);
1982 boolean fastOpen = Boolean.TRUE.equals(channel.config().getOption(ChannelOption.TCP_FASTOPEN_CONNECT));
1983 boolean active = channel.isActive();
1984 if (active || fastOpen) {
1985
1986
1987
1988 startHandshakeProcessing(active);
1989
1990
1991 final ChannelOutboundBuffer outboundBuffer;
1992 if (fastOpen && ((outboundBuffer = channel.unsafe().outboundBuffer()) == null ||
1993 outboundBuffer.totalPendingWriteBytes() > 0)) {
1994 setState(STATE_NEEDS_FLUSH);
1995 }
1996 }
1997 }
1998
1999 private void startHandshakeProcessing(boolean flushAtEnd) {
2000 if (!isStateSet(STATE_HANDSHAKE_STARTED)) {
2001 setState(STATE_HANDSHAKE_STARTED);
2002 if (engine.getUseClientMode()) {
2003
2004
2005
2006 handshake(flushAtEnd);
2007 }
2008 applyHandshakeTimeout();
2009 } else if (isStateSet(STATE_NEEDS_FLUSH)) {
2010 forceFlush(ctx);
2011 }
2012 }
2013
2014
2015
2016
2017 public Future<Channel> renegotiate() {
2018 ChannelHandlerContext ctx = this.ctx;
2019 if (ctx == null) {
2020 throw new IllegalStateException();
2021 }
2022
2023 return renegotiate(ctx.executor().<Channel>newPromise());
2024 }
2025
2026
2027
2028
2029 public Future<Channel> renegotiate(final Promise<Channel> promise) {
2030 ObjectUtil.checkNotNull(promise, "promise");
2031
2032 ChannelHandlerContext ctx = this.ctx;
2033 if (ctx == null) {
2034 throw new IllegalStateException();
2035 }
2036
2037 EventExecutor executor = ctx.executor();
2038 if (!executor.inEventLoop()) {
2039 executor.execute(new Runnable() {
2040 @Override
2041 public void run() {
2042 renegotiateOnEventLoop(promise);
2043 }
2044 });
2045 return promise;
2046 }
2047
2048 renegotiateOnEventLoop(promise);
2049 return promise;
2050 }
2051
2052 private void renegotiateOnEventLoop(final Promise<Channel> newHandshakePromise) {
2053 final Promise<Channel> oldHandshakePromise = handshakePromise;
2054 if (!oldHandshakePromise.isDone()) {
2055
2056
2057 PromiseNotifier.cascade(oldHandshakePromise, newHandshakePromise);
2058 } else {
2059 handshakePromise = newHandshakePromise;
2060 handshake(true);
2061 applyHandshakeTimeout();
2062 }
2063 }
2064
2065
2066
2067
2068
2069
2070
2071 private void handshake(boolean flushAtEnd) {
2072 if (engine.getHandshakeStatus() != HandshakeStatus.NOT_HANDSHAKING) {
2073
2074
2075 return;
2076 }
2077 if (handshakePromise.isDone()) {
2078
2079
2080
2081
2082
2083 return;
2084 }
2085
2086
2087 final ChannelHandlerContext ctx = this.ctx;
2088 try {
2089 engine.beginHandshake();
2090 wrapNonAppData(ctx, false);
2091 } catch (Throwable e) {
2092 setHandshakeFailure(ctx, e);
2093 } finally {
2094 if (flushAtEnd) {
2095 forceFlush(ctx);
2096 }
2097 }
2098 }
2099
2100 private void applyHandshakeTimeout() {
2101 final Promise<Channel> localHandshakePromise = this.handshakePromise;
2102
2103
2104 final long handshakeTimeoutMillis = this.handshakeTimeoutMillis;
2105 if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
2106 return;
2107 }
2108
2109 final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
2110 @Override
2111 public void run() {
2112 if (localHandshakePromise.isDone()) {
2113 return;
2114 }
2115 SSLException exception =
2116 new SslHandshakeTimeoutException("handshake timed out after " + handshakeTimeoutMillis + "ms");
2117 try {
2118 if (localHandshakePromise.tryFailure(exception)) {
2119 SslUtils.handleHandshakeFailure(ctx, exception, true);
2120 }
2121 } finally {
2122 releaseAndFailAll(ctx, exception);
2123 }
2124 }
2125 }, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
2126
2127
2128 localHandshakePromise.addListener(new FutureListener<Channel>() {
2129 @Override
2130 public void operationComplete(Future<Channel> f) throws Exception {
2131 timeoutFuture.cancel(false);
2132 }
2133 });
2134 }
2135
2136 private void forceFlush(ChannelHandlerContext ctx) {
2137 clearState(STATE_NEEDS_FLUSH);
2138 ctx.flush();
2139 }
2140
2141 private void setOpensslEngineSocketFd(Channel c) {
2142 if (c instanceof UnixChannel && engine instanceof ReferenceCountedOpenSslEngine) {
2143 ((ReferenceCountedOpenSslEngine) engine).bioSetFd(((UnixChannel) c).fd().intValue());
2144 }
2145 }
2146
2147
2148
2149
2150 @Override
2151 public void channelActive(final ChannelHandlerContext ctx) throws Exception {
2152 setOpensslEngineSocketFd(ctx.channel());
2153 if (!startTls) {
2154 startHandshakeProcessing(true);
2155 }
2156 ctx.fireChannelActive();
2157 }
2158
2159 private void safeClose(
2160 final ChannelHandlerContext ctx, final ChannelFuture flushFuture,
2161 final ChannelPromise promise) {
2162 if (!ctx.channel().isActive()) {
2163 ctx.close(promise);
2164 return;
2165 }
2166
2167 final Future<?> timeoutFuture;
2168 if (!flushFuture.isDone()) {
2169 long closeNotifyTimeout = closeNotifyFlushTimeoutMillis;
2170 if (closeNotifyTimeout > 0) {
2171
2172 timeoutFuture = ctx.executor().schedule(new Runnable() {
2173 @Override
2174 public void run() {
2175
2176 if (!flushFuture.isDone()) {
2177 logger.warn("{} Last write attempt timed out; force-closing the connection.",
2178 ctx.channel());
2179 addCloseListener(ctx.close(ctx.newPromise()), promise);
2180 }
2181 }
2182 }, closeNotifyTimeout, TimeUnit.MILLISECONDS);
2183 } else {
2184 timeoutFuture = null;
2185 }
2186 } else {
2187 timeoutFuture = null;
2188 }
2189
2190
2191 flushFuture.addListener(new ChannelFutureListener() {
2192 @Override
2193 public void operationComplete(ChannelFuture f) {
2194 if (timeoutFuture != null) {
2195 timeoutFuture.cancel(false);
2196 }
2197 final long closeNotifyReadTimeout = closeNotifyReadTimeoutMillis;
2198 if (closeNotifyReadTimeout <= 0) {
2199
2200
2201 addCloseListener(ctx.close(ctx.newPromise()), promise);
2202 } else {
2203 final Future<?> closeNotifyReadTimeoutFuture;
2204
2205 if (!sslClosePromise.isDone()) {
2206 closeNotifyReadTimeoutFuture = ctx.executor().schedule(new Runnable() {
2207 @Override
2208 public void run() {
2209 if (!sslClosePromise.isDone()) {
2210 logger.debug(
2211 "{} did not receive close_notify in {}ms; force-closing the connection.",
2212 ctx.channel(), closeNotifyReadTimeout);
2213
2214
2215 addCloseListener(ctx.close(ctx.newPromise()), promise);
2216 }
2217 }
2218 }, closeNotifyReadTimeout, TimeUnit.MILLISECONDS);
2219 } else {
2220 closeNotifyReadTimeoutFuture = null;
2221 }
2222
2223
2224 sslClosePromise.addListener(new FutureListener<Channel>() {
2225 @Override
2226 public void operationComplete(Future<Channel> future) throws Exception {
2227 if (closeNotifyReadTimeoutFuture != null) {
2228 closeNotifyReadTimeoutFuture.cancel(false);
2229 }
2230 addCloseListener(ctx.close(ctx.newPromise()), promise);
2231 }
2232 });
2233 }
2234 }
2235 });
2236 }
2237
2238 private static void addCloseListener(ChannelFuture future, ChannelPromise promise) {
2239
2240
2241
2242
2243
2244
2245 PromiseNotifier.cascade(false, future, promise);
2246 }
2247
2248
2249
2250
2251
2252 private ByteBuf allocate(ChannelHandlerContext ctx, int capacity) {
2253 ByteBufAllocator alloc = ctx.alloc();
2254 if (engineType.wantsDirectBuffer) {
2255 return alloc.directBuffer(capacity);
2256 } else {
2257 return alloc.buffer(capacity);
2258 }
2259 }
2260
2261
2262
2263
2264
2265 private ByteBuf allocateOutNetBuf(ChannelHandlerContext ctx, int pendingBytes, int numComponents) {
2266 return engineType.allocateWrapBuffer(this, ctx.alloc(), pendingBytes, numComponents);
2267 }
2268
2269 private boolean isStateSet(int bit) {
2270 return (state & bit) == bit;
2271 }
2272
2273 private void setState(int bit) {
2274 state |= bit;
2275 }
2276
2277 private void clearState(int bit) {
2278 state &= ~bit;
2279 }
2280
2281
2282
2283
2284
2285
2286 private final class SslHandlerCoalescingBufferQueue extends AbstractCoalescingBufferQueue {
2287
2288 SslHandlerCoalescingBufferQueue(Channel channel, int initSize) {
2289 super(channel, initSize);
2290 }
2291
2292 @Override
2293 protected ByteBuf compose(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf next) {
2294 final int wrapDataSize = SslHandler.this.wrapDataSize;
2295 if (cumulation instanceof CompositeByteBuf) {
2296 CompositeByteBuf composite = (CompositeByteBuf) cumulation;
2297 int numComponents = composite.numComponents();
2298 if (numComponents == 0 ||
2299 !attemptCopyToCumulation(composite.internalComponent(numComponents - 1), next, wrapDataSize)) {
2300 composite.addComponent(true, next);
2301 }
2302 return composite;
2303 }
2304 return attemptCopyToCumulation(cumulation, next, wrapDataSize) ? cumulation :
2305 copyAndCompose(alloc, cumulation, next);
2306 }
2307
2308 @Override
2309 protected ByteBuf composeFirst(ByteBufAllocator allocator, ByteBuf first) {
2310 if (first instanceof CompositeByteBuf) {
2311 CompositeByteBuf composite = (CompositeByteBuf) first;
2312 if (engineType.wantsDirectBuffer) {
2313 first = allocator.directBuffer(composite.readableBytes());
2314 } else {
2315 first = allocator.heapBuffer(composite.readableBytes());
2316 }
2317 try {
2318 first.writeBytes(composite);
2319 } catch (Throwable cause) {
2320 first.release();
2321 PlatformDependent.throwException(cause);
2322 }
2323 composite.release();
2324 }
2325 return first;
2326 }
2327
2328 @Override
2329 protected ByteBuf removeEmptyValue() {
2330 return null;
2331 }
2332 }
2333
2334 private static boolean attemptCopyToCumulation(ByteBuf cumulation, ByteBuf next, int wrapDataSize) {
2335 final int inReadableBytes = next.readableBytes();
2336 final int cumulationCapacity = cumulation.capacity();
2337 if (wrapDataSize - cumulation.readableBytes() >= inReadableBytes &&
2338
2339
2340
2341 (cumulation.isWritable(inReadableBytes) && cumulationCapacity >= wrapDataSize ||
2342 cumulationCapacity < wrapDataSize &&
2343 ensureWritableSuccess(cumulation.ensureWritable(inReadableBytes, false)))) {
2344 cumulation.writeBytes(next);
2345 next.release();
2346 return true;
2347 }
2348 return false;
2349 }
2350
2351 private final class LazyChannelPromise extends DefaultPromise<Channel> {
2352
2353 @Override
2354 protected EventExecutor executor() {
2355 if (ctx == null) {
2356 throw new IllegalStateException();
2357 }
2358 return ctx.executor();
2359 }
2360
2361 @Override
2362 protected void checkDeadLock() {
2363 if (ctx == null) {
2364
2365
2366
2367
2368
2369
2370 return;
2371 }
2372 super.checkDeadLock();
2373 }
2374 }
2375 }