View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.pcap;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.Channel;
21  import io.netty.channel.ChannelDuplexHandler;
22  import io.netty.channel.ChannelHandlerContext;
23  import io.netty.channel.ChannelInboundHandlerAdapter;
24  import io.netty.channel.ChannelPromise;
25  import io.netty.channel.ServerChannel;
26  import io.netty.channel.socket.DatagramChannel;
27  import io.netty.channel.socket.DatagramPacket;
28  import io.netty.channel.socket.ServerSocketChannel;
29  import io.netty.channel.socket.SocketChannel;
30  import io.netty.util.NetUtil;
31  import io.netty.util.ReferenceCountUtil;
32  import io.netty.util.internal.logging.InternalLogger;
33  import io.netty.util.internal.logging.InternalLoggerFactory;
34  
35  import java.io.Closeable;
36  import java.io.IOException;
37  import java.io.OutputStream;
38  import java.net.Inet4Address;
39  import java.net.Inet6Address;
40  import java.net.InetAddress;
41  import java.net.InetSocketAddress;
42  import java.net.UnknownHostException;
43  import java.util.concurrent.atomic.AtomicReference;
44  
45  import static io.netty.util.internal.ObjectUtil.checkNotNull;
46  
47  /**
48   * <p> {@link PcapWriteHandler} captures {@link ByteBuf} from {@link SocketChannel} / {@link ServerChannel}
49   * or {@link DatagramPacket} and writes it into Pcap {@link OutputStream}. </p>
50   *
51   * <p>
52   * Things to keep in mind when using {@link PcapWriteHandler} with TCP:
53   *
54   *    <ul>
55   *        <li> Whenever {@link ChannelInboundHandlerAdapter#channelActive(ChannelHandlerContext)} is called,
56   *        a fake TCP 3-way handshake (SYN, SYN+ACK, ACK) is simulated as new connection in Pcap. </li>
57   *
58   *        <li> Whenever {@link ChannelInboundHandlerAdapter#handlerRemoved(ChannelHandlerContext)} is called,
59   *        a fake TCP 3-way handshake (FIN+ACK, FIN+ACK, ACK) is simulated as connection shutdown in Pcap.  </li>
60   *
61   *        <li> Whenever {@link ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)}
62   *        is called, a fake TCP RST is sent to simulate connection Reset in Pcap. </li>
63   *
64   *        <li> ACK is sent each time data is send / received. </li>
65   *
66   *        <li> Zero Length Data Packets can cause TCP Double ACK error in Wireshark. To tackle this,
67   *        set {@code captureZeroByte} to {@code false}. </li>
68   *    </ul>
69   * </p>
70   */
71  public final class PcapWriteHandler extends ChannelDuplexHandler implements Closeable {
72  
73      /**
74       * Logger for logging events
75       */
76      private final InternalLogger logger = InternalLoggerFactory.getInstance(PcapWriteHandler.class);
77  
78      /**
79       * {@link PcapWriter} Instance
80       */
81      private PcapWriter pCapWriter;
82  
83      /**
84       * {@link OutputStream} where we'll write Pcap data.
85       */
86      private final OutputStream outputStream;
87  
88      /**
89       * {@code true} if we want to capture packets with zero bytes else {@code false}.
90       */
91      private final boolean captureZeroByte;
92  
93      /**
94       * {@code true} if we want to write Pcap Global Header on initialization of
95       * {@link PcapWriter} else {@code false}.
96       */
97      private final boolean writePcapGlobalHeader;
98  
99      /**
100      * {@code true} if we want to synchronize on the {@link OutputStream} while writing
101      * else {@code false}.
102      */
103     private final boolean sharedOutputStream;
104 
105     /**
106      * TCP Sender Segment Number.
107      * It'll start with 1 and keep incrementing with number of bytes read/sent and wrap at the uint32 max.
108      */
109     private long sendSegmentNumber = 1;
110 
111     /**
112      * TCP Receiver Segment Number.
113      * It'll start with 1 and keep incrementing with number of bytes read/sent and wrap at the uint32 max
114      */
115     private long receiveSegmentNumber = 1;
116 
117     /**
118      * Type of the channel this handler is registered on
119      */
120     private ChannelType channelType;
121 
122     /**
123      * Address of the initiator of the connection
124      */
125     private InetSocketAddress initiatorAddr;
126 
127     /**
128      * Address of the receiver of the connection
129      */
130     private InetSocketAddress handlerAddr;
131 
132     /**
133      * Set to {@code true} if this handler is registered on a server pipeline
134      */
135     private boolean isServerPipeline;
136 
137     /**
138      * Current of this {@link PcapWriteHandler}
139      */
140     private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
141 
142     /**
143      * Create new {@link PcapWriteHandler} Instance.
144      * {@code captureZeroByte} is set to {@code false} and
145      * {@code writePcapGlobalHeader} is set to {@code true}.
146      *
147      * @param outputStream OutputStream where Pcap data will be written. Call {@link #close()} to close this
148      *                     OutputStream.
149      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
150      *                              {@link NullPointerException}
151      * @deprecated Use {@link Builder} instead.
152      */
153     @Deprecated
154     public PcapWriteHandler(OutputStream outputStream) {
155         this(outputStream, false, true);
156     }
157 
158     /**
159      * Create new {@link PcapWriteHandler} Instance
160      *
161      * @param outputStream          OutputStream where Pcap data will be written. Call {@link #close()} to close this
162      *                              OutputStream.
163      * @param captureZeroByte       Set to {@code true} to enable capturing packets with empty (0 bytes) payload.
164      *                              Otherwise, if set to {@code false}, empty packets will be filtered out.
165      * @param writePcapGlobalHeader Set to {@code true} to write Pcap Global Header on initialization.
166      *                              Otherwise, if set to {@code false}, Pcap Global Header will not be written
167      *                              on initialization. This could when writing Pcap data on a existing file where
168      *                              Pcap Global Header is already present.
169      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
170      *                              {@link NullPointerException}
171      * @deprecated Use {@link Builder} instead.
172      */
173     @Deprecated
174     public PcapWriteHandler(OutputStream outputStream, boolean captureZeroByte, boolean writePcapGlobalHeader) {
175         this.outputStream = checkNotNull(outputStream, "OutputStream");
176         this.captureZeroByte = captureZeroByte;
177         this.writePcapGlobalHeader = writePcapGlobalHeader;
178         sharedOutputStream = false;
179     }
180 
181     private PcapWriteHandler(Builder builder, OutputStream outputStream) {
182         this.outputStream = outputStream;
183         captureZeroByte = builder.captureZeroByte;
184         sharedOutputStream = builder.sharedOutputStream;
185         writePcapGlobalHeader = builder.writePcapGlobalHeader;
186         channelType = builder.channelType;
187         handlerAddr = builder.handlerAddr;
188         initiatorAddr = builder.initiatorAddr;
189         isServerPipeline = builder.isServerPipeline;
190     }
191 
192     /**
193      * Writes the Pcap Global Header to the provided {@code OutputStream}
194      *
195      * @param outputStream OutputStream where Pcap data will be written.
196      * @throws IOException if there is an error writing to the {@code OutputStream}
197      */
198     public static void writeGlobalHeader(OutputStream outputStream) throws IOException {
199         PcapHeaders.writeGlobalHeader(outputStream);
200     }
201 
202     private void initializeIfNecessary(ChannelHandlerContext ctx) throws Exception {
203         // If State is not 'INIT' then it means we're already initialized so then no need to initiaize again.
204         if (state.get() != State.INIT) {
205             return;
206         }
207 
208         pCapWriter = new PcapWriter(this);
209 
210         if (channelType == null) {
211             // infer channel type
212             if (ctx.channel() instanceof SocketChannel) {
213                 channelType = ChannelType.TCP;
214 
215                 // If Channel belongs to `SocketChannel` then we're handling TCP.
216                 // Capture correct `localAddress` and `remoteAddress`
217                 if (ctx.channel().parent() instanceof ServerSocketChannel) {
218                     isServerPipeline = true;
219                     initiatorAddr = (InetSocketAddress) ctx.channel().remoteAddress();
220                     handlerAddr = getLocalAddress(ctx.channel(), initiatorAddr);
221                 } else {
222                     isServerPipeline = false;
223                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
224                     initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
225                 }
226             } else if (ctx.channel() instanceof DatagramChannel) {
227                 channelType = ChannelType.UDP;
228 
229                 DatagramChannel datagramChannel = (DatagramChannel) ctx.channel();
230 
231                 // If `DatagramChannel` is connected then we can get
232                 // `localAddress` and `remoteAddress` from Channel.
233                 if (datagramChannel.isConnected()) {
234                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
235                     initiatorAddr = getLocalAddress(ctx.channel(), handlerAddr);
236                 }
237             }
238         }
239 
240         if (channelType == ChannelType.TCP) {
241             logger.debug("Initiating Fake TCP 3-Way Handshake");
242 
243             ByteBuf tcpBuf = ctx.alloc().buffer();
244 
245             try {
246                 // Write SYN with Normal Source and Destination Address
247                 TCPPacket.writePacket(tcpBuf, null, 0, 0,
248                                       initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.SYN);
249                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
250 
251                 // Write SYN+ACK with Reversed Source and Destination Address
252                 TCPPacket.writePacket(tcpBuf, null, 0, 1,
253                                       handlerAddr.getPort(), initiatorAddr.getPort(), TCPPacket.TCPFlag.SYN,
254                                       TCPPacket.TCPFlag.ACK);
255                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, ctx.alloc(), ctx);
256 
257                 // Write ACK with Normal Source and Destination Address
258                 TCPPacket.writePacket(tcpBuf, null, 1, 1, initiatorAddr.getPort(),
259                                       handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
260                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
261             } finally {
262                 tcpBuf.release();
263             }
264 
265             logger.debug("Finished Fake TCP 3-Way Handshake");
266         }
267 
268         state.set(State.WRITING);
269     }
270 
271     @Override
272     public void channelActive(ChannelHandlerContext ctx) throws Exception {
273         initializeIfNecessary(ctx);
274         super.channelActive(ctx);
275     }
276 
277     @Override
278     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
279         // Initialize if needed
280         if (state.get() == State.INIT) {
281             try {
282                 initializeIfNecessary(ctx);
283             } catch (Exception ex) {
284                 ReferenceCountUtil.release(msg);
285                 throw ex;
286             }
287         }
288 
289         // Only write if State is STARTED
290         if (state.get() == State.WRITING) {
291             if (channelType == ChannelType.TCP) {
292                 handleTCP(ctx, msg, false);
293             } else if (channelType == ChannelType.UDP) {
294                 handleUDP(ctx, msg, false);
295             } else {
296                 logDiscard();
297             }
298         }
299         super.channelRead(ctx, msg);
300     }
301 
302     @Override
303     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
304         // Initialize if needed
305         if (state.get() == State.INIT) {
306             try {
307                 initializeIfNecessary(ctx);
308             } catch (Exception ex) {
309                 ReferenceCountUtil.release(msg);
310                 promise.setFailure(ex);
311                 return;
312             }
313         }
314 
315         // Only write if State is STARTED
316         if (state.get() == State.WRITING) {
317             if (channelType == ChannelType.TCP) {
318                 handleTCP(ctx, msg, true);
319             } else if (channelType == ChannelType.UDP) {
320                 handleUDP(ctx, msg, true);
321             } else {
322                 logDiscard();
323             }
324         }
325         super.write(ctx, msg, promise);
326     }
327 
328     /**
329      * Handle TCP L4
330      *
331      * @param ctx              {@link ChannelHandlerContext} for {@link ByteBuf} allocation and
332      *                         {@code fireExceptionCaught}
333      * @param msg              {@link Object} must be {@link ByteBuf} else it'll be discarded
334      * @param isWriteOperation Set {@code true} if we have to process packet when packets are being sent out
335      *                         else set {@code false}
336      */
337     private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
338         if (msg instanceof ByteBuf) {
339 
340             // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
341             int totalBytes = ((ByteBuf) msg).readableBytes();
342             if (totalBytes == 0 && !captureZeroByte) {
343                 logger.debug("Discarding Zero Byte TCP Packet. isWriteOperation {}", isWriteOperation);
344                 return;
345             }
346 
347             ByteBufAllocator byteBufAllocator = ctx.alloc();
348             if (totalBytes == 0) {
349                 handleTcpPacket(ctx, (ByteBuf) msg, isWriteOperation, byteBufAllocator);
350                 return;
351             }
352 
353             // If the payload exceeds the max size of that can fit in a single TCP IPv4 packet, fragment the payload
354             int maxTcpPayload = 65495;
355 
356             for (int i = 0; i < totalBytes; i += maxTcpPayload) {
357                 ByteBuf packet = ((ByteBuf) msg).slice(i, Math.min(maxTcpPayload, totalBytes - i));
358                 handleTcpPacket(ctx, packet, isWriteOperation, byteBufAllocator);
359             }
360         } else {
361             logger.debug("Discarding Pcap Write for TCP Object: {}", msg);
362         }
363     }
364 
365     private void handleTcpPacket(ChannelHandlerContext ctx, ByteBuf packet, boolean isWriteOperation,
366                                  ByteBufAllocator byteBufAllocator) {
367         ByteBuf tcpBuf = byteBufAllocator.buffer();
368         int bytes = packet.readableBytes();
369 
370         try {
371             if (isWriteOperation) {
372                 final InetSocketAddress srcAddr;
373                 final InetSocketAddress dstAddr;
374                 if (isServerPipeline) {
375                     srcAddr = handlerAddr;
376                     dstAddr = initiatorAddr;
377                 } else {
378                     srcAddr = initiatorAddr;
379                     dstAddr = handlerAddr;
380                 }
381 
382                 TCPPacket.writePacket(tcpBuf, packet, sendSegmentNumber, receiveSegmentNumber,
383                                       srcAddr.getPort(),
384                                       dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
385                 completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
386                 logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, srcAddr, dstAddr, false);
387 
388                 sendSegmentNumber = incrementUintSegmentNumber(sendSegmentNumber, bytes);
389 
390                 TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
391                                       srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
392                 completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
393                 logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
394             } else {
395                 final InetSocketAddress srcAddr;
396                 final InetSocketAddress dstAddr;
397                 if (isServerPipeline) {
398                     srcAddr = initiatorAddr;
399                     dstAddr = handlerAddr;
400                 } else {
401                     srcAddr = handlerAddr;
402                     dstAddr = initiatorAddr;
403                 }
404 
405                 TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber,
406                                       srcAddr.getPort(),
407                                       dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
408                 completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
409                 logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, srcAddr, dstAddr, false);
410 
411                 receiveSegmentNumber = incrementUintSegmentNumber(receiveSegmentNumber, bytes);
412 
413                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, dstAddr.getPort(),
414                                       srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
415                 completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
416                 logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
417             }
418         } finally {
419             tcpBuf.release();
420         }
421     }
422 
423     /**
424      * Write TCP/IP L3 and L2 here.
425      *
426      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
427      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
428      * @param tcpBuf           {@link ByteBuf} containing TCP L4 Data
429      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for TCP/IP L3 and L2 data.
430      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
431      */
432     private void completeTCPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf tcpBuf,
433                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
434 
435         ByteBuf ipBuf = byteBufAllocator.buffer();
436         ByteBuf ethernetBuf = byteBufAllocator.buffer();
437         ByteBuf pcap = byteBufAllocator.buffer();
438 
439         try {
440             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
441                 IPPacket.writeTCPv4(ipBuf, tcpBuf,
442                                     NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
443                                     NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
444 
445                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
446             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
447                 IPPacket.writeTCPv6(ipBuf, tcpBuf,
448                                     srcAddr.getAddress().getAddress(),
449                                     dstAddr.getAddress().getAddress());
450 
451                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
452             } else {
453                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
454                              "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
455                 return;
456             }
457 
458             // Write Packet into Pcap
459             pCapWriter.writePacket(pcap, ethernetBuf);
460         } catch (IOException ex) {
461             logger.error("Caught Exception While Writing Packet into Pcap", ex);
462             ctx.fireExceptionCaught(ex);
463         } finally {
464             ipBuf.release();
465             ethernetBuf.release();
466             pcap.release();
467         }
468     }
469 
470     private static long incrementUintSegmentNumber(long sequenceNumber, int value) {
471         // If the sequence number would go above the max for uint32, wrap around
472         return (sequenceNumber + value) % (0xFFFFFFFFL + 1);
473     }
474 
475     /**
476      * Handle UDP l4
477      *
478      * @param ctx {@link ChannelHandlerContext} for {@code localAddress} / {@code remoteAddress},
479      *            {@link ByteBuf} allocation and {@code fireExceptionCaught}
480      * @param msg {@link DatagramPacket} or {@link ByteBuf}
481      */
482     private void handleUDP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
483         ByteBuf udpBuf = ctx.alloc().buffer();
484 
485         try {
486             if (msg instanceof DatagramPacket) {
487 
488                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
489                 if (((DatagramPacket) msg).content().readableBytes() == 0 && !captureZeroByte) {
490                     logger.debug("Discarding Zero Byte UDP Packet");
491                     return;
492                 }
493 
494                 if (((DatagramPacket) msg).content().readableBytes() > 65507) {
495                     logger.warn("Unable to write UDP packet to PCAP. Payload of size {} exceeds max size of 65507");
496                     return;
497                 }
498 
499                 DatagramPacket datagramPacket = ((DatagramPacket) msg).duplicate();
500                 InetSocketAddress srcAddr = datagramPacket.sender();
501                 InetSocketAddress dstAddr = datagramPacket.recipient();
502 
503                 // If `datagramPacket.sender()` is `null` then DatagramPacket is initialized
504                 // `sender` (local) address. In this case, we'll get source address from Channel.
505                 if (srcAddr == null) {
506                     srcAddr = getLocalAddress(ctx.channel(), dstAddr);
507                 }
508 
509                 logger.debug("Writing UDP Data of {} Bytes, isWriteOperation {}, Src Addr {}, Dst Addr {}",
510                              datagramPacket.content().readableBytes(), isWriteOperation, srcAddr, dstAddr);
511 
512                 UDPPacket.writePacket(udpBuf, datagramPacket.content(), srcAddr.getPort(), dstAddr.getPort());
513                 completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
514             } else if (msg instanceof ByteBuf &&
515                        (!(ctx.channel() instanceof DatagramChannel) ||
516                         ((DatagramChannel) ctx.channel()).isConnected())) {
517 
518                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
519                 if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
520                     logger.debug("Discarding Zero Byte UDP Packet");
521                     return;
522                 }
523 
524                 if (((ByteBuf) msg).readableBytes() > 65507) {
525                     logger.warn("Unable to write UDP packet to PCAP. Payload of size {} exceeds max size of 65507");
526                     return;
527                 }
528 
529                 ByteBuf byteBuf = ((ByteBuf) msg).duplicate();
530 
531                 InetSocketAddress sourceAddr = isWriteOperation? initiatorAddr : handlerAddr;
532                 InetSocketAddress destinationAddr = isWriteOperation? handlerAddr : initiatorAddr;
533 
534                 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
535                              byteBuf.readableBytes(), sourceAddr, destinationAddr);
536 
537                 UDPPacket.writePacket(udpBuf, byteBuf, sourceAddr.getPort(), destinationAddr.getPort());
538                 completeUDPWrite(sourceAddr, destinationAddr, udpBuf, ctx.alloc(), ctx);
539             } else {
540                 logger.debug("Discarding Pcap Write for UDP Object: {}", msg);
541             }
542         } finally {
543             udpBuf.release();
544         }
545     }
546 
547     /**
548      * Write UDP/IP L3 and L2 here.
549      *
550      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
551      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
552      * @param udpBuf           {@link ByteBuf} containing UDP L4 Data
553      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for UDP/IP L3 and L2 data.
554      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
555      */
556     private void completeUDPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf udpBuf,
557                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
558 
559         ByteBuf ipBuf = byteBufAllocator.buffer();
560         ByteBuf ethernetBuf = byteBufAllocator.buffer();
561         ByteBuf pcap = byteBufAllocator.buffer();
562 
563         try {
564             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
565                 IPPacket.writeUDPv4(ipBuf, udpBuf,
566                                     NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
567                                     NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
568 
569                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
570             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
571                 IPPacket.writeUDPv6(ipBuf, udpBuf,
572                                     srcAddr.getAddress().getAddress(),
573                                     dstAddr.getAddress().getAddress());
574 
575                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
576             } else {
577                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
578                              "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
579                 return;
580             }
581 
582             // Write Packet into Pcap
583             pCapWriter.writePacket(pcap, ethernetBuf);
584         } catch (IOException ex) {
585             logger.error("Caught Exception While Writing Packet into Pcap", ex);
586             ctx.fireExceptionCaught(ex);
587         } finally {
588             ipBuf.release();
589             ethernetBuf.release();
590             pcap.release();
591         }
592     }
593 
594     /**
595      * Get the local address of a channel. If the address is a wildcard address ({@code 0.0.0.0} or {@code ::}), and
596      * the address family does not match that of the {@code remote}, return the wildcard address of the {@code remote}'s
597      * family instead.
598      *
599      * @param ch     The channel to get the local address from
600      * @param remote The remote address
601      * @return The fixed local address
602      */
603     private static InetSocketAddress getLocalAddress(Channel ch, InetSocketAddress remote) {
604         InetSocketAddress local = (InetSocketAddress) ch.localAddress();
605         if (remote != null && local.getAddress().isAnyLocalAddress()) {
606             if (local.getAddress() instanceof Inet4Address && remote.getAddress() instanceof Inet6Address) {
607                 return new InetSocketAddress(WildcardAddressHolder.wildcard6, local.getPort());
608             }
609             if (local.getAddress() instanceof Inet6Address && remote.getAddress() instanceof Inet4Address) {
610                 return new InetSocketAddress(WildcardAddressHolder.wildcard4, local.getPort());
611             }
612         }
613         return local;
614     }
615 
616     @Override
617     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
618 
619         // If `isTCP` is true and state is WRITING, then we'll simulate a `FIN` flow.
620         if (channelType == ChannelType.TCP && state.get() == State.WRITING) {
621             logger.debug("Starting Fake TCP FIN+ACK Flow to close connection");
622 
623             ByteBufAllocator byteBufAllocator = ctx.alloc();
624             ByteBuf tcpBuf = byteBufAllocator.buffer();
625 
626             try {
627                 long initiatorSegmentNumber = isServerPipeline? receiveSegmentNumber : sendSegmentNumber;
628                 long initiatorAckNumber = isServerPipeline? sendSegmentNumber : receiveSegmentNumber;
629                 // Write FIN+ACK with Normal Source and Destination Address
630                 TCPPacket.writePacket(tcpBuf, null, initiatorSegmentNumber, initiatorAckNumber, initiatorAddr.getPort(),
631                                       handlerAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
632                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
633 
634                 // Write FIN+ACK with Reversed Source and Destination Address
635                 TCPPacket.writePacket(tcpBuf, null, initiatorAckNumber, initiatorSegmentNumber, handlerAddr.getPort(),
636                                       initiatorAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
637                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, byteBufAllocator, ctx);
638 
639                 // Increment by 1 when responding to FIN
640                 sendSegmentNumber = incrementUintSegmentNumber(sendSegmentNumber, 1);
641                 receiveSegmentNumber = incrementUintSegmentNumber(receiveSegmentNumber, 1);
642                 initiatorSegmentNumber = isServerPipeline? receiveSegmentNumber : sendSegmentNumber;
643                 initiatorAckNumber = isServerPipeline? sendSegmentNumber : receiveSegmentNumber;
644 
645                 // Write ACK with Normal Source and Destination Address
646                 TCPPacket.writePacket(tcpBuf, null, initiatorSegmentNumber, initiatorAckNumber,
647                                       initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
648                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
649             } finally {
650                 tcpBuf.release();
651             }
652 
653             logger.debug("Finished Fake TCP FIN+ACK Flow to close connection");
654         }
655 
656         close();
657         super.handlerRemoved(ctx);
658     }
659 
660     @Override
661     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
662         // Only write RST if this is an initialized TCP stream
663         if (channelType == ChannelType.TCP && state.get() == State.WRITING) {
664             ByteBuf tcpBuf = ctx.alloc().buffer();
665 
666             try {
667                 // Write RST with Normal Source and Destination Address
668                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
669                                       handlerAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
670                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
671             } finally {
672                 tcpBuf.release();
673             }
674 
675             logger.debug("Sent Fake TCP RST to close connection");
676         }
677 
678         close();
679         ctx.fireExceptionCaught(cause);
680     }
681 
682     /**
683      * Logger for TCP
684      */
685     private void logTCP(boolean isWriteOperation, int bytes, long sendSegmentNumber, long receiveSegmentNumber,
686                         InetSocketAddress srcAddr, InetSocketAddress dstAddr, boolean ackOnly) {
687         // If `ackOnly` is `true` when we don't need to write any data so we'll not
688         // log number of bytes being written and mark the operation as "TCP ACK".
689         if (logger.isDebugEnabled()) {
690             if (ackOnly) {
691                 logger.debug("Writing TCP ACK, isWriteOperation {}, Segment Number {}, Ack Number {}, Src Addr {}, "
692                              + "Dst Addr {}", isWriteOperation, sendSegmentNumber, receiveSegmentNumber, dstAddr,
693                              srcAddr);
694             } else {
695                 logger.debug("Writing TCP Data of {} Bytes, isWriteOperation {}, Segment Number {}, Ack Number {}, " +
696                              "Src Addr {}, Dst Addr {}", bytes, isWriteOperation, sendSegmentNumber,
697                              receiveSegmentNumber, srcAddr, dstAddr);
698             }
699         }
700     }
701 
702     OutputStream outputStream() {
703         return outputStream;
704     }
705 
706     boolean sharedOutputStream() {
707         return sharedOutputStream;
708     }
709 
710     boolean writePcapGlobalHeader() {
711         return writePcapGlobalHeader;
712     }
713 
714     /**
715      * Returns {@code true} if the {@link PcapWriteHandler} is currently
716      * writing packets to the {@link OutputStream} else returns {@code false}.
717      */
718     public boolean isWriting() {
719         return state.get() == State.WRITING;
720     }
721 
722     State state() {
723         return state.get();
724     }
725 
726     /**
727      * Pause the {@link PcapWriteHandler} from writing packets to the {@link OutputStream}.
728      */
729     public void pause() {
730         if (!state.compareAndSet(State.WRITING, State.PAUSED)) {
731             throw new IllegalStateException("State must be 'STARTED' to pause but current state is: " + state);
732         }
733     }
734 
735     /**
736      * Resume the {@link PcapWriteHandler} to writing packets to the {@link OutputStream}.
737      */
738     public void resume() {
739         if (!state.compareAndSet(State.PAUSED, State.WRITING)) {
740             throw new IllegalStateException("State must be 'PAUSED' to resume but current state is: " + state);
741         }
742     }
743 
744     void markClosed() {
745         if (state.get() != State.CLOSED) {
746             state.set(State.CLOSED);
747         }
748     }
749 
750     // Visible for testing only.
751     PcapWriter pCapWriter() {
752         return pCapWriter;
753     }
754 
755     private void logDiscard() {
756         logger.warn("Discarding pcap write because channel type is unknown. The channel this handler is registered " +
757                     "on is not a SocketChannel or DatagramChannel, so the inference does not work. Please call " +
758                     "forceTcpChannel or forceUdpChannel before registering the handler.");
759     }
760 
761     @Override
762     public String toString() {
763         return "PcapWriteHandler{" +
764                "captureZeroByte=" + captureZeroByte +
765                ", writePcapGlobalHeader=" + writePcapGlobalHeader +
766                ", sharedOutputStream=" + sharedOutputStream +
767                ", sendSegmentNumber=" + sendSegmentNumber +
768                ", receiveSegmentNumber=" + receiveSegmentNumber +
769                ", channelType=" + channelType +
770                ", initiatorAddr=" + initiatorAddr +
771                ", handlerAddr=" + handlerAddr +
772                ", isServerPipeline=" + isServerPipeline +
773                ", state=" + state +
774                '}';
775     }
776 
777     /**
778      * <p> Close {@code PcapWriter} and {@link OutputStream}. </p>
779      * <p> Note: Calling this method does not close {@link PcapWriteHandler}.
780      * Only Pcap Writes are closed. </p>
781      *
782      * @throws IOException If {@link OutputStream#close()} throws an exception
783      */
784     @Override
785     public void close() throws IOException {
786         if (state.get() == State.CLOSED) {
787             logger.debug("PcapWriterHandler is already closed");
788         } else {
789             // If close is called prematurely, create writer to close output stream
790             if (pCapWriter == null) {
791                 pCapWriter = new PcapWriter(this);
792             }
793             pCapWriter.close();
794             markClosed();
795             logger.debug("PcapWriterHandler is now closed");
796         }
797     }
798 
799     private enum ChannelType {
800         TCP, UDP
801     }
802 
803     public static Builder builder() {
804         return new Builder();
805     }
806 
807     /**
808      * Builder for {@link PcapWriteHandler}
809      */
810     public static final class Builder {
811         private boolean captureZeroByte;
812         private boolean sharedOutputStream;
813         private boolean writePcapGlobalHeader = true;
814 
815         private ChannelType channelType;
816         private InetSocketAddress initiatorAddr;
817         private InetSocketAddress handlerAddr;
818         private boolean isServerPipeline;
819 
820         private Builder() {
821         }
822 
823         /**
824          * Set to {@code true} to enable capturing packets with empty (0 bytes) payload. Otherwise, if set to
825          * {@code false}, empty packets will be filtered out.
826          *
827          * @param captureZeroByte Whether to filter out empty packets.
828          * @return this builder
829          */
830         public Builder captureZeroByte(boolean captureZeroByte) {
831             this.captureZeroByte = captureZeroByte;
832             return this;
833         }
834 
835         /**
836          * Set to {@code true} if multiple {@link PcapWriteHandler} instances will be
837          * writing to the same {@link OutputStream} concurrently, and write locking is
838          * required. Otherwise, if set to {@code false}, no locking will be done.
839          * Additionally, {@link #close} will not close the underlying {@code OutputStream}.
840          * Note: it is probably an error to have both {@code writePcapGlobalHeader} and
841          * {@code sharedOutputStream} set to {@code true} at the same time.
842          *
843          * @param sharedOutputStream Whether {@link OutputStream} is shared or not
844          * @return this builder
845          */
846         public Builder sharedOutputStream(boolean sharedOutputStream) {
847             this.sharedOutputStream = sharedOutputStream;
848             return this;
849         }
850 
851         /**
852          * Set to {@code true} to write Pcap Global Header on initialization. Otherwise, if set to {@code false}, Pcap
853          * Global Header will not be written on initialization. This could when writing Pcap data on a existing file
854          * where Pcap Global Header is already present.
855          *
856          * @param writePcapGlobalHeader Whether to write the pcap global header.
857          * @return this builder
858          */
859         public Builder writePcapGlobalHeader(boolean writePcapGlobalHeader) {
860             this.writePcapGlobalHeader = writePcapGlobalHeader;
861             return this;
862         }
863 
864         /**
865          * Force this handler to write data as if they were TCP packets, with the given connection metadata. If this
866          * method isn't called, we determine the metadata from the channel.
867          *
868          * @param serverAddress    The address of the TCP server (handler)
869          * @param clientAddress    The address of the TCP client (initiator)
870          * @param isServerPipeline Whether the handler is part of the server channel
871          * @return this builder
872          */
873         public Builder forceTcpChannel(InetSocketAddress serverAddress, InetSocketAddress clientAddress,
874                                        boolean isServerPipeline) {
875             channelType = ChannelType.TCP;
876             handlerAddr = checkNotNull(serverAddress, "serverAddress");
877             initiatorAddr = checkNotNull(clientAddress, "clientAddress");
878             this.isServerPipeline = isServerPipeline;
879             return this;
880         }
881 
882         /**
883          * Force this handler to write data as if they were UDP packets, with the given connection metadata. If this
884          * method isn't called, we determine the metadata from the channel.
885          * <br>
886          * Note that even if this method is called, the address information on {@link DatagramPacket} takes precedence
887          * if it is present.
888          *
889          * @param localAddress  The address of the UDP local
890          * @param remoteAddress The address of the UDP remote
891          * @return this builder
892          */
893         public Builder forceUdpChannel(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
894             channelType = ChannelType.UDP;
895             handlerAddr = checkNotNull(remoteAddress, "remoteAddress");
896             initiatorAddr = checkNotNull(localAddress, "localAddress");
897             return this;
898         }
899 
900         /**
901          * Build the {@link PcapWriteHandler}.
902          *
903          * @param outputStream The output stream to write the pcap data to.
904          * @return The handler.
905          */
906         public PcapWriteHandler build(OutputStream outputStream) {
907             checkNotNull(outputStream, "outputStream");
908             return new PcapWriteHandler(this, outputStream);
909         }
910     }
911 
912     private static final class WildcardAddressHolder {
913         static final InetAddress wildcard4; // 0.0.0.0
914         static final InetAddress wildcard6; // ::
915 
916         static {
917             try {
918                 wildcard4 = InetAddress.getByAddress(new byte[4]);
919                 wildcard6 = InetAddress.getByAddress(new byte[16]);
920             } catch (UnknownHostException e) {
921                 // would only happen if the byte array was of incorrect size
922                 throw new AssertionError(e);
923             }
924         }
925     }
926 }