blob: 69b6743333cfadaaca15a224e06f1332c6fa2a06 [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
Yuta HIGUCHI92891d12014-10-27 20:04:38 -07003import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -07004import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
5import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07006import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -07007import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -07008
Madan Jampani38b250d2014-10-17 11:02:38 -07009import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070010import java.util.ArrayList;
11import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070012import java.util.Collection;
13import java.util.Collections;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070014import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070015import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070016import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070017import java.util.concurrent.ExecutorService;
18import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070019import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070020import java.util.concurrent.TimeUnit;
21import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070023import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070024
25import org.apache.felix.scr.annotations.Activate;
26import org.apache.felix.scr.annotations.Component;
27import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070028import org.apache.felix.scr.annotations.Reference;
29import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070030import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070031import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070032import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070034import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070035import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070036import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070037import org.onlab.onos.net.flow.DefaultFlowEntry;
38import org.onlab.onos.net.flow.FlowEntry;
39import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070040import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070041import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070042import org.onlab.onos.net.flow.FlowRuleBatchEntry;
43import org.onlab.onos.net.flow.FlowRuleBatchEvent;
44import org.onlab.onos.net.flow.FlowRuleBatchOperation;
45import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070046import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070047import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070048import org.onlab.onos.net.flow.FlowRuleEvent.Type;
49import org.onlab.onos.net.flow.FlowRuleStore;
50import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070051import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070052import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
53import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070054import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070055import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070056import org.onlab.onos.store.flow.ReplicaInfoEvent;
57import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070058import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070059import org.onlab.onos.store.hz.AbstractHazelcastStore;
60import org.onlab.onos.store.hz.SMap;
Madan Jampani38b250d2014-10-17 11:02:38 -070061import org.onlab.onos.store.serializers.DistributedStoreSerializers;
62import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070063import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070064import org.slf4j.Logger;
65
Madan Jampani24f9efb2014-10-24 18:56:23 -070066import com.google.common.base.Function;
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070067import com.google.common.cache.Cache;
68import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070069import com.google.common.cache.CacheLoader;
70import com.google.common.cache.LoadingCache;
alshabib339a3d92014-09-26 17:54:32 -070071import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070072import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070073import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070074import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070075import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070076import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070077import com.google.common.util.concurrent.ListenableFuture;
78import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070079import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070080
81/**
Madan Jampani38b250d2014-10-17 11:02:38 -070082 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070083 */
alshabib339a3d92014-09-26 17:54:32 -070084@Component(immediate = true)
85@Service
86public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070087 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -070088 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070089
90 private final Logger log = getLogger(getClass());
91
92 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070093 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
94 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070095
alshabib339a3d92014-09-26 17:54:32 -070096
Madan Jampani38b250d2014-10-17 11:02:38 -070097 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070098 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070099
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700101 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700104 protected ClusterService clusterService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected DeviceService deviceService;
108
109 private final AtomicInteger localBatchIdGen = new AtomicInteger();
110
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700111 // TODO: make this configurable
112 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700113
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700114 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
115 CacheBuilder.newBuilder()
116 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
117 // TODO Explicitly fail the future if expired?
118 //.removalListener(listener)
119 .build();
120
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700121 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
122
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700123
124 private final ExecutorService futureListeners =
125 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
126
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700127 private final ExecutorService backupExecutors =
128 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
129
130 // TODO make this configurable
131 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700132
133 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
134 @Override
135 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700136 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700137 .register(DistributedStoreSerializers.COMMON)
138 .build()
139 .populate(1);
140 }
141 };
142
143 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700144 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700145
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700146 private ReplicaInfoEventListener replicaInfoEventListener;
147
148 @Override
alshabib339a3d92014-09-26 17:54:32 -0700149 @Activate
150 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700151
152 super.serializer = SERIALIZER;
153 super.theInstance = storeService.getHazelcastInstance();
154
155 // Cache to create SMap on demand
156 smaps = CacheBuilder.newBuilder()
157 .softValues()
158 .build(new SMapLoader());
159
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700160 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700161
162 @Override
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700163 public void handle(final ClusterMessage message) {
164 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
165 log.info("received batch request {}", operation);
166 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700167
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700168 f.addListener(new Runnable() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700169
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700170 @Override
171 public void run() {
172 CompletedBatchOperation result = Futures.getUnchecked(f);
173 try {
174 message.respond(SERIALIZER.encode(result));
175 } catch (IOException e) {
176 log.error("Failed to respond back", e);
177 }
178 }
179 }, futureListeners);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700180 }
181 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700182
183 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
184
185 @Override
186 public void handle(ClusterMessage message) {
187 FlowRule rule = SERIALIZER.decode(message.payload());
188 log.info("received get flow entry request for {}", rule);
189 FlowEntry flowEntry = getFlowEntryInternal(rule);
190 try {
191 message.respond(SERIALIZER.encode(flowEntry));
192 } catch (IOException e) {
193 log.error("Failed to respond back", e);
194 }
195 }
196 });
197
Madan Jampanif5fdef02014-10-23 21:58:10 -0700198 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
199
200 @Override
201 public void handle(ClusterMessage message) {
202 DeviceId deviceId = SERIALIZER.decode(message.payload());
203 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
204 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
205 try {
206 message.respond(SERIALIZER.encode(flowEntries));
207 } catch (IOException e) {
208 log.error("Failed to respond to peer's getFlowEntries request", e);
209 }
210 }
211 });
212
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700213 replicaInfoEventListener = new InternalReplicaInfoEventListener();
214
215 replicaInfoManager.addListener(replicaInfoEventListener);
216
alshabib339a3d92014-09-26 17:54:32 -0700217 log.info("Started");
218 }
219
220 @Deactivate
221 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700222 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700223 log.info("Stopped");
224 }
225
226
Madan Jampani117aaae2014-10-23 10:04:05 -0700227 // TODO: This is not a efficient operation on a distributed sharded
228 // flow store. We need to revisit the need for this operation or at least
229 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700230 @Override
tom9b4030d2014-10-06 10:39:03 -0700231 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700232 // implementing in-efficient operation for debugging purpose.
233 int sum = 0;
234 for (Device device : deviceService.getDevices()) {
235 final DeviceId did = device.id();
236 sum += Iterables.size(getFlowEntries(did));
237 }
238 return sum;
tom9b4030d2014-10-06 10:39:03 -0700239 }
240
241 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700242 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700243 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
244 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
245 return getFlowEntryInternal(rule);
246 }
247
248 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
249 replicaInfo.master().orNull(), rule.deviceId());
250
251 ClusterMessage message = new ClusterMessage(
252 clusterService.getLocalNode().id(),
253 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
254 SERIALIZER.encode(rule));
255
256 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700257 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
258 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
259 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700260 // FIXME: throw a FlowStoreException
261 throw new RuntimeException(e);
262 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700263 }
264
265 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
266 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700267 if (f.equals(rule)) {
268 return f;
269 }
270 }
271 return null;
272 }
273
274 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700275 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700276
277 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
278 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
279 return getFlowEntriesInternal(deviceId);
280 }
281
282 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
283 replicaInfo.master().orNull(), deviceId);
284
285 ClusterMessage message = new ClusterMessage(
286 clusterService.getLocalNode().id(),
287 GET_DEVICE_FLOW_ENTRIES,
288 SERIALIZER.encode(deviceId));
289
290 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700291 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
292 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
293 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700294 // FIXME: throw a FlowStoreException
295 throw new RuntimeException(e);
296 }
297 }
298
299 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700300 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700301 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700302 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700303 }
304 return ImmutableSet.copyOf(rules);
305 }
306
307 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700308 public void storeFlowRule(FlowRule rule) {
309 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
310 }
311
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700312 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700313 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700314 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700315
Madan Jampani117aaae2014-10-23 10:04:05 -0700316 if (operation.getOperations().isEmpty()) {
317 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700318 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700319
Madan Jampani117aaae2014-10-23 10:04:05 -0700320 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
321
322 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
323
324 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
325 return storeBatchInternal(operation);
326 }
327
328 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
329 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700330
Madan Jampani38b250d2014-10-17 11:02:38 -0700331 ClusterMessage message = new ClusterMessage(
332 clusterService.getLocalNode().id(),
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700333 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700334 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700335
336 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700337 ListenableFuture<byte[]> responseFuture =
338 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
339 return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
340 @Override
341 public CompletedBatchOperation apply(byte[] input) {
342 return SERIALIZER.decode(input);
343 }
344 });
345 } catch (IOException e) {
346 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700347 }
348 }
349
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700350 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700351 final List<StoredFlowEntry> toRemove = new ArrayList<>();
352 final List<StoredFlowEntry> toAdd = new ArrayList<>();
353 DeviceId did = null;
354
355
Madan Jampani117aaae2014-10-23 10:04:05 -0700356 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
357 FlowRule flowRule = batchEntry.getTarget();
358 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700359 if (did == null) {
360 did = flowRule.deviceId();
361 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700362 if (op.equals(FlowRuleOperation.REMOVE)) {
363 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
364 if (entry != null) {
365 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700366 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700367 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700368 } else if (op.equals(FlowRuleOperation.ADD)) {
369 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
370 DeviceId deviceId = flowRule.deviceId();
371 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
372 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700373 toAdd.add(flowEntry);
374 }
375 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700376 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700377 if (toAdd.isEmpty() && toRemove.isEmpty()) {
378 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
379 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700380
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700381 // create remote backup copies
382 final DeviceId deviceId = did;
383 updateBackup(deviceId, toAdd, toRemove);
384
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700385 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
386 final int batchId = localBatchIdGen.incrementAndGet();
387
388 pendingFutures.put(batchId, r);
389 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700390
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700391 return r;
alshabib339a3d92014-09-26 17:54:32 -0700392 }
393
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700394 private void updateBackup(final DeviceId deviceId,
395 final List<StoredFlowEntry> toAdd,
396 final List<? extends FlowRule> list) {
397
398 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
399
400 if (syncBackup) {
401 // wait for backup to complete
402 try {
403 submit.get();
404 } catch (InterruptedException | ExecutionException e) {
405 log.error("Failed to create backups", e);
406 }
407 }
408 }
409
410 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
411 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
412 }
413
alshabib339a3d92014-09-26 17:54:32 -0700414 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700415 public void deleteFlowRule(FlowRule rule) {
416 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700417 }
418
419 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700420 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
421 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700422 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700423 return addOrUpdateFlowRuleInternal(rule);
424 }
425
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700426 log.error("Tried to update FlowRule {} state,"
427 + " while the Node was not the master.", rule);
428 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700429 }
430
431 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700432 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700433
434 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700435 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700436 if (stored != null) {
437 stored.setBytes(rule.bytes());
438 stored.setLife(rule.life());
439 stored.setPackets(rule.packets());
440 if (stored.state() == FlowEntryState.PENDING_ADD) {
441 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700442 // update backup.
443 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700444 return new FlowRuleEvent(Type.RULE_ADDED, rule);
445 }
alshabib339a3d92014-09-26 17:54:32 -0700446 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
447 }
448
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700449 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700450 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700451 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700452 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700453
alshabib339a3d92014-09-26 17:54:32 -0700454 }
455
456 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700457 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
458 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700459 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700460 // bypass and handle it locally
461 return removeFlowRuleInternal(rule);
462 }
463
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700464 log.error("Tried to remove FlowRule {},"
465 + " while the Node was not the master.", rule);
466 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700467 }
468
469 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700470 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700471 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700472 final boolean removed = flowEntries.remove(deviceId, rule);
473 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
474 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700475 return new FlowRuleEvent(RULE_REMOVED, rule);
476 } else {
477 return null;
478 }
alshabib339a3d92014-09-26 17:54:32 -0700479 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700480
481 @Override
482 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700483 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700484 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700485 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700486 if (future != null) {
487 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700488 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700489 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700490 notifyDelegate(event);
491 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700492
493 private synchronized void loadFromBackup(final DeviceId did) {
494 // should relax synchronized condition
495
496 try {
497 log.info("Loading FlowRules for {} from backups", did);
498 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
499 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
500 : backupFlowTable.entrySet()) {
501
502 // TODO: should we be directly updating internal structure or
503 // should we be triggering event?
504 log.debug("loading {}", e.getValue());
505 for (StoredFlowEntry entry : e.getValue()) {
506 flowEntries.remove(did, entry);
507 flowEntries.put(did, entry);
508 }
509 }
510 } catch (ExecutionException e) {
511 log.error("Failed to load backup flowtable for {}", did, e);
512 }
513 }
514
515 private synchronized void removeFromPrimary(final DeviceId did) {
516 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
517 log.debug("removedFromPrimary {}", removed);
518 }
519
520 private final class SMapLoader
521 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
522
523 @Override
524 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
525 throws Exception {
526 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
527 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
528 }
529 }
530
531 private final class InternalReplicaInfoEventListener
532 implements ReplicaInfoEventListener {
533
534 @Override
535 public void event(ReplicaInfoEvent event) {
536 final NodeId local = clusterService.getLocalNode().id();
537 final DeviceId did = event.subject();
538 final ReplicaInfo rInfo = event.replicaInfo();
539
540 switch (event.type()) {
541 case MASTER_CHANGED:
542 if (local.equals(rInfo.master().orNull())) {
543 // This node is the new master, populate local structure
544 // from backup
545 loadFromBackup(did);
546 } else {
547 // This node is no longer the master holder,
548 // clean local structure
549 removeFromPrimary(did);
550 // FIXME: probably should stop pending backup activities in
551 // executors to avoid overwriting with old value
552 }
553 break;
554 default:
555 break;
556
557 }
558 }
559 }
560
561 // Task to update FlowEntries in backup HZ store
562 private final class UpdateBackup implements Runnable {
563
564 private final DeviceId deviceId;
565 private final List<StoredFlowEntry> toAdd;
566 private final List<? extends FlowRule> toRemove;
567
568 public UpdateBackup(DeviceId deviceId,
569 List<StoredFlowEntry> toAdd,
570 List<? extends FlowRule> list) {
571 this.deviceId = checkNotNull(deviceId);
572 this.toAdd = checkNotNull(toAdd);
573 this.toRemove = checkNotNull(list);
574 }
575
576 @Override
577 public void run() {
578 try {
579 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
580 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
581 // Following should be rewritten using async APIs
582 for (StoredFlowEntry entry : toAdd) {
583 final FlowId id = entry.id();
584 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
585 List<StoredFlowEntry> list = new ArrayList<>();
586 if (original != null) {
587 list.addAll(original);
588 }
589
590 list.remove(entry);
591 list.add(entry);
592
593 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
594 boolean success;
595 if (original == null) {
596 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
597 } else {
598 success = backupFlowTable.replace(id, original, newValue);
599 }
600 // TODO retry?
601 if (!success) {
602 log.error("Updating backup failed.");
603 }
604 }
605 for (FlowRule entry : toRemove) {
606 final FlowId id = entry.id();
607 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
608 List<StoredFlowEntry> list = new ArrayList<>();
609 if (original != null) {
610 list.addAll(original);
611 }
612
613 list.remove(entry);
614
615 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
616 boolean success;
617 if (original == null) {
618 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
619 } else {
620 success = backupFlowTable.replace(id, original, newValue);
621 }
622 // TODO retry?
623 if (!success) {
624 log.error("Updating backup failed.");
625 }
626 }
627 } catch (ExecutionException e) {
628 log.error("Failed to write to backups", e);
629 }
630
631 }
632 }
alshabib339a3d92014-09-26 17:54:32 -0700633}