blob: 18d5f1d1f530ca27ce9b03b4fe69aeaa395e507d [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070029import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070030import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070031import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070032import java.util.concurrent.ExecutorService;
33import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070034import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070035import java.util.concurrent.TimeUnit;
36import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070037import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070038import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070039
40import org.apache.felix.scr.annotations.Activate;
41import org.apache.felix.scr.annotations.Component;
42import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070043import org.apache.felix.scr.annotations.Reference;
44import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070045import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070046import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070047import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070048import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070049import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070050import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070051import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070052import org.onlab.onos.net.flow.DefaultFlowEntry;
53import org.onlab.onos.net.flow.FlowEntry;
54import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070055import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070056import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070057import org.onlab.onos.net.flow.FlowRuleBatchEntry;
58import org.onlab.onos.net.flow.FlowRuleBatchEvent;
59import org.onlab.onos.net.flow.FlowRuleBatchOperation;
60import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070061import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070062import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070063import org.onlab.onos.net.flow.FlowRuleEvent.Type;
64import org.onlab.onos.net.flow.FlowRuleStore;
65import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070066import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070067import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
68import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070069import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070070import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070071import org.onlab.onos.store.flow.ReplicaInfoEvent;
72import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070073import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070074import org.onlab.onos.store.hz.AbstractHazelcastStore;
75import org.onlab.onos.store.hz.SMap;
Madan Jampani38b250d2014-10-17 11:02:38 -070076import org.onlab.onos.store.serializers.DistributedStoreSerializers;
77import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070078import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070079import org.slf4j.Logger;
80
Madan Jampani24f9efb2014-10-24 18:56:23 -070081import com.google.common.base.Function;
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070082import com.google.common.cache.Cache;
83import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070084import com.google.common.cache.CacheLoader;
85import com.google.common.cache.LoadingCache;
alshabib339a3d92014-09-26 17:54:32 -070086import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070087import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070088import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070089import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070090import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070091import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070092import com.google.common.util.concurrent.ListenableFuture;
93import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070094import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070095
96/**
Madan Jampani38b250d2014-10-17 11:02:38 -070097 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070098 */
alshabib339a3d92014-09-26 17:54:32 -070099@Component(immediate = true)
100@Service
101public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700102 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700103 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700104
105 private final Logger log = getLogger(getClass());
106
107 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700108 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
109 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700110
alshabib339a3d92014-09-26 17:54:32 -0700111
Madan Jampani38b250d2014-10-17 11:02:38 -0700112 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700113 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700116 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700119 protected ClusterService clusterService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
122 protected DeviceService deviceService;
123
124 private final AtomicInteger localBatchIdGen = new AtomicInteger();
125
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700126 // TODO: make this configurable
127 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700128
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700129 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
130 CacheBuilder.newBuilder()
131 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
132 // TODO Explicitly fail the future if expired?
133 //.removalListener(listener)
134 .build();
135
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700136 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
137
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700138
139 private final ExecutorService futureListeners =
140 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
141
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700142 private final ExecutorService backupExecutors =
143 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
144
145 // TODO make this configurable
146 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700147
148 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
149 @Override
150 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700151 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700152 .register(DistributedStoreSerializers.COMMON)
153 .build()
154 .populate(1);
155 }
156 };
157
158 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700159 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700160
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700161 private ReplicaInfoEventListener replicaInfoEventListener;
162
163 @Override
alshabib339a3d92014-09-26 17:54:32 -0700164 @Activate
165 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700166
167 super.serializer = SERIALIZER;
168 super.theInstance = storeService.getHazelcastInstance();
169
170 // Cache to create SMap on demand
171 smaps = CacheBuilder.newBuilder()
172 .softValues()
173 .build(new SMapLoader());
174
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700175 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700176
177 @Override
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700178 public void handle(final ClusterMessage message) {
179 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
180 log.info("received batch request {}", operation);
181 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700182
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700183 f.addListener(new Runnable() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700184
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700185 @Override
186 public void run() {
187 CompletedBatchOperation result = Futures.getUnchecked(f);
188 try {
189 message.respond(SERIALIZER.encode(result));
190 } catch (IOException e) {
191 log.error("Failed to respond back", e);
192 }
193 }
194 }, futureListeners);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700195 }
196 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700197
198 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
199
200 @Override
201 public void handle(ClusterMessage message) {
202 FlowRule rule = SERIALIZER.decode(message.payload());
203 log.info("received get flow entry request for {}", rule);
204 FlowEntry flowEntry = getFlowEntryInternal(rule);
205 try {
206 message.respond(SERIALIZER.encode(flowEntry));
207 } catch (IOException e) {
208 log.error("Failed to respond back", e);
209 }
210 }
211 });
212
Madan Jampanif5fdef02014-10-23 21:58:10 -0700213 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
214
215 @Override
216 public void handle(ClusterMessage message) {
217 DeviceId deviceId = SERIALIZER.decode(message.payload());
218 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
219 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
220 try {
221 message.respond(SERIALIZER.encode(flowEntries));
222 } catch (IOException e) {
223 log.error("Failed to respond to peer's getFlowEntries request", e);
224 }
225 }
226 });
227
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700228 replicaInfoEventListener = new InternalReplicaInfoEventListener();
229
230 replicaInfoManager.addListener(replicaInfoEventListener);
231
alshabib339a3d92014-09-26 17:54:32 -0700232 log.info("Started");
233 }
234
235 @Deactivate
236 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700237 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700238 log.info("Stopped");
239 }
240
241
Madan Jampani117aaae2014-10-23 10:04:05 -0700242 // TODO: This is not a efficient operation on a distributed sharded
243 // flow store. We need to revisit the need for this operation or at least
244 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700245 @Override
tom9b4030d2014-10-06 10:39:03 -0700246 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700247 // implementing in-efficient operation for debugging purpose.
248 int sum = 0;
249 for (Device device : deviceService.getDevices()) {
250 final DeviceId did = device.id();
251 sum += Iterables.size(getFlowEntries(did));
252 }
253 return sum;
tom9b4030d2014-10-06 10:39:03 -0700254 }
255
256 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700257 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700258 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
259 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
260 return getFlowEntryInternal(rule);
261 }
262
263 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
264 replicaInfo.master().orNull(), rule.deviceId());
265
266 ClusterMessage message = new ClusterMessage(
267 clusterService.getLocalNode().id(),
268 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
269 SERIALIZER.encode(rule));
270
271 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700272 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
273 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
274 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700275 // FIXME: throw a FlowStoreException
276 throw new RuntimeException(e);
277 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700278 }
279
280 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
281 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700282 if (f.equals(rule)) {
283 return f;
284 }
285 }
286 return null;
287 }
288
289 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700290 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700291
292 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
293 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
294 return getFlowEntriesInternal(deviceId);
295 }
296
297 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
298 replicaInfo.master().orNull(), deviceId);
299
300 ClusterMessage message = new ClusterMessage(
301 clusterService.getLocalNode().id(),
302 GET_DEVICE_FLOW_ENTRIES,
303 SERIALIZER.encode(deviceId));
304
305 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700306 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
307 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
308 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700309 // FIXME: throw a FlowStoreException
310 throw new RuntimeException(e);
311 }
312 }
313
314 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700315 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700316 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700317 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700318 }
319 return ImmutableSet.copyOf(rules);
320 }
321
322 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700323 public void storeFlowRule(FlowRule rule) {
324 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
325 }
326
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700327 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700328 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700329 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700330
Madan Jampani117aaae2014-10-23 10:04:05 -0700331 if (operation.getOperations().isEmpty()) {
332 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700333 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700334
Madan Jampani117aaae2014-10-23 10:04:05 -0700335 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
336
337 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
338
339 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
340 return storeBatchInternal(operation);
341 }
342
343 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
344 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700345
Madan Jampani38b250d2014-10-17 11:02:38 -0700346 ClusterMessage message = new ClusterMessage(
347 clusterService.getLocalNode().id(),
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700348 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700349 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700350
351 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700352 ListenableFuture<byte[]> responseFuture =
353 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
354 return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
355 @Override
356 public CompletedBatchOperation apply(byte[] input) {
357 return SERIALIZER.decode(input);
358 }
359 });
360 } catch (IOException e) {
361 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700362 }
363 }
364
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700365 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700366 final List<StoredFlowEntry> toRemove = new ArrayList<>();
367 final List<StoredFlowEntry> toAdd = new ArrayList<>();
368 DeviceId did = null;
369
370
Madan Jampani117aaae2014-10-23 10:04:05 -0700371 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
372 FlowRule flowRule = batchEntry.getTarget();
373 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700374 if (did == null) {
375 did = flowRule.deviceId();
376 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700377 if (op.equals(FlowRuleOperation.REMOVE)) {
378 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
379 if (entry != null) {
380 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700381 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700382 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 } else if (op.equals(FlowRuleOperation.ADD)) {
384 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
385 DeviceId deviceId = flowRule.deviceId();
386 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
387 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700388 toAdd.add(flowEntry);
389 }
390 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700391 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700392 if (toAdd.isEmpty() && toRemove.isEmpty()) {
393 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowEntry>emptySet()));
394 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700395
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700396 // create remote backup copies
397 final DeviceId deviceId = did;
398 updateBackup(deviceId, toAdd, toRemove);
399
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700400 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
401 final int batchId = localBatchIdGen.incrementAndGet();
402
403 pendingFutures.put(batchId, r);
404 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700405
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700406 return r;
alshabib339a3d92014-09-26 17:54:32 -0700407 }
408
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700409 private void updateBackup(final DeviceId deviceId,
410 final List<StoredFlowEntry> toAdd,
411 final List<? extends FlowRule> list) {
412
413 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
414
415 if (syncBackup) {
416 // wait for backup to complete
417 try {
418 submit.get();
419 } catch (InterruptedException | ExecutionException e) {
420 log.error("Failed to create backups", e);
421 }
422 }
423 }
424
425 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
426 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
427 }
428
alshabib339a3d92014-09-26 17:54:32 -0700429 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700430 public void deleteFlowRule(FlowRule rule) {
431 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700432 }
433
434 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700435 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
436 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700437 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700438 return addOrUpdateFlowRuleInternal(rule);
439 }
440
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700441 log.error("Tried to update FlowRule {} state,"
442 + " while the Node was not the master.", rule);
443 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700444 }
445
446 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700447 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700448
449 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700450 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700451 if (stored != null) {
452 stored.setBytes(rule.bytes());
453 stored.setLife(rule.life());
454 stored.setPackets(rule.packets());
455 if (stored.state() == FlowEntryState.PENDING_ADD) {
456 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700457 // update backup.
458 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700459 return new FlowRuleEvent(Type.RULE_ADDED, rule);
460 }
alshabib339a3d92014-09-26 17:54:32 -0700461 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
462 }
463
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700464 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700465 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700466 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700467 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700468
alshabib339a3d92014-09-26 17:54:32 -0700469 }
470
471 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700472 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
473 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700474 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700475 // bypass and handle it locally
476 return removeFlowRuleInternal(rule);
477 }
478
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700479 log.error("Tried to remove FlowRule {},"
480 + " while the Node was not the master.", rule);
481 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700482 }
483
484 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700485 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700486 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700487 final boolean removed = flowEntries.remove(deviceId, rule);
488 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
489 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700490 return new FlowRuleEvent(RULE_REMOVED, rule);
491 } else {
492 return null;
493 }
alshabib339a3d92014-09-26 17:54:32 -0700494 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700495
496 @Override
497 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700498 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700499 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700500 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700501 if (future != null) {
502 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700503 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700504 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700505 notifyDelegate(event);
506 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700507
508 private synchronized void loadFromBackup(final DeviceId did) {
509 // should relax synchronized condition
510
511 try {
512 log.info("Loading FlowRules for {} from backups", did);
513 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
514 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
515 : backupFlowTable.entrySet()) {
516
517 // TODO: should we be directly updating internal structure or
518 // should we be triggering event?
519 log.debug("loading {}", e.getValue());
520 for (StoredFlowEntry entry : e.getValue()) {
521 flowEntries.remove(did, entry);
522 flowEntries.put(did, entry);
523 }
524 }
525 } catch (ExecutionException e) {
526 log.error("Failed to load backup flowtable for {}", did, e);
527 }
528 }
529
530 private synchronized void removeFromPrimary(final DeviceId did) {
531 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
532 log.debug("removedFromPrimary {}", removed);
533 }
534
535 private final class SMapLoader
536 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
537
538 @Override
539 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
540 throws Exception {
541 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
542 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
543 }
544 }
545
546 private final class InternalReplicaInfoEventListener
547 implements ReplicaInfoEventListener {
548
549 @Override
550 public void event(ReplicaInfoEvent event) {
551 final NodeId local = clusterService.getLocalNode().id();
552 final DeviceId did = event.subject();
553 final ReplicaInfo rInfo = event.replicaInfo();
554
555 switch (event.type()) {
556 case MASTER_CHANGED:
557 if (local.equals(rInfo.master().orNull())) {
558 // This node is the new master, populate local structure
559 // from backup
560 loadFromBackup(did);
561 } else {
562 // This node is no longer the master holder,
563 // clean local structure
564 removeFromPrimary(did);
565 // FIXME: probably should stop pending backup activities in
566 // executors to avoid overwriting with old value
567 }
568 break;
569 default:
570 break;
571
572 }
573 }
574 }
575
576 // Task to update FlowEntries in backup HZ store
577 private final class UpdateBackup implements Runnable {
578
579 private final DeviceId deviceId;
580 private final List<StoredFlowEntry> toAdd;
581 private final List<? extends FlowRule> toRemove;
582
583 public UpdateBackup(DeviceId deviceId,
584 List<StoredFlowEntry> toAdd,
585 List<? extends FlowRule> list) {
586 this.deviceId = checkNotNull(deviceId);
587 this.toAdd = checkNotNull(toAdd);
588 this.toRemove = checkNotNull(list);
589 }
590
591 @Override
592 public void run() {
593 try {
594 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
595 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
596 // Following should be rewritten using async APIs
597 for (StoredFlowEntry entry : toAdd) {
598 final FlowId id = entry.id();
599 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
600 List<StoredFlowEntry> list = new ArrayList<>();
601 if (original != null) {
602 list.addAll(original);
603 }
604
605 list.remove(entry);
606 list.add(entry);
607
608 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
609 boolean success;
610 if (original == null) {
611 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
612 } else {
613 success = backupFlowTable.replace(id, original, newValue);
614 }
615 // TODO retry?
616 if (!success) {
617 log.error("Updating backup failed.");
618 }
619 }
620 for (FlowRule entry : toRemove) {
621 final FlowId id = entry.id();
622 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
623 List<StoredFlowEntry> list = new ArrayList<>();
624 if (original != null) {
625 list.addAll(original);
626 }
627
628 list.remove(entry);
629
630 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
631 boolean success;
632 if (original == null) {
633 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
634 } else {
635 success = backupFlowTable.replace(id, original, newValue);
636 }
637 // TODO retry?
638 if (!success) {
639 log.error("Updating backup failed.");
640 }
641 }
642 } catch (ExecutionException e) {
643 log.error("Failed to write to backups", e);
644 }
645
646 }
647 }
alshabib339a3d92014-09-26 17:54:32 -0700648}