blob: 7a917a74eeb3d6e36b4ab1549955d24b396737ba [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Madan Jampani117aaae2014-10-23 10:04:05 -070039import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070040
41import org.apache.felix.scr.annotations.Activate;
42import org.apache.felix.scr.annotations.Component;
43import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070044import org.apache.felix.scr.annotations.Reference;
45import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070046import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070047import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070048import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070049import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070050import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070051import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070052import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070053import org.onlab.onos.net.flow.DefaultFlowEntry;
54import org.onlab.onos.net.flow.FlowEntry;
55import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070056import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070057import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070058import org.onlab.onos.net.flow.FlowRuleBatchEntry;
59import org.onlab.onos.net.flow.FlowRuleBatchEvent;
60import org.onlab.onos.net.flow.FlowRuleBatchOperation;
61import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070062import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070063import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070064import org.onlab.onos.net.flow.FlowRuleEvent.Type;
65import org.onlab.onos.net.flow.FlowRuleStore;
66import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070067import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070068import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
69import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070070import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070071import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070072import org.onlab.onos.store.flow.ReplicaInfoEvent;
73import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070074import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070075import org.onlab.onos.store.hz.AbstractHazelcastStore;
76import org.onlab.onos.store.hz.SMap;
Madan Jampani38b250d2014-10-17 11:02:38 -070077import org.onlab.onos.store.serializers.DistributedStoreSerializers;
78import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070079import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070080import org.slf4j.Logger;
81
Madan Jampani24f9efb2014-10-24 18:56:23 -070082import com.google.common.base.Function;
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070083import com.google.common.cache.Cache;
84import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070085import com.google.common.cache.CacheLoader;
86import com.google.common.cache.LoadingCache;
alshabib339a3d92014-09-26 17:54:32 -070087import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070088import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070089import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070090import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070091import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070092import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070093import com.google.common.util.concurrent.ListenableFuture;
94import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070095import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -070096
97/**
Madan Jampani38b250d2014-10-17 11:02:38 -070098 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070099 */
alshabib339a3d92014-09-26 17:54:32 -0700100@Component(immediate = true)
101@Service
102public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700103 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700104 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700105
106 private final Logger log = getLogger(getClass());
107
108 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700109 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
110 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700111
alshabib339a3d92014-09-26 17:54:32 -0700112
Madan Jampani38b250d2014-10-17 11:02:38 -0700113 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700114 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700117 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700120 protected ClusterService clusterService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
123 protected DeviceService deviceService;
124
125 private final AtomicInteger localBatchIdGen = new AtomicInteger();
126
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700127 // TODO: make this configurable
128 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700129
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700130 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
131 CacheBuilder.newBuilder()
132 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
133 // TODO Explicitly fail the future if expired?
134 //.removalListener(listener)
135 .build();
136
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700137 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
138
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700139
140 private final ExecutorService futureListeners =
141 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
142
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700143 private final ExecutorService backupExecutors =
144 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
145
146 // TODO make this configurable
147 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700148
149 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
150 @Override
151 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700152 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700153 .register(DistributedStoreSerializers.COMMON)
154 .build()
155 .populate(1);
156 }
157 };
158
159 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700160 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700161
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700162 private ReplicaInfoEventListener replicaInfoEventListener;
163
164 @Override
alshabib339a3d92014-09-26 17:54:32 -0700165 @Activate
166 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700167
168 super.serializer = SERIALIZER;
169 super.theInstance = storeService.getHazelcastInstance();
170
171 // Cache to create SMap on demand
172 smaps = CacheBuilder.newBuilder()
173 .softValues()
174 .build(new SMapLoader());
175
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700176 final NodeId local = clusterService.getLocalNode().id();
177
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700178 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new ClusterMessageHandler() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700179
180 @Override
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700181 public void handle(final ClusterMessage message) {
182 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
183 log.info("received batch request {}", operation);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700184
185 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
186 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
187 if (!local.equals(replicaInfo.master().orNull())) {
188
189 Set<FlowRule> failures = new HashSet<>(operation.size());
190 for (FlowRuleBatchEntry op : operation.getOperations()) {
191 failures.add(op.getTarget());
192 }
193 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
194 // This node is no longer the master, respond as all failed.
195 // TODO: we might want to wrap response in envelope
196 // to distinguish sw programming failure and hand over
197 // it make sense in the latter case to retry immediately.
198 try {
199 message.respond(SERIALIZER.encode(allFailed));
200 } catch (IOException e) {
201 log.error("Failed to respond back", e);
202 }
203 return;
204 }
205
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700206 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700207
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700208 f.addListener(new Runnable() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700209
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700210 @Override
211 public void run() {
212 CompletedBatchOperation result = Futures.getUnchecked(f);
213 try {
214 message.respond(SERIALIZER.encode(result));
215 } catch (IOException e) {
216 log.error("Failed to respond back", e);
217 }
218 }
219 }, futureListeners);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700220 }
221 });
Madan Jampani117aaae2014-10-23 10:04:05 -0700222
223 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
224
225 @Override
226 public void handle(ClusterMessage message) {
227 FlowRule rule = SERIALIZER.decode(message.payload());
228 log.info("received get flow entry request for {}", rule);
229 FlowEntry flowEntry = getFlowEntryInternal(rule);
230 try {
231 message.respond(SERIALIZER.encode(flowEntry));
232 } catch (IOException e) {
233 log.error("Failed to respond back", e);
234 }
235 }
236 });
237
Madan Jampanif5fdef02014-10-23 21:58:10 -0700238 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
239
240 @Override
241 public void handle(ClusterMessage message) {
242 DeviceId deviceId = SERIALIZER.decode(message.payload());
243 log.info("Received get flow entries request for {} from {}", deviceId, message.sender());
244 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
245 try {
246 message.respond(SERIALIZER.encode(flowEntries));
247 } catch (IOException e) {
248 log.error("Failed to respond to peer's getFlowEntries request", e);
249 }
250 }
251 });
252
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700253 replicaInfoEventListener = new InternalReplicaInfoEventListener();
254
255 replicaInfoManager.addListener(replicaInfoEventListener);
256
alshabib339a3d92014-09-26 17:54:32 -0700257 log.info("Started");
258 }
259
260 @Deactivate
261 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700262 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700263 log.info("Stopped");
264 }
265
266
Madan Jampani117aaae2014-10-23 10:04:05 -0700267 // TODO: This is not a efficient operation on a distributed sharded
268 // flow store. We need to revisit the need for this operation or at least
269 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700270 @Override
tom9b4030d2014-10-06 10:39:03 -0700271 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700272 // implementing in-efficient operation for debugging purpose.
273 int sum = 0;
274 for (Device device : deviceService.getDevices()) {
275 final DeviceId did = device.id();
276 sum += Iterables.size(getFlowEntries(did));
277 }
278 return sum;
tom9b4030d2014-10-06 10:39:03 -0700279 }
280
281 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700282 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700283 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700284
285 if (!replicaInfo.master().isPresent()) {
286 log.warn("No master for {}", rule);
287 // TODO: revisit if this should be returning null.
288 // FIXME: throw a FlowStoreException
289 throw new RuntimeException("No master for " + rule);
290 }
291
Madan Jampani117aaae2014-10-23 10:04:05 -0700292 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
293 return getFlowEntryInternal(rule);
294 }
295
296 log.info("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
297 replicaInfo.master().orNull(), rule.deviceId());
298
299 ClusterMessage message = new ClusterMessage(
300 clusterService.getLocalNode().id(),
301 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
302 SERIALIZER.encode(rule));
303
304 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700305 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
306 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
307 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700308 // FIXME: throw a FlowStoreException
309 throw new RuntimeException(e);
310 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700311 }
312
313 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
314 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700315 if (f.equals(rule)) {
316 return f;
317 }
318 }
319 return null;
320 }
321
322 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700323 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700324
325 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700326
327 if (!replicaInfo.master().isPresent()) {
328 log.warn("No master for {}", deviceId);
329 // TODO: revisit if this should be returning empty collection.
330 // FIXME: throw a FlowStoreException
331 throw new RuntimeException("No master for " + deviceId);
332 }
333
Madan Jampanif5fdef02014-10-23 21:58:10 -0700334 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
335 return getFlowEntriesInternal(deviceId);
336 }
337
338 log.info("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
339 replicaInfo.master().orNull(), deviceId);
340
341 ClusterMessage message = new ClusterMessage(
342 clusterService.getLocalNode().id(),
343 GET_DEVICE_FLOW_ENTRIES,
344 SERIALIZER.encode(deviceId));
345
346 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700347 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
348 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
349 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700350 // FIXME: throw a FlowStoreException
351 throw new RuntimeException(e);
352 }
353 }
354
355 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700356 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700357 if (rules == null) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700358 return Collections.emptySet();
alshabib339a3d92014-09-26 17:54:32 -0700359 }
360 return ImmutableSet.copyOf(rules);
361 }
362
363 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700364 public void storeFlowRule(FlowRule rule) {
365 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
366 }
367
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700368 // FIXME document that all of the FlowEntries must be about same device
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700369 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700370 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700371
Madan Jampani117aaae2014-10-23 10:04:05 -0700372 if (operation.getOperations().isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700373 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700374 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700375
Madan Jampani117aaae2014-10-23 10:04:05 -0700376 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
377
378 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
379
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700380 if (!replicaInfo.master().isPresent()) {
381 log.warn("No master for {}", deviceId);
382 // TODO: revisit if this should be "success" from Future point of view
383 // with every FlowEntry failed
384 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
385 }
386
387 final NodeId local = clusterService.getLocalNode().id();
388 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700389 return storeBatchInternal(operation);
390 }
391
392 log.info("Forwarding storeBatch to {}, which is the primary (master) for device {}",
393 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700394
Madan Jampani38b250d2014-10-17 11:02:38 -0700395 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700396 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700397 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700398 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700399
400 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700401 ListenableFuture<byte[]> responseFuture =
402 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
403 return Futures.transform(responseFuture, new Function<byte[], CompletedBatchOperation>() {
404 @Override
405 public CompletedBatchOperation apply(byte[] input) {
406 return SERIALIZER.decode(input);
407 }
408 });
409 } catch (IOException e) {
410 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700411 }
412 }
413
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700414 private ListenableFuture<CompletedBatchOperation> storeBatchInternal(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700415 final List<StoredFlowEntry> toRemove = new ArrayList<>();
416 final List<StoredFlowEntry> toAdd = new ArrayList<>();
417 DeviceId did = null;
418
Madan Jampani117aaae2014-10-23 10:04:05 -0700419 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
420 FlowRule flowRule = batchEntry.getTarget();
421 FlowRuleOperation op = batchEntry.getOperator();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700422 if (did == null) {
423 did = flowRule.deviceId();
424 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700425 if (op.equals(FlowRuleOperation.REMOVE)) {
426 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
427 if (entry != null) {
428 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700429 toRemove.add(entry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700430 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700431 } else if (op.equals(FlowRuleOperation.ADD)) {
432 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
433 DeviceId deviceId = flowRule.deviceId();
434 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
435 flowEntries.put(deviceId, flowEntry);
Madan Jampani117aaae2014-10-23 10:04:05 -0700436 toAdd.add(flowEntry);
437 }
438 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700439 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700440 if (toAdd.isEmpty() && toRemove.isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700441 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
Madan Jampani117aaae2014-10-23 10:04:05 -0700442 }
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700443
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700444 // create remote backup copies
445 final DeviceId deviceId = did;
446 updateBackup(deviceId, toAdd, toRemove);
447
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700448 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
449 final int batchId = localBatchIdGen.incrementAndGet();
450
451 pendingFutures.put(batchId, r);
452 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700453
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700454 return r;
alshabib339a3d92014-09-26 17:54:32 -0700455 }
456
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700457 private void updateBackup(final DeviceId deviceId,
458 final List<StoredFlowEntry> toAdd,
459 final List<? extends FlowRule> list) {
460
461 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
462
463 if (syncBackup) {
464 // wait for backup to complete
465 try {
466 submit.get();
467 } catch (InterruptedException | ExecutionException e) {
468 log.error("Failed to create backups", e);
469 }
470 }
471 }
472
473 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
474 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
475 }
476
alshabib339a3d92014-09-26 17:54:32 -0700477 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700478 public void deleteFlowRule(FlowRule rule) {
479 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700480 }
481
482 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700483 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
484 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700485 final NodeId localId = clusterService.getLocalNode().id();
486 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700487 return addOrUpdateFlowRuleInternal(rule);
488 }
489
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700490 log.error("Tried to update FlowRule {} state,"
491 + " while the Node was not the master.", rule);
492 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700493 }
494
495 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700496 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700497
498 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700499 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700500 if (stored != null) {
501 stored.setBytes(rule.bytes());
502 stored.setLife(rule.life());
503 stored.setPackets(rule.packets());
504 if (stored.state() == FlowEntryState.PENDING_ADD) {
505 stored.setState(FlowEntryState.ADDED);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700506 // update backup.
507 updateBackup(did, Arrays.asList(stored));
alshabib1c319ff2014-10-04 20:29:09 -0700508 return new FlowRuleEvent(Type.RULE_ADDED, rule);
509 }
alshabib339a3d92014-09-26 17:54:32 -0700510 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
511 }
512
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700513 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700514 // TODO: also update backup.
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700515 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700516 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700517
alshabib339a3d92014-09-26 17:54:32 -0700518 }
519
520 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700521 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
522 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700523
524 final NodeId localId = clusterService.getLocalNode().id();
525 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700526 // bypass and handle it locally
527 return removeFlowRuleInternal(rule);
528 }
529
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700530 log.error("Tried to remove FlowRule {},"
531 + " while the Node was not the master.", rule);
532 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700533 }
534
535 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700536 final DeviceId deviceId = rule.deviceId();
alshabib1c319ff2014-10-04 20:29:09 -0700537 // This is where one could mark a rule as removed and still keep it in the store.
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700538 final boolean removed = flowEntries.remove(deviceId, rule);
539 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
540 if (removed) {
alshabib339a3d92014-09-26 17:54:32 -0700541 return new FlowRuleEvent(RULE_REMOVED, rule);
542 } else {
543 return null;
544 }
alshabib339a3d92014-09-26 17:54:32 -0700545 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700546
547 @Override
548 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700549 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700550 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700551 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700552 if (future != null) {
553 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700554 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700555 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700556 notifyDelegate(event);
557 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700558
559 private synchronized void loadFromBackup(final DeviceId did) {
560 // should relax synchronized condition
561
562 try {
563 log.info("Loading FlowRules for {} from backups", did);
564 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
565 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
566 : backupFlowTable.entrySet()) {
567
568 // TODO: should we be directly updating internal structure or
569 // should we be triggering event?
570 log.debug("loading {}", e.getValue());
571 for (StoredFlowEntry entry : e.getValue()) {
572 flowEntries.remove(did, entry);
573 flowEntries.put(did, entry);
574 }
575 }
576 } catch (ExecutionException e) {
577 log.error("Failed to load backup flowtable for {}", did, e);
578 }
579 }
580
581 private synchronized void removeFromPrimary(final DeviceId did) {
582 Collection<StoredFlowEntry> removed = flowEntries.removeAll(did);
583 log.debug("removedFromPrimary {}", removed);
584 }
585
586 private final class SMapLoader
587 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
588
589 @Override
590 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
591 throws Exception {
592 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
593 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
594 }
595 }
596
597 private final class InternalReplicaInfoEventListener
598 implements ReplicaInfoEventListener {
599
600 @Override
601 public void event(ReplicaInfoEvent event) {
602 final NodeId local = clusterService.getLocalNode().id();
603 final DeviceId did = event.subject();
604 final ReplicaInfo rInfo = event.replicaInfo();
605
606 switch (event.type()) {
607 case MASTER_CHANGED:
608 if (local.equals(rInfo.master().orNull())) {
609 // This node is the new master, populate local structure
610 // from backup
611 loadFromBackup(did);
612 } else {
613 // This node is no longer the master holder,
614 // clean local structure
615 removeFromPrimary(did);
616 // FIXME: probably should stop pending backup activities in
617 // executors to avoid overwriting with old value
618 }
619 break;
620 default:
621 break;
622
623 }
624 }
625 }
626
627 // Task to update FlowEntries in backup HZ store
628 private final class UpdateBackup implements Runnable {
629
630 private final DeviceId deviceId;
631 private final List<StoredFlowEntry> toAdd;
632 private final List<? extends FlowRule> toRemove;
633
634 public UpdateBackup(DeviceId deviceId,
635 List<StoredFlowEntry> toAdd,
636 List<? extends FlowRule> list) {
637 this.deviceId = checkNotNull(deviceId);
638 this.toAdd = checkNotNull(toAdd);
639 this.toRemove = checkNotNull(list);
640 }
641
642 @Override
643 public void run() {
644 try {
645 log.debug("update backup {} +{} -{}", deviceId, toAdd, toRemove);
646 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
647 // Following should be rewritten using async APIs
648 for (StoredFlowEntry entry : toAdd) {
649 final FlowId id = entry.id();
650 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
651 List<StoredFlowEntry> list = new ArrayList<>();
652 if (original != null) {
653 list.addAll(original);
654 }
655
656 list.remove(entry);
657 list.add(entry);
658
659 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
660 boolean success;
661 if (original == null) {
662 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
663 } else {
664 success = backupFlowTable.replace(id, original, newValue);
665 }
666 // TODO retry?
667 if (!success) {
668 log.error("Updating backup failed.");
669 }
670 }
671 for (FlowRule entry : toRemove) {
672 final FlowId id = entry.id();
673 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
674 List<StoredFlowEntry> list = new ArrayList<>();
675 if (original != null) {
676 list.addAll(original);
677 }
678
679 list.remove(entry);
680
681 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
682 boolean success;
683 if (original == null) {
684 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
685 } else {
686 success = backupFlowTable.replace(id, original, newValue);
687 }
688 // TODO retry?
689 if (!success) {
690 log.error("Updating backup failed.");
691 }
692 }
693 } catch (ExecutionException e) {
694 log.error("Failed to write to backups", e);
695 }
696
697 }
698 }
alshabib339a3d92014-09-26 17:54:32 -0700699}