blob: cb400e16c66958c1025cab29172f9fcc9b4d534c [file] [log] [blame]
Felix Meschberger9e600592010-10-10 21:14:10 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19package org.apache.felix.coordination.impl;
20
21import java.util.ArrayList;
22import java.util.Collection;
Felix Meschberger0055b312010-10-13 07:39:32 +000023import java.util.Collections;
Felix Meschberger9e600592010-10-10 21:14:10 +000024import java.util.HashMap;
25import java.util.Map;
Felix Meschberger0055b312010-10-13 07:39:32 +000026import java.util.TimerTask;
Felix Meschberger9e600592010-10-10 21:14:10 +000027
28import org.apache.felix.service.coordination.Coordination;
29import org.apache.felix.service.coordination.Participant;
30
31@SuppressWarnings("deprecation")
32public class CoordinationImpl implements Coordination {
33
34 private enum State {
35 /** Active */
36 ACTIVE,
37
38 /** Coordination termination started */
39 TERMINATING,
40
41 /** Coordination completed */
42 TERMINATED,
43
44 /** Coordination failed */
45 FAILED;
46 }
47
Felix Meschberger0055b312010-10-13 07:39:32 +000048 private final CoordinationMgr mgr;
49
Felix Meschberger9e600592010-10-10 21:14:10 +000050 private final long id;
51
52 private final String name;
53
54 // TODO: timeout must be enforced
55 private long timeOutInMs;
56
Felix Meschberger0055b312010-10-13 07:39:32 +000057 /**
58 * Access to this field must be synchronized as long as the expected state
59 * is {@link State#ACTIVE}. Once the state has changed, further updates to
60 * this instance will not take place any more and the state will only be
61 * modified by the thread successfully setting the state to
62 * {@link State#TERMINATING}.
63 */
64 private volatile State state;
Felix Meschberger9e600592010-10-10 21:14:10 +000065
Felix Meschberger0055b312010-10-13 07:39:32 +000066 private int mustFail;
Felix Meschberger9e600592010-10-10 21:14:10 +000067
Felix Meschberger0055b312010-10-13 07:39:32 +000068 private Throwable failReason;
Felix Meschberger9e600592010-10-10 21:14:10 +000069
70 private ArrayList<Participant> participants;
71
72 private HashMap<Class<?>, Object> variables;
73
Felix Meschberger0055b312010-10-13 07:39:32 +000074 private TimerTask timeoutTask;
75
76 private Thread initiatorThread;
77
78 public CoordinationImpl(final CoordinationMgr mgr, final long id,
79 final String name, final long defaultTimeOutInMs) {
80 this.mgr = mgr;
Felix Meschberger9e600592010-10-10 21:14:10 +000081 this.id = id;
82 this.name = name;
Felix Meschberger0055b312010-10-13 07:39:32 +000083 this.mustFail = 0;
Felix Meschberger9e600592010-10-10 21:14:10 +000084 this.state = State.ACTIVE;
85 this.participants = new ArrayList<Participant>();
86 this.variables = new HashMap<Class<?>, Object>();
Felix Meschberger0055b312010-10-13 07:39:32 +000087 this.timeOutInMs = -defaultTimeOutInMs;
88 this.initiatorThread = Thread.currentThread();
89
90 scheduleTimeout(defaultTimeOutInMs);
Felix Meschberger9e600592010-10-10 21:14:10 +000091 }
92
93 public String getName() {
94 return name;
95 }
96
97 long getId() {
98 return this.id;
99 }
100
Felix Meschberger0055b312010-10-13 07:39:32 +0000101 void mustFail(final Throwable reason) {
102 this.mustFail = FAILED;
103 this.failReason = reason;
Felix Meschberger9e600592010-10-10 21:14:10 +0000104 }
105
Felix Meschberger0055b312010-10-13 07:39:32 +0000106 /**
107 * Initiates a coordination timeout. Called from the timer task scheduled by
108 * the {@link #scheduleTimeout(long)} method.
109 * <p>
110 * This method is inteded to only be called from the scheduled timer task.
111 */
Felix Meschberger9e600592010-10-10 21:14:10 +0000112 void timeout() {
Felix Meschberger0055b312010-10-13 07:39:32 +0000113 // If a timeout happens, the coordination thread is set to always fail
114 this.mustFail = TIMEOUT;
115
116 // and interrupted and a small delay happens to allow the initiator to
117 // clean up by reacting on the interrupt. If the initiator can do this
118 // clean up normally, the end() method will return TIMEOUT.
119 try {
120 initiatorThread.interrupt();
121 Thread.sleep(500); // half a second for now
122 } catch (SecurityException se) {
123 // thrown by interrupt -- no need to wait if interrupt fails
124 } catch (InterruptedException ie) {
125 // someone interrupted us while delaying, just continue
126 }
127
128 // After this delay the coordination is forcefully failed.
129 CoordinationImpl.this.fail(null);
Felix Meschberger9e600592010-10-10 21:14:10 +0000130 }
131
132 long getTimeOut() {
133 return this.timeOutInMs;
134 }
135
136 public int end() throws IllegalStateException {
Felix Meschberger0055b312010-10-13 07:39:32 +0000137 if (startTermination()) {
138 if (mustFail != 0) {
139 failInternal();
140 return mustFail;
Felix Meschberger9e600592010-10-10 21:14:10 +0000141 }
Felix Meschberger0055b312010-10-13 07:39:32 +0000142 return endInternal();
Felix Meschberger9e600592010-10-10 21:14:10 +0000143 }
144
145 // already terminated
146 throw new IllegalStateException();
147 }
148
149 public boolean fail(Throwable reason) {
Felix Meschberger0055b312010-10-13 07:39:32 +0000150 if (startTermination()) {
151 this.failReason = reason;
152 failInternal();
Felix Meschberger9e600592010-10-10 21:14:10 +0000153 return true;
154 }
155 return false;
156 }
157
158 public boolean terminate() {
159 if (state == State.ACTIVE) {
Felix Meschberger0055b312010-10-13 07:39:32 +0000160 try {
161 end();
162 return true;
163 } catch (IllegalStateException ise) {
164 // another thread might have started the termination just
165 // after the current thread checked the state but before the
166 // end() method called on this thread was able to change the
167 // state. Just ignore this exception and continue.
168 }
Felix Meschberger9e600592010-10-10 21:14:10 +0000169 }
170 return false;
171 }
172
Felix Meschberger0055b312010-10-13 07:39:32 +0000173 /**
174 * Returns whether the coordination has ended in failure.
175 * <p>
176 * The return value of <code>false</code> may be a transient situation if
177 * the coordination is in the process of terminating due to a failure.
178 */
Felix Meschberger9e600592010-10-10 21:14:10 +0000179 public boolean isFailed() {
180 return state == State.FAILED;
181 }
182
183 public void addTimeout(long timeOutInMs) {
Felix Meschberger0055b312010-10-13 07:39:32 +0000184 if (this.timeOutInMs > 0) {
185 // already set, ignore
186 }
187
Felix Meschberger9e600592010-10-10 21:14:10 +0000188 this.timeOutInMs = timeOutInMs;
Felix Meschberger0055b312010-10-13 07:39:32 +0000189 scheduleTimeout(timeOutInMs);
Felix Meschberger9e600592010-10-10 21:14:10 +0000190 }
191
Felix Meschberger0055b312010-10-13 07:39:32 +0000192 /**
193 * Adds the participant to the end of the list of participants of this
194 * coordination.
195 * <p>
196 * This method blocks if the given participant is currently participating in
197 * another coordination.
198 * <p>
199 * Participants can only be added to a coordination if it is active.
200 *
201 * @throws org.apache.felix.service.coordination.CoordinationException if
202 * the participant cannot currently participate in this
203 * coordination
204 */
Felix Meschberger9e600592010-10-10 21:14:10 +0000205 public boolean participate(Participant p) {
Felix Meschberger0055b312010-10-13 07:39:32 +0000206
207 // ensure participant only pariticipates on a single coordination
208 // this blocks until the participant can participate or until
209 // a timeout occurrs (or a deadlock is detected)
210 mgr.lockParticipant(p, this);
211
212 // synchronize access to the state to prevent it from being changed
213 // while adding the participant
214 synchronized (this) {
215 if (state == State.ACTIVE) {
216 if (!participants.contains(p)) {
217 participants.add(p);
218 }
219 return true;
Felix Meschberger9e600592010-10-10 21:14:10 +0000220 }
Felix Meschberger0055b312010-10-13 07:39:32 +0000221 return false;
Felix Meschberger9e600592010-10-10 21:14:10 +0000222 }
Felix Meschberger9e600592010-10-10 21:14:10 +0000223 }
224
225 public Collection<Participant> getParticipants() {
Felix Meschberger0055b312010-10-13 07:39:32 +0000226 // synchronize access to the state to prevent it from being changed
227 // while we create a copy of the participant list
228 synchronized (this) {
229 if (state == State.ACTIVE) {
230 return new ArrayList<Participant>(participants);
231 }
232 }
233
234 return Collections.<Participant> emptyList();
Felix Meschberger9e600592010-10-10 21:14:10 +0000235 }
236
237 public Map<Class<?>, ?> getVariables() {
238 return variables;
239 }
240
Felix Meschberger0055b312010-10-13 07:39:32 +0000241 /**
242 * If this coordination is still active, this method initiates the
243 * termination of the coordination by setting the state to
244 * {@value State#TERMINATING}, unregistering from the
245 * {@link CoordinationMgr} and ensuring there is no timeout task active any
246 * longer to timeout this coordination.
247 *
248 * @return <code>true</code> If the coordination was active and termination
249 * can continue. If <code>false</code> is returned, the coordination
250 * must be considered terminated (or terminating) in the current
251 * thread and no further termination processing must take place.
252 */
253 private synchronized boolean startTermination() {
254 if (state == State.ACTIVE) {
255 state = State.TERMINATING;
256 mgr.unregister(this);
257 scheduleTimeout(-1);
258 return true;
259 }
260
261 // this coordination is not active any longer, nothing to do
262 return false;
263 }
264
265 /**
266 * Internal implemenation of successful termination of the coordination.
267 * <p>
268 * This method must only be called after the {@link #state} field has been
269 * set to {@link State#TERMINATING} and only be the method successfully
270 * setting this state.
271 *
272 * @return OK or PARTIALLY_ENDED depending on whether all participants
273 * succeeded or some of them failed ending the coordination.
274 */
275 private int endInternal() {
276 int reason = OK;
277 for (Participant part : participants) {
278 try {
279 part.ended(this);
280 } catch (Exception e) {
281 // TODO: log
282 reason = PARTIALLY_ENDED;
283 }
284
285 // release the participant for other coordinations
286 mgr.releaseParticipant(part);
287 }
288 state = State.TERMINATED;
289 return reason;
290 }
291
292 /**
293 * Internal implemenation of coordination failure.
294 * <p>
295 * This method must only be called after the {@link #state} field has been
296 * set to {@link State#TERMINATING} and only be the method successfully
297 * setting this state.
298 */
299 private void failInternal() {
300 // consider failure reason (if not null)
301 for (Participant part : participants) {
302 try {
303 part.failed(this);
304 } catch (Exception e) {
305 // TODO: log
306 }
307
308 // release the participant for other coordinations
309 mgr.releaseParticipant(part);
310 }
311 state = State.FAILED;
312 }
313
314 /**
315 * Helper method for timeout scheduling. If a timer is currently scheduled
316 * it is canceled. If the new timeout value is a positive value a new timer
317 * is scheduled to fire of so many milliseconds from now.
318 *
319 * @param timeout The new timeout value
320 */
321 private void scheduleTimeout(final long timeout) {
322 if (timeoutTask != null) {
323 mgr.schedule(timeoutTask, -1);
324 timeoutTask = null;
325 }
326
327 if (timeout > 0) {
328 timeoutTask = new TimerTask() {
329 @Override
330 public void run() {
331 CoordinationImpl.this.timeout();
332 }
333 };
334
335 mgr.schedule(timeoutTask, timeout);
336 }
337 }
Felix Meschberger9e600592010-10-10 21:14:10 +0000338}