1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.channel.local;
17
18 import io.netty.channel.IoEventLoop;
19 import io.netty.channel.IoExecutionContext;
20 import io.netty.channel.IoHandle;
21 import io.netty.channel.IoHandler;
22 import io.netty.channel.IoHandlerFactory;
23 import io.netty.channel.IoOps;
24 import io.netty.channel.IoRegistration;
25 import io.netty.util.concurrent.Future;
26 import io.netty.util.concurrent.Promise;
27 import io.netty.util.internal.StringUtil;
28
29 import java.util.HashSet;
30 import java.util.Set;
31 import java.util.concurrent.locks.LockSupport;
32
33 public final class LocalIoHandler implements IoHandler {
34 private final Set<LocalIoHandle> registeredChannels = new HashSet<LocalIoHandle>(64);
35 private volatile Thread executionThread;
36
37 private LocalIoHandler() { }
38
39
40
41
42 public static IoHandlerFactory newFactory() {
43 return new IoHandlerFactory() {
44 @Override
45 public IoHandler newHandler() {
46 return new LocalIoHandler();
47 }
48 };
49 }
50
51 private static LocalIoHandle cast(IoHandle handle) {
52 if (handle instanceof LocalIoHandle) {
53 return (LocalIoHandle) handle;
54 }
55 throw new IllegalArgumentException("IoHandle of type " + StringUtil.simpleClassName(handle) + " not supported");
56 }
57
58 @Override
59 public int run(IoExecutionContext runner) {
60 if (executionThread == null) {
61 executionThread = Thread.currentThread();
62 }
63 if (runner.canBlock()) {
64
65 LockSupport.parkNanos(this, runner.delayNanos(System.nanoTime()));
66 }
67 return 0;
68 }
69
70 @Override
71 public void wakeup(IoEventLoop eventLoop) {
72 if (!eventLoop.inEventLoop()) {
73 Thread thread = executionThread;
74 if (thread != null) {
75
76 LockSupport.unpark(thread);
77 }
78 }
79 }
80
81 @Override
82 public void prepareToDestroy() {
83 for (LocalIoHandle handle : registeredChannels) {
84 handle.closeNow();
85 }
86 registeredChannels.clear();
87 }
88
89 @Override
90 public void destroy() {
91 }
92
93 @Override
94 public IoRegistration register(IoEventLoop eventLoop, IoHandle handle) {
95 LocalIoHandle localHandle = cast(handle);
96 if (registeredChannels.add(localHandle)) {
97 LocalIoRegistration registration = new LocalIoRegistration(eventLoop, localHandle);
98 localHandle.registerNow();
99 return registration;
100 }
101 throw new IllegalStateException();
102 }
103
104 @Override
105 public boolean isCompatible(Class<? extends IoHandle> handleType) {
106 return LocalIoHandle.class.isAssignableFrom(handleType);
107 }
108
109 private final class LocalIoRegistration implements IoRegistration {
110 private final Promise<?> cancellationPromise;
111 private final IoEventLoop eventLoop;
112 private final LocalIoHandle handle;
113
114 LocalIoRegistration(IoEventLoop eventLoop, LocalIoHandle handle) {
115 this.eventLoop = eventLoop;
116 this.handle = handle;
117 this.cancellationPromise = eventLoop.newPromise();
118 }
119
120 @Override
121 public long submit(IoOps ops) {
122 throw new UnsupportedOperationException();
123 }
124
125 @Override
126 public void cancel() {
127 if (!cancellationPromise.trySuccess(null)) {
128 return;
129 }
130 if (eventLoop.inEventLoop()) {
131 cancel0();
132 } else {
133 eventLoop.execute(this::cancel0);
134 }
135 }
136
137 @Override
138 public Future<?> cancelFuture() {
139 return cancellationPromise;
140 }
141
142 private void cancel0() {
143 if (registeredChannels.remove(handle)) {
144 handle.deregisterNow();
145 }
146 }
147
148 @Override
149 public IoHandler ioHandler() {
150 return LocalIoHandler.this;
151 }
152 }
153 }