1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.pcap;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufAllocator;
20 import io.netty.channel.Channel;
21 import io.netty.channel.ChannelDuplexHandler;
22 import io.netty.channel.ChannelHandlerContext;
23 import io.netty.channel.ChannelInboundHandlerAdapter;
24 import io.netty.channel.ChannelPromise;
25 import io.netty.channel.ServerChannel;
26 import io.netty.channel.socket.DatagramChannel;
27 import io.netty.channel.socket.DatagramPacket;
28 import io.netty.channel.socket.ServerSocketChannel;
29 import io.netty.channel.socket.SocketChannel;
30 import io.netty.util.NetUtil;
31 import io.netty.util.ReferenceCountUtil;
32 import io.netty.util.internal.logging.InternalLogger;
33 import io.netty.util.internal.logging.InternalLoggerFactory;
34
35 import java.io.Closeable;
36 import java.io.IOException;
37 import java.io.OutputStream;
38 import java.net.Inet4Address;
39 import java.net.Inet6Address;
40 import java.net.InetAddress;
41 import java.net.InetSocketAddress;
42 import java.net.UnknownHostException;
43 import java.util.concurrent.atomic.AtomicReference;
44
45 import static io.netty.util.internal.ObjectUtil.checkNotNull;
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public final class PcapWriteHandler extends ChannelDuplexHandler implements Closeable {
72
73
74
75
76 private final InternalLogger logger = InternalLoggerFactory.getInstance(PcapWriteHandler.class);
77
78
79
80
81 private PcapWriter pCapWriter;
82
83
84
85
86 private final OutputStream outputStream;
87
88
89
90
91 private final boolean captureZeroByte;
92
93
94
95
96
97 private final boolean writePcapGlobalHeader;
98
99
100
101
102
103 private final boolean sharedOutputStream;
104
105
106
107
108
109 private long sendSegmentNumber = 1;
110
111
112
113
114
115 private long receiveSegmentNumber = 1;
116
117
118
119
120 private ChannelType channelType;
121
122
123
124
125 private InetSocketAddress initiatorAddr;
126
127
128
129
130 private InetSocketAddress handlerAddr;
131
132
133
134
135 private boolean isServerPipeline;
136
137
138
139
140 private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
141
142
143
144
145
146
147
148
149
150
151
152
153 @Deprecated
154 public PcapWriteHandler(OutputStream outputStream) {
155 this(outputStream, false, true);
156 }
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173 @Deprecated
174 public PcapWriteHandler(OutputStream outputStream, boolean captureZeroByte, boolean writePcapGlobalHeader) {
175 this.outputStream = checkNotNull(outputStream, "OutputStream");
176 this.captureZeroByte = captureZeroByte;
177 this.writePcapGlobalHeader = writePcapGlobalHeader;
178 sharedOutputStream = false;
179 }
180
181 private PcapWriteHandler(Builder builder, OutputStream outputStream) {
182 this.outputStream = outputStream;
183 captureZeroByte = builder.captureZeroByte;
184 sharedOutputStream = builder.sharedOutputStream;
185 writePcapGlobalHeader = builder.writePcapGlobalHeader;
186 channelType = builder.channelType;
187 handlerAddr = builder.handlerAddr;
188 initiatorAddr = builder.initiatorAddr;
189 isServerPipeline = builder.isServerPipeline;
190 }
191
192
193
194
195
196
197
198 public static void writeGlobalHeader(OutputStream outputStream) throws IOException {
199 PcapHeaders.writeGlobalHeader(outputStream);
200 }
201
202 private void initializeIfNecessary(ChannelHandlerContext ctx) throws Exception {
203
204 if (state.get() != State.INIT) {
205 return;
206 }
207
208 pCapWriter = new PcapWriter(this);
209
210 if (channelType == null) {
211
212 if (ctx.channel() instanceof SocketChannel) {
213 channelType = ChannelType.TCP;
214
215
216
217 if (ctx.channel().parent() instanceof ServerSocketChannel) {
218 isServerPipeline = true;
219 initiatorAddr = (InetSocketAddress) ctx.channel().remoteAddress();
220 handlerAddr = getLocalAddress(ctx.channel(), initiatorAddr);
221 } else {
222 isServerPipeline = false;
223 handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
224 initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
225 }
226 } else if (ctx.channel() instanceof DatagramChannel) {
227 channelType = ChannelType.UDP;
228
229 DatagramChannel datagramChannel = (DatagramChannel) ctx.channel();
230
231
232
233 if (datagramChannel.isConnected()) {
234 handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
235 initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
236 }
237 }
238 }
239
240 if (channelType == ChannelType.TCP) {
241 logger.debug("Initiating Fake TCP 3-Way Handshake");
242
243 ByteBuf tcpBuf = ctx.alloc().buffer();
244
245 try {
246
247 TCPPacket.writePacket(tcpBuf, null, 0, 0,
248 initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.SYN);
249 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
250
251
252 TCPPacket.writePacket(tcpBuf, null, 0, 1,
253 handlerAddr.getPort(), initiatorAddr.getPort(), TCPPacket.TCPFlag.SYN,
254 TCPPacket.TCPFlag.ACK);
255 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, ctx.alloc(), ctx);
256
257
258 TCPPacket.writePacket(tcpBuf, null, 1, 1, initiatorAddr.getPort(),
259 handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
260 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
261 } finally {
262 tcpBuf.release();
263 }
264
265 logger.debug("Finished Fake TCP 3-Way Handshake");
266 }
267
268 state.set(State.WRITING);
269 }
270
271 @Override
272 public void channelActive(ChannelHandlerContext ctx) throws Exception {
273 initializeIfNecessary(ctx);
274 super.channelActive(ctx);
275 }
276
277 @Override
278 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
279
280 if (state.get() == State.INIT) {
281 try {
282 initializeIfNecessary(ctx);
283 } catch (Exception ex) {
284 ReferenceCountUtil.release(msg);
285 throw ex;
286 }
287 }
288
289
290 if (state.get() == State.WRITING) {
291 if (channelType == ChannelType.TCP) {
292 handleTCP(ctx, msg, false);
293 } else if (channelType == ChannelType.UDP) {
294 handleUDP(ctx, msg, false);
295 } else {
296 logDiscard();
297 }
298 }
299 super.channelRead(ctx, msg);
300 }
301
302 @Override
303 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
304
305 if (state.get() == State.INIT) {
306 try {
307 initializeIfNecessary(ctx);
308 } catch (Exception ex) {
309 ReferenceCountUtil.release(msg);
310 promise.setFailure(ex);
311 return;
312 }
313 }
314
315
316 if (state.get() == State.WRITING) {
317 if (channelType == ChannelType.TCP) {
318 handleTCP(ctx, msg, true);
319 } else if (channelType == ChannelType.UDP) {
320 handleUDP(ctx, msg, true);
321 } else {
322 logDiscard();
323 }
324 }
325 super.write(ctx, msg, promise);
326 }
327
328
329
330
331
332
333
334
335
336
337 private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
338 if (msg instanceof ByteBuf) {
339
340
341 int totalBytes = ((ByteBuf) msg).readableBytes();
342 if (totalBytes == 0 && !captureZeroByte) {
343 logger.debug("Discarding Zero Byte TCP Packet. isWriteOperation {}", isWriteOperation);
344 return;
345 }
346
347 ByteBufAllocator byteBufAllocator = ctx.alloc();
348 if (totalBytes == 0) {
349 handleTcpPacket(ctx, (ByteBuf) msg, isWriteOperation, byteBufAllocator);
350 return;
351 }
352
353
354 int maxTcpPayload = 65495;
355
356 for (int i = 0; i < totalBytes; i += maxTcpPayload) {
357 ByteBuf packet = ((ByteBuf) msg).slice(i, Math.min(maxTcpPayload, totalBytes - i));
358 handleTcpPacket(ctx, packet, isWriteOperation, byteBufAllocator);
359 }
360 } else {
361 logger.debug("Discarding Pcap Write for TCP Object: {}", msg);
362 }
363 }
364
365 private void handleTcpPacket(ChannelHandlerContext ctx, ByteBuf packet, boolean isWriteOperation,
366 ByteBufAllocator byteBufAllocator) {
367 ByteBuf tcpBuf = byteBufAllocator.buffer();
368 int bytes = packet.readableBytes();
369
370 try {
371 if (isWriteOperation) {
372 final InetSocketAddress srcAddr;
373 final InetSocketAddress dstAddr;
374 if (isServerPipeline) {
375 srcAddr = handlerAddr;
376 dstAddr = initiatorAddr;
377 } else {
378 srcAddr = initiatorAddr;
379 dstAddr = handlerAddr;
380 }
381
382 TCPPacket.writePacket(tcpBuf, packet, sendSegmentNumber, receiveSegmentNumber,
383 srcAddr.getPort(),
384 dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
385 completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
386 logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, srcAddr, dstAddr, false);
387
388 sendSegmentNumber = incrementUintSegmentNumber(sendSegmentNumber, bytes);
389
390 TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
391 srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
392 completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
393 logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
394 } else {
395 final InetSocketAddress srcAddr;
396 final InetSocketAddress dstAddr;
397 if (isServerPipeline) {
398 srcAddr = initiatorAddr;
399 dstAddr = handlerAddr;
400 } else {
401 srcAddr = handlerAddr;
402 dstAddr = initiatorAddr;
403 }
404
405 TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber,
406 srcAddr.getPort(),
407 dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
408 completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
409 logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, srcAddr, dstAddr, false);
410
411 receiveSegmentNumber = incrementUintSegmentNumber(receiveSegmentNumber, bytes);
412
413 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, dstAddr.getPort(),
414 srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
415 completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
416 logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
417 }
418 } finally {
419 tcpBuf.release();
420 }
421 }
422
423
424
425
426
427
428
429
430
431
432 private void completeTCPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf tcpBuf,
433 ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
434
435 ByteBuf ipBuf = byteBufAllocator.buffer();
436 ByteBuf ethernetBuf = byteBufAllocator.buffer();
437 ByteBuf pcap = byteBufAllocator.buffer();
438
439 try {
440 if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
441 IPPacket.writeTCPv4(ipBuf, tcpBuf,
442 NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
443 NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
444
445 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
446 } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
447 IPPacket.writeTCPv6(ipBuf, tcpBuf,
448 srcAddr.getAddress().getAddress(),
449 dstAddr.getAddress().getAddress());
450
451 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
452 } else {
453 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
454 "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
455 return;
456 }
457
458
459 pCapWriter.writePacket(pcap, ethernetBuf);
460 } catch (IOException ex) {
461 logger.error("Caught Exception While Writing Packet into Pcap", ex);
462 ctx.fireExceptionCaught(ex);
463 } finally {
464 ipBuf.release();
465 ethernetBuf.release();
466 pcap.release();
467 }
468 }
469
470 private static long incrementUintSegmentNumber(long sequenceNumber, int value) {
471
472 return (sequenceNumber + value) % (0xFFFFFFFFL + 1);
473 }
474
475
476
477
478
479
480
481
482 private void handleUDP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
483 ByteBuf udpBuf = ctx.alloc().buffer();
484
485 try {
486 if (msg instanceof DatagramPacket) {
487
488
489 if (((DatagramPacket) msg).content().readableBytes() == 0 && !captureZeroByte) {
490 logger.debug("Discarding Zero Byte UDP Packet");
491 return;
492 }
493
494 if (((DatagramPacket) msg).content().readableBytes() > 65507) {
495 logger.warn("Unable to write UDP packet to PCAP. Payload of size {} exceeds max size of 65507");
496 return;
497 }
498
499 DatagramPacket datagramPacket = ((DatagramPacket) msg).duplicate();
500 InetSocketAddress srcAddr = datagramPacket.sender();
501 InetSocketAddress dstAddr = datagramPacket.recipient();
502
503
504
505 if (srcAddr == null) {
506 srcAddr = getLocalAddress(ctx.channel(), dstAddr);
507 }
508
509 logger.debug("Writing UDP Data of {} Bytes, isWriteOperation {}, Src Addr {}, Dst Addr {}",
510 datagramPacket.content().readableBytes(), isWriteOperation, srcAddr, dstAddr);
511
512 UDPPacket.writePacket(udpBuf, datagramPacket.content(), srcAddr.getPort(), dstAddr.getPort());
513 completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
514 } else if (msg instanceof ByteBuf &&
515 (!(ctx.channel() instanceof DatagramChannel) ||
516 ((DatagramChannel) ctx.channel()).isConnected())) {
517
518
519 if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
520 logger.debug("Discarding Zero Byte UDP Packet");
521 return;
522 }
523
524 if (((ByteBuf) msg).readableBytes() > 65507) {
525 logger.warn("Unable to write UDP packet to PCAP. Payload of size {} exceeds max size of 65507");
526 return;
527 }
528
529 ByteBuf byteBuf = ((ByteBuf) msg).duplicate();
530
531 InetSocketAddress sourceAddr = isWriteOperation? initiatorAddr : handlerAddr;
532 InetSocketAddress destinationAddr = isWriteOperation? handlerAddr : initiatorAddr;
533
534 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
535 byteBuf.readableBytes(), sourceAddr, destinationAddr);
536
537 UDPPacket.writePacket(udpBuf, byteBuf, sourceAddr.getPort(), destinationAddr.getPort());
538 completeUDPWrite(sourceAddr, destinationAddr, udpBuf, ctx.alloc(), ctx);
539 } else {
540 logger.debug("Discarding Pcap Write for UDP Object: {}", msg);
541 }
542 } finally {
543 udpBuf.release();
544 }
545 }
546
547
548
549
550
551
552
553
554
555
556 private void completeUDPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf udpBuf,
557 ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
558
559 ByteBuf ipBuf = byteBufAllocator.buffer();
560 ByteBuf ethernetBuf = byteBufAllocator.buffer();
561 ByteBuf pcap = byteBufAllocator.buffer();
562
563 try {
564 if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
565 IPPacket.writeUDPv4(ipBuf, udpBuf,
566 NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
567 NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
568
569 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
570 } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
571 IPPacket.writeUDPv6(ipBuf, udpBuf,
572 srcAddr.getAddress().getAddress(),
573 dstAddr.getAddress().getAddress());
574
575 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
576 } else {
577 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
578 "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
579 return;
580 }
581
582
583 pCapWriter.writePacket(pcap, ethernetBuf);
584 } catch (IOException ex) {
585 logger.error("Caught Exception While Writing Packet into Pcap", ex);
586 ctx.fireExceptionCaught(ex);
587 } finally {
588 ipBuf.release();
589 ethernetBuf.release();
590 pcap.release();
591 }
592 }
593
594
595
596
597
598
599
600
601
602
603 private static InetSocketAddress getLocalAddress(Channel ch, InetSocketAddress remote) {
604 InetSocketAddress local = (InetSocketAddress) ch.localAddress();
605 if (remote != null && local.getAddress().isAnyLocalAddress()) {
606 if (local.getAddress() instanceof Inet4Address && remote.getAddress() instanceof Inet6Address) {
607 return new InetSocketAddress(WildcardAddressHolder.wildcard6, local.getPort());
608 }
609 if (local.getAddress() instanceof Inet6Address && remote.getAddress() instanceof Inet4Address) {
610 return new InetSocketAddress(WildcardAddressHolder.wildcard4, local.getPort());
611 }
612 }
613 return local;
614 }
615
616 @Override
617 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
618
619
620 if (channelType == ChannelType.TCP && state.get() == State.WRITING) {
621 logger.debug("Starting Fake TCP FIN+ACK Flow to close connection");
622
623 ByteBufAllocator byteBufAllocator = ctx.alloc();
624 ByteBuf tcpBuf = byteBufAllocator.buffer();
625
626 try {
627 long initiatorSegmentNumber = isServerPipeline? receiveSegmentNumber : sendSegmentNumber;
628 long initiatorAckNumber = isServerPipeline? sendSegmentNumber : receiveSegmentNumber;
629
630 TCPPacket.writePacket(tcpBuf, null, initiatorSegmentNumber, initiatorAckNumber, initiatorAddr.getPort(),
631 handlerAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
632 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
633
634
635 TCPPacket.writePacket(tcpBuf, null, initiatorAckNumber, initiatorSegmentNumber, handlerAddr.getPort(),
636 initiatorAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
637 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, byteBufAllocator, ctx);
638
639
640 sendSegmentNumber = incrementUintSegmentNumber(sendSegmentNumber, 1);
641 receiveSegmentNumber = incrementUintSegmentNumber(receiveSegmentNumber, 1);
642 initiatorSegmentNumber = isServerPipeline? receiveSegmentNumber : sendSegmentNumber;
643 initiatorAckNumber = isServerPipeline? sendSegmentNumber : receiveSegmentNumber;
644
645
646 TCPPacket.writePacket(tcpBuf, null, initiatorSegmentNumber, initiatorAckNumber,
647 initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
648 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
649 } finally {
650 tcpBuf.release();
651 }
652
653 logger.debug("Finished Fake TCP FIN+ACK Flow to close connection");
654 }
655
656 close();
657 super.handlerRemoved(ctx);
658 }
659
660 @Override
661 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
662
663 if (channelType == ChannelType.TCP && state.get() == State.WRITING) {
664 ByteBuf tcpBuf = ctx.alloc().buffer();
665
666 try {
667
668 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
669 handlerAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
670 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
671 } finally {
672 tcpBuf.release();
673 }
674
675 logger.debug("Sent Fake TCP RST to close connection");
676 }
677
678 close();
679 ctx.fireExceptionCaught(cause);
680 }
681
682
683
684
685 private void logTCP(boolean isWriteOperation, int bytes, long sendSegmentNumber, long receiveSegmentNumber,
686 InetSocketAddress srcAddr, InetSocketAddress dstAddr, boolean ackOnly) {
687
688
689 if (logger.isDebugEnabled()) {
690 if (ackOnly) {
691 logger.debug("Writing TCP ACK, isWriteOperation {}, Segment Number {}, Ack Number {}, Src Addr {}, "
692 + "Dst Addr {}", isWriteOperation, sendSegmentNumber, receiveSegmentNumber, dstAddr,
693 srcAddr);
694 } else {
695 logger.debug("Writing TCP Data of {} Bytes, isWriteOperation {}, Segment Number {}, Ack Number {}, " +
696 "Src Addr {}, Dst Addr {}", bytes, isWriteOperation, sendSegmentNumber,
697 receiveSegmentNumber, srcAddr, dstAddr);
698 }
699 }
700 }
701
702 OutputStream outputStream() {
703 return outputStream;
704 }
705
706 boolean sharedOutputStream() {
707 return sharedOutputStream;
708 }
709
710 boolean writePcapGlobalHeader() {
711 return writePcapGlobalHeader;
712 }
713
714
715
716
717
718 public boolean isWriting() {
719 return state.get() == State.WRITING;
720 }
721
722 State state() {
723 return state.get();
724 }
725
726
727
728
729 public void pause() {
730 if (!state.compareAndSet(State.WRITING, State.PAUSED)) {
731 throw new IllegalStateException("State must be 'STARTED' to pause but current state is: " + state);
732 }
733 }
734
735
736
737
738 public void resume() {
739 if (!state.compareAndSet(State.PAUSED, State.WRITING)) {
740 throw new IllegalStateException("State must be 'PAUSED' to resume but current state is: " + state);
741 }
742 }
743
744 void markClosed() {
745 if (state.get() != State.CLOSED) {
746 state.set(State.CLOSED);
747 }
748 }
749
750
751 PcapWriter pCapWriter() {
752 return pCapWriter;
753 }
754
755 private void logDiscard() {
756 logger.warn("Discarding pcap write because channel type is unknown. The channel this handler is registered " +
757 "on is not a SocketChannel or DatagramChannel, so the inference does not work. Please call " +
758 "forceTcpChannel or forceUdpChannel before registering the handler.");
759 }
760
761 @Override
762 public String toString() {
763 return "PcapWriteHandler{" +
764 "captureZeroByte=" + captureZeroByte +
765 ", writePcapGlobalHeader=" + writePcapGlobalHeader +
766 ", sharedOutputStream=" + sharedOutputStream +
767 ", sendSegmentNumber=" + sendSegmentNumber +
768 ", receiveSegmentNumber=" + receiveSegmentNumber +
769 ", channelType=" + channelType +
770 ", initiatorAddr=" + initiatorAddr +
771 ", handlerAddr=" + handlerAddr +
772 ", isServerPipeline=" + isServerPipeline +
773 ", state=" + state +
774 '}';
775 }
776
777
778
779
780
781
782
783
784 @Override
785 public void close() throws IOException {
786 if (state.get() == State.CLOSED) {
787 logger.debug("PcapWriterHandler is already closed");
788 } else {
789
790 if (pCapWriter == null) {
791 pCapWriter = new PcapWriter(this);
792 }
793 pCapWriter.close();
794 markClosed();
795 logger.debug("PcapWriterHandler is now closed");
796 }
797 }
798
799 private enum ChannelType {
800 TCP, UDP
801 }
802
803 public static Builder builder() {
804 return new Builder();
805 }
806
807
808
809
810 public static final class Builder {
811 private boolean captureZeroByte;
812 private boolean sharedOutputStream;
813 private boolean writePcapGlobalHeader = true;
814
815 private ChannelType channelType;
816 private InetSocketAddress initiatorAddr;
817 private InetSocketAddress handlerAddr;
818 private boolean isServerPipeline;
819
820 private Builder() {
821 }
822
823
824
825
826
827
828
829
830 public Builder captureZeroByte(boolean captureZeroByte) {
831 this.captureZeroByte = captureZeroByte;
832 return this;
833 }
834
835
836
837
838
839
840
841
842
843
844
845
846 public Builder sharedOutputStream(boolean sharedOutputStream) {
847 this.sharedOutputStream = sharedOutputStream;
848 return this;
849 }
850
851
852
853
854
855
856
857
858
859 public Builder writePcapGlobalHeader(boolean writePcapGlobalHeader) {
860 this.writePcapGlobalHeader = writePcapGlobalHeader;
861 return this;
862 }
863
864
865
866
867
868
869
870
871
872
873 public Builder forceTcpChannel(InetSocketAddress serverAddress, InetSocketAddress clientAddress,
874 boolean isServerPipeline) {
875 channelType = ChannelType.TCP;
876 handlerAddr = checkNotNull(serverAddress, "serverAddress");
877 initiatorAddr = checkNotNull(clientAddress, "clientAddress");
878 this.isServerPipeline = isServerPipeline;
879 return this;
880 }
881
882
883
884
885
886
887
888
889
890
891
892
893 public Builder forceUdpChannel(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
894 channelType = ChannelType.UDP;
895 handlerAddr = checkNotNull(remoteAddress, "remoteAddress");
896 initiatorAddr = checkNotNull(localAddress, "localAddress");
897 return this;
898 }
899
900
901
902
903
904
905
906 public PcapWriteHandler build(OutputStream outputStream) {
907 checkNotNull(outputStream, "outputStream");
908 return new PcapWriteHandler(this, outputStream);
909 }
910 }
911
912 private static final class WildcardAddressHolder {
913 static final InetAddress wildcard4;
914 static final InetAddress wildcard6;
915
916 static {
917 try {
918 wildcard4 = InetAddress.getByAddress(new byte[4]);
919 wildcard6 = InetAddress.getByAddress(new byte[16]);
920 } catch (UnknownHostException e) {
921
922 throw new AssertionError(e);
923 }
924 }
925 }
926 }