blob: 39255c818d732180592de342b338a82c0ae91efc [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
31
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080032import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080033import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080036import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080038import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080039import org.onlab.metrics.MetricsService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080041import org.onosproject.net.intent.BatchWrite;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.intent.Intent;
43import org.onosproject.net.intent.IntentEvent;
44import org.onosproject.net.intent.IntentId;
45import org.onosproject.net.intent.IntentState;
46import org.onosproject.net.intent.IntentStore;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080047import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.intent.IntentStoreDelegate;
49import org.onosproject.store.hz.AbstractHazelcastStore;
50import org.onosproject.store.hz.SMap;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import org.onlab.util.KryoNamespace;
54import org.slf4j.Logger;
55
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080056import java.util.ArrayList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080057import java.util.EnumSet;
58import java.util.List;
59import java.util.Map;
60import java.util.Set;
61import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080062import java.util.concurrent.ExecutionException;
63import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080064
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080065import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080066import static com.google.common.base.Preconditions.checkState;
Brian O'Connorabafb502014-12-02 22:26:20 -080067import static org.onosproject.net.intent.IntentState.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080068import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080069import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080070
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080071@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080072@Service
73public class HazelcastIntentStore
74 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080075 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080076
77 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080078 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080079
80 /** Valid parking state, which can transition to WITHDRAWN. */
81 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
82
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080083 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080084
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080085 private final Logger log = getLogger(getClass());
86
87 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080088 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080089 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080090 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080091 private SMap<IntentId, IntentState> states;
92
93 // Map to store instance local intermediate state transition
94 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
95
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080096 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080097 private SMap<IntentId, List<Intent>> installable;
98
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080099 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected MetricsService metricsService;
101
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800102 private boolean onlyLogTransitionError = true;
103
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800104 private Timer createIntentTimer;
105 private Timer removeIntentTimer;
106 private Timer setInstallableIntentsTimer;
107 private Timer getInstallableIntentsTimer;
108 private Timer removeInstalledIntentsTimer;
109 private Timer setStateTimer;
110 private Timer getIntentCountTimer;
111 private Timer getIntentsTimer;
112 private Timer getIntentTimer;
113 private Timer getIntentStateTimer;
114
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800115 // manual near cache of Intent
116 // (Note: IntentId -> Intent is expected to be immutable)
117 // entry will be evicted, when state for that IntentId is removed.
118 private Map<IntentId, Intent> localIntents;
119
120 private String stateListenerId;
121
122 private String intentsListenerId;
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800123
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800124 private Timer createResponseTimer(String methodName) {
125 return createTimer("IntentStore", methodName, "responseTime");
126 }
127
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800128 @Override
129 @Activate
130 public void activate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800131 localIntents = new ConcurrentHashMap<>();
132
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800133 createIntentTimer = createResponseTimer("createIntent");
134 removeIntentTimer = createResponseTimer("removeIntent");
135 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
136 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
137 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
138 setStateTimer = createResponseTimer("setState");
139 getIntentCountTimer = createResponseTimer("getIntentCount");
140 getIntentsTimer = createResponseTimer("getIntents");
141 getIntentTimer = createResponseTimer("getIntent");
142 getIntentStateTimer = createResponseTimer("getIntentState");
143
Brian O'Connor44008532014-12-04 16:41:36 -0800144 // We need a way to add serializer for intents which has been plugged-in.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800145 // As a short term workaround, relax Kryo config to
146 // registrationRequired=false
147 super.activate();
148 super.serializer = new KryoSerializer() {
149
150 @Override
151 protected void setupKryoPool() {
152 serializerPool = KryoNamespace.newBuilder()
153 .setRegistrationRequired(false)
154 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800155 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
156 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800157 }
158
159 };
160
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800161 final Config config = theInstance.getConfig();
162
163 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
164 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
165
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800166 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800167 intents = new SMap<>(rawIntents , super.serializer);
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800168 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800169
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800170 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
171 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
172
173 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800174 states = new SMap<>(rawStates , super.serializer);
175 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800176 stateListenerId = states.addEntryListener(listener, true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800177
178 transientStates.clear();
179
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800180 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
181 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
182
183 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800184 installable = new SMap<>(rawInstallables , super.serializer);
185
186 log.info("Started");
187 }
188
189 @Deactivate
190 public void deactivate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800191 intents.removeEntryListener(intentsListenerId);
192 states.removeEntryListener(stateListenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800193 log.info("Stopped");
194 }
195
196 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800197 public MetricsService metricsService() {
198 return metricsService;
199 }
200
201 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800202 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800203 Context timer = startTimer(createIntentTimer);
204 try {
205 Intent existing = intents.putIfAbsent(intent.id(), intent);
206 if (existing != null) {
207 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800208 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800209 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800210 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800211 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800212 }
213 } finally {
214 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800215 }
216 }
217
218 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800219 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800220 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800221 checkState(getIntentState(intentId) == WITHDRAWN,
222 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800223 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800224 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800225 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800226 states.remove(intentId);
227 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800228 } finally {
229 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800230 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800231 }
232
233 @Override
234 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800235 Context timer = startTimer(getIntentCountTimer);
236 try {
237 return intents.size();
238 } finally {
239 stopTimer(timer);
240 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800241 }
242
243 @Override
244 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800245 Context timer = startTimer(getIntentsTimer);
246 try {
247 return ImmutableSet.copyOf(intents.values());
248 } finally {
249 stopTimer(timer);
250 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800251 }
252
253 @Override
254 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800255 Context timer = startTimer(getIntentTimer);
256 try {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800257 Intent intent = localIntents.get(intentId);
258 if (intent != null) {
259 return intent;
260 }
261 intent = intents.get(intentId);
262 if (intent != null) {
263 localIntents.put(intentId, intent);
264 }
265 return intent;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800266 } finally {
267 stopTimer(timer);
268 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800269 }
270
271 @Override
272 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800273 Context timer = startTimer(getIntentStateTimer);
274 try {
275 final IntentState localState = transientStates.get(id);
276 if (localState != null) {
277 return localState;
278 }
279 return states.get(id);
280 } finally {
281 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800282 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800283 }
284
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800285 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
286 if (onlyLogTransitionError) {
287 if (!expression) {
288 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
289 }
290 } else {
291 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
292 }
293 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800294
295 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800296 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800297 Context timer = startTimer(setStateTimer);
298 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800299 final IntentId id = intent.id();
300 IntentEvent.Type type = null;
301 final IntentState prevParking;
302 boolean transientStateChangeOnly = false;
303
304 // parking state transition
305 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800306 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800307 prevParking = states.get(id);
308 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800309 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
310 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800311 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800312 verify(PRE_INSTALLED.contains(prevParking),
313 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800314 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800315 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
316 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800317 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800318 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800319 break;
320 case INSTALLED:
321 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800322 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800323 "Illegal state transition attempted from %s to INSTALLED",
324 prevParking);
325 type = IntentEvent.Type.INSTALLED;
326 break;
327 case FAILED:
328 prevParking = states.replace(id, FAILED);
329 type = IntentEvent.Type.FAILED;
330 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800331 case WITHDRAW_REQ:
332 prevParking = states.replace(id, WITHDRAW_REQ);
333 verify(PRE_WITHDRAWN.contains(prevParking),
334 "Illegal state transition attempted from %s to WITHDRAW_REQ",
335 prevParking);
336 type = IntentEvent.Type.WITHDRAW_REQ;
337 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800338 case WITHDRAWN:
339 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800340 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800341 "Illegal state transition attempted from %s to WITHDRAWN",
342 prevParking);
343 type = IntentEvent.Type.WITHDRAWN;
344 break;
345 default:
346 transientStateChangeOnly = true;
347 prevParking = null;
348 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800349 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800350 if (!transientStateChangeOnly) {
351 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
352 }
353 // Update instance local state, which includes non-parking state transition
354 final IntentState prevTransient = transientStates.put(id, state);
355 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800356
alshabiba9819bf2014-11-30 18:15:52 -0800357 if (type != null) {
358 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800359 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800360 } finally {
361 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800362 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800363 }
364
365 @Override
366 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800367 Context timer = startTimer(setInstallableIntentsTimer);
368 try {
369 installable.put(intentId, result);
370 } finally {
371 stopTimer(timer);
372 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800373 }
374
375 @Override
376 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800377 Context timer = startTimer(getInstallableIntentsTimer);
378 try {
379 return installable.get(intentId);
380 } finally {
381 stopTimer(timer);
382 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800383 }
384
385 @Override
386 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800387 Context timer = startTimer(removeInstalledIntentsTimer);
388 try {
389 installable.remove(intentId);
390 } finally {
391 stopTimer(timer);
392 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800393 }
394
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800395 @Override
396 public List<Operation> batchWrite(BatchWrite batch) {
397 // Hazelcast version will never fail for conditional failure now.
398 List<Operation> failed = new ArrayList<>();
399
400 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800401 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800402
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800403 batchWriteAsync(batch, failed, futures);
404
405 // verify result
406 verifyAsyncWrites(futures, failed, events);
407
408 notifyDelegate(events);
409
410 return failed;
411 }
412
413 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
414 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800415 for (Operation op : batch.operations()) {
416 switch (op.type()) {
417 case CREATE_INTENT:
418 checkArgument(op.args().size() == 1,
419 "CREATE_INTENT takes 1 argument. %s", op);
420 Intent intent = op.arg(0);
421 futures.add(Pair.of(op,
422 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800423 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800424 break;
425
426 case REMOVE_INTENT:
427 checkArgument(op.args().size() == 1,
428 "REMOVE_INTENT takes 1 argument. %s", op);
429 IntentId intentId = (IntentId) op.arg(0);
430 futures.add(Pair.of(op,
431 ImmutableList.of(intents.removeAsync(intentId),
432 states.removeAsync(intentId),
433 installable.removeAsync(intentId))));
434 break;
435
436 case SET_STATE:
437 checkArgument(op.args().size() == 2,
438 "SET_STATE takes 2 arguments. %s", op);
439 intent = op.arg(0);
440 IntentState newState = op.arg(1);
441 futures.add(Pair.of(op,
442 ImmutableList.of(states.putAsync(intent.id(), newState))));
443 break;
444
445 case SET_INSTALLABLE:
446 checkArgument(op.args().size() == 2,
447 "SET_INSTALLABLE takes 2 arguments. %s", op);
448 intentId = op.arg(0);
449 List<Intent> installableIntents = op.arg(1);
450 futures.add(Pair.of(op,
451 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
452 break;
453
454 case REMOVE_INSTALLED:
455 checkArgument(op.args().size() == 1,
456 "REMOVE_INSTALLED takes 1 argument. %s", op);
457 intentId = op.arg(0);
458 futures.add(Pair.of(op,
459 ImmutableList.of(installable.removeAsync(intentId))));
460 break;
461
462 default:
463 log.warn("Unknown Operation encountered: {}", op);
464 failed.add(op);
465 break;
466 }
467 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800468 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800469
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800470 /**
471 * Checks the async write result Futures and prepare Events to post.
472 *
473 * @param futures async write Futures
474 * @param failed list to output failed batch write operations
475 * @param events list to output events to post as result of writes
476 */
477 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
478 List<Operation> failed,
479 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800480 for (Pair<Operation, List<Future<?>>> future : futures) {
481 final Operation op = future.getLeft();
482 final List<Future<?>> subops = future.getRight();
483
484 switch (op.type()) {
485
486 case CREATE_INTENT:
487 {
488 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800489 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800490
491 try {
492 Intent prevIntent = (Intent) subops.get(0).get();
493 IntentState prevIntentState = (IntentState) subops.get(1).get();
494
495 if (prevIntent != null || prevIntentState != null) {
496 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
497 prevIntent, prevIntentState,
498 intent, newIntentState);
499 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800500 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800501 } catch (InterruptedException e) {
502 log.error("Batch write was interrupted while processing {}", op, e);
503 failed.add(op);
504 Thread.currentThread().interrupt();
505 } catch (ExecutionException e) {
506 log.error("Batch write failed processing {}", op, e);
507 failed.add(op);
508 }
509 break;
510 }
511
512 case REMOVE_INTENT:
513 {
514 IntentId intentId = op.arg(0);
515
516 try {
517 Intent prevIntent = (Intent) subops.get(0).get();
518 IntentState prevIntentState = (IntentState) subops.get(1).get();
519 @SuppressWarnings("unchecked")
520 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
521
522 if (prevIntent == null) {
523 log.warn("Intent {} was already removed.", intentId);
524 }
525 if (prevIntentState == null) {
526 log.warn("Intent {} state was already removed", intentId);
527 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800528 if (prevInstallable != null) {
529 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800530 }
531 } catch (InterruptedException e) {
532 log.error("Batch write was interrupted while processing {}", op, e);
533 failed.add(op);
534 Thread.currentThread().interrupt();
535 } catch (ExecutionException e) {
536 log.error("Batch write failed processing {}", op, e);
537 failed.add(op);
538 }
539 break;
540 }
541
542 case SET_STATE:
543 {
544 Intent intent = op.arg(0);
545 IntentId intentId = intent.id();
546 IntentState newState = op.arg(1);
547
548 try {
549 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800550
551 if (PARKING.contains(newState)) {
552 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800553 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800554 }
alshabiba9819bf2014-11-30 18:15:52 -0800555
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800556 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800557 } catch (InterruptedException e) {
558 log.error("Batch write was interrupted while processing {}", op, e);
559 failed.add(op);
560 Thread.currentThread().interrupt();
561 } catch (ExecutionException e) {
562 log.error("Batch write failed processing {}", op, e);
563 failed.add(op);
564 }
565 break;
566 }
567
568 case SET_INSTALLABLE:
569 {
570 IntentId intentId = op.arg(0);
571 List<Intent> installableIntents = op.arg(1);
572
573 try {
574 @SuppressWarnings("unchecked")
575 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
576
577 if (prevInstallable != null) {
578 log.warn("Overwriting Intent {} installable {} -> {}",
579 intentId, prevInstallable, installableIntents);
580 }
581 } catch (InterruptedException e) {
582 log.error("Batch write was interrupted while processing {}", op, e);
583 failed.add(op);
584 Thread.currentThread().interrupt();
585 } catch (ExecutionException e) {
586 log.error("Batch write failed processing {}", op, e);
587 failed.add(op);
588 }
589 break;
590 }
591
592 case REMOVE_INSTALLED:
593 {
594 IntentId intentId = op.arg(0);
595
596 try {
597 @SuppressWarnings("unchecked")
598 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
599
600 if (prevInstallable == null) {
601 log.warn("Intent {} installable was already removed", intentId);
602 }
603 } catch (InterruptedException e) {
604 log.error("Batch write was interrupted while processing {}", op, e);
605 failed.add(op);
606 Thread.currentThread().interrupt();
607 } catch (ExecutionException e) {
608 log.error("Batch write failed processing {}", op, e);
609 failed.add(op);
610 }
611 break;
612 }
613
614 default:
615 log.warn("Unknown Operation encountered: {}", op);
616 if (!failed.contains(op)) {
617 failed.add(op);
618 }
619 break;
620 }
621 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800622 }
623
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800624 public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
625
626 @Override
627 public void entryAdded(EntryEvent<IntentId, Intent> event) {
628 localIntents.put(event.getKey(), event.getValue());
629 }
630
631 @Override
632 public void entryUpdated(EntryEvent<IntentId, Intent> event) {
633 entryAdded(event);
634 }
635 }
636
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800637 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
638
639 @Override
640 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800641 final IntentId intentId = event.getKey();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800642 final Member myself = theInstance.getCluster().getLocalMember();
643 if (!myself.equals(event.getMember())) {
644 // When Intent state was modified by remote node,
645 // clear local transient state.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800646 IntentState oldState = transientStates.remove(intentId);
647 if (oldState != null) {
648 log.debug("{} state updated remotely, removing transient state {}",
649 intentId, oldState);
650 }
alshabiba9819bf2014-11-30 18:15:52 -0800651
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800652 if (event.getValue() != null) {
653 // notify if this is not entry removed event
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800654
655 final Intent intent = getIntent(intentId);
656 if (intent == null) {
657 log.warn("no Intent found for {} on Event {}", intentId, event);
658 return;
659 }
660 notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
661 // remove IntentCache
662 localIntents.remove(intentId, intent);
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800663 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800664 }
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800665
666 // populate manual near cache, to prepare for
667 // transition event to WITHDRAWN
668 getIntent(intentId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800669 }
670 }
671}