blob: 8514dedf75ec7a12f488d6c6aa9ec596300e48a5 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080023import com.hazelcast.config.Config;
24import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080025import com.hazelcast.core.EntryAdapter;
26import com.hazelcast.core.EntryEvent;
27import com.hazelcast.core.EntryListener;
28import com.hazelcast.core.IMap;
29import com.hazelcast.core.Member;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080030import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080031import org.apache.felix.scr.annotations.Activate;
32import org.apache.felix.scr.annotations.Component;
33import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080034import org.apache.felix.scr.annotations.Reference;
35import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080036import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080037import org.onlab.metrics.MetricsService;
Jonathan Hartc0363672015-01-20 16:21:08 -080038import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080039import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080040import org.onosproject.net.intent.BatchWrite;
Jonathan Hartc0363672015-01-20 16:21:08 -080041import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.intent.Intent;
43import org.onosproject.net.intent.IntentEvent;
44import org.onosproject.net.intent.IntentId;
45import org.onosproject.net.intent.IntentState;
46import org.onosproject.net.intent.IntentStore;
Brian O'Connorabafb502014-12-02 22:26:20 -080047import org.onosproject.net.intent.IntentStoreDelegate;
Ray Milkeyf9af43c2015-02-09 16:45:48 -080048import org.onosproject.net.intent.Key;
Brian O'Connorabafb502014-12-02 22:26:20 -080049import org.onosproject.store.hz.AbstractHazelcastStore;
50import org.onosproject.store.hz.SMap;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import org.slf4j.Logger;
54
55import java.util.EnumSet;
56import java.util.List;
57import java.util.Map;
58import java.util.Set;
59import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080060import java.util.concurrent.ExecutionException;
61import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080062
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080063import static com.google.common.base.Preconditions.checkArgument;
Jonathan Hartc0363672015-01-20 16:21:08 -080064import static org.onlab.metrics.MetricsUtil.startTimer;
65import static org.onlab.metrics.MetricsUtil.stopTimer;
Jonathan Hart2085e072015-02-12 11:44:03 -080066import static org.onosproject.net.intent.IntentState.FAILED;
67import static org.onosproject.net.intent.IntentState.INSTALLED;
68import static org.onosproject.net.intent.IntentState.INSTALL_REQ;
69import static org.onosproject.net.intent.IntentState.WITHDRAWN;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080070import static org.slf4j.LoggerFactory.getLogger;
71
Brian O'Connor2ba63fd2015-02-09 22:48:11 -080072//TODO Note: this store will be removed
73
Jonathan Hart9e817ec2015-02-04 08:52:00 -080074@Component(immediate = true, enabled = false)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080075@Service
76public class HazelcastIntentStore
77 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080078 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080079
80 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080081 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080082
83 /** Valid parking state, which can transition to WITHDRAWN. */
84 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
85
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080086 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080087
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080088 private final Logger log = getLogger(getClass());
89
90 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080091 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080092 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080093 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080094 private SMap<IntentId, IntentState> states;
95
96 // Map to store instance local intermediate state transition
97 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
98
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080099 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800100 private SMap<IntentId, List<Intent>> installable;
101
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected MetricsService metricsService;
104
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800105 private boolean onlyLogTransitionError = true;
106
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800107 private Timer getInstallableIntentsTimer;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800108 private Timer getIntentCountTimer;
109 private Timer getIntentsTimer;
110 private Timer getIntentTimer;
111 private Timer getIntentStateTimer;
112
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800113 // manual near cache of Intent
114 // (Note: IntentId -> Intent is expected to be immutable)
115 // entry will be evicted, when state for that IntentId is removed.
116 private Map<IntentId, Intent> localIntents;
117
118 private String stateListenerId;
119
120 private String intentsListenerId;
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800121
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800122 private Timer createResponseTimer(String methodName) {
123 return createTimer("IntentStore", methodName, "responseTime");
124 }
125
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800126 @Override
127 @Activate
128 public void activate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800129 localIntents = new ConcurrentHashMap<>();
130
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800131 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800132 getIntentCountTimer = createResponseTimer("getIntentCount");
133 getIntentsTimer = createResponseTimer("getIntents");
134 getIntentTimer = createResponseTimer("getIntent");
135 getIntentStateTimer = createResponseTimer("getIntentState");
136
Brian O'Connor44008532014-12-04 16:41:36 -0800137 // We need a way to add serializer for intents which has been plugged-in.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800138 // As a short term workaround, relax Kryo config to
139 // registrationRequired=false
140 super.activate();
141 super.serializer = new KryoSerializer() {
142
143 @Override
144 protected void setupKryoPool() {
145 serializerPool = KryoNamespace.newBuilder()
146 .setRegistrationRequired(false)
147 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800148 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
149 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800150 }
151
152 };
153
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800154 final Config config = theInstance.getConfig();
155
156 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
157 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
158
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800159 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800160 intents = new SMap<>(rawIntents , super.serializer);
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800161 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800162
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800163 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
164 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
165
166 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800167 states = new SMap<>(rawStates , super.serializer);
168 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800169 stateListenerId = states.addEntryListener(listener, true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800170
171 transientStates.clear();
172
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800173 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
174 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
175
176 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800177 installable = new SMap<>(rawInstallables , super.serializer);
178
179 log.info("Started");
180 }
181
182 @Deactivate
183 public void deactivate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800184 intents.removeEntryListener(intentsListenerId);
185 states.removeEntryListener(stateListenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800186 log.info("Stopped");
187 }
188
189 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800190 public MetricsService metricsService() {
191 return metricsService;
192 }
193
194 @Override
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800195 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800196 Context timer = startTimer(getIntentCountTimer);
197 try {
198 return intents.size();
199 } finally {
200 stopTimer(timer);
201 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800202 }
203
204 @Override
205 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800206 Context timer = startTimer(getIntentsTimer);
207 try {
208 return ImmutableSet.copyOf(intents.values());
209 } finally {
210 stopTimer(timer);
211 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800212 }
213
214 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800215 public Intent getIntent(Key intentKey) {
216 return null;
217 }
218
219
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800220 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800221 Context timer = startTimer(getIntentTimer);
222 try {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800223 Intent intent = localIntents.get(intentId);
224 if (intent != null) {
225 return intent;
226 }
227 intent = intents.get(intentId);
228 if (intent != null) {
229 localIntents.put(intentId, intent);
230 }
231 return intent;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800232 } finally {
233 stopTimer(timer);
234 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800235 }
236
237 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800238 public IntentState getIntentState(Key key) {
239 // TODO: either implement this or remove this class
240 return IntentState.FAILED;
241 /*
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800242 Context timer = startTimer(getIntentStateTimer);
243 try {
244 final IntentState localState = transientStates.get(id);
245 if (localState != null) {
246 return localState;
247 }
248 return states.get(id);
249 } finally {
250 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800251 }
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800252 */
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800253 }
254
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800255 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
256 if (onlyLogTransitionError) {
257 if (!expression) {
258 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
259 }
260 } else {
261 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
262 }
263 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800264
265 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800266 public List<Intent> getInstallableIntents(Key intentKey) {
267 // TODO: implement this or delete class
268 return null;
269
270 /*
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800271 Context timer = startTimer(getInstallableIntentsTimer);
272 try {
273 return installable.get(intentId);
274 } finally {
275 stopTimer(timer);
276 }
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800277 */
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800278 }
279
Jonathan Hart2085e072015-02-12 11:44:03 -0800280 /*@Override
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800281 public List<Operation> batchWrite(BatchWrite batch) {
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -0800282 if (batch.isEmpty()) {
283 return Collections.emptyList();
284 }
285
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800286 // Hazelcast version will never fail for conditional failure now.
287 List<Operation> failed = new ArrayList<>();
288
289 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800290 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800291
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800292 batchWriteAsync(batch, failed, futures);
293
294 // verify result
295 verifyAsyncWrites(futures, failed, events);
296
297 notifyDelegate(events);
298
299 return failed;
Jonathan Hart2085e072015-02-12 11:44:03 -0800300 }*/
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800301
302 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
303 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800304 for (Operation op : batch.operations()) {
305 switch (op.type()) {
306 case CREATE_INTENT:
307 checkArgument(op.args().size() == 1,
308 "CREATE_INTENT takes 1 argument. %s", op);
309 Intent intent = op.arg(0);
310 futures.add(Pair.of(op,
311 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800312 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800313 break;
314
315 case REMOVE_INTENT:
316 checkArgument(op.args().size() == 1,
317 "REMOVE_INTENT takes 1 argument. %s", op);
318 IntentId intentId = (IntentId) op.arg(0);
319 futures.add(Pair.of(op,
320 ImmutableList.of(intents.removeAsync(intentId),
321 states.removeAsync(intentId),
322 installable.removeAsync(intentId))));
323 break;
324
325 case SET_STATE:
326 checkArgument(op.args().size() == 2,
327 "SET_STATE takes 2 arguments. %s", op);
328 intent = op.arg(0);
329 IntentState newState = op.arg(1);
330 futures.add(Pair.of(op,
331 ImmutableList.of(states.putAsync(intent.id(), newState))));
332 break;
333
334 case SET_INSTALLABLE:
335 checkArgument(op.args().size() == 2,
336 "SET_INSTALLABLE takes 2 arguments. %s", op);
337 intentId = op.arg(0);
338 List<Intent> installableIntents = op.arg(1);
339 futures.add(Pair.of(op,
340 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
341 break;
342
343 case REMOVE_INSTALLED:
344 checkArgument(op.args().size() == 1,
345 "REMOVE_INSTALLED takes 1 argument. %s", op);
346 intentId = op.arg(0);
347 futures.add(Pair.of(op,
348 ImmutableList.of(installable.removeAsync(intentId))));
349 break;
350
351 default:
352 log.warn("Unknown Operation encountered: {}", op);
353 failed.add(op);
354 break;
355 }
356 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800357 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800358
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800359 /**
360 * Checks the async write result Futures and prepare Events to post.
361 *
362 * @param futures async write Futures
363 * @param failed list to output failed batch write operations
364 * @param events list to output events to post as result of writes
365 */
366 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
367 List<Operation> failed,
368 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800369 for (Pair<Operation, List<Future<?>>> future : futures) {
370 final Operation op = future.getLeft();
371 final List<Future<?>> subops = future.getRight();
372
373 switch (op.type()) {
374
375 case CREATE_INTENT:
376 {
377 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800378 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800379
380 try {
381 Intent prevIntent = (Intent) subops.get(0).get();
382 IntentState prevIntentState = (IntentState) subops.get(1).get();
383
384 if (prevIntent != null || prevIntentState != null) {
385 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
386 prevIntent, prevIntentState,
387 intent, newIntentState);
388 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800389 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800390 } catch (InterruptedException e) {
391 log.error("Batch write was interrupted while processing {}", op, e);
392 failed.add(op);
393 Thread.currentThread().interrupt();
394 } catch (ExecutionException e) {
395 log.error("Batch write failed processing {}", op, e);
396 failed.add(op);
397 }
398 break;
399 }
400
401 case REMOVE_INTENT:
402 {
403 IntentId intentId = op.arg(0);
404
405 try {
406 Intent prevIntent = (Intent) subops.get(0).get();
407 IntentState prevIntentState = (IntentState) subops.get(1).get();
408 @SuppressWarnings("unchecked")
409 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
410
411 if (prevIntent == null) {
412 log.warn("Intent {} was already removed.", intentId);
413 }
414 if (prevIntentState == null) {
415 log.warn("Intent {} state was already removed", intentId);
416 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800417 if (prevInstallable != null) {
418 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800419 }
420 } catch (InterruptedException e) {
421 log.error("Batch write was interrupted while processing {}", op, e);
422 failed.add(op);
423 Thread.currentThread().interrupt();
424 } catch (ExecutionException e) {
425 log.error("Batch write failed processing {}", op, e);
426 failed.add(op);
427 }
428 break;
429 }
430
431 case SET_STATE:
432 {
433 Intent intent = op.arg(0);
434 IntentId intentId = intent.id();
435 IntentState newState = op.arg(1);
436
437 try {
438 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800439
440 if (PARKING.contains(newState)) {
441 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800442 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800443 }
alshabiba9819bf2014-11-30 18:15:52 -0800444
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800445 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800446 } catch (InterruptedException e) {
447 log.error("Batch write was interrupted while processing {}", op, e);
448 failed.add(op);
449 Thread.currentThread().interrupt();
450 } catch (ExecutionException e) {
451 log.error("Batch write failed processing {}", op, e);
452 failed.add(op);
453 }
454 break;
455 }
456
457 case SET_INSTALLABLE:
458 {
459 IntentId intentId = op.arg(0);
460 List<Intent> installableIntents = op.arg(1);
461
462 try {
463 @SuppressWarnings("unchecked")
464 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
465
466 if (prevInstallable != null) {
467 log.warn("Overwriting Intent {} installable {} -> {}",
468 intentId, prevInstallable, installableIntents);
469 }
470 } catch (InterruptedException e) {
471 log.error("Batch write was interrupted while processing {}", op, e);
472 failed.add(op);
473 Thread.currentThread().interrupt();
474 } catch (ExecutionException e) {
475 log.error("Batch write failed processing {}", op, e);
476 failed.add(op);
477 }
478 break;
479 }
480
481 case REMOVE_INSTALLED:
482 {
483 IntentId intentId = op.arg(0);
484
485 try {
486 @SuppressWarnings("unchecked")
487 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
488
489 if (prevInstallable == null) {
490 log.warn("Intent {} installable was already removed", intentId);
491 }
492 } catch (InterruptedException e) {
493 log.error("Batch write was interrupted while processing {}", op, e);
494 failed.add(op);
495 Thread.currentThread().interrupt();
496 } catch (ExecutionException e) {
497 log.error("Batch write failed processing {}", op, e);
498 failed.add(op);
499 }
500 break;
501 }
502
503 default:
504 log.warn("Unknown Operation encountered: {}", op);
505 if (!failed.contains(op)) {
506 failed.add(op);
507 }
508 break;
509 }
510 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800511 }
512
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800513 public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
514
515 @Override
516 public void entryAdded(EntryEvent<IntentId, Intent> event) {
517 localIntents.put(event.getKey(), event.getValue());
518 }
519
520 @Override
521 public void entryUpdated(EntryEvent<IntentId, Intent> event) {
522 entryAdded(event);
523 }
524 }
525
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800526 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
527
528 @Override
529 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800530 final IntentId intentId = event.getKey();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800531 final Member myself = theInstance.getCluster().getLocalMember();
532 if (!myself.equals(event.getMember())) {
533 // When Intent state was modified by remote node,
534 // clear local transient state.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800535 IntentState oldState = transientStates.remove(intentId);
536 if (oldState != null) {
537 log.debug("{} state updated remotely, removing transient state {}",
538 intentId, oldState);
539 }
alshabiba9819bf2014-11-30 18:15:52 -0800540
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800541 if (event.getValue() != null) {
542 // notify if this is not entry removed event
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800543
544 final Intent intent = getIntent(intentId);
545 if (intent == null) {
546 log.warn("no Intent found for {} on Event {}", intentId, event);
547 return;
548 }
549 notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
550 // remove IntentCache
551 localIntents.remove(intentId, intent);
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800552 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800553 }
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800554
555 // populate manual near cache, to prepare for
556 // transition event to WITHDRAWN
557 getIntent(intentId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800558 }
559 }
560}