View Javadoc
1   /*
2    * Copyright 2024 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   https://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.channel.local;
17  
18  import io.netty.channel.IoHandlerContext;
19  import io.netty.channel.IoHandle;
20  import io.netty.channel.IoHandler;
21  import io.netty.channel.IoHandlerFactory;
22  import io.netty.channel.IoOps;
23  import io.netty.channel.IoRegistration;
24  import io.netty.util.concurrent.ThreadAwareExecutor;
25  import io.netty.util.internal.StringUtil;
26  
27  import java.util.HashSet;
28  import java.util.Objects;
29  import java.util.Set;
30  import java.util.concurrent.atomic.AtomicBoolean;
31  import java.util.concurrent.locks.LockSupport;
32  
33  public final class LocalIoHandler implements IoHandler {
34      private final Set<LocalIoHandle> registeredChannels = new HashSet<LocalIoHandle>(64);
35      private final ThreadAwareExecutor executor;
36      private volatile Thread executionThread;
37  
38      private LocalIoHandler(ThreadAwareExecutor executor) {
39          this.executor = Objects.requireNonNull(executor, "executor");
40      }
41  
42      /**
43       * Returns a new {@link IoHandlerFactory} that creates {@link LocalIoHandler} instances.
44       */
45      public static IoHandlerFactory newFactory() {
46          return LocalIoHandler::new;
47      }
48  
49      private static LocalIoHandle cast(IoHandle handle) {
50          if (handle instanceof LocalIoHandle) {
51              return (LocalIoHandle) handle;
52          }
53          throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
54      }
55  
56      @Override
57      public int run(IoHandlerContext context) {
58          if (executionThread == null) {
59              executionThread = Thread.currentThread();
60          }
61          if (context.canBlock()) {
62              // Just block until there is a task ready to process or wakeup(...) is called.
63              LockSupport.parkNanos(this, context.delayNanos(System.nanoTime()));
64          }
65          return 0;
66      }
67  
68      @Override
69      public void wakeup() {
70          if (!executor.isExecutorThread(Thread.currentThread())) {
71              Thread thread = executionThread;
72              if (thread != null) {
73                  // Wakeup if we block at the moment.
74                  LockSupport.unpark(thread);
75              }
76          }
77      }
78  
79      @Override
80      public void prepareToDestroy() {
81          for (LocalIoHandle handle : registeredChannels) {
82              handle.closeNow();
83          }
84          registeredChannels.clear();
85      }
86  
87      @Override
88      public void destroy() {
89      }
90  
91      @Override
92      public IoRegistration register(IoHandle handle) {
93          LocalIoHandle localHandle = cast(handle);
94          if (registeredChannels.add(localHandle)) {
95              LocalIoRegistration registration = new LocalIoRegistration(executor, localHandle);
96              localHandle.registerNow();
97              return registration;
98          }
99          throw new IllegalStateException();
100     }
101 
102     @Override
103     public boolean isCompatible(Class<? extends IoHandle> handleType) {
104         return LocalIoHandle.class.isAssignableFrom(handleType);
105     }
106 
107     private final class LocalIoRegistration implements IoRegistration {
108         private final AtomicBoolean canceled = new AtomicBoolean();
109         private final ThreadAwareExecutor executor;
110         private final LocalIoHandle handle;
111 
112         LocalIoRegistration(ThreadAwareExecutor executor, LocalIoHandle handle) {
113             this.executor = executor;
114             this.handle = handle;
115         }
116 
117         @Override
118         public <T> T attachment() {
119             return null;
120         }
121 
122         @Override
123         public long submit(IoOps ops) {
124             throw new UnsupportedOperationException();
125         }
126 
127         @Override
128         public boolean isValid() {
129             return !canceled.get();
130         }
131 
132         @Override
133         public boolean cancel() {
134             if (!canceled.compareAndSet(false, true)) {
135                 return false;
136             }
137             if (executor.isExecutorThread(Thread.currentThread())) {
138                 cancel0();
139             } else {
140                 executor.execute(this::cancel0);
141             }
142             return true;
143         }
144 
145         private void cancel0() {
146             if (registeredChannels.remove(handle)) {
147                 handle.deregisterNow();
148             }
149         }
150     }
151 }