blob: 486e32f562a135f6da06be219c48469e86f09c46 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080024import com.hazelcast.core.EntryAdapter;
25import com.hazelcast.core.EntryEvent;
26import com.hazelcast.core.EntryListener;
27import com.hazelcast.core.IMap;
28import com.hazelcast.core.Member;
29
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080030import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080031import org.apache.felix.scr.annotations.Activate;
32import org.apache.felix.scr.annotations.Component;
33import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080034import org.apache.felix.scr.annotations.Reference;
35import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080036import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080037import org.onlab.metrics.MetricsService;
38import org.onlab.onos.core.MetricsHelper;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080039import org.onlab.onos.net.intent.Intent;
40import org.onlab.onos.net.intent.IntentEvent;
41import org.onlab.onos.net.intent.IntentId;
42import org.onlab.onos.net.intent.IntentState;
43import org.onlab.onos.net.intent.IntentStore;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080044import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080045import org.onlab.onos.net.intent.IntentStoreDelegate;
46import org.onlab.onos.store.hz.AbstractHazelcastStore;
47import org.onlab.onos.store.hz.SMap;
48import org.onlab.onos.store.serializers.KryoNamespaces;
49import org.onlab.onos.store.serializers.KryoSerializer;
50import org.onlab.util.KryoNamespace;
51import org.slf4j.Logger;
52
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080053import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080054import java.util.EnumSet;
55import java.util.List;
56import java.util.Map;
57import java.util.Set;
58import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080059import java.util.concurrent.ExecutionException;
60import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080061
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080062import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080063import static com.google.common.base.Preconditions.checkState;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080064import static org.onlab.onos.net.intent.IntentState.*;
65import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080066import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080067
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080068@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080069@Service
70public class HazelcastIntentStore
71 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080072 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080073
74 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080075 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080076
77 /** Valid parking state, which can transition to WITHDRAWN. */
78 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
79
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080080 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080081
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080082 private final Logger log = getLogger(getClass());
83
84 // Assumption: IntentId will not have synonyms
85 private SMap<IntentId, Intent> intents;
86 private SMap<IntentId, IntentState> states;
87
88 // Map to store instance local intermediate state transition
89 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
90
91 private SMap<IntentId, List<Intent>> installable;
92
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080093 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected MetricsService metricsService;
95
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080096 // TODO make this configurable
97 private boolean onlyLogTransitionError = true;
98
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080099 private Timer createIntentTimer;
100 private Timer removeIntentTimer;
101 private Timer setInstallableIntentsTimer;
102 private Timer getInstallableIntentsTimer;
103 private Timer removeInstalledIntentsTimer;
104 private Timer setStateTimer;
105 private Timer getIntentCountTimer;
106 private Timer getIntentsTimer;
107 private Timer getIntentTimer;
108 private Timer getIntentStateTimer;
109
110 private Timer createResponseTimer(String methodName) {
111 return createTimer("IntentStore", methodName, "responseTime");
112 }
113
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800114 @Override
115 @Activate
116 public void activate() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800117 createIntentTimer = createResponseTimer("createIntent");
118 removeIntentTimer = createResponseTimer("removeIntent");
119 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
120 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
121 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
122 setStateTimer = createResponseTimer("setState");
123 getIntentCountTimer = createResponseTimer("getIntentCount");
124 getIntentsTimer = createResponseTimer("getIntents");
125 getIntentTimer = createResponseTimer("getIntent");
126 getIntentStateTimer = createResponseTimer("getIntentState");
127
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800128 // FIXME: We need a way to add serializer for intents which has been plugged-in.
129 // As a short term workaround, relax Kryo config to
130 // registrationRequired=false
131 super.activate();
132 super.serializer = new KryoSerializer() {
133
134 @Override
135 protected void setupKryoPool() {
136 serializerPool = KryoNamespace.newBuilder()
137 .setRegistrationRequired(false)
138 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800139 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
140 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800141 }
142
143 };
144
145 // TODO: enable near cache, allow read from backup for this IMap
146 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
147 intents = new SMap<>(rawIntents , super.serializer);
148
149 // TODO: disable near cache, disable read from backup for this IMap
150 IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
151 states = new SMap<>(rawStates , super.serializer);
152 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800153 states.addEntryListener(listener , true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800154
155 transientStates.clear();
156
157 // TODO: disable near cache, disable read from backup for this IMap
158 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
159 installable = new SMap<>(rawInstallables , super.serializer);
160
161 log.info("Started");
162 }
163
164 @Deactivate
165 public void deactivate() {
166 log.info("Stopped");
167 }
168
169 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800170 public MetricsService metricsService() {
171 return metricsService;
172 }
173
174 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800175 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800176 Context timer = startTimer(createIntentTimer);
177 try {
178 Intent existing = intents.putIfAbsent(intent.id(), intent);
179 if (existing != null) {
180 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800181 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800182 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800183 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800184 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800185 }
186 } finally {
187 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800188 }
189 }
190
191 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800192 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800193 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800194 checkState(getIntentState(intentId) == WITHDRAWN,
195 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800196 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800197 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800198 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800199 states.remove(intentId);
200 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800201 } finally {
202 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800203 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800204 }
205
206 @Override
207 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800208 Context timer = startTimer(getIntentCountTimer);
209 try {
210 return intents.size();
211 } finally {
212 stopTimer(timer);
213 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800214 }
215
216 @Override
217 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800218 Context timer = startTimer(getIntentsTimer);
219 try {
220 return ImmutableSet.copyOf(intents.values());
221 } finally {
222 stopTimer(timer);
223 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800224 }
225
226 @Override
227 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800228 Context timer = startTimer(getIntentTimer);
229 try {
230 return intents.get(intentId);
231 } finally {
232 stopTimer(timer);
233 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800234 }
235
236 @Override
237 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800238 Context timer = startTimer(getIntentStateTimer);
239 try {
240 final IntentState localState = transientStates.get(id);
241 if (localState != null) {
242 return localState;
243 }
244 return states.get(id);
245 } finally {
246 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800247 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800248 }
249
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800250 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
251 if (onlyLogTransitionError) {
252 if (!expression) {
253 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
254 }
255 } else {
256 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
257 }
258 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800259
260 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800261 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800262 Context timer = startTimer(setStateTimer);
263 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800264 final IntentId id = intent.id();
265 IntentEvent.Type type = null;
266 final IntentState prevParking;
267 boolean transientStateChangeOnly = false;
268
269 // parking state transition
270 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800271 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800272 prevParking = states.get(id);
273 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800274 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
275 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800276 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800277 verify(PRE_INSTALLED.contains(prevParking),
278 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800279 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800280 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
281 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800282 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800283 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800284 break;
285 case INSTALLED:
286 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800287 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800288 "Illegal state transition attempted from %s to INSTALLED",
289 prevParking);
290 type = IntentEvent.Type.INSTALLED;
291 break;
292 case FAILED:
293 prevParking = states.replace(id, FAILED);
294 type = IntentEvent.Type.FAILED;
295 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800296 case WITHDRAW_REQ:
297 prevParking = states.replace(id, WITHDRAW_REQ);
298 verify(PRE_WITHDRAWN.contains(prevParking),
299 "Illegal state transition attempted from %s to WITHDRAW_REQ",
300 prevParking);
301 type = IntentEvent.Type.WITHDRAW_REQ;
302 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800303 case WITHDRAWN:
304 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800305 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800306 "Illegal state transition attempted from %s to WITHDRAWN",
307 prevParking);
308 type = IntentEvent.Type.WITHDRAWN;
309 break;
310 default:
311 transientStateChangeOnly = true;
312 prevParking = null;
313 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800314 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800315 if (!transientStateChangeOnly) {
316 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
317 }
318 // Update instance local state, which includes non-parking state transition
319 final IntentState prevTransient = transientStates.put(id, state);
320 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800321
alshabiba9819bf2014-11-30 18:15:52 -0800322 if (type != null) {
323 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800324 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800325 } finally {
326 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800327 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800328 }
329
330 @Override
331 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800332 Context timer = startTimer(setInstallableIntentsTimer);
333 try {
334 installable.put(intentId, result);
335 } finally {
336 stopTimer(timer);
337 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800338 }
339
340 @Override
341 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800342 Context timer = startTimer(getInstallableIntentsTimer);
343 try {
344 return installable.get(intentId);
345 } finally {
346 stopTimer(timer);
347 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800348 }
349
350 @Override
351 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800352 Context timer = startTimer(removeInstalledIntentsTimer);
353 try {
354 installable.remove(intentId);
355 } finally {
356 stopTimer(timer);
357 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800358 }
359
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800360 // TODO slice out methods after merging Ali's patch
361 // CHECKSTYLE IGNORE MethodLength FOR NEXT 1 LINES
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800362 @Override
363 public List<Operation> batchWrite(BatchWrite batch) {
364 // Hazelcast version will never fail for conditional failure now.
365 List<Operation> failed = new ArrayList<>();
366
367 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800368 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800369
370 for (Operation op : batch.operations()) {
371 switch (op.type()) {
372 case CREATE_INTENT:
373 checkArgument(op.args().size() == 1,
374 "CREATE_INTENT takes 1 argument. %s", op);
375 Intent intent = op.arg(0);
376 futures.add(Pair.of(op,
377 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800378 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800379 break;
380
381 case REMOVE_INTENT:
382 checkArgument(op.args().size() == 1,
383 "REMOVE_INTENT takes 1 argument. %s", op);
384 IntentId intentId = (IntentId) op.arg(0);
385 futures.add(Pair.of(op,
386 ImmutableList.of(intents.removeAsync(intentId),
387 states.removeAsync(intentId),
388 installable.removeAsync(intentId))));
389 break;
390
391 case SET_STATE:
392 checkArgument(op.args().size() == 2,
393 "SET_STATE takes 2 arguments. %s", op);
394 intent = op.arg(0);
395 IntentState newState = op.arg(1);
396 futures.add(Pair.of(op,
397 ImmutableList.of(states.putAsync(intent.id(), newState))));
398 break;
399
400 case SET_INSTALLABLE:
401 checkArgument(op.args().size() == 2,
402 "SET_INSTALLABLE takes 2 arguments. %s", op);
403 intentId = op.arg(0);
404 List<Intent> installableIntents = op.arg(1);
405 futures.add(Pair.of(op,
406 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
407 break;
408
409 case REMOVE_INSTALLED:
410 checkArgument(op.args().size() == 1,
411 "REMOVE_INSTALLED takes 1 argument. %s", op);
412 intentId = op.arg(0);
413 futures.add(Pair.of(op,
414 ImmutableList.of(installable.removeAsync(intentId))));
415 break;
416
417 default:
418 log.warn("Unknown Operation encountered: {}", op);
419 failed.add(op);
420 break;
421 }
422 }
423
424 // verify result
425 for (Pair<Operation, List<Future<?>>> future : futures) {
426 final Operation op = future.getLeft();
427 final List<Future<?>> subops = future.getRight();
428
429 switch (op.type()) {
430
431 case CREATE_INTENT:
432 {
433 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800434 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800435
436 try {
437 Intent prevIntent = (Intent) subops.get(0).get();
438 IntentState prevIntentState = (IntentState) subops.get(1).get();
439
440 if (prevIntent != null || prevIntentState != null) {
441 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
442 prevIntent, prevIntentState,
443 intent, newIntentState);
444 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800445 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800446 } catch (InterruptedException e) {
447 log.error("Batch write was interrupted while processing {}", op, e);
448 failed.add(op);
449 Thread.currentThread().interrupt();
450 } catch (ExecutionException e) {
451 log.error("Batch write failed processing {}", op, e);
452 failed.add(op);
453 }
454 break;
455 }
456
457 case REMOVE_INTENT:
458 {
459 IntentId intentId = op.arg(0);
460
461 try {
462 Intent prevIntent = (Intent) subops.get(0).get();
463 IntentState prevIntentState = (IntentState) subops.get(1).get();
464 @SuppressWarnings("unchecked")
465 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
466
467 if (prevIntent == null) {
468 log.warn("Intent {} was already removed.", intentId);
469 }
470 if (prevIntentState == null) {
471 log.warn("Intent {} state was already removed", intentId);
472 }
473 if (prevInstallable == null) {
474 log.info("Intent {} installable was already removed", intentId);
475 }
476 } catch (InterruptedException e) {
477 log.error("Batch write was interrupted while processing {}", op, e);
478 failed.add(op);
479 Thread.currentThread().interrupt();
480 } catch (ExecutionException e) {
481 log.error("Batch write failed processing {}", op, e);
482 failed.add(op);
483 }
484 break;
485 }
486
487 case SET_STATE:
488 {
489 Intent intent = op.arg(0);
490 IntentId intentId = intent.id();
491 IntentState newState = op.arg(1);
492
493 try {
494 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800495
496 if (PARKING.contains(newState)) {
497 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800498 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800499 }
alshabiba9819bf2014-11-30 18:15:52 -0800500
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800501 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
502 // TODO sanity check and log?
503 } catch (InterruptedException e) {
504 log.error("Batch write was interrupted while processing {}", op, e);
505 failed.add(op);
506 Thread.currentThread().interrupt();
507 } catch (ExecutionException e) {
508 log.error("Batch write failed processing {}", op, e);
509 failed.add(op);
510 }
511 break;
512 }
513
514 case SET_INSTALLABLE:
515 {
516 IntentId intentId = op.arg(0);
517 List<Intent> installableIntents = op.arg(1);
518
519 try {
520 @SuppressWarnings("unchecked")
521 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
522
523 if (prevInstallable != null) {
524 log.warn("Overwriting Intent {} installable {} -> {}",
525 intentId, prevInstallable, installableIntents);
526 }
527 } catch (InterruptedException e) {
528 log.error("Batch write was interrupted while processing {}", op, e);
529 failed.add(op);
530 Thread.currentThread().interrupt();
531 } catch (ExecutionException e) {
532 log.error("Batch write failed processing {}", op, e);
533 failed.add(op);
534 }
535 break;
536 }
537
538 case REMOVE_INSTALLED:
539 {
540 IntentId intentId = op.arg(0);
541
542 try {
543 @SuppressWarnings("unchecked")
544 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
545
546 if (prevInstallable == null) {
547 log.warn("Intent {} installable was already removed", intentId);
548 }
549 } catch (InterruptedException e) {
550 log.error("Batch write was interrupted while processing {}", op, e);
551 failed.add(op);
552 Thread.currentThread().interrupt();
553 } catch (ExecutionException e) {
554 log.error("Batch write failed processing {}", op, e);
555 failed.add(op);
556 }
557 break;
558 }
559
560 default:
561 log.warn("Unknown Operation encountered: {}", op);
562 if (!failed.contains(op)) {
563 failed.add(op);
564 }
565 break;
566 }
567 }
alshabiba9819bf2014-11-30 18:15:52 -0800568
569 notifyDelegate(events);
570
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800571 return failed;
572 }
573
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800574 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
575
576 @Override
577 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
578 final Member myself = theInstance.getCluster().getLocalMember();
579 if (!myself.equals(event.getMember())) {
580 // When Intent state was modified by remote node,
581 // clear local transient state.
582 final IntentId intentId = event.getKey();
583 IntentState oldState = transientStates.remove(intentId);
584 if (oldState != null) {
585 log.debug("{} state updated remotely, removing transient state {}",
586 intentId, oldState);
587 }
alshabiba9819bf2014-11-30 18:15:52 -0800588
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800589 if (event.getValue() != null) {
590 // notify if this is not entry removed event
591 notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId)));
592 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800593 }
594 }
595 }
596}