View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.uring;
17  
18  import io.netty.util.internal.PlatformDependent;
19  import io.netty.util.internal.logging.InternalLogger;
20  import io.netty.util.internal.logging.InternalLoggerFactory;
21  
22  import java.util.StringJoiner;
23  import java.util.function.IntSupplier;
24  
25  import static io.netty.channel.uring.UserData.decode;
26  
27  /**
28   * Completion queue implementation for io_uring.
29   */
30  final class CompletionQueue implements IntSupplier {
31      private static final InternalLogger logger = InternalLoggerFactory.getInstance(CompletionQueue.class);
32  
33      //these offsets are used to access specific properties
34      //CQE (https://github.com/axboe/liburing/blob/master/src/include/liburing/io_uring.h#L162)
35      private static final int CQE_USER_DATA_FIELD = 0;
36      private static final int CQE_RES_FIELD = 8;
37      private static final int CQE_FLAGS_FIELD = 12;
38  
39      private static final long CQE_SIZE = 16;
40  
41      //these unsigned integer pointers(shared with the kernel) will be changed by the kernel
42      private final long kHeadAddress;
43      private final long kTailAddress;
44  
45      private final long completionQueueArrayAddress;
46  
47      final int ringSize;
48      final long ringAddress;
49      final int ringFd;
50  
51      private final int ringEntries;
52      private final int ringMask;
53      private int ringHead;
54  
55      CompletionQueue(long kHeadAddress, long kTailAddress, long kRingMaskAddress, long kRingEntriesAddress,
56                      long kOverflowAddress, long completionQueueArrayAddress, int ringSize, long ringAddress,
57                      int ringFd) {
58          this.kHeadAddress = kHeadAddress;
59          this.kTailAddress = kTailAddress;
60          this.completionQueueArrayAddress = completionQueueArrayAddress;
61          this.ringSize = ringSize;
62          this.ringAddress = ringAddress;
63          this.ringFd = ringFd;
64  
65          ringEntries = PlatformDependent.getIntVolatile(kRingEntriesAddress);
66          ringMask = PlatformDependent.getIntVolatile(kRingMaskAddress);
67          ringHead = PlatformDependent.getIntVolatile(kHeadAddress);
68      }
69  
70      /**
71       * Returns {@code true} if any completion event is ready to be processed by
72       * {@link #process(CompletionCallback)}, {@code false} otherwise.
73       */
74      boolean hasCompletions() {
75          return ringHead != PlatformDependent.getIntVolatile(kTailAddress);
76      }
77  
78      @Override
79      public int getAsInt() {
80          return count();
81      }
82  
83      int count() {
84          return PlatformDependent.getIntVolatile(kTailAddress) - ringHead;
85      }
86  
87      /**
88       * Process the completion events in the {@link CompletionQueue} and return the number of processed
89       * events.
90       */
91      int process(CompletionCallback callback) {
92          int tail = PlatformDependent.getIntVolatile(kTailAddress);
93          int i = 0;
94          boolean isTraceEnabled = logger.isTraceEnabled();
95          while (ringHead != tail) {
96              long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
97  
98              long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
99              int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
100             int flags = PlatformDependent.getInt(cqeAddress + CQE_FLAGS_FIELD);
101 
102             //Ensure that the kernel only sees the new value of the head index after the CQEs have been read.
103             ringHead++;
104             PlatformDependent.putIntOrdered(kHeadAddress, ringHead);
105 
106             i++;
107 
108             if (isTraceEnabled) {
109                 logger.trace("completed(ring {}): {}(id={}, res={})",
110                         ringFd, Native.opToStr(UserData.decodeOp(udata)), UserData.decodeId(udata), res);
111             }
112             try {
113                 decode(res, flags, udata, callback);
114             } catch (Error e) {
115                 throw e;
116             } catch (Throwable throwable) {
117                 handleLoopException(throwable);
118             }
119         }
120         return i;
121     }
122 
123     private static void handleLoopException(Throwable throwable) {
124         logger.warn("Unexpected exception in the IO event loop.", throwable);
125 
126         // Prevent possible consecutive immediate failures that lead to
127         // excessive CPU consumption.
128         try {
129             Thread.sleep(100);
130         } catch (InterruptedException ignore) {
131         }
132     }
133 
134     @Override
135     public String toString() {
136         StringJoiner sb = new StringJoiner(", ", "CompletionQueue [", "]");
137         int tail = PlatformDependent.getIntVolatile(kTailAddress);
138         int head = ringHead;
139         while (head != tail) {
140             long cqeAddress = completionQueueArrayAddress + (ringHead & ringMask) * CQE_SIZE;
141             long udata = PlatformDependent.getLong(cqeAddress + CQE_USER_DATA_FIELD);
142             int res = PlatformDependent.getInt(cqeAddress + CQE_RES_FIELD);
143             sb.add(Native.opToStr(UserData.decodeOp(udata)) + "(id=" + UserData.decodeId(udata) + ",res=" + res + ')');
144             head++;
145         }
146         return sb.toString();
147     }
148 
149     /**
150      * Block until there is at least one completion ready to be processed.
151      */
152     void ioUringWaitCqe() {
153         int ret = Native.ioUringEnter(ringFd, 0, 1, Native.IORING_ENTER_GETEVENTS);
154         if (logger.isTraceEnabled()) {
155             logger.trace("completed(ring {}): {}", ringFd, this);
156         }
157         if (ret < 0) {
158             throw new RuntimeException("ioUringEnter syscall returned " + ret);
159         }
160     }
161 }