blob: e5b2ed607444e6e16e30cd606abb2083073120fd [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
5
Madan Jampani38b250d2014-10-17 11:02:38 -07006import java.io.IOException;
alshabib339a3d92014-09-26 17:54:32 -07007import java.util.Collection;
8import java.util.Collections;
Madan Jampani38b250d2014-10-17 11:02:38 -07009import java.util.concurrent.TimeUnit;
10import java.util.concurrent.TimeoutException;
alshabib339a3d92014-09-26 17:54:32 -070011
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070015import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070017import org.apache.felix.scr.annotations.Service;
18import org.onlab.onos.ApplicationId;
Madan Jampani38b250d2014-10-17 11:02:38 -070019import org.onlab.onos.cluster.ClusterService;
alshabib339a3d92014-09-26 17:54:32 -070020import org.onlab.onos.net.DeviceId;
alshabib1c319ff2014-10-04 20:29:09 -070021import org.onlab.onos.net.flow.DefaultFlowEntry;
22import org.onlab.onos.net.flow.FlowEntry;
23import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070024import org.onlab.onos.net.flow.FlowRule;
alshabib339a3d92014-09-26 17:54:32 -070025import org.onlab.onos.net.flow.FlowRuleEvent;
26import org.onlab.onos.net.flow.FlowRuleEvent.Type;
27import org.onlab.onos.net.flow.FlowRuleStore;
28import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070029import org.onlab.onos.net.flow.StoredFlowEntry;
alshabib339a3d92014-09-26 17:54:32 -070030import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070031import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
32import org.onlab.onos.store.cluster.messaging.ClusterMessage;
33import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
34import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070035import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070036import org.onlab.onos.store.serializers.DistributedStoreSerializers;
37import org.onlab.onos.store.serializers.KryoSerializer;
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070038import org.onlab.util.KryoNamespace;
alshabib339a3d92014-09-26 17:54:32 -070039import org.slf4j.Logger;
40
41import com.google.common.collect.ArrayListMultimap;
42import com.google.common.collect.ImmutableSet;
43import com.google.common.collect.Multimap;
44
45/**
Madan Jampani38b250d2014-10-17 11:02:38 -070046 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070047 */
alshabib339a3d92014-09-26 17:54:32 -070048@Component(immediate = true)
49@Service
50public class DistributedFlowRuleStore
alshabib1c319ff2014-10-04 20:29:09 -070051 extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
52 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070053
54 private final Logger log = getLogger(getClass());
55
56 // store entries as a pile of rules, no info about device tables
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -070057 private final Multimap<DeviceId, StoredFlowEntry> flowEntries =
58 ArrayListMultimap.<DeviceId, StoredFlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070059
alshabib92c65ad2014-10-08 21:56:05 -070060 private final Multimap<Short, FlowRule> flowEntriesById =
61 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070062
Madan Jampani38b250d2014-10-17 11:02:38 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070064 private ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070065
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 private ClusterCommunicationService clusterCommunicator;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 private ClusterService clusterService;
71
72 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
73 @Override
74 protected void setupKryoPool() {
Yuta HIGUCHI8d143d22014-10-19 23:15:09 -070075 serializerPool = KryoNamespace.newBuilder()
Madan Jampani38b250d2014-10-17 11:02:38 -070076 .register(DistributedStoreSerializers.COMMON)
77 .build()
78 .populate(1);
79 }
80 };
81
82 // TODO: make this configurable
83 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
84
alshabib339a3d92014-09-26 17:54:32 -070085 @Activate
86 public void activate() {
87 log.info("Started");
88 }
89
90 @Deactivate
91 public void deactivate() {
92 log.info("Stopped");
93 }
94
95
96 @Override
tom9b4030d2014-10-06 10:39:03 -070097 public int getFlowRuleCount() {
98 return flowEntries.size();
99 }
100
101 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700102 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700103 return getFlowEntryInternal(rule);
104 }
105
106 private synchronized StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
107 for (StoredFlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700108 if (f.equals(rule)) {
109 return f;
110 }
111 }
112 return null;
113 }
114
115 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700116 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700117 Collection<? extends FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700118 if (rules == null) {
119 return Collections.emptyList();
120 }
121 return ImmutableSet.copyOf(rules);
122 }
123
124 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700125 public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
alshabib92c65ad2014-10-08 21:56:05 -0700126 Collection<FlowRule> rules = flowEntriesById.get(appId.id());
alshabib339a3d92014-09-26 17:54:32 -0700127 if (rules == null) {
128 return Collections.emptyList();
129 }
130 return ImmutableSet.copyOf(rules);
131 }
132
133 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700134 public void storeFlowRule(FlowRule rule) {
135 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700136 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700137 storeFlowEntryInternal(rule);
138 return;
alshabib339a3d92014-09-26 17:54:32 -0700139 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700140
141 ClusterMessage message = new ClusterMessage(
142 clusterService.getLocalNode().id(),
143 FlowStoreMessageSubjects.STORE_FLOW_RULE,
144 SERIALIZER.encode(rule));
145
146 try {
147 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
148 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
149 } catch (IOException | TimeoutException e) {
150 // FIXME: throw a FlowStoreException
151 throw new RuntimeException(e);
152 }
153 }
154
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700155 private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700156 StoredFlowEntry flowEntry = new DefaultFlowEntry(flowRule);
Madan Jampani38b250d2014-10-17 11:02:38 -0700157 DeviceId deviceId = flowRule.deviceId();
158 // write to local copy.
159 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
160 flowEntries.put(deviceId, flowEntry);
161 flowEntriesById.put(flowRule.appId(), flowEntry);
162 }
163 // write to backup.
164 // TODO: write to a hazelcast map.
alshabib339a3d92014-09-26 17:54:32 -0700165 }
166
167 @Override
168 public synchronized void deleteFlowRule(FlowRule rule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700169 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700170 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700171 deleteFlowRuleInternal(rule);
172 return;
173 }
174
175 ClusterMessage message = new ClusterMessage(
176 clusterService.getLocalNode().id(),
177 FlowStoreMessageSubjects.DELETE_FLOW_RULE,
178 SERIALIZER.encode(rule));
179
180 try {
181 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
182 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
183 } catch (IOException | TimeoutException e) {
184 // FIXME: throw a FlowStoreException
185 throw new RuntimeException(e);
186 }
187 }
188
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700189 private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700190 StoredFlowEntry entry = getFlowEntryInternal(flowRule);
alshabib1c319ff2014-10-04 20:29:09 -0700191 if (entry == null) {
192 return;
alshabib339a3d92014-09-26 17:54:32 -0700193 }
alshabib1c319ff2014-10-04 20:29:09 -0700194 entry.setState(FlowEntryState.PENDING_REMOVE);
Madan Jampani38b250d2014-10-17 11:02:38 -0700195 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700196 }
197
198 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700199 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
200 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700201 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700202 return addOrUpdateFlowRuleInternal(rule);
203 }
204
205 ClusterMessage message = new ClusterMessage(
206 clusterService.getLocalNode().id(),
207 FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
208 SERIALIZER.encode(rule));
209
210 try {
211 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
212 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
213 } catch (IOException | TimeoutException e) {
214 // FIXME: throw a FlowStoreException
215 throw new RuntimeException(e);
216 }
217 }
218
219 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700220 DeviceId did = rule.deviceId();
221
222 // check if this new rule is an update to an existing entry
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700223 StoredFlowEntry stored = getFlowEntryInternal(rule);
alshabib1c319ff2014-10-04 20:29:09 -0700224 if (stored != null) {
225 stored.setBytes(rule.bytes());
226 stored.setLife(rule.life());
227 stored.setPackets(rule.packets());
228 if (stored.state() == FlowEntryState.PENDING_ADD) {
229 stored.setState(FlowEntryState.ADDED);
230 return new FlowRuleEvent(Type.RULE_ADDED, rule);
231 }
alshabib339a3d92014-09-26 17:54:32 -0700232 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
233 }
234
Yuta HIGUCHIf6f50a62014-10-19 15:58:49 -0700235 // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
236 flowEntries.put(did, new DefaultFlowEntry(rule));
alshabib1c319ff2014-10-04 20:29:09 -0700237 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700238
239 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700240 }
241
242 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700243 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
244 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700245 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700246 // bypass and handle it locally
247 return removeFlowRuleInternal(rule);
248 }
249
250 ClusterMessage message = new ClusterMessage(
251 clusterService.getLocalNode().id(),
252 FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
253 SERIALIZER.encode(rule));
254
255 try {
256 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
257 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
258 } catch (IOException | TimeoutException e) {
259 // FIXME: throw a FlowStoreException
260 throw new RuntimeException(e);
261 }
262 }
263
264 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700265 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700266 if (flowEntries.remove(rule.deviceId(), rule)) {
267 return new FlowRuleEvent(RULE_REMOVED, rule);
268 } else {
269 return null;
270 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700271 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700272 }
alshabib339a3d92014-09-26 17:54:32 -0700273}