blob: 667d2a8e8c9b804fa3177aa9533e2b73f3dc8f20 [file] [log] [blame]
Yuta HIGUCHI4490a732014-11-18 20:20:30 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.store.intent.impl;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080017
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080018import com.codahale.metrics.Timer;
19import com.codahale.metrics.Timer.Context;
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -080020import com.google.common.base.Verify;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080021import com.google.common.collect.ImmutableList;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080022import com.google.common.collect.ImmutableSet;
alshabiba9819bf2014-11-30 18:15:52 -080023import com.google.common.collect.Lists;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080024import com.hazelcast.config.Config;
25import com.hazelcast.config.MapConfig;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080026import com.hazelcast.core.EntryAdapter;
27import com.hazelcast.core.EntryEvent;
28import com.hazelcast.core.EntryListener;
29import com.hazelcast.core.IMap;
30import com.hazelcast.core.Member;
31
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080032import org.apache.commons.lang3.tuple.Pair;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080033import org.apache.felix.scr.annotations.Activate;
34import org.apache.felix.scr.annotations.Component;
35import org.apache.felix.scr.annotations.Deactivate;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080036import org.apache.felix.scr.annotations.Reference;
37import org.apache.felix.scr.annotations.ReferenceCardinality;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080038import org.apache.felix.scr.annotations.Service;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080039import org.onlab.metrics.MetricsService;
Brian O'Connorabafb502014-12-02 22:26:20 -080040import org.onosproject.core.MetricsHelper;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080041import org.onosproject.net.intent.BatchWrite;
Brian O'Connorabafb502014-12-02 22:26:20 -080042import org.onosproject.net.intent.Intent;
43import org.onosproject.net.intent.IntentEvent;
44import org.onosproject.net.intent.IntentId;
45import org.onosproject.net.intent.IntentState;
46import org.onosproject.net.intent.IntentStore;
Sho SHIMIZU64ae11c2014-12-03 15:17:47 -080047import org.onosproject.net.intent.BatchWrite.Operation;
Brian O'Connorabafb502014-12-02 22:26:20 -080048import org.onosproject.net.intent.IntentStoreDelegate;
49import org.onosproject.store.hz.AbstractHazelcastStore;
50import org.onosproject.store.hz.SMap;
51import org.onosproject.store.serializers.KryoNamespaces;
52import org.onosproject.store.serializers.KryoSerializer;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080053import org.onlab.util.KryoNamespace;
54import org.slf4j.Logger;
55
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080056import java.util.ArrayList;
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -080057import java.util.Collections;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080058import java.util.EnumSet;
59import java.util.List;
60import java.util.Map;
61import java.util.Set;
62import java.util.concurrent.ConcurrentHashMap;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080063import java.util.concurrent.ExecutionException;
64import java.util.concurrent.Future;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080065
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -080066import static com.google.common.base.Preconditions.checkArgument;
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -080067import static com.google.common.base.Preconditions.checkState;
Brian O'Connorabafb502014-12-02 22:26:20 -080068import static org.onosproject.net.intent.IntentState.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080069import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080070import static org.onlab.metrics.MetricsUtil.*;
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080071
Yuta HIGUCHI9b108b32014-12-01 11:10:26 -080072@Component(immediate = true, enabled = true)
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080073@Service
74public class HazelcastIntentStore
75 extends AbstractHazelcastStore<IntentEvent, IntentStoreDelegate>
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -080076 implements IntentStore, MetricsHelper {
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080077
78 /** Valid parking state, which can transition to INSTALLED. */
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080079 private static final Set<IntentState> PRE_INSTALLED = EnumSet.of(INSTALL_REQ, INSTALLED, FAILED);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080080
81 /** Valid parking state, which can transition to WITHDRAWN. */
82 private static final Set<IntentState> PRE_WITHDRAWN = EnumSet.of(INSTALLED, FAILED);
83
Brian O'Connor7a71d5d2014-12-02 00:12:27 -080084 private static final Set<IntentState> PARKING = EnumSet.of(INSTALL_REQ, INSTALLED, WITHDRAWN, FAILED);
Yuta HIGUCHIf5682452014-12-01 10:17:15 -080085
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080086 private final Logger log = getLogger(getClass());
87
88 // Assumption: IntentId will not have synonyms
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080089 private static final String INTENTS_MAP_NAME = "intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080090 private SMap<IntentId, Intent> intents;
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080091 private static final String INTENT_STATES_MAP_NAME = "intent-states";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080092 private SMap<IntentId, IntentState> states;
93
94 // Map to store instance local intermediate state transition
95 private transient Map<IntentId, IntentState> transientStates = new ConcurrentHashMap<>();
96
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -080097 private static final String INSTALLABLE_INTENTS_MAP_NAME = "installable-intents";
Yuta HIGUCHI4490a732014-11-18 20:20:30 -080098 private SMap<IntentId, List<Intent>> installable;
99
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected MetricsService metricsService;
102
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800103 private boolean onlyLogTransitionError = true;
104
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800105 private Timer createIntentTimer;
106 private Timer removeIntentTimer;
107 private Timer setInstallableIntentsTimer;
108 private Timer getInstallableIntentsTimer;
109 private Timer removeInstalledIntentsTimer;
110 private Timer setStateTimer;
111 private Timer getIntentCountTimer;
112 private Timer getIntentsTimer;
113 private Timer getIntentTimer;
114 private Timer getIntentStateTimer;
115
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800116 // manual near cache of Intent
117 // (Note: IntentId -> Intent is expected to be immutable)
118 // entry will be evicted, when state for that IntentId is removed.
119 private Map<IntentId, Intent> localIntents;
120
121 private String stateListenerId;
122
123 private String intentsListenerId;
Yuta HIGUCHId1a63e92014-12-02 13:14:28 -0800124
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800125 private Timer createResponseTimer(String methodName) {
126 return createTimer("IntentStore", methodName, "responseTime");
127 }
128
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800129 @Override
130 @Activate
131 public void activate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800132 localIntents = new ConcurrentHashMap<>();
133
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800134 createIntentTimer = createResponseTimer("createIntent");
135 removeIntentTimer = createResponseTimer("removeIntent");
136 setInstallableIntentsTimer = createResponseTimer("setInstallableIntents");
137 getInstallableIntentsTimer = createResponseTimer("getInstallableIntents");
138 removeInstalledIntentsTimer = createResponseTimer("removeInstalledIntents");
139 setStateTimer = createResponseTimer("setState");
140 getIntentCountTimer = createResponseTimer("getIntentCount");
141 getIntentsTimer = createResponseTimer("getIntents");
142 getIntentTimer = createResponseTimer("getIntent");
143 getIntentStateTimer = createResponseTimer("getIntentState");
144
Brian O'Connor44008532014-12-04 16:41:36 -0800145 // We need a way to add serializer for intents which has been plugged-in.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800146 // As a short term workaround, relax Kryo config to
147 // registrationRequired=false
148 super.activate();
149 super.serializer = new KryoSerializer() {
150
151 @Override
152 protected void setupKryoPool() {
153 serializerPool = KryoNamespace.newBuilder()
154 .setRegistrationRequired(false)
155 .register(KryoNamespaces.API)
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800156 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
157 .build();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800158 }
159
160 };
161
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800162 final Config config = theInstance.getConfig();
163
164 MapConfig intentsCfg = config.getMapConfig(INTENTS_MAP_NAME);
165 intentsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentsCfg.getBackupCount());
166
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800167 IMap<byte[], byte[]> rawIntents = super.theInstance.getMap(INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800168 intents = new SMap<>(rawIntents , super.serializer);
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800169 intentsListenerId = intents.addEntryListener(new RemoteIntentsListener(), true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800170
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800171 MapConfig statesCfg = config.getMapConfig(INTENT_STATES_MAP_NAME);
172 statesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - statesCfg.getBackupCount());
173
174 IMap<byte[], byte[]> rawStates = super.theInstance.getMap(INTENT_STATES_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800175 states = new SMap<>(rawStates , super.serializer);
176 EntryListener<IntentId, IntentState> listener = new RemoteIntentStateListener();
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800177 stateListenerId = states.addEntryListener(listener, true);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800178
179 transientStates.clear();
180
Yuta HIGUCHId36a58e2014-12-02 12:46:33 -0800181 MapConfig installableCfg = config.getMapConfig(INSTALLABLE_INTENTS_MAP_NAME);
182 installableCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - installableCfg.getBackupCount());
183
184 IMap<byte[], byte[]> rawInstallables = super.theInstance.getMap(INSTALLABLE_INTENTS_MAP_NAME);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800185 installable = new SMap<>(rawInstallables , super.serializer);
186
187 log.info("Started");
188 }
189
190 @Deactivate
191 public void deactivate() {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800192 intents.removeEntryListener(intentsListenerId);
193 states.removeEntryListener(stateListenerId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800194 log.info("Stopped");
195 }
196
197 @Override
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800198 public MetricsService metricsService() {
199 return metricsService;
200 }
201
202 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800203 public void createIntent(Intent intent) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800204 Context timer = startTimer(createIntentTimer);
205 try {
206 Intent existing = intents.putIfAbsent(intent.id(), intent);
207 if (existing != null) {
208 // duplicate, ignore
alshabiba9819bf2014-11-30 18:15:52 -0800209 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800210 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800211 this.setState(intent, IntentState.INSTALL_REQ);
alshabiba9819bf2014-11-30 18:15:52 -0800212 return;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800213 }
214 } finally {
215 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800216 }
217 }
218
219 @Override
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800220 public void removeIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800221 Context timer = startTimer(removeIntentTimer);
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800222 checkState(getIntentState(intentId) == WITHDRAWN,
223 "Intent state for {} is not WITHDRAWN.", intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800224 try {
Thomas Vachuskae4b6bb22014-11-25 17:09:43 -0800225 intents.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800226 installable.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800227 states.remove(intentId);
228 transientStates.remove(intentId);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800229 } finally {
230 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800231 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800232 }
233
234 @Override
235 public long getIntentCount() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800236 Context timer = startTimer(getIntentCountTimer);
237 try {
238 return intents.size();
239 } finally {
240 stopTimer(timer);
241 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800242 }
243
244 @Override
245 public Iterable<Intent> getIntents() {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800246 Context timer = startTimer(getIntentsTimer);
247 try {
248 return ImmutableSet.copyOf(intents.values());
249 } finally {
250 stopTimer(timer);
251 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800252 }
253
254 @Override
255 public Intent getIntent(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800256 Context timer = startTimer(getIntentTimer);
257 try {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800258 Intent intent = localIntents.get(intentId);
259 if (intent != null) {
260 return intent;
261 }
262 intent = intents.get(intentId);
263 if (intent != null) {
264 localIntents.put(intentId, intent);
265 }
266 return intent;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800267 } finally {
268 stopTimer(timer);
269 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800270 }
271
272 @Override
273 public IntentState getIntentState(IntentId id) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800274 Context timer = startTimer(getIntentStateTimer);
275 try {
276 final IntentState localState = transientStates.get(id);
277 if (localState != null) {
278 return localState;
279 }
280 return states.get(id);
281 } finally {
282 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800283 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800284 }
285
Yuta HIGUCHIc8f30262014-11-20 19:14:42 -0800286 private void verify(boolean expression, String errorMessageTemplate, Object... errorMessageArgs) {
287 if (onlyLogTransitionError) {
288 if (!expression) {
289 log.error(errorMessageTemplate.replace("%s", "{}"), errorMessageArgs);
290 }
291 } else {
292 Verify.verify(expression, errorMessageTemplate, errorMessageArgs);
293 }
294 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800295
296 @Override
alshabiba9819bf2014-11-30 18:15:52 -0800297 public void setState(Intent intent, IntentState state) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800298 Context timer = startTimer(setStateTimer);
299 try {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800300 final IntentId id = intent.id();
301 IntentEvent.Type type = null;
302 final IntentState prevParking;
303 boolean transientStateChangeOnly = false;
304
305 // parking state transition
306 switch (state) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800307 case INSTALL_REQ:
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800308 prevParking = states.get(id);
309 if (prevParking == null) {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800310 IntentState existing = states.putIfAbsent(id, INSTALL_REQ);
311 verify(existing == null, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800312 } else {
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800313 verify(PRE_INSTALLED.contains(prevParking),
314 "Illegal state transition attempted from %s to INSTALL_REQ",
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800315 prevParking);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800316 boolean updated = states.replace(id, prevParking, INSTALL_REQ);
317 verify(updated, "Conditional replace %s => %s failed", prevParking, INSTALL_REQ);
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800318 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800319 type = IntentEvent.Type.INSTALL_REQ;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800320 break;
321 case INSTALLED:
322 prevParking = states.replace(id, INSTALLED);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800323 verify(prevParking == INSTALL_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800324 "Illegal state transition attempted from %s to INSTALLED",
325 prevParking);
326 type = IntentEvent.Type.INSTALLED;
327 break;
328 case FAILED:
329 prevParking = states.replace(id, FAILED);
330 type = IntentEvent.Type.FAILED;
331 break;
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800332 case WITHDRAW_REQ:
333 prevParking = states.replace(id, WITHDRAW_REQ);
334 verify(PRE_WITHDRAWN.contains(prevParking),
335 "Illegal state transition attempted from %s to WITHDRAW_REQ",
336 prevParking);
337 type = IntentEvent.Type.WITHDRAW_REQ;
338 break;
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800339 case WITHDRAWN:
340 prevParking = states.replace(id, WITHDRAWN);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800341 verify(prevParking == WITHDRAW_REQ,
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800342 "Illegal state transition attempted from %s to WITHDRAWN",
343 prevParking);
344 type = IntentEvent.Type.WITHDRAWN;
345 break;
346 default:
347 transientStateChangeOnly = true;
348 prevParking = null;
349 break;
Yuta HIGUCHI89a7f472014-11-21 14:50:24 -0800350 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800351 if (!transientStateChangeOnly) {
352 log.debug("Parking State change: {} {}=>{}", id, prevParking, state);
353 }
354 // Update instance local state, which includes non-parking state transition
355 final IntentState prevTransient = transientStates.put(id, state);
356 log.debug("Transient State change: {} {}=>{}", id, prevTransient, state);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800357
alshabiba9819bf2014-11-30 18:15:52 -0800358 if (type != null) {
359 notifyDelegate(new IntentEvent(type, intent));
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800360 }
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800361 } finally {
362 stopTimer(timer);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800363 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800364 }
365
366 @Override
367 public void setInstallableIntents(IntentId intentId, List<Intent> result) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800368 Context timer = startTimer(setInstallableIntentsTimer);
369 try {
370 installable.put(intentId, result);
371 } finally {
372 stopTimer(timer);
373 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800374 }
375
376 @Override
377 public List<Intent> getInstallableIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800378 Context timer = startTimer(getInstallableIntentsTimer);
379 try {
380 return installable.get(intentId);
381 } finally {
382 stopTimer(timer);
383 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800384 }
385
386 @Override
387 public void removeInstalledIntents(IntentId intentId) {
Yuta HIGUCHIe367fb92014-11-24 22:26:58 -0800388 Context timer = startTimer(removeInstalledIntentsTimer);
389 try {
390 installable.remove(intentId);
391 } finally {
392 stopTimer(timer);
393 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800394 }
395
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800396 @Override
397 public List<Operation> batchWrite(BatchWrite batch) {
Sho SHIMIZU2bb988b2015-01-20 13:45:35 -0800398 if (batch.isEmpty()) {
399 return Collections.emptyList();
400 }
401
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800402 // Hazelcast version will never fail for conditional failure now.
403 List<Operation> failed = new ArrayList<>();
404
405 List<Pair<Operation, List<Future<?>>>> futures = new ArrayList<>(batch.operations().size());
alshabiba9819bf2014-11-30 18:15:52 -0800406 List<IntentEvent> events = Lists.newArrayList();
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800407
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800408 batchWriteAsync(batch, failed, futures);
409
410 // verify result
411 verifyAsyncWrites(futures, failed, events);
412
413 notifyDelegate(events);
414
415 return failed;
416 }
417
418 private void batchWriteAsync(BatchWrite batch, List<Operation> failed,
419 List<Pair<Operation, List<Future<?>>>> futures) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800420 for (Operation op : batch.operations()) {
421 switch (op.type()) {
422 case CREATE_INTENT:
423 checkArgument(op.args().size() == 1,
424 "CREATE_INTENT takes 1 argument. %s", op);
425 Intent intent = op.arg(0);
426 futures.add(Pair.of(op,
427 ImmutableList.of(intents.putAsync(intent.id(), intent),
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800428 states.putAsync(intent.id(), INSTALL_REQ))));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800429 break;
430
431 case REMOVE_INTENT:
432 checkArgument(op.args().size() == 1,
433 "REMOVE_INTENT takes 1 argument. %s", op);
434 IntentId intentId = (IntentId) op.arg(0);
435 futures.add(Pair.of(op,
436 ImmutableList.of(intents.removeAsync(intentId),
437 states.removeAsync(intentId),
438 installable.removeAsync(intentId))));
439 break;
440
441 case SET_STATE:
442 checkArgument(op.args().size() == 2,
443 "SET_STATE takes 2 arguments. %s", op);
444 intent = op.arg(0);
445 IntentState newState = op.arg(1);
446 futures.add(Pair.of(op,
447 ImmutableList.of(states.putAsync(intent.id(), newState))));
448 break;
449
450 case SET_INSTALLABLE:
451 checkArgument(op.args().size() == 2,
452 "SET_INSTALLABLE takes 2 arguments. %s", op);
453 intentId = op.arg(0);
454 List<Intent> installableIntents = op.arg(1);
455 futures.add(Pair.of(op,
456 ImmutableList.of(installable.putAsync(intentId, installableIntents))));
457 break;
458
459 case REMOVE_INSTALLED:
460 checkArgument(op.args().size() == 1,
461 "REMOVE_INSTALLED takes 1 argument. %s", op);
462 intentId = op.arg(0);
463 futures.add(Pair.of(op,
464 ImmutableList.of(installable.removeAsync(intentId))));
465 break;
466
467 default:
468 log.warn("Unknown Operation encountered: {}", op);
469 failed.add(op);
470 break;
471 }
472 }
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800473 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800474
Yuta HIGUCHI43772d72014-12-03 13:57:09 -0800475 /**
476 * Checks the async write result Futures and prepare Events to post.
477 *
478 * @param futures async write Futures
479 * @param failed list to output failed batch write operations
480 * @param events list to output events to post as result of writes
481 */
482 private void verifyAsyncWrites(List<Pair<Operation, List<Future<?>>>> futures,
483 List<Operation> failed,
484 List<IntentEvent> events) {
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800485 for (Pair<Operation, List<Future<?>>> future : futures) {
486 final Operation op = future.getLeft();
487 final List<Future<?>> subops = future.getRight();
488
489 switch (op.type()) {
490
491 case CREATE_INTENT:
492 {
493 Intent intent = op.arg(0);
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800494 IntentState newIntentState = INSTALL_REQ;
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800495
496 try {
497 Intent prevIntent = (Intent) subops.get(0).get();
498 IntentState prevIntentState = (IntentState) subops.get(1).get();
499
500 if (prevIntent != null || prevIntentState != null) {
501 log.warn("Overwriting existing Intent: {}@{} with {}@{}",
502 prevIntent, prevIntentState,
503 intent, newIntentState);
504 }
Brian O'Connor7a71d5d2014-12-02 00:12:27 -0800505 events.add(IntentEvent.getEvent(INSTALL_REQ, intent));
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800506 } catch (InterruptedException e) {
507 log.error("Batch write was interrupted while processing {}", op, e);
508 failed.add(op);
509 Thread.currentThread().interrupt();
510 } catch (ExecutionException e) {
511 log.error("Batch write failed processing {}", op, e);
512 failed.add(op);
513 }
514 break;
515 }
516
517 case REMOVE_INTENT:
518 {
519 IntentId intentId = op.arg(0);
520
521 try {
522 Intent prevIntent = (Intent) subops.get(0).get();
523 IntentState prevIntentState = (IntentState) subops.get(1).get();
524 @SuppressWarnings("unchecked")
525 List<Intent> prevInstallable = (List<Intent>) subops.get(2).get();
526
527 if (prevIntent == null) {
528 log.warn("Intent {} was already removed.", intentId);
529 }
530 if (prevIntentState == null) {
531 log.warn("Intent {} state was already removed", intentId);
532 }
Pavlin Radoslavov0d972b92014-12-03 20:07:34 -0800533 if (prevInstallable != null) {
534 log.warn("Intent {} removed installable still found", intentId);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800535 }
536 } catch (InterruptedException e) {
537 log.error("Batch write was interrupted while processing {}", op, e);
538 failed.add(op);
539 Thread.currentThread().interrupt();
540 } catch (ExecutionException e) {
541 log.error("Batch write failed processing {}", op, e);
542 failed.add(op);
543 }
544 break;
545 }
546
547 case SET_STATE:
548 {
549 Intent intent = op.arg(0);
550 IntentId intentId = intent.id();
551 IntentState newState = op.arg(1);
552
553 try {
554 IntentState prevIntentState = (IntentState) subops.get(0).get();
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800555
556 if (PARKING.contains(newState)) {
557 transientStates.remove(intentId);
Yuta HIGUCHI5cd352d2014-12-01 20:16:02 -0800558 events.add(IntentEvent.getEvent(newState, intent));
Yuta HIGUCHIf5682452014-12-01 10:17:15 -0800559 }
alshabiba9819bf2014-11-30 18:15:52 -0800560
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800561 log.trace("{} - {} -> {}", intentId, prevIntentState, newState);
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800562 } catch (InterruptedException e) {
563 log.error("Batch write was interrupted while processing {}", op, e);
564 failed.add(op);
565 Thread.currentThread().interrupt();
566 } catch (ExecutionException e) {
567 log.error("Batch write failed processing {}", op, e);
568 failed.add(op);
569 }
570 break;
571 }
572
573 case SET_INSTALLABLE:
574 {
575 IntentId intentId = op.arg(0);
576 List<Intent> installableIntents = op.arg(1);
577
578 try {
579 @SuppressWarnings("unchecked")
580 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
581
582 if (prevInstallable != null) {
583 log.warn("Overwriting Intent {} installable {} -> {}",
584 intentId, prevInstallable, installableIntents);
585 }
586 } catch (InterruptedException e) {
587 log.error("Batch write was interrupted while processing {}", op, e);
588 failed.add(op);
589 Thread.currentThread().interrupt();
590 } catch (ExecutionException e) {
591 log.error("Batch write failed processing {}", op, e);
592 failed.add(op);
593 }
594 break;
595 }
596
597 case REMOVE_INSTALLED:
598 {
599 IntentId intentId = op.arg(0);
600
601 try {
602 @SuppressWarnings("unchecked")
603 List<Intent> prevInstallable = (List<Intent>) subops.get(0).get();
604
605 if (prevInstallable == null) {
606 log.warn("Intent {} installable was already removed", intentId);
607 }
608 } catch (InterruptedException e) {
609 log.error("Batch write was interrupted while processing {}", op, e);
610 failed.add(op);
611 Thread.currentThread().interrupt();
612 } catch (ExecutionException e) {
613 log.error("Batch write failed processing {}", op, e);
614 failed.add(op);
615 }
616 break;
617 }
618
619 default:
620 log.warn("Unknown Operation encountered: {}", op);
621 if (!failed.contains(op)) {
622 failed.add(op);
623 }
624 break;
625 }
626 }
Yuta HIGUCHIa94c6e82014-11-28 18:49:54 -0800627 }
628
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800629 public final class RemoteIntentsListener extends EntryAdapter<IntentId, Intent> {
630
631 @Override
632 public void entryAdded(EntryEvent<IntentId, Intent> event) {
633 localIntents.put(event.getKey(), event.getValue());
634 }
635
636 @Override
637 public void entryUpdated(EntryEvent<IntentId, Intent> event) {
638 entryAdded(event);
639 }
640 }
641
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800642 public final class RemoteIntentStateListener extends EntryAdapter<IntentId, IntentState> {
643
644 @Override
645 public void onEntryEvent(EntryEvent<IntentId, IntentState> event) {
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800646 final IntentId intentId = event.getKey();
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800647 final Member myself = theInstance.getCluster().getLocalMember();
648 if (!myself.equals(event.getMember())) {
649 // When Intent state was modified by remote node,
650 // clear local transient state.
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800651 IntentState oldState = transientStates.remove(intentId);
652 if (oldState != null) {
653 log.debug("{} state updated remotely, removing transient state {}",
654 intentId, oldState);
655 }
alshabiba9819bf2014-11-30 18:15:52 -0800656
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800657 if (event.getValue() != null) {
658 // notify if this is not entry removed event
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800659
660 final Intent intent = getIntent(intentId);
661 if (intent == null) {
662 log.warn("no Intent found for {} on Event {}", intentId, event);
663 return;
664 }
665 notifyDelegate(IntentEvent.getEvent(event.getValue(), intent));
666 // remove IntentCache
667 localIntents.remove(intentId, intent);
Yuta HIGUCHI88375782014-12-02 14:21:17 -0800668 }
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800669 }
Yuta HIGUCHI9796cc62014-12-03 16:24:13 -0800670
671 // populate manual near cache, to prepare for
672 // transition event to WITHDRAWN
673 getIntent(intentId);
Yuta HIGUCHI4490a732014-11-18 20:20:30 -0800674 }
675 }
676}