blob: 926686a88ca9e03d15c16c4b37c3e9a46add9b24 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.onos.store.intent.impl;
17
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
31
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080032import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080033import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080036import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080038import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080039import org.onlab.metrics.MetricsService;
40import org.onlab.onos.core.MetricsHelper;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080041import org.onlab.onos.net.intent.Intent;
42import org.onlab.onos.net.intent.IntentEvent;
43import org.onlab.onos.net.intent.IntentId;
44import org.onlab.onos.net.intent.IntentState;
45import org.onlab.onos.net.intent.IntentStore;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080046import org.onlab.onos.net.intent.IntentStore.BatchWrite.Operation;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080047import org.onlab.onos.net.intent.IntentStoreDelegate;
48import org.onlab.onos.store.hz.AbstractHazelcastStore;
49import org.onlab.onos.store.hz.SMap;
50import org.onlab.onos.store.serializers.KryoNamespaces;
51import org.onlab.onos.store.serializers.KryoSerializer;
52import org.onlab.util.KryoNamespace;
53import org.slf4j.Logger;
54
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080055import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080056import java.util.EnumSet;
57import java.util.List;
58import java.util.Map;
59import java.util.Set;
60import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080061import java.util.concurrent.ExecutionException;
62import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080063
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080064import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080065import static com.google.common.base.Preconditions.checkState;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080066import static org.onlab.onos.net.intent.IntentState.*;
67import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080068import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080069
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080070@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080071@Service
72public class HazelcastIntentStore
73 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080074 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080075
76 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080077 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080078
79 /** Valid parking state, which can transition to WITHDRAWN. */
80 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
81
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080082 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080083
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080084 private final Logger log = getLogger(getClass());
85
86 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080087 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080088 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080089 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080090 private SMap<IntentId, IntentState> states;
91
92 // Map to store instance local intermediate state transition
93 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
94
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080095 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080096 private SMap<IntentId, List<Intent>> installable;
97
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080098 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99 protected MetricsService metricsService;
100
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800101 // TODO make this configurable
102 private boolean onlyLogTransitionError = true;
103
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800104 private Timer createIntentTimer;
105 private Timer removeIntentTimer;
106 private Timer setInstallableIntentsTimer;
107 private Timer getInstallableIntentsTimer;
108 private Timer removeInstalledIntentsTimer;
109 private Timer setStateTimer;
110 private Timer getIntentCountTimer;
111 private Timer getIntentsTimer;
112 private Timer getIntentTimer;
113 private Timer getIntentStateTimer;
114
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800115 private String listenerId;
116
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800117 private Timer createResponseTimer(String methodName) {
118 return createTimer("IntentStore", methodName, "responseTime");
119 }
120
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800121 @Override
122 @Activate
123 public void activate() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800124 createIntentTimer = createResponseTimer("createIntent");
125 removeIntentTimer = createResponseTimer("removeIntent");
126 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
127 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
128 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
129 setStateTimer = createResponseTimer("setState");
130 getIntentCountTimer = createResponseTimer("getIntentCount");
131 getIntentsTimer = createResponseTimer("getIntents");
132 getIntentTimer = createResponseTimer("getIntent");
133 getIntentStateTimer = createResponseTimer("getIntentState");
134
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800135 // FIXME: We need a way to add serializer for intents which has been plugged-in.
136 // As a short term workaround, relax Kryo config to
137 // registrationRequired=false
138 super.activate();
139 super.serializer = new KryoSerializer() {
140
141 @Override
142 protected void setupKryoPool() {
143 serializerPool = KryoNamespace.newBuilder()
144 .setRegistrationRequired(false)
145 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800146 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
147 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800148 }
149
150 };
151
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800152 final Config config = theInstance.getConfig();
153
154 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
155 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
156
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800157 // TODO: enable near cache, allow read from backup for this IMap
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800158 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800159 intents = new SMap<>(rawIntents , super.serializer);
160
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800161 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
162 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
163
164 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800165 states = new SMap<>(rawStates , super.serializer);
166 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800167 listenerId = states.addEntryListener(listener , true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800168
169 transientStates.clear();
170
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800171 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
172 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
173
174 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800175 installable = new SMap<>(rawInstallables , super.serializer);
176
177 log.info("Started");
178 }
179
180 @Deactivate
181 public void deactivate() {
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800182 states.removeEntryListener(listenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800183 log.info("Stopped");
184 }
185
186 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800187 public MetricsService metricsService() {
188 return metricsService;
189 }
190
191 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800192 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800193 Context timer = startTimer(createIntentTimer);
194 try {
195 Intent existing = intents.putIfAbsent(intent.id(), intent);
196 if (existing != null) {
197 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800198 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800199 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800200 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800201 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800202 }
203 } finally {
204 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800205 }
206 }
207
208 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800209 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800210 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800211 checkState(getIntentState(intentId) == WITHDRAWN,
212 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800213 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800214 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800215 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800216 states.remove(intentId);
217 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800218 } finally {
219 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800220 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800221 }
222
223 @Override
224 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800225 Context timer = startTimer(getIntentCountTimer);
226 try {
227 return intents.size();
228 } finally {
229 stopTimer(timer);
230 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800231 }
232
233 @Override
234 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800235 Context timer = startTimer(getIntentsTimer);
236 try {
237 return ImmutableSet.copyOf(intents.values());
238 } finally {
239 stopTimer(timer);
240 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800241 }
242
243 @Override
244 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800245 Context timer = startTimer(getIntentTimer);
246 try {
247 return intents.get(intentId);
248 } finally {
249 stopTimer(timer);
250 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800251 }
252
253 @Override
254 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800255 Context timer = startTimer(getIntentStateTimer);
256 try {
257 final IntentState localState = transientStates.get(id);
258 if (localState != null) {
259 return localState;
260 }
261 return states.get(id);
262 } finally {
263 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800264 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800265 }
266
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800267 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
268 if (onlyLogTransitionError) {
269 if (!expression) {
270 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
271 }
272 } else {
273 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
274 }
275 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800276
277 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800278 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800279 Context timer = startTimer(setStateTimer);
280 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800281 final IntentId id = intent.id();
282 IntentEvent.Type type = null;
283 final IntentState prevParking;
284 boolean transientStateChangeOnly = false;
285
286 // parking state transition
287 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800288 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800289 prevParking = states.get(id);
290 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800291 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
292 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800293 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800294 verify(PRE_INSTALLED.contains(prevParking),
295 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800296 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800297 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
298 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800299 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800300 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800301 break;
302 case INSTALLED:
303 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800304 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800305 "Illegal state transition attempted from %s to INSTALLED",
306 prevParking);
307 type = IntentEvent.Type.INSTALLED;
308 break;
309 case FAILED:
310 prevParking = states.replace(id, FAILED);
311 type = IntentEvent.Type.FAILED;
312 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800313 case WITHDRAW_REQ:
314 prevParking = states.replace(id, WITHDRAW_REQ);
315 verify(PRE_WITHDRAWN.contains(prevParking),
316 "Illegal state transition attempted from %s to WITHDRAW_REQ",
317 prevParking);
318 type = IntentEvent.Type.WITHDRAW_REQ;
319 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800320 case WITHDRAWN:
321 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800322 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800323 "Illegal state transition attempted from %s to WITHDRAWN",
324 prevParking);
325 type = IntentEvent.Type.WITHDRAWN;
326 break;
327 default:
328 transientStateChangeOnly = true;
329 prevParking = null;
330 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800331 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800332 if (!transientStateChangeOnly) {
333 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
334 }
335 // Update instance local state, which includes non-parking state transition
336 final IntentState prevTransient = transientStates.put(id, state);
337 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800338
alshabiba9819bf2014-11-30 18:15:52 -0800339 if (type != null) {
340 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800341 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800342 } finally {
343 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800344 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800345 }
346
347 @Override
348 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800349 Context timer = startTimer(setInstallableIntentsTimer);
350 try {
351 installable.put(intentId, result);
352 } finally {
353 stopTimer(timer);
354 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800355 }
356
357 @Override
358 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800359 Context timer = startTimer(getInstallableIntentsTimer);
360 try {
361 return installable.get(intentId);
362 } finally {
363 stopTimer(timer);
364 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800365 }
366
367 @Override
368 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800369 Context timer = startTimer(removeInstalledIntentsTimer);
370 try {
371 installable.remove(intentId);
372 } finally {
373 stopTimer(timer);
374 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800375 }
376
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800377 // TODO slice out methods after merging Ali's patch
378 // CHECKSTYLE IGNORE MethodLength FOR NEXT 1 LINES
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800379 @Override
380 public List<Operation> batchWrite(BatchWrite batch) {
381 // Hazelcast version will never fail for conditional failure now.
382 List<Operation> failed = new ArrayList<>();
383
384 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800385 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800386
387 for (Operation op : batch.operations()) {
388 switch (op.type()) {
389 case CREATE_INTENT:
390 checkArgument(op.args().size() == 1,
391 "CREATE_INTENT takes 1 argument. %s", op);
392 Intent intent = op.arg(0);
393 futures.add(Pair.of(op,
394 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800395 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800396 break;
397
398 case REMOVE_INTENT:
399 checkArgument(op.args().size() == 1,
400 "REMOVE_INTENT takes 1 argument. %s", op);
401 IntentId intentId = (IntentId) op.arg(0);
402 futures.add(Pair.of(op,
403 ImmutableList.of(intents.removeAsync(intentId),
404 states.removeAsync(intentId),
405 installable.removeAsync(intentId))));
406 break;
407
408 case SET_STATE:
409 checkArgument(op.args().size() == 2,
410 "SET_STATE takes 2 arguments. %s", op);
411 intent = op.arg(0);
412 IntentState newState = op.arg(1);
413 futures.add(Pair.of(op,
414 ImmutableList.of(states.putAsync(intent.id(), newState))));
415 break;
416
417 case SET_INSTALLABLE:
418 checkArgument(op.args().size() == 2,
419 "SET_INSTALLABLE takes 2 arguments. %s", op);
420 intentId = op.arg(0);
421 List<Intent> installableIntents = op.arg(1);
422 futures.add(Pair.of(op,
423 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
424 break;
425
426 case REMOVE_INSTALLED:
427 checkArgument(op.args().size() == 1,
428 "REMOVE_INSTALLED takes 1 argument. %s", op);
429 intentId = op.arg(0);
430 futures.add(Pair.of(op,
431 ImmutableList.of(installable.removeAsync(intentId))));
432 break;
433
434 default:
435 log.warn("Unknown Operation encountered: {}", op);
436 failed.add(op);
437 break;
438 }
439 }
440
441 // verify result
442 for (Pair<Operation, List<Future<?>>> future : futures) {
443 final Operation op = future.getLeft();
444 final List<Future<?>> subops = future.getRight();
445
446 switch (op.type()) {
447
448 case CREATE_INTENT:
449 {
450 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800451 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800452
453 try {
454 Intent prevIntent = (Intent) subops.get(0).get();
455 IntentState prevIntentState = (IntentState) subops.get(1).get();
456
457 if (prevIntent != null || prevIntentState != null) {
458 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
459 prevIntent, prevIntentState,
460 intent, newIntentState);
461 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800462 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800463 } catch (InterruptedException e) {
464 log.error("Batch write was interrupted while processing {}", op, e);
465 failed.add(op);
466 Thread.currentThread().interrupt();
467 } catch (ExecutionException e) {
468 log.error("Batch write failed processing {}", op, e);
469 failed.add(op);
470 }
471 break;
472 }
473
474 case REMOVE_INTENT:
475 {
476 IntentId intentId = op.arg(0);
477
478 try {
479 Intent prevIntent = (Intent) subops.get(0).get();
480 IntentState prevIntentState = (IntentState) subops.get(1).get();
481 @SuppressWarnings("unchecked")
482 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
483
484 if (prevIntent == null) {
485 log.warn("Intent {} was already removed.", intentId);
486 }
487 if (prevIntentState == null) {
488 log.warn("Intent {} state was already removed", intentId);
489 }
490 if (prevInstallable == null) {
491 log.info("Intent {} installable was already removed", intentId);
492 }
493 } catch (InterruptedException e) {
494 log.error("Batch write was interrupted while processing {}", op, e);
495 failed.add(op);
496 Thread.currentThread().interrupt();
497 } catch (ExecutionException e) {
498 log.error("Batch write failed processing {}", op, e);
499 failed.add(op);
500 }
501 break;
502 }
503
504 case SET_STATE:
505 {
506 Intent intent = op.arg(0);
507 IntentId intentId = intent.id();
508 IntentState newState = op.arg(1);
509
510 try {
511 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800512
513 if (PARKING.contains(newState)) {
514 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800515 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800516 }
alshabiba9819bf2014-11-30 18:15:52 -0800517
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800518 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
519 // TODO sanity check and log?
520 } catch (InterruptedException e) {
521 log.error("Batch write was interrupted while processing {}", op, e);
522 failed.add(op);
523 Thread.currentThread().interrupt();
524 } catch (ExecutionException e) {
525 log.error("Batch write failed processing {}", op, e);
526 failed.add(op);
527 }
528 break;
529 }
530
531 case SET_INSTALLABLE:
532 {
533 IntentId intentId = op.arg(0);
534 List<Intent> installableIntents = op.arg(1);
535
536 try {
537 @SuppressWarnings("unchecked")
538 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
539
540 if (prevInstallable != null) {
541 log.warn("Overwriting Intent {} installable {} -> {}",
542 intentId, prevInstallable, installableIntents);
543 }
544 } catch (InterruptedException e) {
545 log.error("Batch write was interrupted while processing {}", op, e);
546 failed.add(op);
547 Thread.currentThread().interrupt();
548 } catch (ExecutionException e) {
549 log.error("Batch write failed processing {}", op, e);
550 failed.add(op);
551 }
552 break;
553 }
554
555 case REMOVE_INSTALLED:
556 {
557 IntentId intentId = op.arg(0);
558
559 try {
560 @SuppressWarnings("unchecked")
561 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
562
563 if (prevInstallable == null) {
564 log.warn("Intent {} installable was already removed", intentId);
565 }
566 } catch (InterruptedException e) {
567 log.error("Batch write was interrupted while processing {}", op, e);
568 failed.add(op);
569 Thread.currentThread().interrupt();
570 } catch (ExecutionException e) {
571 log.error("Batch write failed processing {}", op, e);
572 failed.add(op);
573 }
574 break;
575 }
576
577 default:
578 log.warn("Unknown Operation encountered: {}", op);
579 if (!failed.contains(op)) {
580 failed.add(op);
581 }
582 break;
583 }
584 }
alshabiba9819bf2014-11-30 18:15:52 -0800585
586 notifyDelegate(events);
587
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800588 return failed;
589 }
590
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800591 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
592
593 @Override
594 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
595 final Member myself = theInstance.getCluster().getLocalMember();
596 if (!myself.equals(event.getMember())) {
597 // When Intent state was modified by remote node,
598 // clear local transient state.
599 final IntentId intentId = event.getKey();
600 IntentState oldState = transientStates.remove(intentId);
601 if (oldState != null) {
602 log.debug("{} state updated remotely, removing transient state {}",
603 intentId, oldState);
604 }
alshabiba9819bf2014-11-30 18:15:52 -0800605
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800606 if (event.getValue() != null) {
607 // notify if this is not entry removed event
608 notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId)));
609 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800610 }
611 }
612 }
613}