blob: b67c99e8c473deffd7c431f93ffab81a22dad6c1 [file] [log] [blame]
Felix Meschberger9e600592010-10-10 21:14:10 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19package org.apache.felix.coordination.impl;
20
21import java.io.IOException;
22import java.util.ArrayList;
23import java.util.Collection;
24import java.util.HashMap;
25import java.util.Map;
26import java.util.Stack;
Felix Meschberger0055b312010-10-13 07:39:32 +000027import java.util.Timer;
28import java.util.TimerTask;
Felix Meschberger9e600592010-10-10 21:14:10 +000029import java.util.concurrent.atomic.AtomicLong;
30import java.util.regex.Pattern;
31
32import javax.management.openmbean.CompositeData;
33import javax.management.openmbean.CompositeDataSupport;
34import javax.management.openmbean.OpenDataException;
35import javax.management.openmbean.TabularData;
36import javax.management.openmbean.TabularDataSupport;
37
38import org.apache.felix.jmx.service.coordination.CoordinatorMBean;
39import org.apache.felix.service.coordination.Coordination;
40import org.apache.felix.service.coordination.CoordinationException;
41import org.apache.felix.service.coordination.Participant;
42
Felix Meschberger0055b312010-10-13 07:39:32 +000043/**
44 * The <code>CoordinationMgr</code> is the actual backend manager of all
45 * Coordinations created by the Coordinator implementation. The methods in this
46 * class fall into three categories:
47 * <ul>
48 * <li>Actual implementations of the Coordinator interface on behalf of the
49 * per-bundle Coordinator service instances</li>
50 * <li>Implementation of the CoordinatorMBean interface allowing JMX management
51 * of the coordinations</li>
52 * <li>Management support to timeout and cleanup coordinations</li>
53 * </ul>
54 */
Felix Meschberger9e600592010-10-10 21:14:10 +000055@SuppressWarnings("deprecation")
56public class CoordinationMgr implements CoordinatorMBean {
57
58 private ThreadLocal<Stack<Coordination>> threadStacks;
59
60 private final AtomicLong ctr;
61
62 private final Map<Long, CoordinationImpl> coordinations;
63
Felix Meschberger0055b312010-10-13 07:39:32 +000064 private final Map<Participant, CoordinationImpl> participants;
65
66 private final Timer coordinationTimer;
67
68 /**
69 * Default coordination timeout. Currently hard coded to be 30s (the
70 * specified minimum timeout). Should be made configurable, but not less
71 * than 30s.
72 */
73 private long defaultTimeOut = 30 * 1000L;
74
75 /**
76 * Wait at most 60 seconds for participant to be eligible for participation
77 * in a coordination.
78 *
79 * @see #singularizeParticipant(Participant, CoordinationImpl)
80 */
81 private long participationTimeOut = 60 * 1000L;
82
Felix Meschberger9e600592010-10-10 21:14:10 +000083 CoordinationMgr() {
84 ctr = new AtomicLong(-1);
85 coordinations = new HashMap<Long, CoordinationImpl>();
Felix Meschberger0055b312010-10-13 07:39:32 +000086 participants = new HashMap<Participant, CoordinationImpl>();
87 coordinationTimer = new Timer("Coordination Timer", true);
Felix Meschberger9e600592010-10-10 21:14:10 +000088 }
89
90 void unregister(final CoordinationImpl c) {
91 coordinations.remove(c.getId());
92 Stack<Coordination> stack = threadStacks.get();
93 if (stack != null) {
94 stack.remove(c);
95 }
96 }
97
98 void cleanUp() {
Felix Meschberger0055b312010-10-13 07:39:32 +000099 // terminate coordination timeout timer
100 coordinationTimer.purge();
101 coordinationTimer.cancel();
102
103 // terminate all active coordinations
Felix Meschberger9e600592010-10-10 21:14:10 +0000104 final Exception reason = new Exception();
105 for (Coordination c : coordinations.values()) {
106 c.fail(reason);
107 }
108 coordinations.clear();
Felix Meschberger0055b312010-10-13 07:39:32 +0000109
110 // release all participants
111 participants.clear();
Felix Meschberger9e600592010-10-10 21:14:10 +0000112 }
113
Felix Meschberger0055b312010-10-13 07:39:32 +0000114 void configure(final long coordinationTimeout,
115 final long participationTimeout) {
116 this.defaultTimeOut = coordinationTimeout;
117 this.participationTimeOut = participationTimeout;
118 }
119
120 void schedule(final TimerTask task, final long delay) {
121 if (delay < 0) {
122 task.cancel();
123 } else {
124 coordinationTimer.schedule(task, delay);
125 }
126 }
127
128 void lockParticipant(final Participant p, final CoordinationImpl c) {
129 synchronized (participants) {
130 // wait for participant to be released
131 long cutOff = System.currentTimeMillis() + participationTimeOut;
132 while (participants.containsKey(p)) {
133 try {
134 participants.wait(participationTimeOut / 500);
135 } catch (InterruptedException ie) {
136 // don't worry, just keep on waiting
137 }
138
139 // timeout waiting for participation
140 if (System.currentTimeMillis() > cutOff) {
141 throw new CoordinationException(
142 "Timed out waiting to join coordinaton", c.getName(),
143 CoordinationException.TIMEOUT);
144 }
145 }
146
147 // lock participant into coordination
148 participants.put(p, c);
149 }
150 }
151
152 void releaseParticipant(final Participant p) {
153 synchronized (participants) {
154 participants.remove(p);
155 participants.notifyAll();
156 }
157 }
158
159 // ---------- Coordinator back end implementation
160
161 Coordination create(String name) {
Felix Meschberger9e600592010-10-10 21:14:10 +0000162 long id = ctr.incrementAndGet();
Felix Meschberger0055b312010-10-13 07:39:32 +0000163 CoordinationImpl c = new CoordinationImpl(this, id, name,
164 defaultTimeOut);
Felix Meschberger9e600592010-10-10 21:14:10 +0000165 coordinations.put(id, c);
166 return c;
167 }
168
Felix Meschberger0055b312010-10-13 07:39:32 +0000169 Coordination begin(String name) {
Felix Meschberger9e600592010-10-10 21:14:10 +0000170 return push(create(name));
171 }
172
Felix Meschberger0055b312010-10-13 07:39:32 +0000173 Coordination push(Coordination c) {
Felix Meschberger9e600592010-10-10 21:14:10 +0000174 Stack<Coordination> stack = threadStacks.get();
175 if (stack == null) {
176 stack = new Stack<Coordination>();
177 threadStacks.set(stack);
178 }
179 return stack.push(c);
180 }
181
Felix Meschberger0055b312010-10-13 07:39:32 +0000182 Coordination pop() {
Felix Meschberger9e600592010-10-10 21:14:10 +0000183 Stack<Coordination> stack = threadStacks.get();
184 if (stack != null && !stack.isEmpty()) {
185 return stack.pop();
186 }
187 return null;
188 }
189
Felix Meschberger0055b312010-10-13 07:39:32 +0000190 Coordination getCurrentCoordination() {
Felix Meschberger9e600592010-10-10 21:14:10 +0000191 Stack<Coordination> stack = threadStacks.get();
192 if (stack != null && !stack.isEmpty()) {
193 return stack.peek();
194 }
195 return null;
196 }
197
Felix Meschberger0055b312010-10-13 07:39:32 +0000198 boolean alwaysFail(Throwable reason) {
Felix Meschberger9e600592010-10-10 21:14:10 +0000199 CoordinationImpl current = (CoordinationImpl) getCurrentCoordination();
200 if (current != null) {
Felix Meschberger0055b312010-10-13 07:39:32 +0000201 current.mustFail(reason);
Felix Meschberger9e600592010-10-10 21:14:10 +0000202 return true;
203 }
204 return false;
205 }
206
Felix Meschberger0055b312010-10-13 07:39:32 +0000207 Collection<Coordination> getCoordinations() {
Felix Meschberger9e600592010-10-10 21:14:10 +0000208 ArrayList<Coordination> result = new ArrayList<Coordination>();
209 Stack<Coordination> stack = threadStacks.get();
210 if (stack != null) {
211 result.addAll(stack);
212 }
213 return result;
214 }
215
Felix Meschberger0055b312010-10-13 07:39:32 +0000216 boolean participate(Participant participant) throws CoordinationException {
Felix Meschberger9e600592010-10-10 21:14:10 +0000217 Coordination current = getCurrentCoordination();
218 if (current != null) {
219 current.participate(participant);
220 return true;
221 }
222 return false;
223 }
224
Felix Meschberger0055b312010-10-13 07:39:32 +0000225 Coordination participateOrBegin(Participant ifActive) {
Felix Meschberger9e600592010-10-10 21:14:10 +0000226 Coordination current = getCurrentCoordination();
227 if (current == null) {
228 current = begin("implicit");
229 }
230 current.participate(ifActive);
231 return current;
232 }
233
234 // ---------- CoordinatorMBean interface
235
236 public TabularData listCoordinations(String regexFilter) {
237 Pattern p = Pattern.compile(regexFilter);
238 TabularData td = new TabularDataSupport(COORDINATIONS_TYPE);
239 for (CoordinationImpl c : coordinations.values()) {
240 if (p.matcher(c.getName()).matches()) {
241 try {
242 td.put(fromCoordination(c));
243 } catch (OpenDataException e) {
244 // TODO: log
245 }
246 }
247 }
248 return td;
249 }
250
251 public CompositeData getCoordination(long id) throws IOException {
252 CoordinationImpl c = coordinations.get(id);
253 if (c != null) {
254 try {
255 return fromCoordination(c);
256 } catch (OpenDataException e) {
257 throw new IOException(e.toString());
258 }
259 }
260 throw new IOException("No such Coordination " + id);
261 }
262
263 public boolean fail(long id, String reason) {
264 Coordination c = coordinations.get(id);
265 if (c != null) {
266 return c.fail(new Exception(reason));
267 }
268 return false;
269 }
270
271 public void addTimeout(long id, long timeout) {
272 Coordination c = coordinations.get(id);
273 if (c != null) {
274 c.addTimeout(timeout);
275 }
276 }
277
278 private CompositeData fromCoordination(final CoordinationImpl c)
279 throws OpenDataException {
280 return new CompositeDataSupport(COORDINATION_TYPE, new String[] { ID,
281 NAME, TIMEOUT }, new Object[] { c.getId(), c.getName(),
282 c.getTimeOut() });
283 }
284}