1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.IoHandlerContext;
19 import io.netty.channel.IoHandle;
20 import io.netty.channel.IoHandler;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.util.concurrent.ThreadAwareExecutor;
25 import io.netty.util.internal.StringUtil;
26
27 import java.util.HashSet;
28 import java.util.Objects;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.locks.LockSupport;
32
33 public final class LocalIoHandler implements IoHandler {
34 private final Set<LocalIoHandle> registeredChannels = new HashSet<LocalIoHandle>(64);
35 private final ThreadAwareExecutor executor;
36 private volatile Thread executionThread;
37
38 private LocalIoHandler(ThreadAwareExecutor executor) {
39 this.executor = Objects.requireNonNull(executor, "executor");
40 }
41
42
43
44
45 public static IoHandlerFactory newFactory() {
46 return LocalIoHandler::new;
47 }
48
49 private static LocalIoHandle cast(IoHandle handle) {
50 if (handle instanceof LocalIoHandle) {
51 return (LocalIoHandle) handle;
52 }
53 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
54 }
55
56 @Override
57 public int run(IoHandlerContext context) {
58 if (executionThread == null) {
59 executionThread = Thread.currentThread();
60 }
61 if (context.canBlock()) {
62
63 LockSupport.parkNanos(this, context.delayNanos(System.nanoTime()));
64 }
65 return 0;
66 }
67
68 @Override
69 public void wakeup() {
70 if (!executor.isExecutorThread(Thread.currentThread())) {
71 Thread thread = executionThread;
72 if (thread != null) {
73
74 LockSupport.unpark(thread);
75 }
76 }
77 }
78
79 @Override
80 public void prepareToDestroy() {
81 for (LocalIoHandle handle : registeredChannels) {
82 handle.closeNow();
83 }
84 registeredChannels.clear();
85 }
86
87 @Override
88 public void destroy() {
89 }
90
91 @Override
92 public IoRegistration register(IoHandle handle) {
93 LocalIoHandle localHandle = cast(handle);
94 if (registeredChannels.add(localHandle)) {
95 LocalIoRegistration registration = new LocalIoRegistration(executor, localHandle);
96 localHandle.registerNow();
97 return registration;
98 }
99 throw new IllegalStateException();
100 }
101
102 @Override
103 public boolean isCompatible(Class<? extends IoHandle> handleType) {
104 return LocalIoHandle.class.isAssignableFrom(handleType);
105 }
106
107 private final class LocalIoRegistration implements IoRegistration {
108 private final AtomicBoolean canceled = new AtomicBoolean();
109 private final ThreadAwareExecutor executor;
110 private final LocalIoHandle handle;
111
112 LocalIoRegistration(ThreadAwareExecutor executor, LocalIoHandle handle) {
113 this.executor = executor;
114 this.handle = handle;
115 }
116
117 @Override
118 public <T> T attachment() {
119 return null;
120 }
121
122 @Override
123 public long submit(IoOps ops) {
124 throw new UnsupportedOperationException();
125 }
126
127 @Override
128 public boolean isValid() {
129 return !canceled.get();
130 }
131
132 @Override
133 public boolean cancel() {
134 if (!canceled.compareAndSet(false, true)) {
135 return false;
136 }
137 if (executor.isExecutorThread(Thread.currentThread())) {
138 cancel0();
139 } else {
140 executor.execute(this::cancel0);
141 }
142 return true;
143 }
144
145 private void cancel0() {
146 if (registeredChannels.remove(handle)) {
147 handle.deregisterNow();
148 }
149 }
150 }
151 }