blob: bd2742a7b8f620b74332d49a372fb282121f5306 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070040
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070044import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070046import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070047import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070048import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070049import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070050import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070051import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070052import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070053import org.onlab.onos.net.flow.DefaultFlowEntry;
54import org.onlab.onos.net.flow.FlowEntry;
55import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070056import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070057import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070058import org.onlab.onos.net.flow.FlowRuleBatchEntry;
59import org.onlab.onos.net.flow.FlowRuleBatchEvent;
60import org.onlab.onos.net.flow.FlowRuleBatchOperation;
61import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070062import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070063import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070064import org.onlab.onos.net.flow.FlowRuleEvent.Type;
65import org.onlab.onos.net.flow.FlowRuleStore;
66import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070067import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070068import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
69import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070070import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070071import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070072import org.onlab.onos.store.flow.ReplicaInfoEvent;
73import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070074import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070075import org.onlab.onos.store.hz.AbstractHazelcastStore;
76import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070077import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070078import org.onlab.onos.store.serializers.DistributedStoreSerializers;
79import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070080import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070081import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070082import org.slf4j.Logger;
83
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070084import com.google.common.cache.Cache;
85import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070086import com.google.common.cache.CacheLoader;
87import com.google.common.cache.LoadingCache;
alshabib339a3d92014-09-26 17:54:32 -070088import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070089import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070090import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070091import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070092import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070093import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094import com.google.common.util.concurrent.ListenableFuture;
95import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070096import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070097
98/**
Madan Jampani38b250d2014-10-17 11:02:38 -070099 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700100 */
alshabib339a3d92014-09-26 17:54:32 -0700101@Component(immediate = true)
102@Service
103public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700104 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700105 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700106
107 private final Logger log = getLogger(getClass());
108
109 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700110 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
111 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700112
alshabib339a3d92014-09-26 17:54:32 -0700113
Madan Jampani38b250d2014-10-17 11:02:38 -0700114 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700115 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700118 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
124 protected DeviceService deviceService;
125
126 private final AtomicInteger localBatchIdGen = new AtomicInteger();
127
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700128 // TODO: make this configurable
129 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700130
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700131 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
132 CacheBuilder.newBuilder()
133 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
134 // TODO Explicitly fail the future if expired?
135 //.removalListener(listener)
136 .build();
137
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700138 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
139
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700140
141 private final ExecutorService futureListeners =
142 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
143
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700144 private final ExecutorService backupExecutors =
145 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
146
147 // TODO make this configurable
148 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700149
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700150 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700151 @Override
152 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700153 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700154 .register(DistributedStoreSerializers.COMMON)
155 .build()
156 .populate(1);
157 }
158 };
159
160 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700161 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700162
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700163 private ReplicaInfoEventListener replicaInfoEventListener;
164
165 @Override
alshabib339a3d92014-09-26 17:54:32 -0700166 @Activate
167 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168
169 super.serializer = SERIALIZER;
170 super.theInstance = storeService.getHazelcastInstance();
171
172 // Cache to create SMap on demand
173 smaps = CacheBuilder.newBuilder()
174 .softValues()
175 .build(new SMapLoader());
176
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700177 final NodeId local = clusterService.getLocalNode().id();
178
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700179 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700180
181 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
182
183 @Override
184 public void handle(ClusterMessage message) {
185 FlowRule rule = SERIALIZER.decode(message.payload());
186 log.info("received get flow entry request for {}", rule);
187 FlowEntry flowEntry = getFlowEntryInternal(rule);
188 try {
189 message.respond(SERIALIZER.encode(flowEntry));
190 } catch (IOException e) {
191 log.error("Failed to respond back", e);
192 }
193 }
194 });
195
Madan Jampanif5fdef02014-10-23 21:58:10 -0700196 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
197
198 @Override
199 public void handle(ClusterMessage message) {
200 DeviceId deviceId = SERIALIZER.decode(message.payload());
201 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
202 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
203 try {
204 message.respond(SERIALIZER.encode(flowEntries));
205 } catch (IOException e) {
206 log.error("Failed to respond to peer's getFlowEntries request", e);
207 }
208 }
209 });
210
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700211 replicaInfoEventListener = new InternalReplicaInfoEventListener();
212
213 replicaInfoManager.addListener(replicaInfoEventListener);
214
alshabib339a3d92014-09-26 17:54:32 -0700215 log.info("Started");
216 }
217
218 @Deactivate
219 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700220 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700221 log.info("Stopped");
222 }
223
224
Madan Jampani117aaae2014-10-23 10:04:05 -0700225 // TODO: This is not a efficient operation on a distributed sharded
226 // flow store. We need to revisit the need for this operation or at least
227 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700228 @Override
tom9b4030d2014-10-06 10:39:03 -0700229 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700230 // implementing in-efficient operation for debugging purpose.
231 int sum = 0;
232 for (Device device : deviceService.getDevices()) {
233 final DeviceId did = device.id();
234 sum += Iterables.size(getFlowEntries(did));
235 }
236 return sum;
tom9b4030d2014-10-06 10:39:03 -0700237 }
238
239 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700240 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700241 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700242
243 if (!replicaInfo.master().isPresent()) {
244 log.warn("No master for {}", rule);
245 // TODO: revisit if this should be returning null.
246 // FIXME: throw a FlowStoreException
247 throw new RuntimeException("No master for " + rule);
248 }
249
Madan Jampani117aaae2014-10-23 10:04:05 -0700250 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
251 return getFlowEntryInternal(rule);
252 }
253
254 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
255 replicaInfo.master().orNull(), rule.deviceId());
256
257 ClusterMessage message = new ClusterMessage(
258 clusterService.getLocalNode().id(),
259 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
260 SERIALIZER.encode(rule));
261
262 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700263 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
264 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
265 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700266 // FIXME: throw a FlowStoreException
267 throw new RuntimeException(e);
268 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700269 }
270
271 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
272 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700273 if (f.equals(rule)) {
274 return f;
275 }
276 }
277 return null;
278 }
279
280 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700281 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700282
283 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700284
285 if (!replicaInfo.master().isPresent()) {
286 log.warn("No master for {}", deviceId);
287 // TODO: revisit if this should be returning empty collection.
288 // FIXME: throw a FlowStoreException
289 throw new RuntimeException("No master for " + deviceId);
290 }
291
Madan Jampanif5fdef02014-10-23 21:58:10 -0700292 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
293 return getFlowEntriesInternal(deviceId);
294 }
295
296 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
297 replicaInfo.master().orNull(), deviceId);
298
299 ClusterMessage message = new ClusterMessage(
300 clusterService.getLocalNode().id(),
301 GET_DEVICE_FLOW_ENTRIES,
302 SERIALIZER.encode(deviceId));
303
304 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700305 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
306 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
307 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700308 // FIXME: throw a FlowStoreException
309 throw new RuntimeException(e);
310 }
311 }
312
313 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700314 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700315 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700316 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700317 }
318 return ImmutableSet.copyOf(rules);
319 }
320
321 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700322 public void storeFlowRule(FlowRule rule) {
323 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
324 }
325
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700326 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700327 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700328 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700329
Madan Jampani117aaae2014-10-23 10:04:05 -0700330 if (operation.getOperations().isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700331 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700332 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700333
Madan Jampani117aaae2014-10-23 10:04:05 -0700334 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
335
336 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
337
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700338 if (!replicaInfo.master().isPresent()) {
339 log.warn("No master for {}", deviceId);
340 // TODO: revisit if this should be "success" from Future point of view
341 // with every FlowEntry failed
342 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
343 }
344
345 final NodeId local = clusterService.getLocalNode().id();
346 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700347 return storeBatchInternal(operation);
348 }
349
350 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
351 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700352
Madan Jampani38b250d2014-10-17 11:02:38 -0700353 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700354 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700355 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700356 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700357
358 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700359 ListenableFuture<byte[]> responseFuture =
360 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700361 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700362 } catch (IOException e) {
363 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700364 }
365 }
366
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700367 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700368 final List<StoredFlowEntry> toRemove = new ArrayList<>();
369 final List<StoredFlowEntry> toAdd = new ArrayList<>();
370 DeviceId did = null;
371
Madan Jampani117aaae2014-10-23 10:04:05 -0700372 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
373 FlowRule flowRule = batchEntry.getTarget();
374 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700375 if (did == null) {
376 did = flowRule.deviceId();
377 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700378 if (op.equals(FlowRuleOperation.REMOVE)) {
379 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
380 if (entry != null) {
381 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700382 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700384 } else if (op.equals(FlowRuleOperation.ADD)) {
385 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
386 DeviceId deviceId = flowRule.deviceId();
387 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
388 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700389 toAdd.add(flowEntry);
390 }
391 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700392 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700393 if (toAdd.isEmpty() && toRemove.isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700394 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700395 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700396
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700397 // create remote backup copies
398 final DeviceId deviceId = did;
399 updateBackup(deviceId, toAdd, toRemove);
400
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700401 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
402 final int batchId = localBatchIdGen.incrementAndGet();
403
404 pendingFutures.put(batchId, r);
405 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700406
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700407 return r;
alshabib339a3d92014-09-26 17:54:32 -0700408 }
409
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700410 private void updateBackup(final DeviceId deviceId,
411 final List<StoredFlowEntry> toAdd,
412 final List<? extends FlowRule> list) {
413
414 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
415
416 if (syncBackup) {
417 // wait for backup to complete
418 try {
419 submit.get();
420 } catch (InterruptedException | ExecutionException e) {
421 log.error("Failed to create backups", e);
422 }
423 }
424 }
425
426 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
427 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
428 }
429
alshabib339a3d92014-09-26 17:54:32 -0700430 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700431 public void deleteFlowRule(FlowRule rule) {
432 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700433 }
434
435 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700436 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
437 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700438 final NodeId localId = clusterService.getLocalNode().id();
439 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700440 return addOrUpdateFlowRuleInternal(rule);
441 }
442
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700443 log.error("Tried to update FlowRule {} state,"
444 + " while the Node was not the master.", rule);
445 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700446 }
447
448 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700449 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700450
451 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700452 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700453 if (stored != null) {
454 stored.setBytes(rule.bytes());
455 stored.setLife(rule.life());
456 stored.setPackets(rule.packets());
457 if (stored.state() == FlowEntryState.PENDING_ADD) {
458 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700459 // update backup.
460 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700461 return new FlowRuleEvent(Type.RULE_ADDED, rule);
462 }
alshabib339a3d92014-09-26 17:54:32 -0700463 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
464 }
465
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700466 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700467 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700468 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700469 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700470
alshabib339a3d92014-09-26 17:54:32 -0700471 }
472
473 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700474 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
475 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700476
477 final NodeId localId = clusterService.getLocalNode().id();
478 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700479 // bypass and handle it locally
480 return removeFlowRuleInternal(rule);
481 }
482
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700483 log.error("Tried to remove FlowRule {},"
484 + " while the Node was not the master.", rule);
485 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700486 }
487
488 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700489 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700490 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700491 final boolean removed = flowEntries.remove(deviceId, rule);
492 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
493 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700494 return new FlowRuleEvent(RULE_REMOVED, rule);
495 } else {
496 return null;
497 }
alshabib339a3d92014-09-26 17:54:32 -0700498 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700499
500 @Override
501 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700502 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700503 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700504 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700505 if (future != null) {
506 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700507 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700508 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700509 notifyDelegate(event);
510 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700511
512 private synchronized void loadFromBackup(final DeviceId did) {
513 // should relax synchronized condition
514
515 try {
516 log.info("Loading FlowRules for {} from backups", did);
517 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
518 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
519 : backupFlowTable.entrySet()) {
520
521 // TODO: should we be directly updating internal structure or
522 // should we be triggering event?
523 log.debug("loading {}", e.getValue());
524 for (StoredFlowEntry entry : e.getValue()) {
525 flowEntries.remove(did, entry);
526 flowEntries.put(did, entry);
527 }
528 }
529 } catch (ExecutionException e) {
530 log.error("Failed to load backup flowtable for {}", did, e);
531 }
532 }
533
534 private synchronized void removeFromPrimary(final DeviceId did) {
535 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
536 log.debug("removedFromPrimary {}", removed);
537 }
538
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700539 private final class OnStoreBatch implements ClusterMessageHandler {
540 private final NodeId local;
541
542 private OnStoreBatch(NodeId local) {
543 this.local = local;
544 }
545
546 @Override
547 public void handle(final ClusterMessage message) {
548 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
549 log.info("received batch request {}", operation);
550
551 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
552 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
553 if (!local.equals(replicaInfo.master().orNull())) {
554
555 Set<FlowRule> failures = new HashSet<>(operation.size());
556 for (FlowRuleBatchEntry op : operation.getOperations()) {
557 failures.add(op.getTarget());
558 }
559 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
560 // This node is no longer the master, respond as all failed.
561 // TODO: we might want to wrap response in envelope
562 // to distinguish sw programming failure and hand over
563 // it make sense in the latter case to retry immediately.
564 try {
565 message.respond(SERIALIZER.encode(allFailed));
566 } catch (IOException e) {
567 log.error("Failed to respond back", e);
568 }
569 return;
570 }
571
572 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
573
574 f.addListener(new Runnable() {
575
576 @Override
577 public void run() {
578 CompletedBatchOperation result = Futures.getUnchecked(f);
579 try {
580 message.respond(SERIALIZER.encode(result));
581 } catch (IOException e) {
582 log.error("Failed to respond back", e);
583 }
584 }
585 }, futureListeners);
586 }
587 }
588
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700589 private final class SMapLoader
590 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
591
592 @Override
593 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
594 throws Exception {
595 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
596 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
597 }
598 }
599
600 private final class InternalReplicaInfoEventListener
601 implements ReplicaInfoEventListener {
602
603 @Override
604 public void event(ReplicaInfoEvent event) {
605 final NodeId local = clusterService.getLocalNode().id();
606 final DeviceId did = event.subject();
607 final ReplicaInfo rInfo = event.replicaInfo();
608
609 switch (event.type()) {
610 case MASTER_CHANGED:
611 if (local.equals(rInfo.master().orNull())) {
612 // This node is the new master, populate local structure
613 // from backup
614 loadFromBackup(did);
615 } else {
616 // This node is no longer the master holder,
617 // clean local structure
618 removeFromPrimary(did);
619 // FIXME: probably should stop pending backup activities in
620 // executors to avoid overwriting with old value
621 }
622 break;
623 default:
624 break;
625
626 }
627 }
628 }
629
630 // Task to update FlowEntries in backup HZ store
631 private final class UpdateBackup implements Runnable {
632
633 private final DeviceId deviceId;
634 private final List<StoredFlowEntry> toAdd;
635 private final List<? extends FlowRule> toRemove;
636
637 public UpdateBackup(DeviceId deviceId,
638 List<StoredFlowEntry> toAdd,
639 List<? extends FlowRule> list) {
640 this.deviceId = checkNotNull(deviceId);
641 this.toAdd = checkNotNull(toAdd);
642 this.toRemove = checkNotNull(list);
643 }
644
645 @Override
646 public void run() {
647 try {
648 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
649 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
650 // Following should be rewritten using async APIs
651 for (StoredFlowEntry entry : toAdd) {
652 final FlowId id = entry.id();
653 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
654 List<StoredFlowEntry> list = new ArrayList<>();
655 if (original != null) {
656 list.addAll(original);
657 }
658
659 list.remove(entry);
660 list.add(entry);
661
662 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
663 boolean success;
664 if (original == null) {
665 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
666 } else {
667 success = backupFlowTable.replace(id, original, newValue);
668 }
669 // TODO retry?
670 if (!success) {
671 log.error("Updating backup failed.");
672 }
673 }
674 for (FlowRule entry : toRemove) {
675 final FlowId id = entry.id();
676 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
677 List<StoredFlowEntry> list = new ArrayList<>();
678 if (original != null) {
679 list.addAll(original);
680 }
681
682 list.remove(entry);
683
684 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
685 boolean success;
686 if (original == null) {
687 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
688 } else {
689 success = backupFlowTable.replace(id, original, newValue);
690 }
691 // TODO retry?
692 if (!success) {
693 log.error("Updating backup failed.");
694 }
695 }
696 } catch (ExecutionException e) {
697 log.error("Failed to write to backups", e);
698 }
699
700 }
701 }
alshabib339a3d92014-09-26 17:54:32 -0700702}