blob: c537a72c21947f6df202692f8c17191f61463121 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080024import com.hazelcast.core.EntryAdapter;
25import com.hazelcast.core.EntryEvent;
26import com.hazelcast.core.EntryListener;
27import com.hazelcast.core.IMap;
28import com.hazelcast.core.Member;
29
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080030import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080031import org.apache.felix.scr.annotations.Activate;
32import org.apache.felix.scr.annotations.Component;
33import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080034import org.apache.felix.scr.annotations.Reference;
35import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080036import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080037import org.onlab.metrics.MetricsService;
38import org.onlab.onos.core.MetricsHelper;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080039import org.onlab.onos.net.intent.Intent;
40import org.onlab.onos.net.intent.IntentEvent;
41import org.onlab.onos.net.intent.IntentId;
42import org.onlab.onos.net.intent.IntentState;
43import org.onlab.onos.net.intent.IntentStore;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080044import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080045import org.onlab.onos.net.intent.IntentStoreDelegate;
46import org.onlab.onos.store.hz.AbstractHazelcastStore;
47import org.onlab.onos.store.hz.SMap;
48import org.onlab.onos.store.serializers.KryoNamespaces;
49import org.onlab.onos.store.serializers.KryoSerializer;
50import org.onlab.util.KryoNamespace;
51import org.slf4j.Logger;
52
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080053import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080054import java.util.EnumSet;
55import java.util.List;
56import java.util.Map;
57import java.util.Set;
58import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080059import java.util.concurrent.ExecutionException;
60import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080061
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080062import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080063import static com.google.common.base.Preconditions.checkState;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080064import static org.onlab.onos.net.intent.IntentState.*;
65import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080066import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080067
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080068@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080069@Service
70public class HazelcastIntentStore
71 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080072 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080073
74 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080075 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080076
77 /** Valid parking state, which can transition to WITHDRAWN. */
78 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
79
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080080 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080081
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080082 private final Logger log = getLogger(getClass());
83
84 // Assumption: IntentId will not have synonyms
85 private SMap<IntentId, Intent> intents;
86 private SMap<IntentId, IntentState> states;
87
88 // Map to store instance local intermediate state transition
89 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
90
91 private SMap<IntentId, List<Intent>> installable;
92
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080093 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected MetricsService metricsService;
95
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080096 // TODO make this configurable
97 private boolean onlyLogTransitionError = true;
98
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080099 private Timer createIntentTimer;
100 private Timer removeIntentTimer;
101 private Timer setInstallableIntentsTimer;
102 private Timer getInstallableIntentsTimer;
103 private Timer removeInstalledIntentsTimer;
104 private Timer setStateTimer;
105 private Timer getIntentCountTimer;
106 private Timer getIntentsTimer;
107 private Timer getIntentTimer;
108 private Timer getIntentStateTimer;
109
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800110 private String listenerId;
111
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800112 private Timer createResponseTimer(String methodName) {
113 return createTimer("IntentStore", methodName, "responseTime");
114 }
115
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800116 @Override
117 @Activate
118 public void activate() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800119 createIntentTimer = createResponseTimer("createIntent");
120 removeIntentTimer = createResponseTimer("removeIntent");
121 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
122 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
123 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
124 setStateTimer = createResponseTimer("setState");
125 getIntentCountTimer = createResponseTimer("getIntentCount");
126 getIntentsTimer = createResponseTimer("getIntents");
127 getIntentTimer = createResponseTimer("getIntent");
128 getIntentStateTimer = createResponseTimer("getIntentState");
129
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800130 // FIXME: We need a way to add serializer for intents which has been plugged-in.
131 // As a short term workaround, relax Kryo config to
132 // registrationRequired=false
133 super.activate();
134 super.serializer = new KryoSerializer() {
135
136 @Override
137 protected void setupKryoPool() {
138 serializerPool = KryoNamespace.newBuilder()
139 .setRegistrationRequired(false)
140 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800141 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
142 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800143 }
144
145 };
146
147 // TODO: enable near cache, allow read from backup for this IMap
148 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap("intents");
149 intents = new SMap<>(rawIntents , super.serializer);
150
151 // TODO: disable near cache, disable read from backup for this IMap
152 IMap<byte[], byte[]> rawStates = super.theInstance.getMap("intent-states");
153 states = new SMap<>(rawStates , super.serializer);
154 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800155 listenerId = states.addEntryListener(listener , true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800156
157 transientStates.clear();
158
159 // TODO: disable near cache, disable read from backup for this IMap
160 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap("installable-intents");
161 installable = new SMap<>(rawInstallables , super.serializer);
162
163 log.info("Started");
164 }
165
166 @Deactivate
167 public void deactivate() {
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800168 states.removeEntryListener(listenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800169 log.info("Stopped");
170 }
171
172 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800173 public MetricsService metricsService() {
174 return metricsService;
175 }
176
177 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800178 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800179 Context timer = startTimer(createIntentTimer);
180 try {
181 Intent existing = intents.putIfAbsent(intent.id(), intent);
182 if (existing != null) {
183 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800184 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800185 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800186 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800187 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800188 }
189 } finally {
190 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800191 }
192 }
193
194 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800195 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800196 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800197 checkState(getIntentState(intentId) == WITHDRAWN,
198 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800199 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800200 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800201 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800202 states.remove(intentId);
203 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800204 } finally {
205 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800206 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800207 }
208
209 @Override
210 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800211 Context timer = startTimer(getIntentCountTimer);
212 try {
213 return intents.size();
214 } finally {
215 stopTimer(timer);
216 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800217 }
218
219 @Override
220 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800221 Context timer = startTimer(getIntentsTimer);
222 try {
223 return ImmutableSet.copyOf(intents.values());
224 } finally {
225 stopTimer(timer);
226 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800227 }
228
229 @Override
230 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800231 Context timer = startTimer(getIntentTimer);
232 try {
233 return intents.get(intentId);
234 } finally {
235 stopTimer(timer);
236 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800237 }
238
239 @Override
240 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800241 Context timer = startTimer(getIntentStateTimer);
242 try {
243 final IntentState localState = transientStates.get(id);
244 if (localState != null) {
245 return localState;
246 }
247 return states.get(id);
248 } finally {
249 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800250 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800251 }
252
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800253 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
254 if (onlyLogTransitionError) {
255 if (!expression) {
256 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
257 }
258 } else {
259 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
260 }
261 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800262
263 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800264 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800265 Context timer = startTimer(setStateTimer);
266 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800267 final IntentId id = intent.id();
268 IntentEvent.Type type = null;
269 final IntentState prevParking;
270 boolean transientStateChangeOnly = false;
271
272 // parking state transition
273 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800274 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800275 prevParking = states.get(id);
276 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800277 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
278 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800279 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800280 verify(PRE_INSTALLED.contains(prevParking),
281 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800282 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800283 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
284 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800285 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800286 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800287 break;
288 case INSTALLED:
289 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800290 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800291 "Illegal state transition attempted from %s to INSTALLED",
292 prevParking);
293 type = IntentEvent.Type.INSTALLED;
294 break;
295 case FAILED:
296 prevParking = states.replace(id, FAILED);
297 type = IntentEvent.Type.FAILED;
298 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800299 case WITHDRAW_REQ:
300 prevParking = states.replace(id, WITHDRAW_REQ);
301 verify(PRE_WITHDRAWN.contains(prevParking),
302 "Illegal state transition attempted from %s to WITHDRAW_REQ",
303 prevParking);
304 type = IntentEvent.Type.WITHDRAW_REQ;
305 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800306 case WITHDRAWN:
307 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800308 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800309 "Illegal state transition attempted from %s to WITHDRAWN",
310 prevParking);
311 type = IntentEvent.Type.WITHDRAWN;
312 break;
313 default:
314 transientStateChangeOnly = true;
315 prevParking = null;
316 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800317 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800318 if (!transientStateChangeOnly) {
319 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
320 }
321 // Update instance local state, which includes non-parking state transition
322 final IntentState prevTransient = transientStates.put(id, state);
323 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800324
alshabiba9819bf2014-11-30 18:15:52 -0800325 if (type != null) {
326 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800327 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800328 } finally {
329 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800330 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800331 }
332
333 @Override
334 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800335 Context timer = startTimer(setInstallableIntentsTimer);
336 try {
337 installable.put(intentId, result);
338 } finally {
339 stopTimer(timer);
340 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800341 }
342
343 @Override
344 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800345 Context timer = startTimer(getInstallableIntentsTimer);
346 try {
347 return installable.get(intentId);
348 } finally {
349 stopTimer(timer);
350 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800351 }
352
353 @Override
354 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800355 Context timer = startTimer(removeInstalledIntentsTimer);
356 try {
357 installable.remove(intentId);
358 } finally {
359 stopTimer(timer);
360 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800361 }
362
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800363 // TODO slice out methods after merging Ali's patch
364 // CHECKSTYLE IGNORE MethodLength FOR NEXT 1 LINES
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800365 @Override
366 public List<Operation> batchWrite(BatchWrite batch) {
367 // Hazelcast version will never fail for conditional failure now.
368 List<Operation> failed = new ArrayList<>();
369
370 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800371 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800372
373 for (Operation op : batch.operations()) {
374 switch (op.type()) {
375 case CREATE_INTENT:
376 checkArgument(op.args().size() == 1,
377 "CREATE_INTENT takes 1 argument. %s", op);
378 Intent intent = op.arg(0);
379 futures.add(Pair.of(op,
380 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800381 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800382 break;
383
384 case REMOVE_INTENT:
385 checkArgument(op.args().size() == 1,
386 "REMOVE_INTENT takes 1 argument. %s", op);
387 IntentId intentId = (IntentId) op.arg(0);
388 futures.add(Pair.of(op,
389 ImmutableList.of(intents.removeAsync(intentId),
390 states.removeAsync(intentId),
391 installable.removeAsync(intentId))));
392 break;
393
394 case SET_STATE:
395 checkArgument(op.args().size() == 2,
396 "SET_STATE takes 2 arguments. %s", op);
397 intent = op.arg(0);
398 IntentState newState = op.arg(1);
399 futures.add(Pair.of(op,
400 ImmutableList.of(states.putAsync(intent.id(), newState))));
401 break;
402
403 case SET_INSTALLABLE:
404 checkArgument(op.args().size() == 2,
405 "SET_INSTALLABLE takes 2 arguments. %s", op);
406 intentId = op.arg(0);
407 List<Intent> installableIntents = op.arg(1);
408 futures.add(Pair.of(op,
409 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
410 break;
411
412 case REMOVE_INSTALLED:
413 checkArgument(op.args().size() == 1,
414 "REMOVE_INSTALLED takes 1 argument. %s", op);
415 intentId = op.arg(0);
416 futures.add(Pair.of(op,
417 ImmutableList.of(installable.removeAsync(intentId))));
418 break;
419
420 default:
421 log.warn("Unknown Operation encountered: {}", op);
422 failed.add(op);
423 break;
424 }
425 }
426
427 // verify result
428 for (Pair<Operation, List<Future<?>>> future : futures) {
429 final Operation op = future.getLeft();
430 final List<Future<?>> subops = future.getRight();
431
432 switch (op.type()) {
433
434 case CREATE_INTENT:
435 {
436 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800437 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800438
439 try {
440 Intent prevIntent = (Intent) subops.get(0).get();
441 IntentState prevIntentState = (IntentState) subops.get(1).get();
442
443 if (prevIntent != null || prevIntentState != null) {
444 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
445 prevIntent, prevIntentState,
446 intent, newIntentState);
447 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800448 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800449 } catch (InterruptedException e) {
450 log.error("Batch write was interrupted while processing {}", op, e);
451 failed.add(op);
452 Thread.currentThread().interrupt();
453 } catch (ExecutionException e) {
454 log.error("Batch write failed processing {}", op, e);
455 failed.add(op);
456 }
457 break;
458 }
459
460 case REMOVE_INTENT:
461 {
462 IntentId intentId = op.arg(0);
463
464 try {
465 Intent prevIntent = (Intent) subops.get(0).get();
466 IntentState prevIntentState = (IntentState) subops.get(1).get();
467 @SuppressWarnings("unchecked")
468 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
469
470 if (prevIntent == null) {
471 log.warn("Intent {} was already removed.", intentId);
472 }
473 if (prevIntentState == null) {
474 log.warn("Intent {} state was already removed", intentId);
475 }
476 if (prevInstallable == null) {
477 log.info("Intent {} installable was already removed", intentId);
478 }
479 } catch (InterruptedException e) {
480 log.error("Batch write was interrupted while processing {}", op, e);
481 failed.add(op);
482 Thread.currentThread().interrupt();
483 } catch (ExecutionException e) {
484 log.error("Batch write failed processing {}", op, e);
485 failed.add(op);
486 }
487 break;
488 }
489
490 case SET_STATE:
491 {
492 Intent intent = op.arg(0);
493 IntentId intentId = intent.id();
494 IntentState newState = op.arg(1);
495
496 try {
497 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800498
499 if (PARKING.contains(newState)) {
500 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800501 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800502 }
alshabiba9819bf2014-11-30 18:15:52 -0800503
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800504 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
505 // TODO sanity check and log?
506 } catch (InterruptedException e) {
507 log.error("Batch write was interrupted while processing {}", op, e);
508 failed.add(op);
509 Thread.currentThread().interrupt();
510 } catch (ExecutionException e) {
511 log.error("Batch write failed processing {}", op, e);
512 failed.add(op);
513 }
514 break;
515 }
516
517 case SET_INSTALLABLE:
518 {
519 IntentId intentId = op.arg(0);
520 List<Intent> installableIntents = op.arg(1);
521
522 try {
523 @SuppressWarnings("unchecked")
524 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
525
526 if (prevInstallable != null) {
527 log.warn("Overwriting Intent {} installable {} -> {}",
528 intentId, prevInstallable, installableIntents);
529 }
530 } catch (InterruptedException e) {
531 log.error("Batch write was interrupted while processing {}", op, e);
532 failed.add(op);
533 Thread.currentThread().interrupt();
534 } catch (ExecutionException e) {
535 log.error("Batch write failed processing {}", op, e);
536 failed.add(op);
537 }
538 break;
539 }
540
541 case REMOVE_INSTALLED:
542 {
543 IntentId intentId = op.arg(0);
544
545 try {
546 @SuppressWarnings("unchecked")
547 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
548
549 if (prevInstallable == null) {
550 log.warn("Intent {} installable was already removed", intentId);
551 }
552 } catch (InterruptedException e) {
553 log.error("Batch write was interrupted while processing {}", op, e);
554 failed.add(op);
555 Thread.currentThread().interrupt();
556 } catch (ExecutionException e) {
557 log.error("Batch write failed processing {}", op, e);
558 failed.add(op);
559 }
560 break;
561 }
562
563 default:
564 log.warn("Unknown Operation encountered: {}", op);
565 if (!failed.contains(op)) {
566 failed.add(op);
567 }
568 break;
569 }
570 }
alshabiba9819bf2014-11-30 18:15:52 -0800571
572 notifyDelegate(events);
573
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800574 return failed;
575 }
576
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800577 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
578
579 @Override
580 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
581 final Member myself = theInstance.getCluster().getLocalMember();
582 if (!myself.equals(event.getMember())) {
583 // When Intent state was modified by remote node,
584 // clear local transient state.
585 final IntentId intentId = event.getKey();
586 IntentState oldState = transientStates.remove(intentId);
587 if (oldState != null) {
588 log.debug("{} state updated remotely, removing transient state {}",
589 intentId, oldState);
590 }
alshabiba9819bf2014-11-30 18:15:52 -0800591
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800592 if (event.getValue() != null) {
593 // notify if this is not entry removed event
594 notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId)));
595 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800596 }
597 }
598 }
599}