blob: fc1028496ea12aed03172dd4953c18d092e75c7b [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
31
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080032import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080033import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080036import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080038import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080039import org.onlab.metrics.MetricsService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080041import org.onosproject.net.intent.BatchWrite;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.intent.Intent;
43import org.onosproject.net.intent.IntentEvent;
44import org.onosproject.net.intent.IntentId;
45import org.onosproject.net.intent.IntentState;
46import org.onosproject.net.intent.IntentStore;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080047import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.intent.IntentStoreDelegate;
49import org.onosproject.store.hz.AbstractHazelcastStore;
50import org.onosproject.store.hz.SMap;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import org.onlab.util.KryoNamespace;
54import org.slf4j.Logger;
55
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080056import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080057import java.util.EnumSet;
58import java.util.List;
59import java.util.Map;
60import java.util.Set;
61import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080062import java.util.concurrent.ExecutionException;
63import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080064
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080065import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080066import static com.google.common.base.Preconditions.checkState;
Brian O'Connorabafb502014-12-02 22:26:20 -080067import static org.onosproject.net.intent.IntentState.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080068import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080069import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080070
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080071@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080072@Service
73public class HazelcastIntentStore
74 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080075 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080076
77 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080078 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080079
80 /** Valid parking state, which can transition to WITHDRAWN. */
81 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
82
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080083 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080084
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080085 private final Logger log = getLogger(getClass());
86
87 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080088 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080089 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080090 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080091 private SMap<IntentId, IntentState> states;
92
93 // Map to store instance local intermediate state transition
94 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
95
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080096 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080097 private SMap<IntentId, List<Intent>> installable;
98
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected MetricsService metricsService;
101
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800102 // TODO make this configurable
103 private boolean onlyLogTransitionError = true;
104
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800105 private Timer createIntentTimer;
106 private Timer removeIntentTimer;
107 private Timer setInstallableIntentsTimer;
108 private Timer getInstallableIntentsTimer;
109 private Timer removeInstalledIntentsTimer;
110 private Timer setStateTimer;
111 private Timer getIntentCountTimer;
112 private Timer getIntentsTimer;
113 private Timer getIntentTimer;
114 private Timer getIntentStateTimer;
115
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800116 private String listenerId;
117
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800118 private Timer createResponseTimer(String methodName) {
119 return createTimer("IntentStore", methodName, "responseTime");
120 }
121
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800122 @Override
123 @Activate
124 public void activate() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800125 createIntentTimer = createResponseTimer("createIntent");
126 removeIntentTimer = createResponseTimer("removeIntent");
127 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
128 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
129 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
130 setStateTimer = createResponseTimer("setState");
131 getIntentCountTimer = createResponseTimer("getIntentCount");
132 getIntentsTimer = createResponseTimer("getIntents");
133 getIntentTimer = createResponseTimer("getIntent");
134 getIntentStateTimer = createResponseTimer("getIntentState");
135
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800136 // FIXME: We need a way to add serializer for intents which has been plugged-in.
137 // As a short term workaround, relax Kryo config to
138 // registrationRequired=false
139 super.activate();
140 super.serializer = new KryoSerializer() {
141
142 @Override
143 protected void setupKryoPool() {
144 serializerPool = KryoNamespace.newBuilder()
145 .setRegistrationRequired(false)
146 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800147 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
148 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800149 }
150
151 };
152
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800153 final Config config = theInstance.getConfig();
154
155 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
156 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
157
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800158 // TODO: enable near cache, allow read from backup for this IMap
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800159 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800160 intents = new SMap<>(rawIntents , super.serializer);
161
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800162 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
163 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
164
165 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800166 states = new SMap<>(rawStates , super.serializer);
167 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800168 listenerId = states.addEntryListener(listener , true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800169
170 transientStates.clear();
171
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800172 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
173 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
174
175 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800176 installable = new SMap<>(rawInstallables , super.serializer);
177
178 log.info("Started");
179 }
180
181 @Deactivate
182 public void deactivate() {
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800183 states.removeEntryListener(listenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800184 log.info("Stopped");
185 }
186
187 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800188 public MetricsService metricsService() {
189 return metricsService;
190 }
191
192 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800193 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800194 Context timer = startTimer(createIntentTimer);
195 try {
196 Intent existing = intents.putIfAbsent(intent.id(), intent);
197 if (existing != null) {
198 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800199 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800200 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800201 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800202 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800203 }
204 } finally {
205 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800206 }
207 }
208
209 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800210 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800211 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800212 checkState(getIntentState(intentId) == WITHDRAWN,
213 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800214 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800215 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800216 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800217 states.remove(intentId);
218 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800219 } finally {
220 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800221 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800222 }
223
224 @Override
225 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800226 Context timer = startTimer(getIntentCountTimer);
227 try {
228 return intents.size();
229 } finally {
230 stopTimer(timer);
231 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800232 }
233
234 @Override
235 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800236 Context timer = startTimer(getIntentsTimer);
237 try {
238 return ImmutableSet.copyOf(intents.values());
239 } finally {
240 stopTimer(timer);
241 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800242 }
243
244 @Override
245 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800246 Context timer = startTimer(getIntentTimer);
247 try {
248 return intents.get(intentId);
249 } finally {
250 stopTimer(timer);
251 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800252 }
253
254 @Override
255 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800256 Context timer = startTimer(getIntentStateTimer);
257 try {
258 final IntentState localState = transientStates.get(id);
259 if (localState != null) {
260 return localState;
261 }
262 return states.get(id);
263 } finally {
264 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800265 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800266 }
267
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800268 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
269 if (onlyLogTransitionError) {
270 if (!expression) {
271 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
272 }
273 } else {
274 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
275 }
276 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800277
278 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800279 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800280 Context timer = startTimer(setStateTimer);
281 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800282 final IntentId id = intent.id();
283 IntentEvent.Type type = null;
284 final IntentState prevParking;
285 boolean transientStateChangeOnly = false;
286
287 // parking state transition
288 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800289 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800290 prevParking = states.get(id);
291 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800292 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
293 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800294 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800295 verify(PRE_INSTALLED.contains(prevParking),
296 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800297 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800298 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
299 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800300 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800301 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800302 break;
303 case INSTALLED:
304 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800305 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800306 "Illegal state transition attempted from %s to INSTALLED",
307 prevParking);
308 type = IntentEvent.Type.INSTALLED;
309 break;
310 case FAILED:
311 prevParking = states.replace(id, FAILED);
312 type = IntentEvent.Type.FAILED;
313 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800314 case WITHDRAW_REQ:
315 prevParking = states.replace(id, WITHDRAW_REQ);
316 verify(PRE_WITHDRAWN.contains(prevParking),
317 "Illegal state transition attempted from %s to WITHDRAW_REQ",
318 prevParking);
319 type = IntentEvent.Type.WITHDRAW_REQ;
320 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800321 case WITHDRAWN:
322 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800323 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800324 "Illegal state transition attempted from %s to WITHDRAWN",
325 prevParking);
326 type = IntentEvent.Type.WITHDRAWN;
327 break;
328 default:
329 transientStateChangeOnly = true;
330 prevParking = null;
331 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800332 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800333 if (!transientStateChangeOnly) {
334 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
335 }
336 // Update instance local state, which includes non-parking state transition
337 final IntentState prevTransient = transientStates.put(id, state);
338 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800339
alshabiba9819bf2014-11-30 18:15:52 -0800340 if (type != null) {
341 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800342 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800343 } finally {
344 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800345 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800346 }
347
348 @Override
349 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800350 Context timer = startTimer(setInstallableIntentsTimer);
351 try {
352 installable.put(intentId, result);
353 } finally {
354 stopTimer(timer);
355 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800356 }
357
358 @Override
359 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800360 Context timer = startTimer(getInstallableIntentsTimer);
361 try {
362 return installable.get(intentId);
363 } finally {
364 stopTimer(timer);
365 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800366 }
367
368 @Override
369 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800370 Context timer = startTimer(removeInstalledIntentsTimer);
371 try {
372 installable.remove(intentId);
373 } finally {
374 stopTimer(timer);
375 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800376 }
377
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800378 @Override
379 public List<Operation> batchWrite(BatchWrite batch) {
380 // Hazelcast version will never fail for conditional failure now.
381 List<Operation> failed = new ArrayList<>();
382
383 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800384 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800385
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800386 batchWriteAsync(batch, failed, futures);
387
388 // verify result
389 verifyAsyncWrites(futures, failed, events);
390
391 notifyDelegate(events);
392
393 return failed;
394 }
395
396 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
397 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800398 for (Operation op : batch.operations()) {
399 switch (op.type()) {
400 case CREATE_INTENT:
401 checkArgument(op.args().size() == 1,
402 "CREATE_INTENT takes 1 argument. %s", op);
403 Intent intent = op.arg(0);
404 futures.add(Pair.of(op,
405 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800406 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800407 break;
408
409 case REMOVE_INTENT:
410 checkArgument(op.args().size() == 1,
411 "REMOVE_INTENT takes 1 argument. %s", op);
412 IntentId intentId = (IntentId) op.arg(0);
413 futures.add(Pair.of(op,
414 ImmutableList.of(intents.removeAsync(intentId),
415 states.removeAsync(intentId),
416 installable.removeAsync(intentId))));
417 break;
418
419 case SET_STATE:
420 checkArgument(op.args().size() == 2,
421 "SET_STATE takes 2 arguments. %s", op);
422 intent = op.arg(0);
423 IntentState newState = op.arg(1);
424 futures.add(Pair.of(op,
425 ImmutableList.of(states.putAsync(intent.id(), newState))));
426 break;
427
428 case SET_INSTALLABLE:
429 checkArgument(op.args().size() == 2,
430 "SET_INSTALLABLE takes 2 arguments. %s", op);
431 intentId = op.arg(0);
432 List<Intent> installableIntents = op.arg(1);
433 futures.add(Pair.of(op,
434 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
435 break;
436
437 case REMOVE_INSTALLED:
438 checkArgument(op.args().size() == 1,
439 "REMOVE_INSTALLED takes 1 argument. %s", op);
440 intentId = op.arg(0);
441 futures.add(Pair.of(op,
442 ImmutableList.of(installable.removeAsync(intentId))));
443 break;
444
445 default:
446 log.warn("Unknown Operation encountered: {}", op);
447 failed.add(op);
448 break;
449 }
450 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800451 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800452
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800453 /**
454 * Checks the async write result Futures and prepare Events to post.
455 *
456 * @param futures async write Futures
457 * @param failed list to output failed batch write operations
458 * @param events list to output events to post as result of writes
459 */
460 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
461 List<Operation> failed,
462 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800463 for (Pair<Operation, List<Future<?>>> future : futures) {
464 final Operation op = future.getLeft();
465 final List<Future<?>> subops = future.getRight();
466
467 switch (op.type()) {
468
469 case CREATE_INTENT:
470 {
471 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800472 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800473
474 try {
475 Intent prevIntent = (Intent) subops.get(0).get();
476 IntentState prevIntentState = (IntentState) subops.get(1).get();
477
478 if (prevIntent != null || prevIntentState != null) {
479 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
480 prevIntent, prevIntentState,
481 intent, newIntentState);
482 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800483 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800484 } catch (InterruptedException e) {
485 log.error("Batch write was interrupted while processing {}", op, e);
486 failed.add(op);
487 Thread.currentThread().interrupt();
488 } catch (ExecutionException e) {
489 log.error("Batch write failed processing {}", op, e);
490 failed.add(op);
491 }
492 break;
493 }
494
495 case REMOVE_INTENT:
496 {
497 IntentId intentId = op.arg(0);
498
499 try {
500 Intent prevIntent = (Intent) subops.get(0).get();
501 IntentState prevIntentState = (IntentState) subops.get(1).get();
502 @SuppressWarnings("unchecked")
503 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
504
505 if (prevIntent == null) {
506 log.warn("Intent {} was already removed.", intentId);
507 }
508 if (prevIntentState == null) {
509 log.warn("Intent {} state was already removed", intentId);
510 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800511 if (prevInstallable != null) {
512 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800513 }
514 } catch (InterruptedException e) {
515 log.error("Batch write was interrupted while processing {}", op, e);
516 failed.add(op);
517 Thread.currentThread().interrupt();
518 } catch (ExecutionException e) {
519 log.error("Batch write failed processing {}", op, e);
520 failed.add(op);
521 }
522 break;
523 }
524
525 case SET_STATE:
526 {
527 Intent intent = op.arg(0);
528 IntentId intentId = intent.id();
529 IntentState newState = op.arg(1);
530
531 try {
532 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800533
534 if (PARKING.contains(newState)) {
535 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800536 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800537 }
alshabiba9819bf2014-11-30 18:15:52 -0800538
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800539 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
540 // TODO sanity check and log?
541 } catch (InterruptedException e) {
542 log.error("Batch write was interrupted while processing {}", op, e);
543 failed.add(op);
544 Thread.currentThread().interrupt();
545 } catch (ExecutionException e) {
546 log.error("Batch write failed processing {}", op, e);
547 failed.add(op);
548 }
549 break;
550 }
551
552 case SET_INSTALLABLE:
553 {
554 IntentId intentId = op.arg(0);
555 List<Intent> installableIntents = op.arg(1);
556
557 try {
558 @SuppressWarnings("unchecked")
559 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
560
561 if (prevInstallable != null) {
562 log.warn("Overwriting Intent {} installable {} -> {}",
563 intentId, prevInstallable, installableIntents);
564 }
565 } catch (InterruptedException e) {
566 log.error("Batch write was interrupted while processing {}", op, e);
567 failed.add(op);
568 Thread.currentThread().interrupt();
569 } catch (ExecutionException e) {
570 log.error("Batch write failed processing {}", op, e);
571 failed.add(op);
572 }
573 break;
574 }
575
576 case REMOVE_INSTALLED:
577 {
578 IntentId intentId = op.arg(0);
579
580 try {
581 @SuppressWarnings("unchecked")
582 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
583
584 if (prevInstallable == null) {
585 log.warn("Intent {} installable was already removed", intentId);
586 }
587 } catch (InterruptedException e) {
588 log.error("Batch write was interrupted while processing {}", op, e);
589 failed.add(op);
590 Thread.currentThread().interrupt();
591 } catch (ExecutionException e) {
592 log.error("Batch write failed processing {}", op, e);
593 failed.add(op);
594 }
595 break;
596 }
597
598 default:
599 log.warn("Unknown Operation encountered: {}", op);
600 if (!failed.contains(op)) {
601 failed.add(op);
602 }
603 break;
604 }
605 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800606 }
607
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800608 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
609
610 @Override
611 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
612 final Member myself = theInstance.getCluster().getLocalMember();
613 if (!myself.equals(event.getMember())) {
614 // When Intent state was modified by remote node,
615 // clear local transient state.
616 final IntentId intentId = event.getKey();
617 IntentState oldState = transientStates.remove(intentId);
618 if (oldState != null) {
619 log.debug("{} state updated remotely, removing transient state {}",
620 intentId, oldState);
621 }
alshabiba9819bf2014-11-30 18:15:52 -0800622
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800623 if (event.getValue() != null) {
624 // notify if this is not entry removed event
625 notifyDelegate(IntentEvent.getEvent(event.getValue(), getIntent(intentId)));
626 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800627 }
628 }
629 }
630}