blob: 9b9e2d1bab05c5c1007936d4102aaa8a07632293 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
23import com.hazelcast.core.EntryAdapter;
24import com.hazelcast.core.EntryEvent;
25import com.hazelcast.core.EntryListener;
26import com.hazelcast.core.IMap;
27import com.hazelcast.core.Member;
28
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080029import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080030import org.apache.felix.scr.annotations.Activate;
31import org.apache.felix.scr.annotations.Component;
32import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080033import org.apache.felix.scr.annotations.Reference;
34import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080035import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080036import org.onlab.metrics.MetricsService;
37import org.onlab.onos.core.MetricsHelper;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080038import org.onlab.onos.net.intent.Intent;
39import org.onlab.onos.net.intent.IntentEvent;
40import org.onlab.onos.net.intent.IntentId;
41import org.onlab.onos.net.intent.IntentState;
42import org.onlab.onos.net.intent.IntentStore;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080043import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080044import org.onlab.onos.net.intent.IntentStoreDelegate;
45import org.onlab.onos.store.hz.AbstractHazelcastStore;
46import org.onlab.onos.store.hz.SMap;
47import org.onlab.onos.store.serializers.KryoNamespaces;
48import org.onlab.onos.store.serializers.KryoSerializer;
49import org.onlab.util.KryoNamespace;
50import org.slf4j.Logger;
51
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080052import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import java.util.EnumSet;
54import java.util.List;
55import java.util.Map;
56import java.util.Set;
57import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080058import java.util.concurrent.ExecutionException;
59import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080060
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080061import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080062import static com.google.common.base.Preconditions.checkState;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080063import static org.onlab.onos.net.intent.IntentState.*;
64import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080065import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080066
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080067@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080068@Service
69public class HazelcastIntentStore
70 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080071 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080072
73 /** Valid parking state, which can transition to INSTALLED. */
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -080074 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(SUBMITTED, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080075
76 /** Valid parking state, which can transition to WITHDRAWN. */
77 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
78
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080079 private static final Set<IntentState> PARKING = EnumSet.of(SUBMITTED, INSTALLED, WITHDRAWN, FAILED);
80
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080081 private final Logger log = getLogger(getClass());
82
83 // Assumption: IntentId will not have synonyms
84 private SMap<IntentId, Intent> intents;
85 private SMap<IntentId, IntentState> states;
86
87 // Map to store instance local intermediate state transition
88 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
89
90 private SMap<IntentId, List<Intent>> installable;
91
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080092 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93 protected MetricsService metricsService;
94
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080095 // TODO make this configurable
96 private boolean onlyLogTransitionError = true;
97
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080098 private Timer createIntentTimer;
99 private Timer removeIntentTimer;
100 private Timer setInstallableIntentsTimer;
101 private Timer getInstallableIntentsTimer;
102 private Timer removeInstalledIntentsTimer;
103 private Timer setStateTimer;
104 private Timer getIntentCountTimer;
105 private Timer getIntentsTimer;
106 private Timer getIntentTimer;
107 private Timer getIntentStateTimer;
108
109 private Timer createResponseTimer(String methodName) {
110 return createTimer("IntentStore", methodName, "responseTime");
111 }
112
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800113 @Override
114 @Activate
115 public void activate() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800116 createIntentTimer = createResponseTimer("createIntent");
117 removeIntentTimer = createResponseTimer("removeIntent");
118 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
119 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
120 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
121 setStateTimer = createResponseTimer("setState");
122 getIntentCountTimer = createResponseTimer("getIntentCount");
123 getIntentsTimer = createResponseTimer("getIntents");
124 getIntentTimer = createResponseTimer("getIntent");
125 getIntentStateTimer = createResponseTimer("getIntentState");
126
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800127 // FIXME: We need a way to add serializer for intents which has been plugged-in.
128 // As a short term workaround, relax Kryo config to
129 // registrationRequired=false
130 super.activate();
131 super.serializer = new KryoSerializer() {
132
133 @Override
134 protected void setupKryoPool() {
135 serializerPool = KryoNamespace.newBuilder()
136 .setRegistrationRequired(false)
137 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800138 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
139 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800140 }
141
142 };
143
144 // TODO: enable near cache, allow read from backup for this IMap
145 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
146 intents = new SMap<>(rawIntents , super.serializer);
147
148 // TODO: disable near cache, disable read from backup for this IMap
149 IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
150 states = new SMap<>(rawStates , super.serializer);
151 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
152 states.addEntryListener(listener , false);
153
154 transientStates.clear();
155
156 // TODO: disable near cache, disable read from backup for this IMap
157 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
158 installable = new SMap<>(rawInstallables , super.serializer);
159
160 log.info("Started");
161 }
162
163 @Deactivate
164 public void deactivate() {
165 log.info("Stopped");
166 }
167
168 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800169 public MetricsService metricsService() {
170 return metricsService;
171 }
172
173 @Override
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800174 public IntentEvent createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800175 Context timer = startTimer(createIntentTimer);
176 try {
177 Intent existing = intents.putIfAbsent(intent.id(), intent);
178 if (existing != null) {
179 // duplicate, ignore
180 return null;
181 } else {
182 return this.setState(intent, IntentState.SUBMITTED);
183 }
184 } finally {
185 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800186 }
187 }
188
189 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800190 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800191 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800192 checkState(getIntentState(intentId) == WITHDRAWN,
193 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800194 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800195 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800196 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800197 states.remove(intentId);
198 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800199 } finally {
200 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800201 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800202 }
203
204 @Override
205 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800206 Context timer = startTimer(getIntentCountTimer);
207 try {
208 return intents.size();
209 } finally {
210 stopTimer(timer);
211 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800212 }
213
214 @Override
215 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800216 Context timer = startTimer(getIntentsTimer);
217 try {
218 return ImmutableSet.copyOf(intents.values());
219 } finally {
220 stopTimer(timer);
221 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800222 }
223
224 @Override
225 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800226 Context timer = startTimer(getIntentTimer);
227 try {
228 return intents.get(intentId);
229 } finally {
230 stopTimer(timer);
231 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800232 }
233
234 @Override
235 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800236 Context timer = startTimer(getIntentStateTimer);
237 try {
238 final IntentState localState = transientStates.get(id);
239 if (localState != null) {
240 return localState;
241 }
242 return states.get(id);
243 } finally {
244 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800245 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800246 }
247
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800248 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
249 if (onlyLogTransitionError) {
250 if (!expression) {
251 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
252 }
253 } else {
254 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
255 }
256 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800257
258 @Override
259 public IntentEvent setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800260 Context timer = startTimer(setStateTimer);
261 try {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800262
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800263 final IntentId id = intent.id();
264 IntentEvent.Type type = null;
265 final IntentState prevParking;
266 boolean transientStateChangeOnly = false;
267
268 // parking state transition
269 switch (state) {
270 case SUBMITTED:
271 prevParking = states.get(id);
272 if (prevParking == null) {
273 IntentState existing = states.putIfAbsent(id, SUBMITTED);
274 verify(existing == null, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
275 } else {
276 verify(prevParking == WITHDRAWN,
277 "Illegal state transition attempted from %s to SUBMITTED",
278 prevParking);
279 boolean updated = states.replace(id, prevParking, SUBMITTED);
280 verify(updated, "Conditional replace %s => %s failed", prevParking, SUBMITTED);
281 }
282 type = IntentEvent.Type.SUBMITTED;
283 break;
284 case INSTALLED:
285 prevParking = states.replace(id, INSTALLED);
286 verify(PRE_INSTALLED.contains(prevParking),
287 "Illegal state transition attempted from %s to INSTALLED",
288 prevParking);
289 type = IntentEvent.Type.INSTALLED;
290 break;
291 case FAILED:
292 prevParking = states.replace(id, FAILED);
293 type = IntentEvent.Type.FAILED;
294 break;
295 case WITHDRAWN:
296 prevParking = states.replace(id, WITHDRAWN);
297 verify(PRE_WITHDRAWN.contains(prevParking),
298 "Illegal state transition attempted from %s to WITHDRAWN",
299 prevParking);
300 type = IntentEvent.Type.WITHDRAWN;
301 break;
302 default:
303 transientStateChangeOnly = true;
304 prevParking = null;
305 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800306 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800307 if (!transientStateChangeOnly) {
308 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
309 }
310 // Update instance local state, which includes non-parking state transition
311 final IntentState prevTransient = transientStates.put(id, state);
312 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800313
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800314 if (type == null) {
315 return null;
316 }
317 return new IntentEvent(type, intent);
318 } finally {
319 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800320 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800321 }
322
323 @Override
324 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800325 Context timer = startTimer(setInstallableIntentsTimer);
326 try {
327 installable.put(intentId, result);
328 } finally {
329 stopTimer(timer);
330 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800331 }
332
333 @Override
334 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800335 Context timer = startTimer(getInstallableIntentsTimer);
336 try {
337 return installable.get(intentId);
338 } finally {
339 stopTimer(timer);
340 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800341 }
342
343 @Override
344 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800345 Context timer = startTimer(removeInstalledIntentsTimer);
346 try {
347 installable.remove(intentId);
348 } finally {
349 stopTimer(timer);
350 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800351 }
352
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800353 // TODO slice out methods after merging Ali's patch
354 // CHECKSTYLE IGNORE MethodLength FOR NEXT 1 LINES
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800355 @Override
356 public List<Operation> batchWrite(BatchWrite batch) {
357 // Hazelcast version will never fail for conditional failure now.
358 List<Operation> failed = new ArrayList<>();
359
360 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
361
362 for (Operation op : batch.operations()) {
363 switch (op.type()) {
364 case CREATE_INTENT:
365 checkArgument(op.args().size() == 1,
366 "CREATE_INTENT takes 1 argument. %s", op);
367 Intent intent = op.arg(0);
368 futures.add(Pair.of(op,
369 ImmutableList.of(intents.putAsync(intent.id(), intent),
370 states.putAsync(intent.id(), SUBMITTED))));
371 break;
372
373 case REMOVE_INTENT:
374 checkArgument(op.args().size() == 1,
375 "REMOVE_INTENT takes 1 argument. %s", op);
376 IntentId intentId = (IntentId) op.arg(0);
377 futures.add(Pair.of(op,
378 ImmutableList.of(intents.removeAsync(intentId),
379 states.removeAsync(intentId),
380 installable.removeAsync(intentId))));
381 break;
382
383 case SET_STATE:
384 checkArgument(op.args().size() == 2,
385 "SET_STATE takes 2 arguments. %s", op);
386 intent = op.arg(0);
387 IntentState newState = op.arg(1);
388 futures.add(Pair.of(op,
389 ImmutableList.of(states.putAsync(intent.id(), newState))));
390 break;
391
392 case SET_INSTALLABLE:
393 checkArgument(op.args().size() == 2,
394 "SET_INSTALLABLE takes 2 arguments. %s", op);
395 intentId = op.arg(0);
396 List<Intent> installableIntents = op.arg(1);
397 futures.add(Pair.of(op,
398 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
399 break;
400
401 case REMOVE_INSTALLED:
402 checkArgument(op.args().size() == 1,
403 "REMOVE_INSTALLED takes 1 argument. %s", op);
404 intentId = op.arg(0);
405 futures.add(Pair.of(op,
406 ImmutableList.of(installable.removeAsync(intentId))));
407 break;
408
409 default:
410 log.warn("Unknown Operation encountered: {}", op);
411 failed.add(op);
412 break;
413 }
414 }
415
416 // verify result
417 for (Pair<Operation, List<Future<?>>> future : futures) {
418 final Operation op = future.getLeft();
419 final List<Future<?>> subops = future.getRight();
420
421 switch (op.type()) {
422
423 case CREATE_INTENT:
424 {
425 Intent intent = op.arg(0);
426 IntentState newIntentState = SUBMITTED;
427
428 try {
429 Intent prevIntent = (Intent) subops.get(0).get();
430 IntentState prevIntentState = (IntentState) subops.get(1).get();
431
432 if (prevIntent != null || prevIntentState != null) {
433 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
434 prevIntent, prevIntentState,
435 intent, newIntentState);
436 }
437 } catch (InterruptedException e) {
438 log.error("Batch write was interrupted while processing {}", op, e);
439 failed.add(op);
440 Thread.currentThread().interrupt();
441 } catch (ExecutionException e) {
442 log.error("Batch write failed processing {}", op, e);
443 failed.add(op);
444 }
445 break;
446 }
447
448 case REMOVE_INTENT:
449 {
450 IntentId intentId = op.arg(0);
451
452 try {
453 Intent prevIntent = (Intent) subops.get(0).get();
454 IntentState prevIntentState = (IntentState) subops.get(1).get();
455 @SuppressWarnings("unchecked")
456 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
457
458 if (prevIntent == null) {
459 log.warn("Intent {} was already removed.", intentId);
460 }
461 if (prevIntentState == null) {
462 log.warn("Intent {} state was already removed", intentId);
463 }
464 if (prevInstallable == null) {
465 log.info("Intent {} installable was already removed", intentId);
466 }
467 } catch (InterruptedException e) {
468 log.error("Batch write was interrupted while processing {}", op, e);
469 failed.add(op);
470 Thread.currentThread().interrupt();
471 } catch (ExecutionException e) {
472 log.error("Batch write failed processing {}", op, e);
473 failed.add(op);
474 }
475 break;
476 }
477
478 case SET_STATE:
479 {
480 Intent intent = op.arg(0);
481 IntentId intentId = intent.id();
482 IntentState newState = op.arg(1);
483
484 try {
485 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800486
487 if (PARKING.contains(newState)) {
488 transientStates.remove(intentId);
489 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800490 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
491 // TODO sanity check and log?
492 } catch (InterruptedException e) {
493 log.error("Batch write was interrupted while processing {}", op, e);
494 failed.add(op);
495 Thread.currentThread().interrupt();
496 } catch (ExecutionException e) {
497 log.error("Batch write failed processing {}", op, e);
498 failed.add(op);
499 }
500 break;
501 }
502
503 case SET_INSTALLABLE:
504 {
505 IntentId intentId = op.arg(0);
506 List<Intent> installableIntents = op.arg(1);
507
508 try {
509 @SuppressWarnings("unchecked")
510 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
511
512 if (prevInstallable != null) {
513 log.warn("Overwriting Intent {} installable {} -> {}",
514 intentId, prevInstallable, installableIntents);
515 }
516 } catch (InterruptedException e) {
517 log.error("Batch write was interrupted while processing {}", op, e);
518 failed.add(op);
519 Thread.currentThread().interrupt();
520 } catch (ExecutionException e) {
521 log.error("Batch write failed processing {}", op, e);
522 failed.add(op);
523 }
524 break;
525 }
526
527 case REMOVE_INSTALLED:
528 {
529 IntentId intentId = op.arg(0);
530
531 try {
532 @SuppressWarnings("unchecked")
533 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
534
535 if (prevInstallable == null) {
536 log.warn("Intent {} installable was already removed", intentId);
537 }
538 } catch (InterruptedException e) {
539 log.error("Batch write was interrupted while processing {}", op, e);
540 failed.add(op);
541 Thread.currentThread().interrupt();
542 } catch (ExecutionException e) {
543 log.error("Batch write failed processing {}", op, e);
544 failed.add(op);
545 }
546 break;
547 }
548
549 default:
550 log.warn("Unknown Operation encountered: {}", op);
551 if (!failed.contains(op)) {
552 failed.add(op);
553 }
554 break;
555 }
556 }
557 return failed;
558 }
559
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800560 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
561
562 @Override
563 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
564 final Member myself = theInstance.getCluster().getLocalMember();
565 if (!myself.equals(event.getMember())) {
566 // When Intent state was modified by remote node,
567 // clear local transient state.
568 final IntentId intentId = event.getKey();
569 IntentState oldState = transientStates.remove(intentId);
570 if (oldState != null) {
571 log.debug("{} state updated remotely, removing transient state {}",
572 intentId, oldState);
573 }
574 }
575 }
576 }
577}