1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.IoEventLoop;
19 import io.netty.channel.IoExecutionContext;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerFactory;
23 import io.netty.channel.IoOps;
24 import io.netty.channel.IoRegistration;
25 import io.netty.util.internal.StringUtil;
26
27 import java.util.HashSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicBoolean;
30 import java.util.concurrent.locks.LockSupport;
31
32 public final class LocalIoHandler implements IoHandler {
33 private final Set<LocalIoHandle> registeredChannels = new HashSet<LocalIoHandle>(64);
34 private volatile Thread executionThread;
35
36 private LocalIoHandler() { }
37
38
39
40
41 public static IoHandlerFactory newFactory() {
42 return new IoHandlerFactory() {
43 @Override
44 public IoHandler newHandler() {
45 return new LocalIoHandler();
46 }
47 };
48 }
49
50 private static LocalIoHandle cast(IoHandle handle) {
51 if (handle instanceof LocalIoHandle) {
52 return (LocalIoHandle) handle;
53 }
54 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
55 }
56
57 @Override
58 public int run(IoExecutionContext runner) {
59 if (executionThread == null) {
60 executionThread = Thread.currentThread();
61 }
62 if (runner.canBlock()) {
63
64 LockSupport.parkNanos(this, runner.delayNanos(System.nanoTime()));
65 }
66 return 0;
67 }
68
69 @Override
70 public void wakeup(IoEventLoop eventLoop) {
71 if (!eventLoop.inEventLoop()) {
72 Thread thread = executionThread;
73 if (thread != null) {
74
75 LockSupport.unpark(thread);
76 }
77 }
78 }
79
80 @Override
81 public void prepareToDestroy() {
82 for (LocalIoHandle handle : registeredChannels) {
83 handle.closeNow();
84 }
85 registeredChannels.clear();
86 }
87
88 @Override
89 public void destroy() {
90 }
91
92 @Override
93 public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) {
94 LocalIoHandle localHandle = cast(handle);
95 if (registeredChannels.add(localHandle)) {
96 LocalIoRegistration registration = new LocalIoRegistration(eventLoop, localHandle);
97 localHandle.registerNow();
98 return registration;
99 }
100 throw new IllegalStateException();
101 }
102
103 @Override
104 public boolean isCompatible(Class<? extends IoHandle> handleType) {
105 return LocalIoHandle.class.isAssignableFrom(handleType);
106 }
107
108 private final class LocalIoRegistration extends AtomicBoolean implements IoRegistration {
109 private final IoEventLoop eventLoop;
110 private final LocalIoHandle handle;
111
112 LocalIoRegistration(IoEventLoop eventLoop, LocalIoHandle handle) {
113 this.eventLoop = eventLoop;
114 this.handle = handle;
115 }
116
117 @Override
118 public long submit(IoOps ops) {
119 throw new UnsupportedOperationException();
120 }
121
122 @Override
123 public boolean isValid() {
124 return !get();
125 }
126
127 @Override
128 public void cancel() {
129 if (getAndSet(true)) {
130 return;
131 }
132 if (eventLoop.inEventLoop()) {
133 cancel0();
134 } else {
135 eventLoop.execute(this::cancel0);
136 }
137 }
138
139 private void cancel0() {
140 if (registeredChannels.remove(handle)) {
141 handle.deregisterNow();
142 }
143 }
144
145 @Override
146 public IoHandler ioHandler() {
147 return LocalIoHandler.this;
148 }
149 }
150 }