1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.IoHandlerContext;
19 import io.netty.channel.IoHandle;
20 import io.netty.channel.IoHandler;
21 import io.netty.channel.IoHandlerFactory;
22 import io.netty.channel.IoOps;
23 import io.netty.channel.IoRegistration;
24 import io.netty.util.concurrent.ThreadAwareExecutor;
25 import io.netty.util.internal.StringUtil;
26
27 import java.util.HashSet;
28 import java.util.Objects;
29 import java.util.Set;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.locks.LockSupport;
32
33 public final class LocalIoHandler implements IoHandler {
34 private final Set<LocalIoHandle> registeredChannels = new HashSet<LocalIoHandle>(64);
35 private final ThreadAwareExecutor executor;
36 private volatile Thread executionThread;
37
38 private LocalIoHandler(ThreadAwareExecutor executor) {
39 this.executor = Objects.requireNonNull(executor, "executor");
40 }
41
42
43
44
45 public static IoHandlerFactory newFactory() {
46 return LocalIoHandler::new;
47 }
48
49 private static LocalIoHandle cast(IoHandle handle) {
50 if (handle instanceof LocalIoHandle) {
51 return (LocalIoHandle) handle;
52 }
53 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
54 }
55
56 @Override
57 public int run(IoHandlerContext context) {
58 if (executionThread == null) {
59 executionThread = Thread.currentThread();
60 }
61 if (context.canBlock()) {
62
63 LockSupport.parkNanos(this, context.delayNanos(System.nanoTime()));
64 }
65
66 if (context.shouldReportActiveIoTime()) {
67 context.reportActiveIoTime(0);
68 }
69 return 0;
70 }
71
72 @Override
73 public void wakeup() {
74 if (!executor.isExecutorThread(Thread.currentThread())) {
75 Thread thread = executionThread;
76 if (thread != null) {
77
78 LockSupport.unpark(thread);
79 }
80 }
81 }
82
83 @Override
84 public void prepareToDestroy() {
85 for (LocalIoHandle handle : registeredChannels) {
86 handle.closeNow();
87 }
88 registeredChannels.clear();
89 }
90
91 @Override
92 public void destroy() {
93 }
94
95 @Override
96 public IoRegistration register(IoHandle handle) {
97 LocalIoHandle localHandle = cast(handle);
98 if (registeredChannels.add(localHandle)) {
99 LocalIoRegistration registration = new LocalIoRegistration(executor, localHandle);
100 localHandle.registerNow();
101 return registration;
102 }
103 throw new IllegalStateException();
104 }
105
106 @Override
107 public boolean isCompatible(Class<? extends IoHandle> handleType) {
108 return LocalIoHandle.class.isAssignableFrom(handleType);
109 }
110
111 private final class LocalIoRegistration implements IoRegistration {
112 private final AtomicBoolean canceled = new AtomicBoolean();
113 private final ThreadAwareExecutor executor;
114 private final LocalIoHandle handle;
115
116 LocalIoRegistration(ThreadAwareExecutor executor, LocalIoHandle handle) {
117 this.executor = executor;
118 this.handle = handle;
119 }
120
121 @Override
122 public <T> T attachment() {
123 return null;
124 }
125
126 @Override
127 public long submit(IoOps ops) {
128 throw new UnsupportedOperationException();
129 }
130
131 @Override
132 public boolean isValid() {
133 return !canceled.get();
134 }
135
136 @Override
137 public boolean cancel() {
138 if (!canceled.compareAndSet(false, true)) {
139 return false;
140 }
141 if (executor.isExecutorThread(Thread.currentThread())) {
142 cancel0();
143 } else {
144 executor.execute(this::cancel0);
145 }
146 return true;
147 }
148
149 private void cancel0() {
150 if (registeredChannels.remove(handle)) {
151 handle.deregisterNow();
152 }
153 }
154 }
155 }