blob: 2e82e5c923029616d7e5851a373eecec65ea1f24 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080031import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080032import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080035import org.apache.felix.scr.annotations.Reference;
36import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080037import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080038import org.onlab.metrics.MetricsService;
Jonathan Hartc0363672015-01-20 16:21:08 -080039import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080041import org.onosproject.net.intent.BatchWrite;
Jonathan Hartc0363672015-01-20 16:21:08 -080042import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.net.intent.Intent;
44import org.onosproject.net.intent.IntentEvent;
45import org.onosproject.net.intent.IntentId;
46import org.onosproject.net.intent.IntentState;
47import org.onosproject.net.intent.IntentStore;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.intent.IntentStoreDelegate;
49import org.onosproject.store.hz.AbstractHazelcastStore;
50import org.onosproject.store.hz.SMap;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import org.slf4j.Logger;
54
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080055import java.util.ArrayList;
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -080056import java.util.Collections;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080057import java.util.EnumSet;
58import java.util.List;
59import java.util.Map;
60import java.util.Set;
61import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080062import java.util.concurrent.ExecutionException;
63import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080064
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080065import static com.google.common.base.Preconditions.checkArgument;
Jonathan Hartc0363672015-01-20 16:21:08 -080066import static org.onlab.metrics.MetricsUtil.startTimer;
67import static org.onlab.metrics.MetricsUtil.stopTimer;
Brian O'Connorabafb502014-12-02 22:26:20 -080068import static org.onosproject.net.intent.IntentState.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080069import static org.slf4j.LoggerFactory.getLogger;
70
Jonathan Hart9e817ec2015-02-04 08:52:00 -080071@Component(immediate = true, enabled = false)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080072@Service
73public class HazelcastIntentStore
74 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080075 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080076
77 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080078 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080079
80 /** Valid parking state, which can transition to WITHDRAWN. */
81 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
82
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080083 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080084
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080085 private final Logger log = getLogger(getClass());
86
87 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080088 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080089 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080090 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080091 private SMap<IntentId, IntentState> states;
92
93 // Map to store instance local intermediate state transition
94 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
95
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080096 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080097 private SMap<IntentId, List<Intent>> installable;
98
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected MetricsService metricsService;
101
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800102 private boolean onlyLogTransitionError = true;
103
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800104 private Timer getInstallableIntentsTimer;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800105 private Timer getIntentCountTimer;
106 private Timer getIntentsTimer;
107 private Timer getIntentTimer;
108 private Timer getIntentStateTimer;
109
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800110 // manual near cache of Intent
111 // (Note: IntentId -> Intent is expected to be immutable)
112 // entry will be evicted, when state for that IntentId is removed.
113 private Map<IntentId, Intent> localIntents;
114
115 private String stateListenerId;
116
117 private String intentsListenerId;
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800118
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800119 private Timer createResponseTimer(String methodName) {
120 return createTimer("IntentStore", methodName, "responseTime");
121 }
122
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800123 @Override
124 @Activate
125 public void activate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800126 localIntents = new ConcurrentHashMap<>();
127
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800128 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800129 getIntentCountTimer = createResponseTimer("getIntentCount");
130 getIntentsTimer = createResponseTimer("getIntents");
131 getIntentTimer = createResponseTimer("getIntent");
132 getIntentStateTimer = createResponseTimer("getIntentState");
133
Brian O'Connor44008532014-12-04 16:41:36 -0800134 // We need a way to add serializer for intents which has been plugged-in.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800135 // As a short term workaround, relax Kryo config to
136 // registrationRequired=false
137 super.activate();
138 super.serializer = new KryoSerializer() {
139
140 @Override
141 protected void setupKryoPool() {
142 serializerPool = KryoNamespace.newBuilder()
143 .setRegistrationRequired(false)
144 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800145 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
146 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800147 }
148
149 };
150
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800151 final Config config = theInstance.getConfig();
152
153 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
154 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
155
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800156 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800157 intents = new SMap<>(rawIntents , super.serializer);
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800158 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800159
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800160 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
161 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
162
163 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800164 states = new SMap<>(rawStates , super.serializer);
165 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800166 stateListenerId = states.addEntryListener(listener, true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800167
168 transientStates.clear();
169
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800170 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
171 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
172
173 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800174 installable = new SMap<>(rawInstallables , super.serializer);
175
176 log.info("Started");
177 }
178
179 @Deactivate
180 public void deactivate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800181 intents.removeEntryListener(intentsListenerId);
182 states.removeEntryListener(stateListenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800183 log.info("Stopped");
184 }
185
186 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800187 public MetricsService metricsService() {
188 return metricsService;
189 }
190
191 @Override
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800192 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800193 Context timer = startTimer(getIntentCountTimer);
194 try {
195 return intents.size();
196 } finally {
197 stopTimer(timer);
198 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800199 }
200
201 @Override
202 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800203 Context timer = startTimer(getIntentsTimer);
204 try {
205 return ImmutableSet.copyOf(intents.values());
206 } finally {
207 stopTimer(timer);
208 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800209 }
210
211 @Override
212 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800213 Context timer = startTimer(getIntentTimer);
214 try {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800215 Intent intent = localIntents.get(intentId);
216 if (intent != null) {
217 return intent;
218 }
219 intent = intents.get(intentId);
220 if (intent != null) {
221 localIntents.put(intentId, intent);
222 }
223 return intent;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800224 } finally {
225 stopTimer(timer);
226 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800227 }
228
229 @Override
230 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800231 Context timer = startTimer(getIntentStateTimer);
232 try {
233 final IntentState localState = transientStates.get(id);
234 if (localState != null) {
235 return localState;
236 }
237 return states.get(id);
238 } finally {
239 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800240 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800241 }
242
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800243 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
244 if (onlyLogTransitionError) {
245 if (!expression) {
246 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
247 }
248 } else {
249 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
250 }
251 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800252
253 @Override
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800254 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800255 Context timer = startTimer(getInstallableIntentsTimer);
256 try {
257 return installable.get(intentId);
258 } finally {
259 stopTimer(timer);
260 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800261 }
262
263 @Override
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800264 public List<Operation> batchWrite(BatchWrite batch) {
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -0800265 if (batch.isEmpty()) {
266 return Collections.emptyList();
267 }
268
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800269 // Hazelcast version will never fail for conditional failure now.
270 List<Operation> failed = new ArrayList<>();
271
272 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800273 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800274
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800275 batchWriteAsync(batch, failed, futures);
276
277 // verify result
278 verifyAsyncWrites(futures, failed, events);
279
280 notifyDelegate(events);
281
282 return failed;
283 }
284
285 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
286 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800287 for (Operation op : batch.operations()) {
288 switch (op.type()) {
289 case CREATE_INTENT:
290 checkArgument(op.args().size() == 1,
291 "CREATE_INTENT takes 1 argument. %s", op);
292 Intent intent = op.arg(0);
293 futures.add(Pair.of(op,
294 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800295 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800296 break;
297
298 case REMOVE_INTENT:
299 checkArgument(op.args().size() == 1,
300 "REMOVE_INTENT takes 1 argument. %s", op);
301 IntentId intentId = (IntentId) op.arg(0);
302 futures.add(Pair.of(op,
303 ImmutableList.of(intents.removeAsync(intentId),
304 states.removeAsync(intentId),
305 installable.removeAsync(intentId))));
306 break;
307
308 case SET_STATE:
309 checkArgument(op.args().size() == 2,
310 "SET_STATE takes 2 arguments. %s", op);
311 intent = op.arg(0);
312 IntentState newState = op.arg(1);
313 futures.add(Pair.of(op,
314 ImmutableList.of(states.putAsync(intent.id(), newState))));
315 break;
316
317 case SET_INSTALLABLE:
318 checkArgument(op.args().size() == 2,
319 "SET_INSTALLABLE takes 2 arguments. %s", op);
320 intentId = op.arg(0);
321 List<Intent> installableIntents = op.arg(1);
322 futures.add(Pair.of(op,
323 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
324 break;
325
326 case REMOVE_INSTALLED:
327 checkArgument(op.args().size() == 1,
328 "REMOVE_INSTALLED takes 1 argument. %s", op);
329 intentId = op.arg(0);
330 futures.add(Pair.of(op,
331 ImmutableList.of(installable.removeAsync(intentId))));
332 break;
333
334 default:
335 log.warn("Unknown Operation encountered: {}", op);
336 failed.add(op);
337 break;
338 }
339 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800340 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800341
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800342 /**
343 * Checks the async write result Futures and prepare Events to post.
344 *
345 * @param futures async write Futures
346 * @param failed list to output failed batch write operations
347 * @param events list to output events to post as result of writes
348 */
349 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
350 List<Operation> failed,
351 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800352 for (Pair<Operation, List<Future<?>>> future : futures) {
353 final Operation op = future.getLeft();
354 final List<Future<?>> subops = future.getRight();
355
356 switch (op.type()) {
357
358 case CREATE_INTENT:
359 {
360 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800361 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800362
363 try {
364 Intent prevIntent = (Intent) subops.get(0).get();
365 IntentState prevIntentState = (IntentState) subops.get(1).get();
366
367 if (prevIntent != null || prevIntentState != null) {
368 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
369 prevIntent, prevIntentState,
370 intent, newIntentState);
371 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800372 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800373 } catch (InterruptedException e) {
374 log.error("Batch write was interrupted while processing {}", op, e);
375 failed.add(op);
376 Thread.currentThread().interrupt();
377 } catch (ExecutionException e) {
378 log.error("Batch write failed processing {}", op, e);
379 failed.add(op);
380 }
381 break;
382 }
383
384 case REMOVE_INTENT:
385 {
386 IntentId intentId = op.arg(0);
387
388 try {
389 Intent prevIntent = (Intent) subops.get(0).get();
390 IntentState prevIntentState = (IntentState) subops.get(1).get();
391 @SuppressWarnings("unchecked")
392 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
393
394 if (prevIntent == null) {
395 log.warn("Intent {} was already removed.", intentId);
396 }
397 if (prevIntentState == null) {
398 log.warn("Intent {} state was already removed", intentId);
399 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800400 if (prevInstallable != null) {
401 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800402 }
403 } catch (InterruptedException e) {
404 log.error("Batch write was interrupted while processing {}", op, e);
405 failed.add(op);
406 Thread.currentThread().interrupt();
407 } catch (ExecutionException e) {
408 log.error("Batch write failed processing {}", op, e);
409 failed.add(op);
410 }
411 break;
412 }
413
414 case SET_STATE:
415 {
416 Intent intent = op.arg(0);
417 IntentId intentId = intent.id();
418 IntentState newState = op.arg(1);
419
420 try {
421 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800422
423 if (PARKING.contains(newState)) {
424 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800425 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800426 }
alshabiba9819bf2014-11-30 18:15:52 -0800427
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800428 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800429 } catch (InterruptedException e) {
430 log.error("Batch write was interrupted while processing {}", op, e);
431 failed.add(op);
432 Thread.currentThread().interrupt();
433 } catch (ExecutionException e) {
434 log.error("Batch write failed processing {}", op, e);
435 failed.add(op);
436 }
437 break;
438 }
439
440 case SET_INSTALLABLE:
441 {
442 IntentId intentId = op.arg(0);
443 List<Intent> installableIntents = op.arg(1);
444
445 try {
446 @SuppressWarnings("unchecked")
447 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
448
449 if (prevInstallable != null) {
450 log.warn("Overwriting Intent {} installable {} -> {}",
451 intentId, prevInstallable, installableIntents);
452 }
453 } catch (InterruptedException e) {
454 log.error("Batch write was interrupted while processing {}", op, e);
455 failed.add(op);
456 Thread.currentThread().interrupt();
457 } catch (ExecutionException e) {
458 log.error("Batch write failed processing {}", op, e);
459 failed.add(op);
460 }
461 break;
462 }
463
464 case REMOVE_INSTALLED:
465 {
466 IntentId intentId = op.arg(0);
467
468 try {
469 @SuppressWarnings("unchecked")
470 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
471
472 if (prevInstallable == null) {
473 log.warn("Intent {} installable was already removed", intentId);
474 }
475 } catch (InterruptedException e) {
476 log.error("Batch write was interrupted while processing {}", op, e);
477 failed.add(op);
478 Thread.currentThread().interrupt();
479 } catch (ExecutionException e) {
480 log.error("Batch write failed processing {}", op, e);
481 failed.add(op);
482 }
483 break;
484 }
485
486 default:
487 log.warn("Unknown Operation encountered: {}", op);
488 if (!failed.contains(op)) {
489 failed.add(op);
490 }
491 break;
492 }
493 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800494 }
495
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800496 public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
497
498 @Override
499 public void entryAdded(EntryEvent<IntentId, Intent> event) {
500 localIntents.put(event.getKey(), event.getValue());
501 }
502
503 @Override
504 public void entryUpdated(EntryEvent<IntentId, Intent> event) {
505 entryAdded(event);
506 }
507 }
508
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800509 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
510
511 @Override
512 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800513 final IntentId intentId = event.getKey();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800514 final Member myself = theInstance.getCluster().getLocalMember();
515 if (!myself.equals(event.getMember())) {
516 // When Intent state was modified by remote node,
517 // clear local transient state.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800518 IntentState oldState = transientStates.remove(intentId);
519 if (oldState != null) {
520 log.debug("{} state updated remotely, removing transient state {}",
521 intentId, oldState);
522 }
alshabiba9819bf2014-11-30 18:15:52 -0800523
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800524 if (event.getValue() != null) {
525 // notify if this is not entry removed event
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800526
527 final Intent intent = getIntent(intentId);
528 if (intent == null) {
529 log.warn("no Intent found for {} on Event {}", intentId, event);
530 return;
531 }
532 notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
533 // remove IntentCache
534 localIntents.remove(intentId, intent);
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800535 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800536 }
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800537
538 // populate manual near cache, to prepare for
539 // transition event to WITHDRAWN
540 getIntent(intentId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800541 }
542 }
543}