View Javadoc
1   /*
2    * Copyright 2020 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    * https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.pcap;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.buffer.ByteBufAllocator;
20  import io.netty.channel.ChannelDuplexHandler;
21  import io.netty.channel.ChannelHandlerContext;
22  import io.netty.channel.ChannelInboundHandlerAdapter;
23  import io.netty.channel.ChannelPromise;
24  import io.netty.channel.ServerChannel;
25  import io.netty.channel.socket.DatagramChannel;
26  import io.netty.channel.socket.DatagramPacket;
27  import io.netty.channel.socket.ServerSocketChannel;
28  import io.netty.channel.socket.SocketChannel;
29  import io.netty.util.NetUtil;
30  import io.netty.util.internal.logging.InternalLogger;
31  import io.netty.util.internal.logging.InternalLoggerFactory;
32  
33  import java.io.Closeable;
34  import java.io.IOException;
35  import java.io.OutputStream;
36  import java.net.Inet4Address;
37  import java.net.Inet6Address;
38  import java.net.InetSocketAddress;
39  import java.util.concurrent.atomic.AtomicReference;
40  
41  import static io.netty.util.internal.ObjectUtil.checkNotNull;
42  
43  /**
44   * <p> {@link PcapWriteHandler} captures {@link ByteBuf} from {@link SocketChannel} / {@link ServerChannel}
45   * or {@link DatagramPacket} and writes it into Pcap {@link OutputStream}. </p>
46   *
47   * <p>
48   * Things to keep in mind when using {@link PcapWriteHandler} with TCP:
49   *
50   *    <ul>
51   *        <li> Whenever {@link ChannelInboundHandlerAdapter#channelActive(ChannelHandlerContext)} is called,
52   *        a fake TCP 3-way handshake (SYN, SYN+ACK, ACK) is simulated as new connection in Pcap. </li>
53   *
54   *        <li> Whenever {@link ChannelInboundHandlerAdapter#handlerRemoved(ChannelHandlerContext)} is called,
55   *        a fake TCP 3-way handshake (FIN+ACK, FIN+ACK, ACK) is simulated as connection shutdown in Pcap.  </li>
56   *
57   *        <li> Whenever {@link ChannelInboundHandlerAdapter#exceptionCaught(ChannelHandlerContext, Throwable)}
58   *        is called, a fake TCP RST is sent to simulate connection Reset in Pcap. </li>
59   *
60   *        <li> ACK is sent each time data is send / received. </li>
61   *
62   *        <li> Zero Length Data Packets can cause TCP Double ACK error in Wireshark. To tackle this,
63   *        set {@code captureZeroByte} to {@code false}. </li>
64   *    </ul>
65   * </p>
66   */
67  public final class PcapWriteHandler extends ChannelDuplexHandler implements Closeable {
68  
69      /**
70       * Logger for logging events
71       */
72      private final InternalLogger logger = InternalLoggerFactory.getInstance(PcapWriteHandler.class);
73  
74      /**
75       * {@link PcapWriter} Instance
76       */
77      private PcapWriter pCapWriter;
78  
79      /**
80       * {@link OutputStream} where we'll write Pcap data.
81       */
82      private final OutputStream outputStream;
83  
84      /**
85       * {@code true} if we want to capture packets with zero bytes else {@code false}.
86       */
87      private final boolean captureZeroByte;
88  
89      /**
90       * {@code true} if we want to write Pcap Global Header on initialization of
91       * {@link PcapWriter} else {@code false}.
92       */
93      private final boolean writePcapGlobalHeader;
94  
95      /**
96       * {@code true} if we want to synchronize on the {@link OutputStream} while writing
97       * else {@code false}.
98       */
99      private final boolean sharedOutputStream;
100 
101     /**
102      * TCP Sender Segment Number.
103      * It'll start with 1 and keep incrementing with number of bytes read/sent.
104      */
105     private int sendSegmentNumber = 1;
106 
107     /**
108      * TCP Receiver Segment Number.
109      * It'll start with 1 and keep incrementing with number of bytes read/sent.
110      */
111     private int receiveSegmentNumber = 1;
112 
113     /**
114      * Type of the channel this handler is registered on
115      */
116     private ChannelType channelType;
117 
118     /**
119      * Address of the initiator of the connection
120      */
121     private InetSocketAddress initiatorAddr;
122 
123     /**
124      * Address of the receiver of the connection
125      */
126     private InetSocketAddress handlerAddr;
127 
128     /**
129      * Set to {@code true} if this handler is registered on a server pipeline
130      */
131     private boolean isServerPipeline;
132 
133     /**
134      * Current of this {@link PcapWriteHandler}
135      */
136     private final AtomicReference<State> state = new AtomicReference<State>(State.INIT);
137 
138     /**
139      * Create new {@link PcapWriteHandler} Instance.
140      * {@code captureZeroByte} is set to {@code false} and
141      * {@code writePcapGlobalHeader} is set to {@code true}.
142      *
143      * @param outputStream OutputStream where Pcap data will be written. Call {@link #close()} to close this
144      *                     OutputStream.
145      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
146      *                              {@link NullPointerException}
147      *
148      * @deprecated Use {@link Builder} instead.
149      */
150     @Deprecated
151     public PcapWriteHandler(OutputStream outputStream) {
152         this(outputStream, false, true);
153     }
154 
155     /**
156      * Create new {@link PcapWriteHandler} Instance
157      *
158      * @param outputStream          OutputStream where Pcap data will be written. Call {@link #close()} to close this
159      *                              OutputStream.
160      * @param captureZeroByte       Set to {@code true} to enable capturing packets with empty (0 bytes) payload.
161      *                              Otherwise, if set to {@code false}, empty packets will be filtered out.
162      * @param writePcapGlobalHeader Set to {@code true} to write Pcap Global Header on initialization.
163      *                              Otherwise, if set to {@code false}, Pcap Global Header will not be written
164      *                              on initialization. This could when writing Pcap data on a existing file where
165      *                              Pcap Global Header is already present.
166      * @throws NullPointerException If {@link OutputStream} is {@code null} then we'll throw an
167      *                              {@link NullPointerException}
168      *
169      * @deprecated Use {@link Builder} instead.
170      */
171     @Deprecated
172     public PcapWriteHandler(OutputStream outputStream, boolean captureZeroByte, boolean writePcapGlobalHeader) {
173         this.outputStream = checkNotNull(outputStream, "OutputStream");
174         this.captureZeroByte = captureZeroByte;
175         this.writePcapGlobalHeader = writePcapGlobalHeader;
176         sharedOutputStream = false;
177     }
178 
179     private PcapWriteHandler(Builder builder, OutputStream outputStream) {
180         this.outputStream = outputStream;
181         captureZeroByte = builder.captureZeroByte;
182         sharedOutputStream = builder.sharedOutputStream;
183         writePcapGlobalHeader = builder.writePcapGlobalHeader;
184         channelType = builder.channelType;
185         handlerAddr = builder.handlerAddr;
186         initiatorAddr = builder.initiatorAddr;
187         isServerPipeline = builder.isServerPipeline;
188     }
189 
190     /**
191      * Writes the Pcap Global Header to the provided {@code OutputStream}
192      *
193      * @param outputStream OutputStream where Pcap data will be written.
194      * @throws IOException if there is an error writing to the {@code OutputStream}
195      */
196     public static void writeGlobalHeader(OutputStream outputStream) throws IOException {
197         PcapHeaders.writeGlobalHeader(outputStream);
198     }
199 
200     private void initializeIfNecessary(ChannelHandlerContext ctx) throws Exception {
201         // If State is not 'INIT' then it means we're already initialized so then no need to initiaize again.
202         if (state.get() != State.INIT) {
203             return;
204         }
205 
206         pCapWriter = new PcapWriter(this);
207 
208         if (channelType == null) {
209             // infer channel type
210             if (ctx.channel() instanceof SocketChannel) {
211                 channelType = ChannelType.TCP;
212 
213                 // If Channel belongs to `SocketChannel` then we're handling TCP.
214                 // Capture correct `localAddress` and `remoteAddress`
215                 if (ctx.channel().parent() instanceof ServerSocketChannel) {
216                     isServerPipeline = true;
217                     initiatorAddr = (InetSocketAddress) ctx.channel().remoteAddress();
218                     handlerAddr = (InetSocketAddress) ctx.channel().localAddress();
219                 } else {
220                     isServerPipeline = false;
221                     initiatorAddr = (InetSocketAddress) ctx.channel().localAddress();
222                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
223                 }
224             } else if (ctx.channel() instanceof DatagramChannel) {
225                 channelType = ChannelType.UDP;
226 
227                 DatagramChannel datagramChannel = (DatagramChannel) ctx.channel();
228 
229                 // If `DatagramChannel` is connected then we can get
230                 // `localAddress` and `remoteAddress` from Channel.
231                 if (datagramChannel.isConnected()) {
232                     initiatorAddr = (InetSocketAddress) ctx.channel().localAddress();
233                     handlerAddr = (InetSocketAddress) ctx.channel().remoteAddress();
234                 }
235             }
236         }
237 
238         if (channelType == ChannelType.TCP) {
239             logger.debug("Initiating Fake TCP 3-Way Handshake");
240 
241             ByteBuf tcpBuf = ctx.alloc().buffer();
242 
243             try {
244                 // Write SYN with Normal Source and Destination Address
245                 TCPPacket.writePacket(tcpBuf, null, 0, 0,
246                         initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.SYN);
247                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
248 
249                 // Write SYN+ACK with Reversed Source and Destination Address
250                 TCPPacket.writePacket(tcpBuf, null, 0, 1,
251                         handlerAddr.getPort(), initiatorAddr.getPort(), TCPPacket.TCPFlag.SYN, TCPPacket.TCPFlag.ACK);
252                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, ctx.alloc(), ctx);
253 
254                 // Write ACK with Normal Source and Destination Address
255                 TCPPacket.writePacket(tcpBuf, null, 1, 1, initiatorAddr.getPort(),
256                         handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
257                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
258             } finally {
259                 tcpBuf.release();
260             }
261 
262             logger.debug("Finished Fake TCP 3-Way Handshake");
263         }
264 
265         state.set(State.WRITING);
266     }
267 
268     @Override
269     public void channelActive(ChannelHandlerContext ctx) throws Exception {
270         initializeIfNecessary(ctx);
271         super.channelActive(ctx);
272     }
273 
274     @Override
275     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
276         // Initialize if needed
277         if (state.get() == State.INIT) {
278             initializeIfNecessary(ctx);
279         }
280 
281         // Only write if State is STARTED
282         if (state.get() == State.WRITING) {
283             if (channelType == ChannelType.TCP) {
284                 handleTCP(ctx, msg, false);
285             } else if (channelType == ChannelType.UDP) {
286                 handleUDP(ctx, msg);
287             } else {
288                 logDiscard();
289             }
290         }
291         super.channelRead(ctx, msg);
292     }
293 
294     @Override
295     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
296         // Initialize if needed
297         if (state.get() == State.INIT) {
298             initializeIfNecessary(ctx);
299         }
300 
301         // Only write if State is STARTED
302         if (state.get() == State.WRITING) {
303             if (channelType == ChannelType.TCP) {
304                 handleTCP(ctx, msg, true);
305             } else if (channelType == ChannelType.UDP) {
306                 handleUDP(ctx, msg);
307             } else {
308                 logDiscard();
309             }
310         }
311         super.write(ctx, msg, promise);
312     }
313 
314     /**
315      * Handle TCP L4
316      *
317      * @param ctx              {@link ChannelHandlerContext} for {@link ByteBuf} allocation and
318      *                         {@code fireExceptionCaught}
319      * @param msg              {@link Object} must be {@link ByteBuf} else it'll be discarded
320      * @param isWriteOperation Set {@code true} if we have to process packet when packets are being sent out
321      *                         else set {@code false}
322      */
323     private void handleTCP(ChannelHandlerContext ctx, Object msg, boolean isWriteOperation) {
324         if (msg instanceof ByteBuf) {
325 
326             // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
327             if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
328                 logger.debug("Discarding Zero Byte TCP Packet. isWriteOperation {}", isWriteOperation);
329                 return;
330             }
331 
332             ByteBufAllocator byteBufAllocator = ctx.alloc();
333             ByteBuf packet = ((ByteBuf) msg).duplicate();
334             ByteBuf tcpBuf = byteBufAllocator.buffer();
335             int bytes = packet.readableBytes();
336 
337             try {
338                 if (isWriteOperation) {
339                     final InetSocketAddress srcAddr;
340                     final InetSocketAddress dstAddr;
341                     if (isServerPipeline) {
342                         srcAddr = handlerAddr;
343                         dstAddr = initiatorAddr;
344                     } else {
345                         srcAddr = initiatorAddr;
346                         dstAddr = handlerAddr;
347                     }
348 
349                     TCPPacket.writePacket(tcpBuf, packet, sendSegmentNumber, receiveSegmentNumber, srcAddr.getPort(),
350                             dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
351                     completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
352                     logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, srcAddr, dstAddr, false);
353 
354                     sendSegmentNumber += bytes;
355 
356                     TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, dstAddr.getPort(),
357                             srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
358                     completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
359                     logTCP(true, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
360                 } else {
361                     final InetSocketAddress srcAddr;
362                     final InetSocketAddress dstAddr;
363                     if (isServerPipeline) {
364                         srcAddr = initiatorAddr;
365                         dstAddr = handlerAddr;
366                     } else {
367                         srcAddr = handlerAddr;
368                         dstAddr = initiatorAddr;
369                     }
370 
371                     TCPPacket.writePacket(tcpBuf, packet, receiveSegmentNumber, sendSegmentNumber, srcAddr.getPort(),
372                             dstAddr.getPort(), TCPPacket.TCPFlag.ACK);
373                     completeTCPWrite(srcAddr, dstAddr, tcpBuf, byteBufAllocator, ctx);
374                     logTCP(false, bytes, receiveSegmentNumber, sendSegmentNumber, srcAddr, dstAddr, false);
375 
376                     receiveSegmentNumber += bytes;
377 
378                     TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, dstAddr.getPort(),
379                             srcAddr.getPort(), TCPPacket.TCPFlag.ACK);
380                     completeTCPWrite(dstAddr, srcAddr, tcpBuf, byteBufAllocator, ctx);
381                     logTCP(false, bytes, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr, true);
382                 }
383             } finally {
384                 tcpBuf.release();
385             }
386         } else {
387             logger.debug("Discarding Pcap Write for TCP Object: {}", msg);
388         }
389     }
390 
391     /**
392      * Write TCP/IP L3 and L2 here.
393      *
394      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
395      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
396      * @param tcpBuf           {@link ByteBuf} containing TCP L4 Data
397      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for TCP/IP L3 and L2 data.
398      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
399      */
400     private void completeTCPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf tcpBuf,
401                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
402 
403         ByteBuf ipBuf = byteBufAllocator.buffer();
404         ByteBuf ethernetBuf = byteBufAllocator.buffer();
405         ByteBuf pcap = byteBufAllocator.buffer();
406 
407         try {
408             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
409                 IPPacket.writeTCPv4(ipBuf, tcpBuf,
410                         NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
411                         NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
412 
413                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
414             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
415                 IPPacket.writeTCPv6(ipBuf, tcpBuf,
416                         srcAddr.getAddress().getAddress(),
417                         dstAddr.getAddress().getAddress());
418 
419                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
420             } else {
421                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
422                         "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
423                 return;
424             }
425 
426             // Write Packet into Pcap
427             pCapWriter.writePacket(pcap, ethernetBuf);
428         } catch (IOException ex) {
429             logger.error("Caught Exception While Writing Packet into Pcap", ex);
430             ctx.fireExceptionCaught(ex);
431         } finally {
432             ipBuf.release();
433             ethernetBuf.release();
434             pcap.release();
435         }
436     }
437 
438     /**
439      * Handle UDP l4
440      *
441      * @param ctx {@link ChannelHandlerContext} for {@code localAddress} / {@code remoteAddress},
442      *            {@link ByteBuf} allocation and {@code fireExceptionCaught}
443      * @param msg {@link DatagramPacket} or {@link ByteBuf}
444      */
445     private void handleUDP(ChannelHandlerContext ctx, Object msg) {
446         ByteBuf udpBuf = ctx.alloc().buffer();
447 
448         try {
449             if (msg instanceof DatagramPacket) {
450 
451                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
452                 if (((DatagramPacket) msg).content().readableBytes() == 0 && !captureZeroByte) {
453                     logger.debug("Discarding Zero Byte UDP Packet");
454                     return;
455                 }
456 
457                 DatagramPacket datagramPacket = ((DatagramPacket) msg).duplicate();
458                 InetSocketAddress srcAddr = datagramPacket.sender();
459                 InetSocketAddress dstAddr = datagramPacket.recipient();
460 
461                 // If `datagramPacket.sender()` is `null` then DatagramPacket is initialized
462                 // `sender` (local) address. In this case, we'll get source address from Channel.
463                 if (srcAddr == null) {
464                     srcAddr = (InetSocketAddress) ctx.channel().localAddress();
465                 }
466 
467                 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
468                         datagramPacket.content().readableBytes(), srcAddr, dstAddr);
469 
470                 UDPPacket.writePacket(udpBuf, datagramPacket.content(), srcAddr.getPort(), dstAddr.getPort());
471                 completeUDPWrite(srcAddr, dstAddr, udpBuf, ctx.alloc(), ctx);
472             } else if (msg instanceof ByteBuf &&
473                     (!(ctx.channel() instanceof DatagramChannel) || ((DatagramChannel) ctx.channel()).isConnected())) {
474 
475                 // If bytes are 0 and `captureZeroByte` is false, we won't capture this.
476                 if (((ByteBuf) msg).readableBytes() == 0 && !captureZeroByte) {
477                     logger.debug("Discarding Zero Byte UDP Packet");
478                     return;
479                 }
480 
481                 ByteBuf byteBuf = ((ByteBuf) msg).duplicate();
482 
483                 logger.debug("Writing UDP Data of {} Bytes, Src Addr {}, Dst Addr {}",
484                         byteBuf.readableBytes(), initiatorAddr, handlerAddr);
485 
486                 UDPPacket.writePacket(udpBuf, byteBuf, initiatorAddr.getPort(), handlerAddr.getPort());
487                 completeUDPWrite(initiatorAddr, handlerAddr, udpBuf, ctx.alloc(), ctx);
488             } else {
489                 logger.debug("Discarding Pcap Write for UDP Object: {}", msg);
490             }
491         } finally {
492             udpBuf.release();
493         }
494     }
495 
496     /**
497      * Write UDP/IP L3 and L2 here.
498      *
499      * @param srcAddr          {@link InetSocketAddress} Source Address of this Packet
500      * @param dstAddr          {@link InetSocketAddress} Destination Address of this Packet
501      * @param udpBuf           {@link ByteBuf} containing UDP L4 Data
502      * @param byteBufAllocator {@link ByteBufAllocator} for allocating bytes for UDP/IP L3 and L2 data.
503      * @param ctx              {@link ChannelHandlerContext} for {@code fireExceptionCaught}
504      */
505     private void completeUDPWrite(InetSocketAddress srcAddr, InetSocketAddress dstAddr, ByteBuf udpBuf,
506                                   ByteBufAllocator byteBufAllocator, ChannelHandlerContext ctx) {
507 
508         ByteBuf ipBuf = byteBufAllocator.buffer();
509         ByteBuf ethernetBuf = byteBufAllocator.buffer();
510         ByteBuf pcap = byteBufAllocator.buffer();
511 
512         try {
513             if (srcAddr.getAddress() instanceof Inet4Address && dstAddr.getAddress() instanceof Inet4Address) {
514                 IPPacket.writeUDPv4(ipBuf, udpBuf,
515                         NetUtil.ipv4AddressToInt((Inet4Address) srcAddr.getAddress()),
516                         NetUtil.ipv4AddressToInt((Inet4Address) dstAddr.getAddress()));
517 
518                 EthernetPacket.writeIPv4(ethernetBuf, ipBuf);
519             } else if (srcAddr.getAddress() instanceof Inet6Address && dstAddr.getAddress() instanceof Inet6Address) {
520                 IPPacket.writeUDPv6(ipBuf, udpBuf,
521                         srcAddr.getAddress().getAddress(),
522                         dstAddr.getAddress().getAddress());
523 
524                 EthernetPacket.writeIPv6(ethernetBuf, ipBuf);
525             } else {
526                 logger.error("Source and Destination IP Address versions are not same. Source Address: {}, " +
527                         "Destination Address: {}", srcAddr.getAddress(), dstAddr.getAddress());
528                 return;
529             }
530 
531             // Write Packet into Pcap
532             pCapWriter.writePacket(pcap, ethernetBuf);
533         } catch (IOException ex) {
534             logger.error("Caught Exception While Writing Packet into Pcap", ex);
535             ctx.fireExceptionCaught(ex);
536         } finally {
537             ipBuf.release();
538             ethernetBuf.release();
539             pcap.release();
540         }
541     }
542 
543     @Override
544     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
545 
546         // If `isTCP` is true, then we'll simulate a `FIN` flow.
547         if (channelType == ChannelType.TCP) {
548             logger.debug("Starting Fake TCP FIN+ACK Flow to close connection");
549 
550             ByteBufAllocator byteBufAllocator = ctx.alloc();
551             ByteBuf tcpBuf = byteBufAllocator.buffer();
552 
553             try {
554                 // Write FIN+ACK with Normal Source and Destination Address
555                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
556                         handlerAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
557                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
558 
559                 // Write FIN+ACK with Reversed Source and Destination Address
560                 TCPPacket.writePacket(tcpBuf, null, receiveSegmentNumber, sendSegmentNumber, handlerAddr.getPort(),
561                         initiatorAddr.getPort(), TCPPacket.TCPFlag.FIN, TCPPacket.TCPFlag.ACK);
562                 completeTCPWrite(handlerAddr, initiatorAddr, tcpBuf, byteBufAllocator, ctx);
563 
564                 // Write ACK with Normal Source and Destination Address
565                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber + 1, receiveSegmentNumber + 1,
566                         initiatorAddr.getPort(), handlerAddr.getPort(), TCPPacket.TCPFlag.ACK);
567                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, byteBufAllocator, ctx);
568             } finally {
569                 tcpBuf.release();
570             }
571 
572             logger.debug("Finished Fake TCP FIN+ACK Flow to close connection");
573         }
574 
575         close();
576         super.handlerRemoved(ctx);
577     }
578 
579     @Override
580     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
581 
582         if (channelType == ChannelType.TCP) {
583             ByteBuf tcpBuf = ctx.alloc().buffer();
584 
585             try {
586                 // Write RST with Normal Source and Destination Address
587                 TCPPacket.writePacket(tcpBuf, null, sendSegmentNumber, receiveSegmentNumber, initiatorAddr.getPort(),
588                         handlerAddr.getPort(), TCPPacket.TCPFlag.RST, TCPPacket.TCPFlag.ACK);
589                 completeTCPWrite(initiatorAddr, handlerAddr, tcpBuf, ctx.alloc(), ctx);
590             } finally {
591                 tcpBuf.release();
592             }
593 
594             logger.debug("Sent Fake TCP RST to close connection");
595         }
596 
597         close();
598         ctx.fireExceptionCaught(cause);
599     }
600 
601     /**
602      * Logger for TCP
603      */
604     private void logTCP(boolean isWriteOperation, int bytes, int sendSegmentNumber, int receiveSegmentNumber,
605                         InetSocketAddress srcAddr, InetSocketAddress dstAddr, boolean ackOnly) {
606         // If `ackOnly` is `true` when we don't need to write any data so we'll not
607         // log number of bytes being written and mark the operation as "TCP ACK".
608         if (logger.isDebugEnabled()) {
609             if (ackOnly) {
610                 logger.debug("Writing TCP ACK, isWriteOperation {}, Segment Number {}, Ack Number {}, Src Addr {}, "
611                         + "Dst Addr {}", isWriteOperation, sendSegmentNumber, receiveSegmentNumber, dstAddr, srcAddr);
612             } else {
613                 logger.debug("Writing TCP Data of {} Bytes, isWriteOperation {}, Segment Number {}, Ack Number {}, " +
614                                 "Src Addr {}, Dst Addr {}", bytes, isWriteOperation, sendSegmentNumber,
615                         receiveSegmentNumber, srcAddr, dstAddr);
616             }
617         }
618     }
619 
620     OutputStream outputStream() {
621         return outputStream;
622     }
623 
624     boolean sharedOutputStream() {
625         return sharedOutputStream;
626     }
627 
628     /**
629      * Returns {@code true} if the {@link PcapWriteHandler} is currently
630      * writing packets to the {@link OutputStream} else returns {@code false}.
631      */
632     public boolean isWriting() {
633         return state.get() == State.WRITING;
634     }
635 
636     State state() {
637         return state.get();
638     }
639 
640     /**
641      * Pause the {@link PcapWriteHandler} from writing packets to the {@link OutputStream}.
642      */
643     public void pause() {
644         if (!state.compareAndSet(State.WRITING, State.PAUSED)) {
645             throw new IllegalStateException("State must be 'STARTED' to pause but current state is: " + state);
646         }
647     }
648 
649     /**
650      * Resume the {@link PcapWriteHandler} to writing packets to the {@link OutputStream}.
651      */
652     public void resume() {
653         if (!state.compareAndSet(State.PAUSED, State.WRITING)) {
654             throw new IllegalStateException("State must be 'PAUSED' to resume but current state is: " + state);
655         }
656     }
657 
658     void markClosed() {
659         if (state.get() != State.CLOSED) {
660             state.set(State.CLOSED);
661         }
662     }
663 
664     // Visible for testing only.
665     PcapWriter pCapWriter() {
666         return pCapWriter;
667     }
668 
669     private void logDiscard() {
670         logger.warn("Discarding pcap write because channel type is unknown. The channel this handler is registered " +
671                 "on is not a SocketChannel or DatagramChannel, so the inference does not work. Please call " +
672                 "forceTcpChannel or forceUdpChannel before registering the handler.");
673     }
674 
675     @Override
676     public String toString() {
677         return "PcapWriteHandler{" +
678                 "captureZeroByte=" + captureZeroByte +
679                 ", writePcapGlobalHeader=" + writePcapGlobalHeader +
680                 ", sharedOutputStream=" + sharedOutputStream +
681                 ", sendSegmentNumber=" + sendSegmentNumber +
682                 ", receiveSegmentNumber=" + receiveSegmentNumber +
683                 ", channelType=" + channelType +
684                 ", initiatorAddr=" + initiatorAddr +
685                 ", handlerAddr=" + handlerAddr +
686                 ", isServerPipeline=" + isServerPipeline +
687                 ", state=" + state +
688                 '}';
689     }
690 
691     /**
692      * <p> Close {@code PcapWriter} and {@link OutputStream}. </p>
693      * <p> Note: Calling this method does not close {@link PcapWriteHandler}.
694      * Only Pcap Writes are closed. </p>
695      *
696      * @throws IOException If {@link OutputStream#close()} throws an exception
697      */
698     @Override
699     public void close() throws IOException {
700         if (state.get() == State.CLOSED) {
701             logger.debug("PcapWriterHandler is already closed");
702         } else {
703             markClosed();
704             pCapWriter.close();
705             logger.debug("PcapWriterHandler is now closed");
706         }
707     }
708 
709     private enum ChannelType {
710         TCP, UDP
711     }
712 
713     public static Builder builder() {
714         return new Builder();
715     }
716 
717     /**
718      * Builder for {@link PcapWriteHandler}
719      */
720     public static final class Builder {
721         private boolean captureZeroByte;
722         private boolean sharedOutputStream;
723         private boolean writePcapGlobalHeader = true;
724 
725         private ChannelType channelType;
726         private InetSocketAddress initiatorAddr;
727         private InetSocketAddress handlerAddr;
728         private boolean isServerPipeline;
729 
730         private Builder() {
731         }
732 
733         /**
734          * Set to {@code true} to enable capturing packets with empty (0 bytes) payload. Otherwise, if set to
735          * {@code false}, empty packets will be filtered out.
736          *
737          * @param captureZeroByte Whether to filter out empty packets.
738          * @return this builder
739          */
740         public Builder captureZeroByte(boolean captureZeroByte) {
741             this.captureZeroByte = captureZeroByte;
742             return this;
743         }
744 
745         /**
746          * Set to {@code true} if multiple {@link PcapWriteHandler} instances will be
747          * writing to the same {@link OutputStream} concurrently, and write locking is
748          * required. Otherwise, if set to {@code false}, no locking will be done.
749          * Additionally, {@link #close} will not close the underlying {@code OutputStream}.
750          * Note: it is probably an error to have both {@code writePcapGlobalHeader} and
751          * {@code sharedOutputStream} set to {@code true} at the same time.
752          *
753          * @param sharedOutputStream Whether {@link OutputStream} is shared or not
754          * @return this builder
755          */
756         public Builder sharedOutputStream(boolean sharedOutputStream) {
757             this.sharedOutputStream = sharedOutputStream;
758             return this;
759         }
760 
761         /**
762          * Set to {@code true} to write Pcap Global Header on initialization. Otherwise, if set to {@code false}, Pcap
763          * Global Header will not be written on initialization. This could when writing Pcap data on a existing file
764          * where Pcap Global Header is already present.
765          *
766          * @param writePcapGlobalHeader Whether to write the pcap global header.
767          * @return this builder
768          */
769         public Builder writePcapGlobalHeader(boolean writePcapGlobalHeader) {
770             this.writePcapGlobalHeader = writePcapGlobalHeader;
771             return this;
772         }
773 
774         /**
775          * Force this handler to write data as if they were TCP packets, with the given connection metadata. If this
776          * method isn't called, we determine the metadata from the channel.
777          *
778          * @param serverAddress The address of the TCP server (handler)
779          * @param clientAddress The address of the TCP client (initiator)
780          * @param isServerPipeline Whether the handler is part of the server channel
781          * @return this builder
782          */
783         public Builder forceTcpChannel(InetSocketAddress serverAddress, InetSocketAddress clientAddress,
784                                        boolean isServerPipeline) {
785             channelType = ChannelType.TCP;
786             handlerAddr = checkNotNull(serverAddress, "serverAddress");
787             initiatorAddr = checkNotNull(clientAddress, "clientAddress");
788             this.isServerPipeline = isServerPipeline;
789             return this;
790         }
791 
792         /**
793          * Force this handler to write data as if they were UDP packets, with the given connection metadata. If this
794          * method isn't called, we determine the metadata from the channel.
795          * <br>
796          * Note that even if this method is called, the address information on {@link DatagramPacket} takes precedence
797          * if it is present.
798          *
799          * @param localAddress  The address of the UDP local
800          * @param remoteAddress The address of the UDP remote
801          * @return this builder
802          */
803         public Builder forceUdpChannel(InetSocketAddress localAddress, InetSocketAddress remoteAddress) {
804             channelType = ChannelType.UDP;
805             handlerAddr = checkNotNull(remoteAddress, "remoteAddress");
806             initiatorAddr = checkNotNull(localAddress, "localAddress");
807             return this;
808         }
809 
810         /**
811          * Build the {@link PcapWriteHandler}.
812          *
813          * @param outputStream The output stream to write the pcap data to.
814          * @return The handler.
815          */
816         public PcapWriteHandler build(OutputStream outputStream) {
817             checkNotNull(outputStream, "outputStream");
818             return new PcapWriteHandler(this, outputStream);
819         }
820     }
821 }