blob: 8e91d6129bbcb62eeb848d7b85ac73659f205b69 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080031import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080032import org.apache.felix.scr.annotations.Activate;
33import org.apache.felix.scr.annotations.Component;
34import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080035import org.apache.felix.scr.annotations.Reference;
36import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080037import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080038import org.onlab.metrics.MetricsService;
Jonathan Hartc0363672015-01-20 16:21:08 -080039import org.onlab.util.KryoNamespace;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080041import org.onosproject.net.intent.BatchWrite;
Jonathan Hartc0363672015-01-20 16:21:08 -080042import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080043import org.onosproject.net.intent.Intent;
44import org.onosproject.net.intent.IntentEvent;
45import org.onosproject.net.intent.IntentId;
46import org.onosproject.net.intent.IntentState;
47import org.onosproject.net.intent.IntentStore;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.intent.IntentStoreDelegate;
Ray Milkeyf9af43c2015-02-09 16:45:48 -080049import org.onosproject.net.intent.Key;
Brian O'Connorabafb502014-12-02 22:26:20 -080050import org.onosproject.store.hz.AbstractHazelcastStore;
51import org.onosproject.store.hz.SMap;
52import org.onosproject.store.serializers.KryoNamespaces;
53import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080054import org.slf4j.Logger;
55
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080056import java.util.ArrayList;
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -080057import java.util.Collections;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080058import java.util.EnumSet;
59import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080063import java.util.concurrent.ExecutionException;
64import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080065
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080066import static com.google.common.base.Preconditions.checkArgument;
Jonathan Hartc0363672015-01-20 16:21:08 -080067import static org.onlab.metrics.MetricsUtil.startTimer;
68import static org.onlab.metrics.MetricsUtil.stopTimer;
Brian O'Connorabafb502014-12-02 22:26:20 -080069import static org.onosproject.net.intent.IntentState.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080070import static org.slf4j.LoggerFactory.getLogger;
71
Jonathan Hart9e817ec2015-02-04 08:52:00 -080072@Component(immediate = true, enabled = false)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080073@Service
74public class HazelcastIntentStore
75 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080076 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080077
78 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080079 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080080
81 /** Valid parking state, which can transition to WITHDRAWN. */
82 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
83
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080084 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080085
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080086 private final Logger log = getLogger(getClass());
87
88 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080089 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080090 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080091 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080092 private SMap<IntentId, IntentState> states;
93
94 // Map to store instance local intermediate state transition
95 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
96
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080097 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080098 private SMap<IntentId, List<Intent>> installable;
99
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected MetricsService metricsService;
102
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800103 private boolean onlyLogTransitionError = true;
104
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800105 private Timer getInstallableIntentsTimer;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800106 private Timer getIntentCountTimer;
107 private Timer getIntentsTimer;
108 private Timer getIntentTimer;
109 private Timer getIntentStateTimer;
110
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800111 // manual near cache of Intent
112 // (Note: IntentId -> Intent is expected to be immutable)
113 // entry will be evicted, when state for that IntentId is removed.
114 private Map<IntentId, Intent> localIntents;
115
116 private String stateListenerId;
117
118 private String intentsListenerId;
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800119
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800120 private Timer createResponseTimer(String methodName) {
121 return createTimer("IntentStore", methodName, "responseTime");
122 }
123
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800124 @Override
125 @Activate
126 public void activate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800127 localIntents = new ConcurrentHashMap<>();
128
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800129 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800130 getIntentCountTimer = createResponseTimer("getIntentCount");
131 getIntentsTimer = createResponseTimer("getIntents");
132 getIntentTimer = createResponseTimer("getIntent");
133 getIntentStateTimer = createResponseTimer("getIntentState");
134
Brian O'Connor44008532014-12-04 16:41:36 -0800135 // We need a way to add serializer for intents which has been plugged-in.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800136 // As a short term workaround, relax Kryo config to
137 // registrationRequired=false
138 super.activate();
139 super.serializer = new KryoSerializer() {
140
141 @Override
142 protected void setupKryoPool() {
143 serializerPool = KryoNamespace.newBuilder()
144 .setRegistrationRequired(false)
145 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800146 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
147 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800148 }
149
150 };
151
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800152 final Config config = theInstance.getConfig();
153
154 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
155 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
156
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800157 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800158 intents = new SMap<>(rawIntents , super.serializer);
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800159 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800160
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800161 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
162 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
163
164 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800165 states = new SMap<>(rawStates , super.serializer);
166 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800167 stateListenerId = states.addEntryListener(listener, true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800168
169 transientStates.clear();
170
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800171 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
172 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
173
174 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800175 installable = new SMap<>(rawInstallables , super.serializer);
176
177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800182 intents.removeEntryListener(intentsListenerId);
183 states.removeEntryListener(stateListenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800184 log.info("Stopped");
185 }
186
187 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800188 public MetricsService metricsService() {
189 return metricsService;
190 }
191
192 @Override
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800193 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800194 Context timer = startTimer(getIntentCountTimer);
195 try {
196 return intents.size();
197 } finally {
198 stopTimer(timer);
199 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800200 }
201
202 @Override
203 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800204 Context timer = startTimer(getIntentsTimer);
205 try {
206 return ImmutableSet.copyOf(intents.values());
207 } finally {
208 stopTimer(timer);
209 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800210 }
211
212 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800213 public Intent getIntent(Key intentKey) {
214 return null;
215 }
216
217
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800218 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800219 Context timer = startTimer(getIntentTimer);
220 try {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800221 Intent intent = localIntents.get(intentId);
222 if (intent != null) {
223 return intent;
224 }
225 intent = intents.get(intentId);
226 if (intent != null) {
227 localIntents.put(intentId, intent);
228 }
229 return intent;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800230 } finally {
231 stopTimer(timer);
232 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800233 }
234
235 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800236 public IntentState getIntentState(Key key) {
237 // TODO: either implement this or remove this class
238 return IntentState.FAILED;
239 /*
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800240 Context timer = startTimer(getIntentStateTimer);
241 try {
242 final IntentState localState = transientStates.get(id);
243 if (localState != null) {
244 return localState;
245 }
246 return states.get(id);
247 } finally {
248 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800249 }
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800250 */
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800251 }
252
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800253 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
254 if (onlyLogTransitionError) {
255 if (!expression) {
256 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
257 }
258 } else {
259 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
260 }
261 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800262
263 @Override
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800264 public List<Intent> getInstallableIntents(Key intentKey) {
265 // TODO: implement this or delete class
266 return null;
267
268 /*
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800269 Context timer = startTimer(getInstallableIntentsTimer);
270 try {
271 return installable.get(intentId);
272 } finally {
273 stopTimer(timer);
274 }
Ray Milkeyf9af43c2015-02-09 16:45:48 -0800275 */
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800276 }
277
278 @Override
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800279 public List<Operation> batchWrite(BatchWrite batch) {
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -0800280 if (batch.isEmpty()) {
281 return Collections.emptyList();
282 }
283
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800284 // Hazelcast version will never fail for conditional failure now.
285 List<Operation> failed = new ArrayList<>();
286
287 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800288 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800289
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800290 batchWriteAsync(batch, failed, futures);
291
292 // verify result
293 verifyAsyncWrites(futures, failed, events);
294
295 notifyDelegate(events);
296
297 return failed;
298 }
299
300 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
301 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800302 for (Operation op : batch.operations()) {
303 switch (op.type()) {
304 case CREATE_INTENT:
305 checkArgument(op.args().size() == 1,
306 "CREATE_INTENT takes 1 argument. %s", op);
307 Intent intent = op.arg(0);
308 futures.add(Pair.of(op,
309 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800310 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800311 break;
312
313 case REMOVE_INTENT:
314 checkArgument(op.args().size() == 1,
315 "REMOVE_INTENT takes 1 argument. %s", op);
316 IntentId intentId = (IntentId) op.arg(0);
317 futures.add(Pair.of(op,
318 ImmutableList.of(intents.removeAsync(intentId),
319 states.removeAsync(intentId),
320 installable.removeAsync(intentId))));
321 break;
322
323 case SET_STATE:
324 checkArgument(op.args().size() == 2,
325 "SET_STATE takes 2 arguments. %s", op);
326 intent = op.arg(0);
327 IntentState newState = op.arg(1);
328 futures.add(Pair.of(op,
329 ImmutableList.of(states.putAsync(intent.id(), newState))));
330 break;
331
332 case SET_INSTALLABLE:
333 checkArgument(op.args().size() == 2,
334 "SET_INSTALLABLE takes 2 arguments. %s", op);
335 intentId = op.arg(0);
336 List<Intent> installableIntents = op.arg(1);
337 futures.add(Pair.of(op,
338 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
339 break;
340
341 case REMOVE_INSTALLED:
342 checkArgument(op.args().size() == 1,
343 "REMOVE_INSTALLED takes 1 argument. %s", op);
344 intentId = op.arg(0);
345 futures.add(Pair.of(op,
346 ImmutableList.of(installable.removeAsync(intentId))));
347 break;
348
349 default:
350 log.warn("Unknown Operation encountered: {}", op);
351 failed.add(op);
352 break;
353 }
354 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800355 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800356
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800357 /**
358 * Checks the async write result Futures and prepare Events to post.
359 *
360 * @param futures async write Futures
361 * @param failed list to output failed batch write operations
362 * @param events list to output events to post as result of writes
363 */
364 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
365 List<Operation> failed,
366 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800367 for (Pair<Operation, List<Future<?>>> future : futures) {
368 final Operation op = future.getLeft();
369 final List<Future<?>> subops = future.getRight();
370
371 switch (op.type()) {
372
373 case CREATE_INTENT:
374 {
375 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800376 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800377
378 try {
379 Intent prevIntent = (Intent) subops.get(0).get();
380 IntentState prevIntentState = (IntentState) subops.get(1).get();
381
382 if (prevIntent != null || prevIntentState != null) {
383 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
384 prevIntent, prevIntentState,
385 intent, newIntentState);
386 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800387 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800388 } catch (InterruptedException e) {
389 log.error("Batch write was interrupted while processing {}", op, e);
390 failed.add(op);
391 Thread.currentThread().interrupt();
392 } catch (ExecutionException e) {
393 log.error("Batch write failed processing {}", op, e);
394 failed.add(op);
395 }
396 break;
397 }
398
399 case REMOVE_INTENT:
400 {
401 IntentId intentId = op.arg(0);
402
403 try {
404 Intent prevIntent = (Intent) subops.get(0).get();
405 IntentState prevIntentState = (IntentState) subops.get(1).get();
406 @SuppressWarnings("unchecked")
407 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
408
409 if (prevIntent == null) {
410 log.warn("Intent {} was already removed.", intentId);
411 }
412 if (prevIntentState == null) {
413 log.warn("Intent {} state was already removed", intentId);
414 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800415 if (prevInstallable != null) {
416 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800417 }
418 } catch (InterruptedException e) {
419 log.error("Batch write was interrupted while processing {}", op, e);
420 failed.add(op);
421 Thread.currentThread().interrupt();
422 } catch (ExecutionException e) {
423 log.error("Batch write failed processing {}", op, e);
424 failed.add(op);
425 }
426 break;
427 }
428
429 case SET_STATE:
430 {
431 Intent intent = op.arg(0);
432 IntentId intentId = intent.id();
433 IntentState newState = op.arg(1);
434
435 try {
436 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800437
438 if (PARKING.contains(newState)) {
439 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800440 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800441 }
alshabiba9819bf2014-11-30 18:15:52 -0800442
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800443 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800444 } catch (InterruptedException e) {
445 log.error("Batch write was interrupted while processing {}", op, e);
446 failed.add(op);
447 Thread.currentThread().interrupt();
448 } catch (ExecutionException e) {
449 log.error("Batch write failed processing {}", op, e);
450 failed.add(op);
451 }
452 break;
453 }
454
455 case SET_INSTALLABLE:
456 {
457 IntentId intentId = op.arg(0);
458 List<Intent> installableIntents = op.arg(1);
459
460 try {
461 @SuppressWarnings("unchecked")
462 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
463
464 if (prevInstallable != null) {
465 log.warn("Overwriting Intent {} installable {} -> {}",
466 intentId, prevInstallable, installableIntents);
467 }
468 } catch (InterruptedException e) {
469 log.error("Batch write was interrupted while processing {}", op, e);
470 failed.add(op);
471 Thread.currentThread().interrupt();
472 } catch (ExecutionException e) {
473 log.error("Batch write failed processing {}", op, e);
474 failed.add(op);
475 }
476 break;
477 }
478
479 case REMOVE_INSTALLED:
480 {
481 IntentId intentId = op.arg(0);
482
483 try {
484 @SuppressWarnings("unchecked")
485 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
486
487 if (prevInstallable == null) {
488 log.warn("Intent {} installable was already removed", intentId);
489 }
490 } catch (InterruptedException e) {
491 log.error("Batch write was interrupted while processing {}", op, e);
492 failed.add(op);
493 Thread.currentThread().interrupt();
494 } catch (ExecutionException e) {
495 log.error("Batch write failed processing {}", op, e);
496 failed.add(op);
497 }
498 break;
499 }
500
501 default:
502 log.warn("Unknown Operation encountered: {}", op);
503 if (!failed.contains(op)) {
504 failed.add(op);
505 }
506 break;
507 }
508 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800509 }
510
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800511 public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
512
513 @Override
514 public void entryAdded(EntryEvent<IntentId, Intent> event) {
515 localIntents.put(event.getKey(), event.getValue());
516 }
517
518 @Override
519 public void entryUpdated(EntryEvent<IntentId, Intent> event) {
520 entryAdded(event);
521 }
522 }
523
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800524 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
525
526 @Override
527 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800528 final IntentId intentId = event.getKey();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800529 final Member myself = theInstance.getCluster().getLocalMember();
530 if (!myself.equals(event.getMember())) {
531 // When Intent state was modified by remote node,
532 // clear local transient state.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800533 IntentState oldState = transientStates.remove(intentId);
534 if (oldState != null) {
535 log.debug("{} state updated remotely, removing transient state {}",
536 intentId, oldState);
537 }
alshabiba9819bf2014-11-30 18:15:52 -0800538
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800539 if (event.getValue() != null) {
540 // notify if this is not entry removed event
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800541
542 final Intent intent = getIntent(intentId);
543 if (intent == null) {
544 log.warn("no Intent found for {} on Event {}", intentId, event);
545 return;
546 }
547 notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
548 // remove IntentCache
549 localIntents.remove(intentId, intent);
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800550 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800551 }
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800552
553 // populate manual near cache, to prepare for
554 // transition event to WITHDRAWN
555 getIntent(intentId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800556 }
557 }
558}