blob: ca3b29c3a7a7a062f155ec3ed9e069daff251b79 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070040
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070044import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070046import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070047import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070048import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070049import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070050import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070051import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070052import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070053import org.onlab.onos.net.flow.DefaultFlowEntry;
54import org.onlab.onos.net.flow.FlowEntry;
55import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070056import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070057import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070058import org.onlab.onos.net.flow.FlowRuleBatchEntry;
59import org.onlab.onos.net.flow.FlowRuleBatchEvent;
60import org.onlab.onos.net.flow.FlowRuleBatchOperation;
61import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070062import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070063import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070064import org.onlab.onos.net.flow.FlowRuleEvent.Type;
65import org.onlab.onos.net.flow.FlowRuleStore;
66import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070067import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070068import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
69import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070070import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070071import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070072import org.onlab.onos.store.flow.ReplicaInfoEvent;
73import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070074import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070075import org.onlab.onos.store.hz.AbstractHazelcastStore;
76import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070077import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070078import org.onlab.onos.store.serializers.DistributedStoreSerializers;
79import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070080import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070081import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070082import org.slf4j.Logger;
83
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070084import com.google.common.cache.Cache;
85import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070086import com.google.common.cache.CacheLoader;
87import com.google.common.cache.LoadingCache;
alshabib339a3d92014-09-26 17:54:32 -070088import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070089import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070090import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070091import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070092import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070093import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094import com.google.common.util.concurrent.ListenableFuture;
95import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070096import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070097
98/**
Madan Jampani38b250d2014-10-17 11:02:38 -070099 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700100 */
alshabib339a3d92014-09-26 17:54:32 -0700101@Component(immediate = true)
102@Service
103public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700104 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700105 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700106
107 private final Logger log = getLogger(getClass());
108
109 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700110 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
111 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700112
alshabib339a3d92014-09-26 17:54:32 -0700113
Madan Jampani38b250d2014-10-17 11:02:38 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700115 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700118 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected DeviceService deviceService;
125
126 private final AtomicInteger localBatchIdGen = new AtomicInteger();
127
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700128 // TODO: make this configurable
129 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700130
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700131 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
132 CacheBuilder.newBuilder()
133 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
134 // TODO Explicitly fail the future if expired?
135 //.removalListener(listener)
136 .build();
137
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700138 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
139
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700140
141 private final ExecutorService futureListeners =
142 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
143
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700144 private final ExecutorService backupExecutors =
145 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
146
147 // TODO make this configurable
148 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700149
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700150 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700151 @Override
152 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700153 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700154 .register(DistributedStoreSerializers.COMMON)
155 .build()
156 .populate(1);
157 }
158 };
159
160 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700161 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700162
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700163 private ReplicaInfoEventListener replicaInfoEventListener;
164
165 @Override
alshabib339a3d92014-09-26 17:54:32 -0700166 @Activate
167 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168
169 super.serializer = SERIALIZER;
170 super.theInstance = storeService.getHazelcastInstance();
171
172 // Cache to create SMap on demand
173 smaps = CacheBuilder.newBuilder()
174 .softValues()
175 .build(new SMapLoader());
176
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700177 final NodeId local = clusterService.getLocalNode().id();
178
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700179 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700180
181 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
182
183 @Override
184 public void handle(ClusterMessage message) {
185 FlowRule rule = SERIALIZER.decode(message.payload());
186 log.info("received get flow entry request for {}", rule);
187 FlowEntry flowEntry = getFlowEntryInternal(rule);
188 try {
189 message.respond(SERIALIZER.encode(flowEntry));
190 } catch (IOException e) {
191 log.error("Failed to respond back", e);
192 }
193 }
194 });
195
Madan Jampanif5fdef02014-10-23 21:58:10 -0700196 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
197
198 @Override
199 public void handle(ClusterMessage message) {
200 DeviceId deviceId = SERIALIZER.decode(message.payload());
201 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
202 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
203 try {
204 message.respond(SERIALIZER.encode(flowEntries));
205 } catch (IOException e) {
206 log.error("Failed to respond to peer's getFlowEntries request", e);
207 }
208 }
209 });
210
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700211 replicaInfoEventListener = new InternalReplicaInfoEventListener();
212
213 replicaInfoManager.addListener(replicaInfoEventListener);
214
alshabib339a3d92014-09-26 17:54:32 -0700215 log.info("Started");
216 }
217
218 @Deactivate
219 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700220 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700221 log.info("Stopped");
222 }
223
224
Madan Jampani117aaae2014-10-23 10:04:05 -0700225 // TODO: This is not a efficient operation on a distributed sharded
226 // flow store. We need to revisit the need for this operation or at least
227 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700228 @Override
tom9b4030d2014-10-06 10:39:03 -0700229 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700230 // implementing in-efficient operation for debugging purpose.
231 int sum = 0;
232 for (Device device : deviceService.getDevices()) {
233 final DeviceId did = device.id();
234 sum += Iterables.size(getFlowEntries(did));
235 }
236 return sum;
tom9b4030d2014-10-06 10:39:03 -0700237 }
238
239 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700240 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700241 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700242
243 if (!replicaInfo.master().isPresent()) {
244 log.warn("No master for {}", rule);
245 // TODO: revisit if this should be returning null.
246 // FIXME: throw a FlowStoreException
247 throw new RuntimeException("No master for " + rule);
248 }
249
Madan Jampani117aaae2014-10-23 10:04:05 -0700250 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
251 return getFlowEntryInternal(rule);
252 }
253
254 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
255 replicaInfo.master().orNull(), rule.deviceId());
256
257 ClusterMessage message = new ClusterMessage(
258 clusterService.getLocalNode().id(),
259 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
260 SERIALIZER.encode(rule));
261
262 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700263 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
264 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
265 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700266 // FIXME: throw a FlowStoreException
267 throw new RuntimeException(e);
268 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700269 }
270
271 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
272 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700273 if (f.equals(rule)) {
274 return f;
275 }
276 }
277 return null;
278 }
279
280 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700281 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700282
283 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700284
285 if (!replicaInfo.master().isPresent()) {
286 log.warn("No master for {}", deviceId);
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700287 // TODO: revisit if this should be returning empty collection or throwing exception.
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700288 // FIXME: throw a FlowStoreException
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700289 //throw new RuntimeException("No master for " + deviceId);
290 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700291 }
292
Madan Jampanif5fdef02014-10-23 21:58:10 -0700293 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
294 return getFlowEntriesInternal(deviceId);
295 }
296
297 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
298 replicaInfo.master().orNull(), deviceId);
299
300 ClusterMessage message = new ClusterMessage(
301 clusterService.getLocalNode().id(),
302 GET_DEVICE_FLOW_ENTRIES,
303 SERIALIZER.encode(deviceId));
304
305 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700306 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
307 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
308 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700309 // FIXME: throw a FlowStoreException
310 throw new RuntimeException(e);
311 }
312 }
313
314 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700315 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700316 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700317 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700318 }
319 return ImmutableSet.copyOf(rules);
320 }
321
322 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700323 public void storeFlowRule(FlowRule rule) {
324 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
325 }
326
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700327 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700328 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700329 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700330
Madan Jampani117aaae2014-10-23 10:04:05 -0700331 if (operation.getOperations().isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700332 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700333 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700334
Madan Jampani117aaae2014-10-23 10:04:05 -0700335 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
336
337 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
338
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700339 if (!replicaInfo.master().isPresent()) {
340 log.warn("No master for {}", deviceId);
341 // TODO: revisit if this should be "success" from Future point of view
342 // with every FlowEntry failed
343 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
344 }
345
346 final NodeId local = clusterService.getLocalNode().id();
347 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700348 return storeBatchInternal(operation);
349 }
350
351 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
352 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700353
Madan Jampani38b250d2014-10-17 11:02:38 -0700354 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700355 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700356 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700357 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700358
359 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700360 ListenableFuture<byte[]> responseFuture =
361 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700362 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700363 } catch (IOException e) {
364 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700365 }
366 }
367
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700368 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700369 final List<StoredFlowEntry> toRemove = new ArrayList<>();
370 final List<StoredFlowEntry> toAdd = new ArrayList<>();
371 DeviceId did = null;
372
Madan Jampani117aaae2014-10-23 10:04:05 -0700373 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
374 FlowRule flowRule = batchEntry.getTarget();
375 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700376 if (did == null) {
377 did = flowRule.deviceId();
378 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700379 if (op.equals(FlowRuleOperation.REMOVE)) {
380 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
381 if (entry != null) {
382 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700383 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700384 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700385 } else if (op.equals(FlowRuleOperation.ADD)) {
386 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
387 DeviceId deviceId = flowRule.deviceId();
388 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
389 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700390 toAdd.add(flowEntry);
391 }
392 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700393 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700394 if (toAdd.isEmpty() && toRemove.isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700395 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700396 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700397
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700398 // create remote backup copies
399 final DeviceId deviceId = did;
400 updateBackup(deviceId, toAdd, toRemove);
401
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700402 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
403 final int batchId = localBatchIdGen.incrementAndGet();
404
405 pendingFutures.put(batchId, r);
406 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700407
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700408 return r;
alshabib339a3d92014-09-26 17:54:32 -0700409 }
410
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700411 private void updateBackup(final DeviceId deviceId,
412 final List<StoredFlowEntry> toAdd,
413 final List<? extends FlowRule> list) {
414
415 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
416
417 if (syncBackup) {
418 // wait for backup to complete
419 try {
420 submit.get();
421 } catch (InterruptedException | ExecutionException e) {
422 log.error("Failed to create backups", e);
423 }
424 }
425 }
426
427 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
428 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
429 }
430
alshabib339a3d92014-09-26 17:54:32 -0700431 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700432 public void deleteFlowRule(FlowRule rule) {
433 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700434 }
435
436 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700437 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
438 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700439 final NodeId localId = clusterService.getLocalNode().id();
440 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700441 return addOrUpdateFlowRuleInternal(rule);
442 }
443
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700444 log.error("Tried to update FlowRule {} state,"
445 + " while the Node was not the master.", rule);
446 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700447 }
448
449 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700450 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700451
452 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700453 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700454 if (stored != null) {
455 stored.setBytes(rule.bytes());
456 stored.setLife(rule.life());
457 stored.setPackets(rule.packets());
458 if (stored.state() == FlowEntryState.PENDING_ADD) {
459 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700460 // update backup.
461 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700462 return new FlowRuleEvent(Type.RULE_ADDED, rule);
463 }
alshabib339a3d92014-09-26 17:54:32 -0700464 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
465 }
466
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700467 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700468 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700469 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700470 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700471
alshabib339a3d92014-09-26 17:54:32 -0700472 }
473
474 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700475 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
476 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700477
478 final NodeId localId = clusterService.getLocalNode().id();
479 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700480 // bypass and handle it locally
481 return removeFlowRuleInternal(rule);
482 }
483
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700484 log.error("Tried to remove FlowRule {},"
485 + " while the Node was not the master.", rule);
486 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700487 }
488
489 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700490 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700491 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700492 final boolean removed = flowEntries.remove(deviceId, rule);
493 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
494 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700495 return new FlowRuleEvent(RULE_REMOVED, rule);
496 } else {
497 return null;
498 }
alshabib339a3d92014-09-26 17:54:32 -0700499 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700500
501 @Override
502 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700503 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700504 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700505 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700506 if (future != null) {
507 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700508 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700509 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700510 notifyDelegate(event);
511 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700512
513 private synchronized void loadFromBackup(final DeviceId did) {
514 // should relax synchronized condition
515
516 try {
517 log.info("Loading FlowRules for {} from backups", did);
518 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
519 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
520 : backupFlowTable.entrySet()) {
521
522 // TODO: should we be directly updating internal structure or
523 // should we be triggering event?
524 log.debug("loading {}", e.getValue());
525 for (StoredFlowEntry entry : e.getValue()) {
526 flowEntries.remove(did, entry);
527 flowEntries.put(did, entry);
528 }
529 }
530 } catch (ExecutionException e) {
531 log.error("Failed to load backup flowtable for {}", did, e);
532 }
533 }
534
535 private synchronized void removeFromPrimary(final DeviceId did) {
536 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
537 log.debug("removedFromPrimary {}", removed);
538 }
539
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700540 private final class OnStoreBatch implements ClusterMessageHandler {
541 private final NodeId local;
542
543 private OnStoreBatch(NodeId local) {
544 this.local = local;
545 }
546
547 @Override
548 public void handle(final ClusterMessage message) {
549 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
550 log.info("received batch request {}", operation);
551
552 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
553 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
554 if (!local.equals(replicaInfo.master().orNull())) {
555
556 Set<FlowRule> failures = new HashSet<>(operation.size());
557 for (FlowRuleBatchEntry op : operation.getOperations()) {
558 failures.add(op.getTarget());
559 }
560 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
561 // This node is no longer the master, respond as all failed.
562 // TODO: we might want to wrap response in envelope
563 // to distinguish sw programming failure and hand over
564 // it make sense in the latter case to retry immediately.
565 try {
566 message.respond(SERIALIZER.encode(allFailed));
567 } catch (IOException e) {
568 log.error("Failed to respond back", e);
569 }
570 return;
571 }
572
573 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
574
575 f.addListener(new Runnable() {
576
577 @Override
578 public void run() {
579 CompletedBatchOperation result = Futures.getUnchecked(f);
580 try {
581 message.respond(SERIALIZER.encode(result));
582 } catch (IOException e) {
583 log.error("Failed to respond back", e);
584 }
585 }
586 }, futureListeners);
587 }
588 }
589
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700590 private final class SMapLoader
591 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
592
593 @Override
594 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
595 throws Exception {
596 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
597 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
598 }
599 }
600
601 private final class InternalReplicaInfoEventListener
602 implements ReplicaInfoEventListener {
603
604 @Override
605 public void event(ReplicaInfoEvent event) {
606 final NodeId local = clusterService.getLocalNode().id();
607 final DeviceId did = event.subject();
608 final ReplicaInfo rInfo = event.replicaInfo();
609
610 switch (event.type()) {
611 case MASTER_CHANGED:
612 if (local.equals(rInfo.master().orNull())) {
613 // This node is the new master, populate local structure
614 // from backup
615 loadFromBackup(did);
616 } else {
617 // This node is no longer the master holder,
618 // clean local structure
619 removeFromPrimary(did);
620 // FIXME: probably should stop pending backup activities in
621 // executors to avoid overwriting with old value
622 }
623 break;
624 default:
625 break;
626
627 }
628 }
629 }
630
631 // Task to update FlowEntries in backup HZ store
632 private final class UpdateBackup implements Runnable {
633
634 private final DeviceId deviceId;
635 private final List<StoredFlowEntry> toAdd;
636 private final List<? extends FlowRule> toRemove;
637
638 public UpdateBackup(DeviceId deviceId,
639 List<StoredFlowEntry> toAdd,
640 List<? extends FlowRule> list) {
641 this.deviceId = checkNotNull(deviceId);
642 this.toAdd = checkNotNull(toAdd);
643 this.toRemove = checkNotNull(list);
644 }
645
646 @Override
647 public void run() {
648 try {
649 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
650 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
651 // Following should be rewritten using async APIs
652 for (StoredFlowEntry entry : toAdd) {
653 final FlowId id = entry.id();
654 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
655 List<StoredFlowEntry> list = new ArrayList<>();
656 if (original != null) {
657 list.addAll(original);
658 }
659
660 list.remove(entry);
661 list.add(entry);
662
663 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
664 boolean success;
665 if (original == null) {
666 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
667 } else {
668 success = backupFlowTable.replace(id, original, newValue);
669 }
670 // TODO retry?
671 if (!success) {
672 log.error("Updating backup failed.");
673 }
674 }
675 for (FlowRule entry : toRemove) {
676 final FlowId id = entry.id();
677 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
678 List<StoredFlowEntry> list = new ArrayList<>();
679 if (original != null) {
680 list.addAll(original);
681 }
682
683 list.remove(entry);
684
685 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
686 boolean success;
687 if (original == null) {
688 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
689 } else {
690 success = backupFlowTable.replace(id, original, newValue);
691 }
692 // TODO retry?
693 if (!success) {
694 log.error("Updating backup failed.");
695 }
696 }
697 } catch (ExecutionException e) {
698 log.error("Failed to write to backups", e);
699 }
700
701 }
702 }
alshabib339a3d92014-09-26 17:54:32 -0700703}