blob: f7718c114cf704042fdb5ac699f068ae944846f7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -080039import java.util.concurrent.locks.ReentrantReadWriteLock;
Madan Jampani117aaae2014-10-23 10:04:05 -070040import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070041
42import org.apache.felix.scr.annotations.Activate;
43import org.apache.felix.scr.annotations.Component;
44import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070045import org.apache.felix.scr.annotations.Reference;
46import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070047import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070048import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070049import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070050import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070051import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070052import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070053import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070054import org.onlab.onos.net.flow.DefaultFlowEntry;
55import org.onlab.onos.net.flow.FlowEntry;
56import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070057import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070058import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070059import org.onlab.onos.net.flow.FlowRuleBatchEntry;
60import org.onlab.onos.net.flow.FlowRuleBatchEvent;
61import org.onlab.onos.net.flow.FlowRuleBatchOperation;
62import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070063import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070064import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070065import org.onlab.onos.net.flow.FlowRuleEvent.Type;
66import org.onlab.onos.net.flow.FlowRuleStore;
67import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070068import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070069import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
70import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070071import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070072import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070073import org.onlab.onos.store.flow.ReplicaInfoEvent;
74import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070075import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070076import org.onlab.onos.store.hz.AbstractHazelcastStore;
77import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070078import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070079import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070080import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080081import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070082import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070083import org.slf4j.Logger;
84
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070085import com.google.common.cache.Cache;
86import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070087import com.google.common.cache.CacheLoader;
88import com.google.common.cache.LoadingCache;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080089import com.google.common.cache.RemovalListener;
90import com.google.common.cache.RemovalNotification;
alshabib339a3d92014-09-26 17:54:32 -070091import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070092import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070093import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070095import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070096import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070097import com.google.common.util.concurrent.ListenableFuture;
98import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070099import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -0700100
101/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700102 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700103 */
alshabib339a3d92014-09-26 17:54:32 -0700104@Component(immediate = true)
105@Service
106public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700107 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700108 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700109
110 private final Logger log = getLogger(getClass());
111
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800112 // primary data:
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800113 // read/write needs to be locked
114 private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
alshabib339a3d92014-09-26 17:54:32 -0700115 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800116 private final Multimap<DeviceId, StoredFlowEntry> flowEntries
117 = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700118
Madan Jampani38b250d2014-10-17 11:02:38 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700120 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700123 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected DeviceService deviceService;
130
131 private final AtomicInteger localBatchIdGen = new AtomicInteger();
132
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700133 // TODO: make this configurable
134 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700135
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700136 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
137 CacheBuilder.newBuilder()
138 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800139 .removalListener(new TimeoutFuture())
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700140 .build();
141
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800142 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700143 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
144
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700145
146 private final ExecutorService futureListeners =
147 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
148
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700149 private final ExecutorService backupExecutors =
150 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
151
152 // TODO make this configurable
153 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700154
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700156 @Override
157 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700158 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -0700159 .register(DistributedStoreSerializers.COMMON)
160 .build()
161 .populate(1);
162 }
163 };
164
165 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700166 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700167
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168 private ReplicaInfoEventListener replicaInfoEventListener;
169
170 @Override
alshabib339a3d92014-09-26 17:54:32 -0700171 @Activate
172 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700173
174 super.serializer = SERIALIZER;
175 super.theInstance = storeService.getHazelcastInstance();
176
177 // Cache to create SMap on demand
178 smaps = CacheBuilder.newBuilder()
179 .softValues()
180 .build(new SMapLoader());
181
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700182 final NodeId local = clusterService.getLocalNode().id();
183
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700184 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700185
186 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
187
188 @Override
189 public void handle(ClusterMessage message) {
190 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800191 log.trace("received get flow entry request for {}", rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700192 FlowEntry flowEntry = getFlowEntryInternal(rule);
193 try {
194 message.respond(SERIALIZER.encode(flowEntry));
195 } catch (IOException e) {
196 log.error("Failed to respond back", e);
197 }
198 }
199 });
200
Madan Jampanif5fdef02014-10-23 21:58:10 -0700201 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
202
203 @Override
204 public void handle(ClusterMessage message) {
205 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800206 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700207 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
208 try {
209 message.respond(SERIALIZER.encode(flowEntries));
210 } catch (IOException e) {
211 log.error("Failed to respond to peer's getFlowEntries request", e);
212 }
213 }
214 });
215
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700216 replicaInfoEventListener = new InternalReplicaInfoEventListener();
217
218 replicaInfoManager.addListener(replicaInfoEventListener);
219
alshabib339a3d92014-09-26 17:54:32 -0700220 log.info("Started");
221 }
222
223 @Deactivate
224 public void deactivate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700225 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700226 log.info("Stopped");
227 }
228
229
Madan Jampani117aaae2014-10-23 10:04:05 -0700230 // TODO: This is not a efficient operation on a distributed sharded
231 // flow store. We need to revisit the need for this operation or at least
232 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700233 @Override
tom9b4030d2014-10-06 10:39:03 -0700234 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700235 // implementing in-efficient operation for debugging purpose.
236 int sum = 0;
237 for (Device device : deviceService.getDevices()) {
238 final DeviceId did = device.id();
239 sum += Iterables.size(getFlowEntries(did));
240 }
241 return sum;
tom9b4030d2014-10-06 10:39:03 -0700242 }
243
244 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800245 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700246 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700247
248 if (!replicaInfo.master().isPresent()) {
249 log.warn("No master for {}", rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800250 // TODO: should we try returning from backup?
251 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700252 }
253
Madan Jampani117aaae2014-10-23 10:04:05 -0700254 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
255 return getFlowEntryInternal(rule);
256 }
257
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800258 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Madan Jampani117aaae2014-10-23 10:04:05 -0700259 replicaInfo.master().orNull(), rule.deviceId());
260
261 ClusterMessage message = new ClusterMessage(
262 clusterService.getLocalNode().id(),
263 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
264 SERIALIZER.encode(rule));
265
266 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700267 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
268 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
269 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700270 // FIXME: throw a FlowStoreException
271 throw new RuntimeException(e);
272 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700273 }
274
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800275 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
276 flowEntriesLock.readLock().lock();
277 try {
278 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
279 if (f.equals(rule)) {
280 return f;
281 }
alshabib339a3d92014-09-26 17:54:32 -0700282 }
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800283 } finally {
284 flowEntriesLock.readLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700285 }
286 return null;
287 }
288
289 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800290 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700291
292 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700293
294 if (!replicaInfo.master().isPresent()) {
295 log.warn("No master for {}", deviceId);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800296 // TODO: should we try returning from backup?
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700297 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700298 }
299
Madan Jampanif5fdef02014-10-23 21:58:10 -0700300 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
301 return getFlowEntriesInternal(deviceId);
302 }
303
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800304 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Madan Jampanif5fdef02014-10-23 21:58:10 -0700305 replicaInfo.master().orNull(), deviceId);
306
307 ClusterMessage message = new ClusterMessage(
308 clusterService.getLocalNode().id(),
309 GET_DEVICE_FLOW_ENTRIES,
310 SERIALIZER.encode(deviceId));
311
312 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700313 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
314 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
315 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700316 // FIXME: throw a FlowStoreException
317 throw new RuntimeException(e);
318 }
319 }
320
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800321 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
322 flowEntriesLock.readLock().lock();
323 try {
324 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
325 if (rules == null) {
326 return Collections.emptySet();
327 }
328 return ImmutableSet.copyOf(rules);
329 } finally {
330 flowEntriesLock.readLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700331 }
alshabib339a3d92014-09-26 17:54:32 -0700332 }
333
334 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700335 public void storeFlowRule(FlowRule rule) {
336 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
337 }
338
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700339 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700340 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700341
Madan Jampani117aaae2014-10-23 10:04:05 -0700342 if (operation.getOperations().isEmpty()) {
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700343 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700344 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700345
Madan Jampani117aaae2014-10-23 10:04:05 -0700346 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
347
348 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
349
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700350 if (!replicaInfo.master().isPresent()) {
351 log.warn("No master for {}", deviceId);
352 // TODO: revisit if this should be "success" from Future point of view
353 // with every FlowEntry failed
354 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
355 }
356
357 final NodeId local = clusterService.getLocalNode().id();
358 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700359 return storeBatchInternal(operation);
360 }
361
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800362 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Madan Jampani117aaae2014-10-23 10:04:05 -0700363 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700364
Madan Jampani38b250d2014-10-17 11:02:38 -0700365 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700366 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700367 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700368 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700369
370 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700371 ListenableFuture<byte[]> responseFuture =
372 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700373 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700374 } catch (IOException e) {
375 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700376 }
377 }
378
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800379 private ListenableFuture<CompletedBatchOperation>
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800380 storeBatchInternal(FlowRuleBatchOperation operation) {
381
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700382 final List<StoredFlowEntry> toRemove = new ArrayList<>();
383 final List<StoredFlowEntry> toAdd = new ArrayList<>();
384 DeviceId did = null;
385
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700386
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800387 flowEntriesLock.writeLock().lock();
388 try {
389 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
390 FlowRule flowRule = batchEntry.getTarget();
391 FlowRuleOperation op = batchEntry.getOperator();
392 if (did == null) {
393 did = flowRule.deviceId();
394 }
395 if (op.equals(FlowRuleOperation.REMOVE)) {
396 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
397 if (entry != null) {
398 entry.setState(FlowEntryState.PENDING_REMOVE);
399 toRemove.add(entry);
400 }
401 } else if (op.equals(FlowRuleOperation.ADD)) {
402 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
403 DeviceId deviceId = flowRule.deviceId();
404 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
405 flowEntries.put(deviceId, flowEntry);
406 toAdd.add(flowEntry);
407 }
408 }
409 }
410 if (toAdd.isEmpty() && toRemove.isEmpty()) {
411 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
412 }
413
414 // create remote backup copies
415 updateBackup(did, toAdd, toRemove);
416 } finally {
417 flowEntriesLock.writeLock().unlock();
418 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700419
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700420 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
421 final int batchId = localBatchIdGen.incrementAndGet();
422
423 pendingFutures.put(batchId, r);
424 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700425
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700426 return r;
alshabib339a3d92014-09-26 17:54:32 -0700427 }
428
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700429 private void updateBackup(final DeviceId deviceId,
430 final List<StoredFlowEntry> toAdd,
431 final List<? extends FlowRule> list) {
432
433 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
434
435 if (syncBackup) {
436 // wait for backup to complete
437 try {
438 submit.get();
439 } catch (InterruptedException | ExecutionException e) {
440 log.error("Failed to create backups", e);
441 }
442 }
443 }
444
445 private void updateBackup(DeviceId deviceId, List<StoredFlowEntry> toAdd) {
446 updateBackup(deviceId, toAdd, Collections.<FlowEntry>emptyList());
447 }
448
alshabib339a3d92014-09-26 17:54:32 -0700449 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700450 public void deleteFlowRule(FlowRule rule) {
451 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700452 }
453
454 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700455 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
456 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700457 final NodeId localId = clusterService.getLocalNode().id();
458 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700459 return addOrUpdateFlowRuleInternal(rule);
460 }
461
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800462 log.warn("Tried to update FlowRule {} state,"
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700463 + " while the Node was not the master.", rule);
464 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700465 }
466
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800467 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700468 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700469
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800470 flowEntriesLock.writeLock().lock();
471 try {
472 // check if this new rule is an update to an existing entry
473 StoredFlowEntry stored = getFlowEntryInternal(rule);
474 if (stored != null) {
475 stored.setBytes(rule.bytes());
476 stored.setLife(rule.life());
477 stored.setPackets(rule.packets());
478 if (stored.state() == FlowEntryState.PENDING_ADD) {
479 stored.setState(FlowEntryState.ADDED);
480 // update backup.
481 updateBackup(did, Arrays.asList(stored));
482 return new FlowRuleEvent(Type.RULE_ADDED, rule);
483 }
484 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
alshabib1c319ff2014-10-04 20:29:09 -0700485 }
alshabib339a3d92014-09-26 17:54:32 -0700486
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800487 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
488 // TODO: also update backup.
489 flowEntries.put(did, new DefaultFlowEntry(rule));
490 } finally {
491 flowEntriesLock.writeLock().unlock();
492 }
alshabib1c319ff2014-10-04 20:29:09 -0700493 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700494
alshabib339a3d92014-09-26 17:54:32 -0700495 }
496
497 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700498 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
499 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700500
501 final NodeId localId = clusterService.getLocalNode().id();
502 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700503 // bypass and handle it locally
504 return removeFlowRuleInternal(rule);
505 }
506
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800507 log.warn("Tried to remove FlowRule {},"
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700508 + " while the Node was not the master.", rule);
509 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700510 }
511
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800512 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700513 final DeviceId deviceId = rule.deviceId();
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800514 flowEntriesLock.writeLock().lock();
515 try {
516 // This is where one could mark a rule as removed and still keep it in the store.
517 final boolean removed = flowEntries.remove(deviceId, rule);
518 updateBackup(deviceId, Collections.<StoredFlowEntry>emptyList(), Arrays.asList(rule));
519 if (removed) {
520 return new FlowRuleEvent(RULE_REMOVED, rule);
521 } else {
522 return null;
523 }
524 } finally {
525 flowEntriesLock.writeLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700526 }
alshabib339a3d92014-09-26 17:54:32 -0700527 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700528
529 @Override
530 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700531 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700532 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700533 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700534 if (future != null) {
535 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700536 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700537 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700538 notifyDelegate(event);
539 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700540
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800541 private void loadFromBackup(final DeviceId did) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700542
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800543 flowEntriesLock.writeLock().lock();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700544 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800545 log.debug("Loading FlowRules for {} from backups", did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700546 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
547 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
548 : backupFlowTable.entrySet()) {
549
550 // TODO: should we be directly updating internal structure or
551 // should we be triggering event?
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800552 log.trace("loading {}", e.getValue());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700553 for (StoredFlowEntry entry : e.getValue()) {
554 flowEntries.remove(did, entry);
555 flowEntries.put(did, entry);
556 }
557 }
558 } catch (ExecutionException e) {
559 log.error("Failed to load backup flowtable for {}", did, e);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800560 } finally {
561 flowEntriesLock.writeLock().unlock();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700562 }
563 }
564
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800565 private void removeFromPrimary(final DeviceId did) {
566 Collection<StoredFlowEntry> removed = null;
567 flowEntriesLock.writeLock().lock();
568 try {
569 removed = flowEntries.removeAll(did);
570 } finally {
571 flowEntriesLock.writeLock().unlock();
572 }
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800573 log.trace("removedFromPrimary {}", removed);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700574 }
575
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800576 private static final class TimeoutFuture
577 implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
578 @Override
579 public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
580 // wrapping in ExecutionException to support Future.get
581 notification.getValue()
582 .setException(new ExecutionException("Timed out",
583 new TimeoutException()));
584 }
585 }
586
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700587 private final class OnStoreBatch implements ClusterMessageHandler {
588 private final NodeId local;
589
590 private OnStoreBatch(NodeId local) {
591 this.local = local;
592 }
593
594 @Override
595 public void handle(final ClusterMessage message) {
596 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800597 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700598
599 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
600 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
601 if (!local.equals(replicaInfo.master().orNull())) {
602
603 Set<FlowRule> failures = new HashSet<>(operation.size());
604 for (FlowRuleBatchEntry op : operation.getOperations()) {
605 failures.add(op.getTarget());
606 }
607 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
608 // This node is no longer the master, respond as all failed.
609 // TODO: we might want to wrap response in envelope
610 // to distinguish sw programming failure and hand over
611 // it make sense in the latter case to retry immediately.
612 try {
613 message.respond(SERIALIZER.encode(allFailed));
614 } catch (IOException e) {
615 log.error("Failed to respond back", e);
616 }
617 return;
618 }
619
620 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
621
622 f.addListener(new Runnable() {
623
624 @Override
625 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800626 CompletedBatchOperation result;
627 try {
628 result = f.get();
629 } catch (InterruptedException | ExecutionException e) {
630 log.error("Batch operation failed", e);
631 // create everything failed response
632 Set<FlowRule> failures = new HashSet<>(operation.size());
633 for (FlowRuleBatchEntry op : operation.getOperations()) {
634 failures.add(op.getTarget());
635 }
636 result = new CompletedBatchOperation(false, failures);
637 }
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700638 try {
639 message.respond(SERIALIZER.encode(result));
640 } catch (IOException e) {
641 log.error("Failed to respond back", e);
642 }
643 }
644 }, futureListeners);
645 }
646 }
647
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700648 private final class SMapLoader
649 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
650
651 @Override
652 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
653 throws Exception {
654 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
655 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
656 }
657 }
658
659 private final class InternalReplicaInfoEventListener
660 implements ReplicaInfoEventListener {
661
662 @Override
663 public void event(ReplicaInfoEvent event) {
664 final NodeId local = clusterService.getLocalNode().id();
665 final DeviceId did = event.subject();
666 final ReplicaInfo rInfo = event.replicaInfo();
667
668 switch (event.type()) {
669 case MASTER_CHANGED:
670 if (local.equals(rInfo.master().orNull())) {
671 // This node is the new master, populate local structure
672 // from backup
673 loadFromBackup(did);
674 } else {
675 // This node is no longer the master holder,
676 // clean local structure
677 removeFromPrimary(did);
678 // FIXME: probably should stop pending backup activities in
679 // executors to avoid overwriting with old value
680 }
681 break;
682 default:
683 break;
684
685 }
686 }
687 }
688
689 // Task to update FlowEntries in backup HZ store
690 private final class UpdateBackup implements Runnable {
691
692 private final DeviceId deviceId;
693 private final List<StoredFlowEntry> toAdd;
694 private final List<? extends FlowRule> toRemove;
695
696 public UpdateBackup(DeviceId deviceId,
697 List<StoredFlowEntry> toAdd,
698 List<? extends FlowRule> list) {
699 this.deviceId = checkNotNull(deviceId);
700 this.toAdd = checkNotNull(toAdd);
701 this.toRemove = checkNotNull(list);
702 }
703
704 @Override
705 public void run() {
706 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800707 log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700708 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
709 // Following should be rewritten using async APIs
710 for (StoredFlowEntry entry : toAdd) {
711 final FlowId id = entry.id();
712 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
713 List<StoredFlowEntry> list = new ArrayList<>();
714 if (original != null) {
715 list.addAll(original);
716 }
717
718 list.remove(entry);
719 list.add(entry);
720
721 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
722 boolean success;
723 if (original == null) {
724 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
725 } else {
726 success = backupFlowTable.replace(id, original, newValue);
727 }
728 // TODO retry?
729 if (!success) {
730 log.error("Updating backup failed.");
731 }
732 }
733 for (FlowRule entry : toRemove) {
734 final FlowId id = entry.id();
735 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
736 List<StoredFlowEntry> list = new ArrayList<>();
737 if (original != null) {
738 list.addAll(original);
739 }
740
741 list.remove(entry);
742
743 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
744 boolean success;
745 if (original == null) {
746 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
747 } else {
748 success = backupFlowTable.replace(id, original, newValue);
749 }
750 // TODO retry?
751 if (!success) {
752 log.error("Updating backup failed.");
753 }
754 }
755 } catch (ExecutionException e) {
756 log.error("Failed to write to backups", e);
757 }
758
759 }
760 }
alshabib339a3d92014-09-26 17:54:32 -0700761}