View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.IoHandlerContext;
19  import io.netty.channel.IoHandle;
20  import io.netty.channel.IoHandler;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.util.concurrent.ThreadAwareExecutor;
25  import io.netty.util.internal.StringUtil;
26  
27  import java.util.HashSet;
28  import java.util.Objects;
29  import java.util.Set;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.locks.LockSupport;
32  
33  public final class LocalIoHandler implements IoHandler {
34      private final Set<LocalIoHandle> registeredChannels = new HashSet<LocalIoHandle>(64);
35      private final ThreadAwareExecutor executor;
36      private volatile Thread executionThread;
37  
38      private LocalIoHandler(ThreadAwareExecutor executor) {
39          this.executor = Objects.requireNonNull(executor, "executor");
40      }
41  
42      /**
43       * Returns a new {@link IoHandlerFactory} that creates {@link LocalIoHandler} instances.
44       */
45      public static IoHandlerFactory newFactory() {
46          return LocalIoHandler::new;
47      }
48  
49      private static LocalIoHandle cast(IoHandle handle) {
50          if (handle instanceof LocalIoHandle) {
51              return (LocalIoHandle) handle;
52          }
53          throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
54      }
55  
56      @Override
57      public int run(IoHandlerContext context) {
58          if (executionThread == null) {
59              executionThread = Thread.currentThread();
60          }
61          if (context.canBlock()) {
62              // Just block until there is a task ready to process or wakeup(...) is called.
63              LockSupport.parkNanos(this, context.delayNanos(System.nanoTime()));
64          }
65  
66          if (context.shouldReportActiveIoTime()) {
67              context.reportActiveIoTime(0);
68          }
69          return 0;
70      }
71  
72      @Override
73      public void wakeup() {
74          if (!executor.isExecutorThread(Thread.currentThread())) {
75              Thread thread = executionThread;
76              if (thread != null) {
77                  // Wakeup if we block at the moment.
78                  LockSupport.unpark(thread);
79              }
80          }
81      }
82  
83      @Override
84      public void prepareToDestroy() {
85          for (LocalIoHandle handle : registeredChannels) {
86              handle.closeNow();
87          }
88          registeredChannels.clear();
89      }
90  
91      @Override
92      public void destroy() {
93      }
94  
95      @Override
96      public IoRegistration register(IoHandle handle) {
97          LocalIoHandle localHandle = cast(handle);
98          if (registeredChannels.add(localHandle)) {
99              LocalIoRegistration registration = new LocalIoRegistration(executor, localHandle);
100             localHandle.registerNow();
101             return registration;
102         }
103         throw new IllegalStateException();
104     }
105 
106     @Override
107     public boolean isCompatible(Class<? extends IoHandle> handleType) {
108         return LocalIoHandle.class.isAssignableFrom(handleType);
109     }
110 
111     private final class LocalIoRegistration implements IoRegistration {
112         private final AtomicBoolean canceled = new AtomicBoolean();
113         private final ThreadAwareExecutor executor;
114         private final LocalIoHandle handle;
115 
116         LocalIoRegistration(ThreadAwareExecutor executor, LocalIoHandle handle) {
117             this.executor = executor;
118             this.handle = handle;
119         }
120 
121         @Override
122         public <T> T attachment() {
123             return null;
124         }
125 
126         @Override
127         public long submit(IoOps ops) {
128             throw new UnsupportedOperationException();
129         }
130 
131         @Override
132         public boolean isValid() {
133             return !canceled.get();
134         }
135 
136         @Override
137         public boolean cancel() {
138             if (!canceled.compareAndSet(false, true)) {
139                 return false;
140             }
141             if (executor.isExecutorThread(Thread.currentThread())) {
142                 cancel0();
143             } else {
144                 executor.execute(this::cancel0);
145             }
146             return true;
147         }
148 
149         private void cancel0() {
150             if (registeredChannels.remove(handle)) {
151                 handle.deregisterNow();
152             }
153         }
154     }
155 }