blob: 3da5d2569cf10acec8075a11e459f8bcc6159e1e [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -07005import static org.onlab.onos.store.flow.impl.FlowStoreMessageSubjects.*;
alshabib339a3d92014-09-26 17:54:32 -07006
Madan Jampani38b250d2014-10-17 11:02:38 -07007import java.io.IOException;
alshabib339a3d92014-09-26 17:54:32 -07008import java.util.Collection;
9import java.util.Collections;
Madan Jampani38b250d2014-10-17 11:02:38 -070010import java.util.concurrent.TimeUnit;
11import java.util.concurrent.TimeoutException;
alshabib339a3d92014-09-26 17:54:32 -070012
13import org.apache.felix.scr.annotations.Activate;
14import org.apache.felix.scr.annotations.Component;
15import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070016import org.apache.felix.scr.annotations.Reference;
17import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070018import org.apache.felix.scr.annotations.Service;
19import org.onlab.onos.ApplicationId;
Madan Jampani38b250d2014-10-17 11:02:38 -070020import org.onlab.onos.cluster.ClusterService;
alshabib339a3d92014-09-26 17:54:32 -070021import org.onlab.onos.net.DeviceId;
alshabib1c319ff2014-10-04 20:29:09 -070022import org.onlab.onos.net.flow.DefaultFlowEntry;
23import org.onlab.onos.net.flow.FlowEntry;
24import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070025import org.onlab.onos.net.flow.FlowRule;
alshabib339a3d92014-09-26 17:54:32 -070026import org.onlab.onos.net.flow.FlowRuleEvent;
27import org.onlab.onos.net.flow.FlowRuleEvent.Type;
28import org.onlab.onos.net.flow.FlowRuleStore;
29import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070030import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070031import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070032import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
33import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070034import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
Madan Jampani38b250d2014-10-17 11:02:38 -070035import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
36import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070037import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070038import org.onlab.onos.store.serializers.DistributedStoreSerializers;
39import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070040import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070041import org.slf4j.Logger;
42
43import com.google.common.collect.ArrayListMultimap;
44import com.google.common.collect.ImmutableSet;
45import com.google.common.collect.Multimap;
46
47/**
Madan Jampani38b250d2014-10-17 11:02:38 -070048 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070049 */
alshabib339a3d92014-09-26 17:54:32 -070050@Component(immediate = true)
51@Service
52public class DistributedFlowRuleStore
alshabib1c319ff2014-10-04 20:29:09 -070053 extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
54 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070055
56 private final Logger log = getLogger(getClass());
57
58 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070059 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
60 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070061
alshabib92c65ad2014-10-08 21:56:05 -070062 private final Multimap<Short, FlowRule> flowEntriesById =
63 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070064
Madan Jampani38b250d2014-10-17 11:02:38 -070065 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070066 private ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070067
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 private ClusterCommunicationService clusterCommunicator;
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 private ClusterService clusterService;
73
74 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
75 @Override
76 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070077 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -070078 .register(DistributedStoreSerializers.COMMON)
79 .build()
80 .populate(1);
81 }
82 };
83
84 // TODO: make this configurable
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070085 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
Madan Jampani38b250d2014-10-17 11:02:38 -070086
alshabib339a3d92014-09-26 17:54:32 -070087 @Activate
88 public void activate() {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -070089 clusterCommunicator.addSubscriber(STORE_FLOW_RULE, new ClusterMessageHandler() {
90
91 @Override
92 public void handle(ClusterMessage message) {
93 FlowRule rule = SERIALIZER.decode(message.payload());
94 log.info("received add request for {}", rule);
95 storeFlowEntryInternal(rule);
96 // FIXME what to respond.
97 try {
98 // FIXME: #respond() not working. responded message is
99 // handled by this sender node and never goes back.
100 message.respond(SERIALIZER.encode("ACK"));
101 } catch (IOException e) {
102 log.error("Failed to respond back", e);
103 }
104 }
105 });
106
107 clusterCommunicator.addSubscriber(DELETE_FLOW_RULE, new ClusterMessageHandler() {
108
109 @Override
110 public void handle(ClusterMessage message) {
111 FlowRule rule = SERIALIZER.decode(message.payload());
112 log.info("received delete request for {}", rule);
113 deleteFlowRuleInternal(rule);
114 // FIXME what to respond.
115 try {
116 message.respond(SERIALIZER.encode("ACK"));
117 } catch (IOException e) {
118 log.error("Failed to respond back", e);
119 }
120
121 }
122 });
alshabib339a3d92014-09-26 17:54:32 -0700123 log.info("Started");
124 }
125
126 @Deactivate
127 public void deactivate() {
128 log.info("Stopped");
129 }
130
131
132 @Override
tom9b4030d2014-10-06 10:39:03 -0700133 public int getFlowRuleCount() {
134 return flowEntries.size();
135 }
136
137 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700138 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700139 return getFlowEntryInternal(rule);
140 }
141
142 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
143 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700144 if (f.equals(rule)) {
145 return f;
146 }
147 }
148 return null;
149 }
150
151 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700152 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700153 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700154 if (rules == null) {
155 return Collections.emptyList();
156 }
157 return ImmutableSet.copyOf(rules);
158 }
159
160 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700161 public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
alshabib92c65ad2014-10-08 21:56:05 -0700162 Collection<FlowRule> rules = flowEntriesById.get(appId.id());
alshabib339a3d92014-09-26 17:54:32 -0700163 if (rules == null) {
164 return Collections.emptyList();
165 }
166 return ImmutableSet.copyOf(rules);
167 }
168
169 @Override
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700170 public boolean storeFlowRule(FlowRule rule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700171 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700172 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700173 return storeFlowEntryInternal(rule);
alshabib339a3d92014-09-26 17:54:32 -0700174 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700175
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700176 log.warn("Not my flow forwarding to {}", replicaInfo.master().orNull());
177
Madan Jampani38b250d2014-10-17 11:02:38 -0700178 ClusterMessage message = new ClusterMessage(
179 clusterService.getLocalNode().id(),
180 FlowStoreMessageSubjects.STORE_FLOW_RULE,
181 SERIALIZER.encode(rule));
182
183 try {
184 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
185 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
186 } catch (IOException | TimeoutException e) {
187 // FIXME: throw a FlowStoreException
188 throw new RuntimeException(e);
189 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700190 return false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700191 }
192
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700193 private synchronized boolean storeFlowEntryInternal(FlowRule flowRule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700194 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700195 DeviceId deviceId = flowRule.deviceId();
196 // write to local copy.
197 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
198 flowEntries.put(deviceId, flowEntry);
199 flowEntriesById.put(flowRule.appId(), flowEntry);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700200 notifyDelegate(new FlowRuleEvent(Type.RULE_ADD_REQUESTED, flowRule));
201 return true;
Madan Jampani38b250d2014-10-17 11:02:38 -0700202 }
203 // write to backup.
204 // TODO: write to a hazelcast map.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700205 return false;
alshabib339a3d92014-09-26 17:54:32 -0700206 }
207
208 @Override
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700209 public synchronized boolean deleteFlowRule(FlowRule rule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700210 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700211 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700212 return deleteFlowRuleInternal(rule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700213 }
214
215 ClusterMessage message = new ClusterMessage(
216 clusterService.getLocalNode().id(),
217 FlowStoreMessageSubjects.DELETE_FLOW_RULE,
218 SERIALIZER.encode(rule));
219
220 try {
221 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
222 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
223 } catch (IOException | TimeoutException e) {
224 // FIXME: throw a FlowStoreException
225 throw new RuntimeException(e);
226 }
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700227 return false;
Madan Jampani38b250d2014-10-17 11:02:38 -0700228 }
229
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700230 private synchronized boolean deleteFlowRuleInternal(FlowRule flowRule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700231 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
alshabib1c319ff2014-10-04 20:29:09 -0700232 if (entry == null) {
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700233 return false;
alshabib339a3d92014-09-26 17:54:32 -0700234 }
alshabib1c319ff2014-10-04 20:29:09 -0700235 entry.setState(FlowEntryState.PENDING_REMOVE);
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700236
Madan Jampani38b250d2014-10-17 11:02:38 -0700237 // TODO: also update backup.
Yuta HIGUCHIf3d51bd2014-10-21 01:05:33 -0700238
239 notifyDelegate(new FlowRuleEvent(Type.RULE_REMOVE_REQUESTED, flowRule));
240
241 return true;
alshabib339a3d92014-09-26 17:54:32 -0700242 }
243
244 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700245 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
246 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700247 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700248 return addOrUpdateFlowRuleInternal(rule);
249 }
250
251 ClusterMessage message = new ClusterMessage(
252 clusterService.getLocalNode().id(),
253 FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
254 SERIALIZER.encode(rule));
255
256 try {
257 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
258 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
259 } catch (IOException | TimeoutException e) {
260 // FIXME: throw a FlowStoreException
261 throw new RuntimeException(e);
262 }
263 }
264
265 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700266 DeviceId did = rule.deviceId();
267
268 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700269 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700270 if (stored != null) {
271 stored.setBytes(rule.bytes());
272 stored.setLife(rule.life());
273 stored.setPackets(rule.packets());
274 if (stored.state() == FlowEntryState.PENDING_ADD) {
275 stored.setState(FlowEntryState.ADDED);
276 return new FlowRuleEvent(Type.RULE_ADDED, rule);
277 }
alshabib339a3d92014-09-26 17:54:32 -0700278 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
279 }
280
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700281 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
282 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700283 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700284
285 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700286 }
287
288 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700289 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
290 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700291 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700292 // bypass and handle it locally
293 return removeFlowRuleInternal(rule);
294 }
295
296 ClusterMessage message = new ClusterMessage(
297 clusterService.getLocalNode().id(),
298 FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
299 SERIALIZER.encode(rule));
300
301 try {
302 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
303 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
304 } catch (IOException | TimeoutException e) {
305 // FIXME: throw a FlowStoreException
306 throw new RuntimeException(e);
307 }
308 }
309
310 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700311 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700312 if (flowEntries.remove(rule.deviceId(), rule)) {
313 return new FlowRuleEvent(RULE_REMOVED, rule);
314 } else {
315 return null;
316 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700317 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700318 }
alshabib339a3d92014-09-26 17:54:32 -0700319}