blob: 81265546568d421f59909116eeffc1a30d696f49 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
alshabib339a3d92014-09-26 17:54:32 -070016package org.onlab.onos.store.flow.impl;
17
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070018import static com.google.common.base.Preconditions.checkNotNull;
alshabib339a3d92014-09-26 17:54:32 -070019import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
20import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070021import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070022import static org.onlab.util.Tools.namedThreads;
alshabib339a3d92014-09-26 17:54:32 -070023
Madan Jampani38b250d2014-10-17 11:02:38 -070024import java.io.IOException;
Madan Jampani117aaae2014-10-23 10:04:05 -070025import java.util.ArrayList;
26import java.util.Arrays;
alshabib339a3d92014-09-26 17:54:32 -070027import java.util.Collection;
28import java.util.Collections;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -070029import java.util.HashSet;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070030import java.util.Map.Entry;
Madan Jampanif5fdef02014-10-23 21:58:10 -070031import java.util.Set;
Madan Jampani24f9efb2014-10-24 18:56:23 -070032import java.util.concurrent.ExecutionException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070033import java.util.concurrent.ExecutorService;
34import java.util.concurrent.Executors;
Madan Jampani117aaae2014-10-23 10:04:05 -070035import java.util.concurrent.Future;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import java.util.concurrent.TimeUnit;
37import java.util.concurrent.TimeoutException;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070038import java.util.concurrent.atomic.AtomicInteger;
Yuta HIGUCHI885868f2014-11-13 19:12:07 -080039import java.util.concurrent.locks.ReentrantReadWriteLock;
Madan Jampani117aaae2014-10-23 10:04:05 -070040import java.util.List;
alshabib339a3d92014-09-26 17:54:32 -070041
42import org.apache.felix.scr.annotations.Activate;
43import org.apache.felix.scr.annotations.Component;
44import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070045import org.apache.felix.scr.annotations.Reference;
46import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070047import org.apache.felix.scr.annotations.Service;
Madan Jampani38b250d2014-10-17 11:02:38 -070048import org.onlab.onos.cluster.ClusterService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070049import org.onlab.onos.cluster.NodeId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070050import org.onlab.onos.net.Device;
alshabib339a3d92014-09-26 17:54:32 -070051import org.onlab.onos.net.DeviceId;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070052import org.onlab.onos.net.device.DeviceService;
Madan Jampani117aaae2014-10-23 10:04:05 -070053import org.onlab.onos.net.flow.CompletedBatchOperation;
alshabib1c319ff2014-10-04 20:29:09 -070054import org.onlab.onos.net.flow.DefaultFlowEntry;
55import org.onlab.onos.net.flow.FlowEntry;
56import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070057import org.onlab.onos.net.flow.FlowId;
alshabib339a3d92014-09-26 17:54:32 -070058import org.onlab.onos.net.flow.FlowRule;
Madan Jampani117aaae2014-10-23 10:04:05 -070059import org.onlab.onos.net.flow.FlowRuleBatchEntry;
60import org.onlab.onos.net.flow.FlowRuleBatchEvent;
61import org.onlab.onos.net.flow.FlowRuleBatchOperation;
62import org.onlab.onos.net.flow.FlowRuleBatchRequest;
alshabib339a3d92014-09-26 17:54:32 -070063import org.onlab.onos.net.flow.FlowRuleEvent;
Madan Jampani117aaae2014-10-23 10:04:05 -070064import org.onlab.onos.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
alshabib339a3d92014-09-26 17:54:32 -070065import org.onlab.onos.net.flow.FlowRuleEvent.Type;
66import org.onlab.onos.net.flow.FlowRuleStore;
67import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070068import org.onlab.onos.net.flow.StoredFlowEntry;
Madan Jampani38b250d2014-10-17 11:02:38 -070069import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
70import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070071import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070072import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070073import org.onlab.onos.store.flow.ReplicaInfoEvent;
74import org.onlab.onos.store.flow.ReplicaInfoEventListener;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070075import org.onlab.onos.store.flow.ReplicaInfoService;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070076import org.onlab.onos.store.hz.AbstractHazelcastStore;
77import org.onlab.onos.store.hz.SMap;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070078import org.onlab.onos.store.serializers.DecodeTo;
Madan Jampani38b250d2014-10-17 11:02:38 -070079import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHIea150152014-10-28 22:55:14 -070080import org.onlab.onos.store.serializers.StoreSerializer;
Yuta HIGUCHI60a190b2014-11-07 16:24:47 -080081import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070082import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070083import org.slf4j.Logger;
84
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -070085import com.google.common.cache.Cache;
86import com.google.common.cache.CacheBuilder;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070087import com.google.common.cache.CacheLoader;
88import com.google.common.cache.LoadingCache;
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -080089import com.google.common.cache.RemovalListener;
90import com.google.common.cache.RemovalNotification;
alshabib339a3d92014-09-26 17:54:32 -070091import com.google.common.collect.ArrayListMultimap;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070092import com.google.common.collect.ImmutableList;
alshabib339a3d92014-09-26 17:54:32 -070093import com.google.common.collect.ImmutableSet;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070094import com.google.common.collect.Iterables;
alshabib339a3d92014-09-26 17:54:32 -070095import com.google.common.collect.Multimap;
Madan Jampani117aaae2014-10-23 10:04:05 -070096import com.google.common.util.concurrent.Futures;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -070097import com.google.common.util.concurrent.ListenableFuture;
98import com.google.common.util.concurrent.SettableFuture;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -070099import com.hazelcast.core.IMap;
alshabib339a3d92014-09-26 17:54:32 -0700100
101/**
Madan Jampani38b250d2014-10-17 11:02:38 -0700102 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -0700103 */
alshabib339a3d92014-09-26 17:54:32 -0700104@Component(immediate = true)
105@Service
106public class DistributedFlowRuleStore
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700107 extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
alshabib1c319ff2014-10-04 20:29:09 -0700108 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -0700109
110 private final Logger log = getLogger(getClass());
111
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800112 // primary data:
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800113 // read/write needs to be locked
114 private final ReentrantReadWriteLock flowEntriesLock = new ReentrantReadWriteLock();
alshabib339a3d92014-09-26 17:54:32 -0700115 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800116 private final Multimap<DeviceId, StoredFlowEntry> flowEntries
117 = ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -0700118
Madan Jampani38b250d2014-10-17 11:02:38 -0700119 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700120 protected ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -0700121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700123 protected ClusterCommunicationService clusterCommunicator;
Madan Jampani38b250d2014-10-17 11:02:38 -0700124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700126 protected ClusterService clusterService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
129 protected DeviceService deviceService;
130
131 private final AtomicInteger localBatchIdGen = new AtomicInteger();
132
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700133 // TODO: make this configurable
134 private int pendingFutureTimeoutMinutes = 5;
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700135
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700136 private Cache<Integer, SettableFuture<CompletedBatchOperation>> pendingFutures =
137 CacheBuilder.newBuilder()
138 .expireAfterWrite(pendingFutureTimeoutMinutes, TimeUnit.MINUTES)
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800139 .removalListener(new TimeoutFuture())
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700140 .build();
141
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800142 // Cache of SMaps used for backup data. each SMap contain device flow table
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700143 private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
144
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700145
146 private final ExecutorService futureListeners =
147 Executors.newCachedThreadPool(namedThreads("flowstore-peer-responders"));
148
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700149 private final ExecutorService backupExecutors =
150 Executors.newSingleThreadExecutor(namedThreads("async-backups"));
151
152 // TODO make this configurable
153 private boolean syncBackup = false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700154
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700155 protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
Madan Jampani38b250d2014-10-17 11:02:38 -0700156 @Override
157 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -0700158 serializerPool = KryoNamespace.newBuilder()
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800159 .register(DistributedStoreSerializers.STORE_COMMON)
160 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
161 .build();
Madan Jampani38b250d2014-10-17 11:02:38 -0700162 }
163 };
164
165 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700166 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -0700167
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700168 private ReplicaInfoEventListener replicaInfoEventListener;
169
170 @Override
alshabib339a3d92014-09-26 17:54:32 -0700171 @Activate
172 public void activate() {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700173
174 super.serializer = SERIALIZER;
175 super.theInstance = storeService.getHazelcastInstance();
176
177 // Cache to create SMap on demand
178 smaps = CacheBuilder.newBuilder()
179 .softValues()
180 .build(new SMapLoader());
181
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700182 final NodeId local = clusterService.getLocalNode().id();
183
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700184 clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local));
Madan Jampani117aaae2014-10-23 10:04:05 -0700185
186 clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
187
188 @Override
189 public void handle(ClusterMessage message) {
190 FlowRule rule = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800191 log.trace("received get flow entry request for {}", rule);
Madan Jampani117aaae2014-10-23 10:04:05 -0700192 FlowEntry flowEntry = getFlowEntryInternal(rule);
193 try {
194 message.respond(SERIALIZER.encode(flowEntry));
195 } catch (IOException e) {
196 log.error("Failed to respond back", e);
197 }
198 }
199 });
200
Madan Jampanif5fdef02014-10-23 21:58:10 -0700201 clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
202
203 @Override
204 public void handle(ClusterMessage message) {
205 DeviceId deviceId = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800206 log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
Madan Jampanif5fdef02014-10-23 21:58:10 -0700207 Set<FlowEntry> flowEntries = getFlowEntriesInternal(deviceId);
208 try {
209 message.respond(SERIALIZER.encode(flowEntries));
210 } catch (IOException e) {
211 log.error("Failed to respond to peer's getFlowEntries request", e);
212 }
213 }
214 });
215
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800216 clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
217
218 @Override
219 public void handle(ClusterMessage message) {
220 FlowEntry rule = SERIALIZER.decode(message.payload());
221 log.trace("received get flow entry request for {}", rule);
222 FlowRuleEvent event = removeFlowRuleInternal(rule);
223 try {
224 message.respond(SERIALIZER.encode(event));
225 } catch (IOException e) {
226 log.error("Failed to respond back", e);
227 }
228 }
229 });
230
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700231 replicaInfoEventListener = new InternalReplicaInfoEventListener();
232
233 replicaInfoManager.addListener(replicaInfoEventListener);
234
alshabib339a3d92014-09-26 17:54:32 -0700235 log.info("Started");
236 }
237
238 @Deactivate
239 public void deactivate() {
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800240 clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
241 clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
242 clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
243 clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700244 replicaInfoManager.removeListener(replicaInfoEventListener);
alshabib339a3d92014-09-26 17:54:32 -0700245 log.info("Stopped");
246 }
247
248
Madan Jampani117aaae2014-10-23 10:04:05 -0700249 // TODO: This is not a efficient operation on a distributed sharded
250 // flow store. We need to revisit the need for this operation or at least
251 // make it device specific.
alshabib339a3d92014-09-26 17:54:32 -0700252 @Override
tom9b4030d2014-10-06 10:39:03 -0700253 public int getFlowRuleCount() {
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700254 // implementing in-efficient operation for debugging purpose.
255 int sum = 0;
256 for (Device device : deviceService.getDevices()) {
257 final DeviceId did = device.id();
258 sum += Iterables.size(getFlowEntries(did));
259 }
260 return sum;
tom9b4030d2014-10-06 10:39:03 -0700261 }
262
263 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800264 public FlowEntry getFlowEntry(FlowRule rule) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700265 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700266
267 if (!replicaInfo.master().isPresent()) {
268 log.warn("No master for {}", rule);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800269 // TODO: should we try returning from backup?
270 return null;
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700271 }
272
Madan Jampani117aaae2014-10-23 10:04:05 -0700273 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
274 return getFlowEntryInternal(rule);
275 }
276
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800277 log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
Madan Jampani117aaae2014-10-23 10:04:05 -0700278 replicaInfo.master().orNull(), rule.deviceId());
279
280 ClusterMessage message = new ClusterMessage(
281 clusterService.getLocalNode().id(),
282 FlowStoreMessageSubjects.GET_FLOW_ENTRY,
283 SERIALIZER.encode(rule));
284
285 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700286 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
287 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
288 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700289 // FIXME: throw a FlowStoreException
290 throw new RuntimeException(e);
291 }
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700292 }
293
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800294 private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
295 flowEntriesLock.readLock().lock();
296 try {
297 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
298 if (f.equals(rule)) {
299 return f;
300 }
alshabib339a3d92014-09-26 17:54:32 -0700301 }
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800302 } finally {
303 flowEntriesLock.readLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700304 }
305 return null;
306 }
307
308 @Override
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800309 public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700310
311 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700312
313 if (!replicaInfo.master().isPresent()) {
314 log.warn("No master for {}", deviceId);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800315 // TODO: should we try returning from backup?
Yuta HIGUCHI2c1d8472014-10-31 14:13:38 -0700316 return Collections.emptyList();
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700317 }
318
Madan Jampanif5fdef02014-10-23 21:58:10 -0700319 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
320 return getFlowEntriesInternal(deviceId);
321 }
322
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800323 log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
Madan Jampanif5fdef02014-10-23 21:58:10 -0700324 replicaInfo.master().orNull(), deviceId);
325
326 ClusterMessage message = new ClusterMessage(
327 clusterService.getLocalNode().id(),
328 GET_DEVICE_FLOW_ENTRIES,
329 SERIALIZER.encode(deviceId));
330
331 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700332 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
333 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
334 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
Madan Jampanif5fdef02014-10-23 21:58:10 -0700335 // FIXME: throw a FlowStoreException
336 throw new RuntimeException(e);
337 }
338 }
339
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800340 private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
341 flowEntriesLock.readLock().lock();
342 try {
343 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
344 if (rules == null) {
345 return Collections.emptySet();
346 }
347 return ImmutableSet.copyOf(rules);
348 } finally {
349 flowEntriesLock.readLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700350 }
alshabib339a3d92014-09-26 17:54:32 -0700351 }
352
353 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700354 public void storeFlowRule(FlowRule rule) {
355 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule))));
356 }
357
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700358 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700359 public Future<CompletedBatchOperation> storeBatch(FlowRuleBatchOperation operation) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700360
Madan Jampani117aaae2014-10-23 10:04:05 -0700361 if (operation.getOperations().isEmpty()) {
Brian O'Connor427a1762014-11-19 18:40:32 -0800362 return Futures.immediateFuture(new CompletedBatchOperation(true,
363 Collections.<FlowRule>emptySet()));
alshabib339a3d92014-09-26 17:54:32 -0700364 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700365
Madan Jampani117aaae2014-10-23 10:04:05 -0700366 DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
367
368 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
369
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700370 if (!replicaInfo.master().isPresent()) {
371 log.warn("No master for {}", deviceId);
372 // TODO: revisit if this should be "success" from Future point of view
373 // with every FlowEntry failed
374 return Futures.immediateFailedFuture(new IOException("No master to forward to"));
375 }
376
377 final NodeId local = clusterService.getLocalNode().id();
378 if (replicaInfo.master().get().equals(local)) {
Madan Jampani117aaae2014-10-23 10:04:05 -0700379 return storeBatchInternal(operation);
380 }
381
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800382 log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
Madan Jampani117aaae2014-10-23 10:04:05 -0700383 replicaInfo.master().orNull(), deviceId);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700384
Madan Jampani38b250d2014-10-17 11:02:38 -0700385 ClusterMessage message = new ClusterMessage(
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700386 local,
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700387 APPLY_BATCH_FLOWS,
Madan Jampani117aaae2014-10-23 10:04:05 -0700388 SERIALIZER.encode(operation));
Madan Jampani38b250d2014-10-17 11:02:38 -0700389
390 try {
Madan Jampani24f9efb2014-10-24 18:56:23 -0700391 ListenableFuture<byte[]> responseFuture =
392 clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700393 return Futures.transform(responseFuture, new DecodeTo<CompletedBatchOperation>(SERIALIZER));
Madan Jampani24f9efb2014-10-24 18:56:23 -0700394 } catch (IOException e) {
395 return Futures.immediateFailedFuture(e);
Madan Jampani38b250d2014-10-17 11:02:38 -0700396 }
397 }
398
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800399 private ListenableFuture<CompletedBatchOperation>
Yuta HIGUCHIe9b2b002014-11-04 12:25:47 -0800400 storeBatchInternal(FlowRuleBatchOperation operation) {
401
Brian O'Connor427a1762014-11-19 18:40:32 -0800402 final List<FlowRuleBatchEntry> toRemove = new ArrayList<>();
403 final List<FlowRuleBatchEntry> toAdd = new ArrayList<>();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700404 DeviceId did = null;
405
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700406
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800407 flowEntriesLock.writeLock().lock();
408 try {
409 for (FlowRuleBatchEntry batchEntry : operation.getOperations()) {
410 FlowRule flowRule = batchEntry.getTarget();
411 FlowRuleOperation op = batchEntry.getOperator();
412 if (did == null) {
413 did = flowRule.deviceId();
414 }
415 if (op.equals(FlowRuleOperation.REMOVE)) {
416 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
417 if (entry != null) {
418 entry.setState(FlowEntryState.PENDING_REMOVE);
Brian O'Connor427a1762014-11-19 18:40:32 -0800419 toRemove.add(batchEntry);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800420 }
421 } else if (op.equals(FlowRuleOperation.ADD)) {
422 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
423 DeviceId deviceId = flowRule.deviceId();
424 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
425 flowEntries.put(deviceId, flowEntry);
Brian O'Connor427a1762014-11-19 18:40:32 -0800426 toAdd.add(batchEntry);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800427 }
428 }
429 }
430 if (toAdd.isEmpty() && toRemove.isEmpty()) {
431 return Futures.immediateFuture(new CompletedBatchOperation(true, Collections.<FlowRule>emptySet()));
432 }
433
434 // create remote backup copies
435 updateBackup(did, toAdd, toRemove);
436 } finally {
437 flowEntriesLock.writeLock().unlock();
438 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700439
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700440 SettableFuture<CompletedBatchOperation> r = SettableFuture.create();
441 final int batchId = localBatchIdGen.incrementAndGet();
442
443 pendingFutures.put(batchId, r);
444 notifyDelegate(FlowRuleBatchEvent.requested(new FlowRuleBatchRequest(batchId, toAdd, toRemove)));
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700445
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700446 return r;
alshabib339a3d92014-09-26 17:54:32 -0700447 }
448
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700449 private void updateBackup(final DeviceId deviceId,
Brian O'Connor427a1762014-11-19 18:40:32 -0800450 final List<FlowRuleBatchEntry> toAdd,
451 final List<FlowRuleBatchEntry> list) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700452
453 Future<?> submit = backupExecutors.submit(new UpdateBackup(deviceId, toAdd, list));
454
455 if (syncBackup) {
456 // wait for backup to complete
457 try {
458 submit.get();
459 } catch (InterruptedException | ExecutionException e) {
460 log.error("Failed to create backups", e);
461 }
462 }
463 }
464
Brian O'Connor427a1762014-11-19 18:40:32 -0800465 private void updateBackup(DeviceId deviceId, List<FlowRuleBatchEntry> toAdd) {
466
467 updateBackup(deviceId, toAdd, Collections.<FlowRuleBatchEntry>emptyList());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700468 }
469
alshabib339a3d92014-09-26 17:54:32 -0700470 @Override
Madan Jampani117aaae2014-10-23 10:04:05 -0700471 public void deleteFlowRule(FlowRule rule) {
472 storeBatch(new FlowRuleBatchOperation(Arrays.asList(new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule))));
alshabib339a3d92014-09-26 17:54:32 -0700473 }
474
475 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700476 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
477 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700478 final NodeId localId = clusterService.getLocalNode().id();
479 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700480 return addOrUpdateFlowRuleInternal(rule);
481 }
482
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800483 log.warn("Tried to update FlowRule {} state,"
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700484 + " while the Node was not the master.", rule);
485 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700486 }
487
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800488 private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700489 final DeviceId did = rule.deviceId();
alshabib339a3d92014-09-26 17:54:32 -0700490
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800491 flowEntriesLock.writeLock().lock();
492 try {
493 // check if this new rule is an update to an existing entry
494 StoredFlowEntry stored = getFlowEntryInternal(rule);
495 if (stored != null) {
496 stored.setBytes(rule.bytes());
497 stored.setLife(rule.life());
498 stored.setPackets(rule.packets());
499 if (stored.state() == FlowEntryState.PENDING_ADD) {
500 stored.setState(FlowEntryState.ADDED);
Brian O'Connor427a1762014-11-19 18:40:32 -0800501 FlowRuleBatchEntry entry =
502 new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
503 updateBackup(did, Arrays.asList(entry));
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800504 return new FlowRuleEvent(Type.RULE_ADDED, rule);
505 }
506 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
alshabib1c319ff2014-10-04 20:29:09 -0700507 }
alshabib339a3d92014-09-26 17:54:32 -0700508
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800509 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
510 // TODO: also update backup.
511 flowEntries.put(did, new DefaultFlowEntry(rule));
512 } finally {
513 flowEntriesLock.writeLock().unlock();
514 }
alshabib1c319ff2014-10-04 20:29:09 -0700515 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700516
alshabib339a3d92014-09-26 17:54:32 -0700517 }
518
519 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700520 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
521 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI4b524442014-10-28 22:23:57 -0700522
523 final NodeId localId = clusterService.getLocalNode().id();
524 if (localId.equals(replicaInfo.master().orNull())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700525 // bypass and handle it locally
526 return removeFlowRuleInternal(rule);
527 }
528
Yuta HIGUCHIdfa45c12014-11-24 18:38:53 -0800529 log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
530 replicaInfo.master().orNull(), rule.deviceId());
531
532 ClusterMessage message = new ClusterMessage(
533 clusterService.getLocalNode().id(),
534 REMOVE_FLOW_ENTRY,
535 SERIALIZER.encode(rule));
536
537 try {
538 Future<byte[]> responseFuture = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
539 return SERIALIZER.decode(responseFuture.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
540 } catch (IOException | TimeoutException | ExecutionException | InterruptedException e) {
541 // FIXME: throw a FlowStoreException
542 throw new RuntimeException(e);
543 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700544 }
545
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800546 private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700547 final DeviceId deviceId = rule.deviceId();
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800548 flowEntriesLock.writeLock().lock();
549 try {
550 // This is where one could mark a rule as removed and still keep it in the store.
551 final boolean removed = flowEntries.remove(deviceId, rule);
Brian O'Connor427a1762014-11-19 18:40:32 -0800552 FlowRuleBatchEntry entry =
553 new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
554 updateBackup(deviceId, Collections.<FlowRuleBatchEntry>emptyList(), Arrays.asList(entry));
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800555 if (removed) {
556 return new FlowRuleEvent(RULE_REMOVED, rule);
557 } else {
558 return null;
559 }
560 } finally {
561 flowEntriesLock.writeLock().unlock();
alshabib339a3d92014-09-26 17:54:32 -0700562 }
alshabib339a3d92014-09-26 17:54:32 -0700563 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700564
565 @Override
566 public void batchOperationComplete(FlowRuleBatchEvent event) {
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700567 final Integer batchId = event.subject().batchId();
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700568 SettableFuture<CompletedBatchOperation> future
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700569 = pendingFutures.getIfPresent(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700570 if (future != null) {
571 future.set(event.result());
Yuta HIGUCHIbf89c742014-10-27 15:10:02 -0700572 pendingFutures.invalidate(batchId);
Yuta HIGUCHI9def0472014-10-23 15:51:10 -0700573 }
Madan Jampani117aaae2014-10-23 10:04:05 -0700574 notifyDelegate(event);
575 }
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700576
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800577 private void loadFromBackup(final DeviceId did) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700578
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800579 flowEntriesLock.writeLock().lock();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700580 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800581 log.debug("Loading FlowRules for {} from backups", did);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700582 SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
583 for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
584 : backupFlowTable.entrySet()) {
585
586 // TODO: should we be directly updating internal structure or
587 // should we be triggering event?
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800588 log.trace("loading {}", e.getValue());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700589 for (StoredFlowEntry entry : e.getValue()) {
590 flowEntries.remove(did, entry);
591 flowEntries.put(did, entry);
592 }
593 }
594 } catch (ExecutionException e) {
595 log.error("Failed to load backup flowtable for {}", did, e);
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800596 } finally {
597 flowEntriesLock.writeLock().unlock();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700598 }
599 }
600
Yuta HIGUCHI885868f2014-11-13 19:12:07 -0800601 private void removeFromPrimary(final DeviceId did) {
602 Collection<StoredFlowEntry> removed = null;
603 flowEntriesLock.writeLock().lock();
604 try {
605 removed = flowEntries.removeAll(did);
606 } finally {
607 flowEntriesLock.writeLock().unlock();
608 }
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800609 log.trace("removedFromPrimary {}", removed);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700610 }
611
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800612 private static final class TimeoutFuture
613 implements RemovalListener<Integer, SettableFuture<CompletedBatchOperation>> {
614 @Override
615 public void onRemoval(RemovalNotification<Integer, SettableFuture<CompletedBatchOperation>> notification) {
616 // wrapping in ExecutionException to support Future.get
617 notification.getValue()
618 .setException(new ExecutionException("Timed out",
619 new TimeoutException()));
620 }
621 }
622
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700623 private final class OnStoreBatch implements ClusterMessageHandler {
624 private final NodeId local;
625
626 private OnStoreBatch(NodeId local) {
627 this.local = local;
628 }
629
630 @Override
631 public void handle(final ClusterMessage message) {
632 FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800633 log.debug("received batch request {}", operation);
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700634
635 final DeviceId deviceId = operation.getOperations().get(0).getTarget().deviceId();
636 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
637 if (!local.equals(replicaInfo.master().orNull())) {
638
639 Set<FlowRule> failures = new HashSet<>(operation.size());
640 for (FlowRuleBatchEntry op : operation.getOperations()) {
641 failures.add(op.getTarget());
642 }
643 CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures);
644 // This node is no longer the master, respond as all failed.
645 // TODO: we might want to wrap response in envelope
646 // to distinguish sw programming failure and hand over
647 // it make sense in the latter case to retry immediately.
648 try {
649 message.respond(SERIALIZER.encode(allFailed));
650 } catch (IOException e) {
651 log.error("Failed to respond back", e);
652 }
653 return;
654 }
655
656 final ListenableFuture<CompletedBatchOperation> f = storeBatchInternal(operation);
657
658 f.addListener(new Runnable() {
659
660 @Override
661 public void run() {
Yuta HIGUCHIf1ccee82014-11-11 20:39:58 -0800662 CompletedBatchOperation result;
663 try {
664 result = f.get();
665 } catch (InterruptedException | ExecutionException e) {
666 log.error("Batch operation failed", e);
667 // create everything failed response
668 Set<FlowRule> failures = new HashSet<>(operation.size());
669 for (FlowRuleBatchEntry op : operation.getOperations()) {
670 failures.add(op.getTarget());
671 }
672 result = new CompletedBatchOperation(false, failures);
673 }
Yuta HIGUCHIea150152014-10-28 22:55:14 -0700674 try {
675 message.respond(SERIALIZER.encode(result));
676 } catch (IOException e) {
677 log.error("Failed to respond back", e);
678 }
679 }
680 }, futureListeners);
681 }
682 }
683
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700684 private final class SMapLoader
685 extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
686
687 @Override
688 public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
689 throws Exception {
690 IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
691 return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
692 }
693 }
694
695 private final class InternalReplicaInfoEventListener
696 implements ReplicaInfoEventListener {
697
698 @Override
699 public void event(ReplicaInfoEvent event) {
700 final NodeId local = clusterService.getLocalNode().id();
701 final DeviceId did = event.subject();
702 final ReplicaInfo rInfo = event.replicaInfo();
703
704 switch (event.type()) {
705 case MASTER_CHANGED:
706 if (local.equals(rInfo.master().orNull())) {
707 // This node is the new master, populate local structure
708 // from backup
709 loadFromBackup(did);
710 } else {
711 // This node is no longer the master holder,
712 // clean local structure
713 removeFromPrimary(did);
714 // FIXME: probably should stop pending backup activities in
715 // executors to avoid overwriting with old value
716 }
717 break;
718 default:
719 break;
720
721 }
722 }
723 }
724
725 // Task to update FlowEntries in backup HZ store
Brian O'Connor427a1762014-11-19 18:40:32 -0800726 // TODO: Should be refactored to contain only one list and not
727 // toAdd and toRemove
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700728 private final class UpdateBackup implements Runnable {
729
730 private final DeviceId deviceId;
Brian O'Connor427a1762014-11-19 18:40:32 -0800731 private final List<FlowRuleBatchEntry> toAdd;
732 private final List<FlowRuleBatchEntry> toRemove;
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700733
734 public UpdateBackup(DeviceId deviceId,
Brian O'Connor427a1762014-11-19 18:40:32 -0800735 List<FlowRuleBatchEntry> toAdd,
736 List<FlowRuleBatchEntry> list) {
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700737 this.deviceId = checkNotNull(deviceId);
738 this.toAdd = checkNotNull(toAdd);
739 this.toRemove = checkNotNull(list);
740 }
741
742 @Override
743 public void run() {
744 try {
Yuta HIGUCHIfaf9e1c2014-11-20 00:31:29 -0800745 log.trace("update backup {} +{} -{}", deviceId, toAdd, toRemove);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700746 final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
747 // Following should be rewritten using async APIs
Brian O'Connor427a1762014-11-19 18:40:32 -0800748 for (FlowRuleBatchEntry bEntry : toAdd) {
749 final FlowRule entry = bEntry.getTarget();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700750 final FlowId id = entry.id();
751 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
752 List<StoredFlowEntry> list = new ArrayList<>();
753 if (original != null) {
754 list.addAll(original);
755 }
756
Brian O'Connor427a1762014-11-19 18:40:32 -0800757 list.remove(bEntry.getTarget());
758 list.add((StoredFlowEntry) entry);
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700759
760 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
761 boolean success;
762 if (original == null) {
763 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
764 } else {
765 success = backupFlowTable.replace(id, original, newValue);
766 }
767 // TODO retry?
768 if (!success) {
769 log.error("Updating backup failed.");
770 }
771 }
Brian O'Connor427a1762014-11-19 18:40:32 -0800772 for (FlowRuleBatchEntry bEntry : toRemove) {
773 final FlowRule entry = bEntry.getTarget();
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700774 final FlowId id = entry.id();
775 ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
776 List<StoredFlowEntry> list = new ArrayList<>();
777 if (original != null) {
778 list.addAll(original);
779 }
780
Brian O'Connor427a1762014-11-19 18:40:32 -0800781 list.remove(bEntry.getTarget());
Yuta HIGUCHI92891d12014-10-27 20:04:38 -0700782
783 ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
784 boolean success;
785 if (original == null) {
786 success = (backupFlowTable.putIfAbsent(id, newValue) == null);
787 } else {
788 success = backupFlowTable.replace(id, original, newValue);
789 }
790 // TODO retry?
791 if (!success) {
792 log.error("Updating backup failed.");
793 }
794 }
795 } catch (ExecutionException e) {
796 log.error("Failed to write to backups", e);
797 }
798
799 }
800 }
alshabib339a3d92014-09-26 17:54:32 -0700801}