blob: c353b9f5002942714932211a6b99faad07b717b1 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
31
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080032import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080033import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080036import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080038import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080039import org.onlab.metrics.MetricsService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080041import org.onosproject.net.intent.BatchWrite;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.intent.Intent;
43import org.onosproject.net.intent.IntentEvent;
44import org.onosproject.net.intent.IntentId;
45import org.onosproject.net.intent.IntentState;
46import org.onosproject.net.intent.IntentStore;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080047import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.intent.IntentStoreDelegate;
49import org.onosproject.store.hz.AbstractHazelcastStore;
50import org.onosproject.store.hz.SMap;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import org.onlab.util.KryoNamespace;
54import org.slf4j.Logger;
55
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080056import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080057import java.util.EnumSet;
58import java.util.List;
59import java.util.Map;
60import java.util.Set;
61import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080062import java.util.concurrent.ExecutionException;
63import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080064
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080065import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080066import static com.google.common.base.Preconditions.checkState;
Brian O'Connorabafb502014-12-02 22:26:20 -080067import static org.onosproject.net.intent.IntentState.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080068import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080069import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080070
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080071@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080072@Service
73public class HazelcastIntentStore
74 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080075 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080076
77 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080078 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080079
80 /** Valid parking state, which can transition to WITHDRAWN. */
81 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
82
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080083 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080084
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080085 private final Logger log = getLogger(getClass());
86
87 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080088 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080089 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080090 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080091 private SMap<IntentId, IntentState> states;
92
93 // Map to store instance local intermediate state transition
94 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
95
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080096 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080097 private SMap<IntentId, List<Intent>> installable;
98
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected MetricsService metricsService;
101
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800102 // TODO make this configurable
103 private boolean onlyLogTransitionError = true;
104
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800105 private Timer createIntentTimer;
106 private Timer removeIntentTimer;
107 private Timer setInstallableIntentsTimer;
108 private Timer getInstallableIntentsTimer;
109 private Timer removeInstalledIntentsTimer;
110 private Timer setStateTimer;
111 private Timer getIntentCountTimer;
112 private Timer getIntentsTimer;
113 private Timer getIntentTimer;
114 private Timer getIntentStateTimer;
115
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800116 // manual near cache of Intent
117 // (Note: IntentId -> Intent is expected to be immutable)
118 // entry will be evicted, when state for that IntentId is removed.
119 private Map<IntentId, Intent> localIntents;
120
121 private String stateListenerId;
122
123 private String intentsListenerId;
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800124
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800125 private Timer createResponseTimer(String methodName) {
126 return createTimer("IntentStore", methodName, "responseTime");
127 }
128
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800129 @Override
130 @Activate
131 public void activate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800132 localIntents = new ConcurrentHashMap<>();
133
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800134 createIntentTimer = createResponseTimer("createIntent");
135 removeIntentTimer = createResponseTimer("removeIntent");
136 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
137 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
138 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
139 setStateTimer = createResponseTimer("setState");
140 getIntentCountTimer = createResponseTimer("getIntentCount");
141 getIntentsTimer = createResponseTimer("getIntents");
142 getIntentTimer = createResponseTimer("getIntent");
143 getIntentStateTimer = createResponseTimer("getIntentState");
144
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800145 // FIXME: We need a way to add serializer for intents which has been plugged-in.
146 // As a short term workaround, relax Kryo config to
147 // registrationRequired=false
148 super.activate();
149 super.serializer = new KryoSerializer() {
150
151 @Override
152 protected void setupKryoPool() {
153 serializerPool = KryoNamespace.newBuilder()
154 .setRegistrationRequired(false)
155 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800156 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
157 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800158 }
159
160 };
161
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800162 final Config config = theInstance.getConfig();
163
164 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
165 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
166
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800167 // TODO: enable near cache, allow read from backup for this IMap
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800168 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800169 intents = new SMap<>(rawIntents , super.serializer);
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800170 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800171
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800172 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
173 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
174
175 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800176 states = new SMap<>(rawStates , super.serializer);
177 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800178 stateListenerId = states.addEntryListener(listener, true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800179
180 transientStates.clear();
181
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800182 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
183 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
184
185 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800186 installable = new SMap<>(rawInstallables , super.serializer);
187
188 log.info("Started");
189 }
190
191 @Deactivate
192 public void deactivate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800193 intents.removeEntryListener(intentsListenerId);
194 states.removeEntryListener(stateListenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800195 log.info("Stopped");
196 }
197
198 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800199 public MetricsService metricsService() {
200 return metricsService;
201 }
202
203 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800204 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800205 Context timer = startTimer(createIntentTimer);
206 try {
207 Intent existing = intents.putIfAbsent(intent.id(), intent);
208 if (existing != null) {
209 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800210 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800211 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800212 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800213 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800214 }
215 } finally {
216 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800217 }
218 }
219
220 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800221 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800222 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800223 checkState(getIntentState(intentId) == WITHDRAWN,
224 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800225 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800226 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800227 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800228 states.remove(intentId);
229 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800230 } finally {
231 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800232 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800233 }
234
235 @Override
236 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800237 Context timer = startTimer(getIntentCountTimer);
238 try {
239 return intents.size();
240 } finally {
241 stopTimer(timer);
242 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800243 }
244
245 @Override
246 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800247 Context timer = startTimer(getIntentsTimer);
248 try {
249 return ImmutableSet.copyOf(intents.values());
250 } finally {
251 stopTimer(timer);
252 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800253 }
254
255 @Override
256 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800257 Context timer = startTimer(getIntentTimer);
258 try {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800259 Intent intent = localIntents.get(intentId);
260 if (intent != null) {
261 return intent;
262 }
263 intent = intents.get(intentId);
264 if (intent != null) {
265 localIntents.put(intentId, intent);
266 }
267 return intent;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800268 } finally {
269 stopTimer(timer);
270 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800271 }
272
273 @Override
274 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800275 Context timer = startTimer(getIntentStateTimer);
276 try {
277 final IntentState localState = transientStates.get(id);
278 if (localState != null) {
279 return localState;
280 }
281 return states.get(id);
282 } finally {
283 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800284 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800285 }
286
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800287 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
288 if (onlyLogTransitionError) {
289 if (!expression) {
290 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
291 }
292 } else {
293 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
294 }
295 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800296
297 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800298 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800299 Context timer = startTimer(setStateTimer);
300 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800301 final IntentId id = intent.id();
302 IntentEvent.Type type = null;
303 final IntentState prevParking;
304 boolean transientStateChangeOnly = false;
305
306 // parking state transition
307 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800308 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800309 prevParking = states.get(id);
310 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800311 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
312 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800313 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800314 verify(PRE_INSTALLED.contains(prevParking),
315 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800316 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800317 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
318 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800319 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800320 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800321 break;
322 case INSTALLED:
323 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800324 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800325 "Illegal state transition attempted from %s to INSTALLED",
326 prevParking);
327 type = IntentEvent.Type.INSTALLED;
328 break;
329 case FAILED:
330 prevParking = states.replace(id, FAILED);
331 type = IntentEvent.Type.FAILED;
332 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800333 case WITHDRAW_REQ:
334 prevParking = states.replace(id, WITHDRAW_REQ);
335 verify(PRE_WITHDRAWN.contains(prevParking),
336 "Illegal state transition attempted from %s to WITHDRAW_REQ",
337 prevParking);
338 type = IntentEvent.Type.WITHDRAW_REQ;
339 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800340 case WITHDRAWN:
341 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800342 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800343 "Illegal state transition attempted from %s to WITHDRAWN",
344 prevParking);
345 type = IntentEvent.Type.WITHDRAWN;
346 break;
347 default:
348 transientStateChangeOnly = true;
349 prevParking = null;
350 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800351 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800352 if (!transientStateChangeOnly) {
353 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
354 }
355 // Update instance local state, which includes non-parking state transition
356 final IntentState prevTransient = transientStates.put(id, state);
357 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800358
alshabiba9819bf2014-11-30 18:15:52 -0800359 if (type != null) {
360 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800361 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800362 } finally {
363 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800364 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800365 }
366
367 @Override
368 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800369 Context timer = startTimer(setInstallableIntentsTimer);
370 try {
371 installable.put(intentId, result);
372 } finally {
373 stopTimer(timer);
374 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800375 }
376
377 @Override
378 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800379 Context timer = startTimer(getInstallableIntentsTimer);
380 try {
381 return installable.get(intentId);
382 } finally {
383 stopTimer(timer);
384 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800385 }
386
387 @Override
388 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800389 Context timer = startTimer(removeInstalledIntentsTimer);
390 try {
391 installable.remove(intentId);
392 } finally {
393 stopTimer(timer);
394 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800395 }
396
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800397 @Override
398 public List<Operation> batchWrite(BatchWrite batch) {
399 // Hazelcast version will never fail for conditional failure now.
400 List<Operation> failed = new ArrayList<>();
401
402 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800403 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800404
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800405 batchWriteAsync(batch, failed, futures);
406
407 // verify result
408 verifyAsyncWrites(futures, failed, events);
409
410 notifyDelegate(events);
411
412 return failed;
413 }
414
415 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
416 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800417 for (Operation op : batch.operations()) {
418 switch (op.type()) {
419 case CREATE_INTENT:
420 checkArgument(op.args().size() == 1,
421 "CREATE_INTENT takes 1 argument. %s", op);
422 Intent intent = op.arg(0);
423 futures.add(Pair.of(op,
424 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800425 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800426 break;
427
428 case REMOVE_INTENT:
429 checkArgument(op.args().size() == 1,
430 "REMOVE_INTENT takes 1 argument. %s", op);
431 IntentId intentId = (IntentId) op.arg(0);
432 futures.add(Pair.of(op,
433 ImmutableList.of(intents.removeAsync(intentId),
434 states.removeAsync(intentId),
435 installable.removeAsync(intentId))));
436 break;
437
438 case SET_STATE:
439 checkArgument(op.args().size() == 2,
440 "SET_STATE takes 2 arguments. %s", op);
441 intent = op.arg(0);
442 IntentState newState = op.arg(1);
443 futures.add(Pair.of(op,
444 ImmutableList.of(states.putAsync(intent.id(), newState))));
445 break;
446
447 case SET_INSTALLABLE:
448 checkArgument(op.args().size() == 2,
449 "SET_INSTALLABLE takes 2 arguments. %s", op);
450 intentId = op.arg(0);
451 List<Intent> installableIntents = op.arg(1);
452 futures.add(Pair.of(op,
453 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
454 break;
455
456 case REMOVE_INSTALLED:
457 checkArgument(op.args().size() == 1,
458 "REMOVE_INSTALLED takes 1 argument. %s", op);
459 intentId = op.arg(0);
460 futures.add(Pair.of(op,
461 ImmutableList.of(installable.removeAsync(intentId))));
462 break;
463
464 default:
465 log.warn("Unknown Operation encountered: {}", op);
466 failed.add(op);
467 break;
468 }
469 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800470 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800471
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800472 /**
473 * Checks the async write result Futures and prepare Events to post.
474 *
475 * @param futures async write Futures
476 * @param failed list to output failed batch write operations
477 * @param events list to output events to post as result of writes
478 */
479 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
480 List<Operation> failed,
481 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800482 for (Pair<Operation, List<Future<?>>> future : futures) {
483 final Operation op = future.getLeft();
484 final List<Future<?>> subops = future.getRight();
485
486 switch (op.type()) {
487
488 case CREATE_INTENT:
489 {
490 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800491 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800492
493 try {
494 Intent prevIntent = (Intent) subops.get(0).get();
495 IntentState prevIntentState = (IntentState) subops.get(1).get();
496
497 if (prevIntent != null || prevIntentState != null) {
498 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
499 prevIntent, prevIntentState,
500 intent, newIntentState);
501 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800502 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800503 } catch (InterruptedException e) {
504 log.error("Batch write was interrupted while processing {}", op, e);
505 failed.add(op);
506 Thread.currentThread().interrupt();
507 } catch (ExecutionException e) {
508 log.error("Batch write failed processing {}", op, e);
509 failed.add(op);
510 }
511 break;
512 }
513
514 case REMOVE_INTENT:
515 {
516 IntentId intentId = op.arg(0);
517
518 try {
519 Intent prevIntent = (Intent) subops.get(0).get();
520 IntentState prevIntentState = (IntentState) subops.get(1).get();
521 @SuppressWarnings("unchecked")
522 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
523
524 if (prevIntent == null) {
525 log.warn("Intent {} was already removed.", intentId);
526 }
527 if (prevIntentState == null) {
528 log.warn("Intent {} state was already removed", intentId);
529 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800530 if (prevInstallable != null) {
531 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800532 }
533 } catch (InterruptedException e) {
534 log.error("Batch write was interrupted while processing {}", op, e);
535 failed.add(op);
536 Thread.currentThread().interrupt();
537 } catch (ExecutionException e) {
538 log.error("Batch write failed processing {}", op, e);
539 failed.add(op);
540 }
541 break;
542 }
543
544 case SET_STATE:
545 {
546 Intent intent = op.arg(0);
547 IntentId intentId = intent.id();
548 IntentState newState = op.arg(1);
549
550 try {
551 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800552
553 if (PARKING.contains(newState)) {
554 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800555 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800556 }
alshabiba9819bf2014-11-30 18:15:52 -0800557
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800558 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
559 // TODO sanity check and log?
560 } catch (InterruptedException e) {
561 log.error("Batch write was interrupted while processing {}", op, e);
562 failed.add(op);
563 Thread.currentThread().interrupt();
564 } catch (ExecutionException e) {
565 log.error("Batch write failed processing {}", op, e);
566 failed.add(op);
567 }
568 break;
569 }
570
571 case SET_INSTALLABLE:
572 {
573 IntentId intentId = op.arg(0);
574 List<Intent> installableIntents = op.arg(1);
575
576 try {
577 @SuppressWarnings("unchecked")
578 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
579
580 if (prevInstallable != null) {
581 log.warn("Overwriting Intent {} installable {} -> {}",
582 intentId, prevInstallable, installableIntents);
583 }
584 } catch (InterruptedException e) {
585 log.error("Batch write was interrupted while processing {}", op, e);
586 failed.add(op);
587 Thread.currentThread().interrupt();
588 } catch (ExecutionException e) {
589 log.error("Batch write failed processing {}", op, e);
590 failed.add(op);
591 }
592 break;
593 }
594
595 case REMOVE_INSTALLED:
596 {
597 IntentId intentId = op.arg(0);
598
599 try {
600 @SuppressWarnings("unchecked")
601 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
602
603 if (prevInstallable == null) {
604 log.warn("Intent {} installable was already removed", intentId);
605 }
606 } catch (InterruptedException e) {
607 log.error("Batch write was interrupted while processing {}", op, e);
608 failed.add(op);
609 Thread.currentThread().interrupt();
610 } catch (ExecutionException e) {
611 log.error("Batch write failed processing {}", op, e);
612 failed.add(op);
613 }
614 break;
615 }
616
617 default:
618 log.warn("Unknown Operation encountered: {}", op);
619 if (!failed.contains(op)) {
620 failed.add(op);
621 }
622 break;
623 }
624 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800625 }
626
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800627 public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
628
629 @Override
630 public void entryAdded(EntryEvent<IntentId, Intent> event) {
631 localIntents.put(event.getKey(), event.getValue());
632 }
633
634 @Override
635 public void entryUpdated(EntryEvent<IntentId, Intent> event) {
636 entryAdded(event);
637 }
638 }
639
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800640 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
641
642 @Override
643 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800644 final IntentId intentId = event.getKey();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800645 final Member myself = theInstance.getCluster().getLocalMember();
646 if (!myself.equals(event.getMember())) {
647 // When Intent state was modified by remote node,
648 // clear local transient state.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800649 IntentState oldState = transientStates.remove(intentId);
650 if (oldState != null) {
651 log.debug("{} state updated remotely, removing transient state {}",
652 intentId, oldState);
653 }
alshabiba9819bf2014-11-30 18:15:52 -0800654
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800655 if (event.getValue() != null) {
656 // notify if this is not entry removed event
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800657
658 final Intent intent = getIntent(intentId);
659 if (intent == null) {
660 log.warn("no Intent found for {} on Event {}", intentId, event);
661 return;
662 }
663 notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
664 // remove IntentCache
665 localIntents.remove(intentId, intent);
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800666 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800667 }
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800668
669 // populate manual near cache, to prepare for
670 // transition event to WITHDRAWN
671 getIntent(intentId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800672 }
673 }
674}