blob: 9c67f6983c3c6dcc157afd4b6fbe400400ac8ace [file] [log] [blame]
alshabib339a3d92014-09-26 17:54:32 -07001package org.onlab.onos.store.flow.impl;
2
alshabib339a3d92014-09-26 17:54:32 -07003import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
4import static org.slf4j.LoggerFactory.getLogger;
5
Madan Jampani38b250d2014-10-17 11:02:38 -07006import java.io.IOException;
alshabib339a3d92014-09-26 17:54:32 -07007import java.util.Collection;
8import java.util.Collections;
Madan Jampani38b250d2014-10-17 11:02:38 -07009import java.util.concurrent.TimeUnit;
10import java.util.concurrent.TimeoutException;
alshabib339a3d92014-09-26 17:54:32 -070011
12import org.apache.felix.scr.annotations.Activate;
13import org.apache.felix.scr.annotations.Component;
14import org.apache.felix.scr.annotations.Deactivate;
Madan Jampani38b250d2014-10-17 11:02:38 -070015import org.apache.felix.scr.annotations.Reference;
16import org.apache.felix.scr.annotations.ReferenceCardinality;
alshabib339a3d92014-09-26 17:54:32 -070017import org.apache.felix.scr.annotations.Service;
18import org.onlab.onos.ApplicationId;
Madan Jampani38b250d2014-10-17 11:02:38 -070019import org.onlab.onos.cluster.ClusterService;
alshabib339a3d92014-09-26 17:54:32 -070020import org.onlab.onos.net.DeviceId;
alshabib1c319ff2014-10-04 20:29:09 -070021import org.onlab.onos.net.flow.DefaultFlowEntry;
22import org.onlab.onos.net.flow.FlowEntry;
23import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
alshabib339a3d92014-09-26 17:54:32 -070024import org.onlab.onos.net.flow.FlowRule;
alshabib339a3d92014-09-26 17:54:32 -070025import org.onlab.onos.net.flow.FlowRuleEvent;
26import org.onlab.onos.net.flow.FlowRuleEvent.Type;
27import org.onlab.onos.net.flow.FlowRuleStore;
28import org.onlab.onos.net.flow.FlowRuleStoreDelegate;
29import org.onlab.onos.store.AbstractStore;
Madan Jampani38b250d2014-10-17 11:02:38 -070030import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
31import org.onlab.onos.store.cluster.messaging.ClusterMessage;
32import org.onlab.onos.store.cluster.messaging.ClusterMessageResponse;
33import org.onlab.onos.store.flow.ReplicaInfo;
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070034import org.onlab.onos.store.flow.ReplicaInfoService;
Madan Jampani38b250d2014-10-17 11:02:38 -070035import org.onlab.onos.store.serializers.DistributedStoreSerializers;
36import org.onlab.onos.store.serializers.KryoSerializer;
37import org.onlab.util.KryoPool;
alshabib339a3d92014-09-26 17:54:32 -070038import org.slf4j.Logger;
39
40import com.google.common.collect.ArrayListMultimap;
41import com.google.common.collect.ImmutableSet;
42import com.google.common.collect.Multimap;
43
44/**
Madan Jampani38b250d2014-10-17 11:02:38 -070045 * Manages inventory of flow rules using a distributed state management protocol.
alshabib339a3d92014-09-26 17:54:32 -070046 */
alshabib339a3d92014-09-26 17:54:32 -070047@Component(immediate = true)
48@Service
49public class DistributedFlowRuleStore
alshabib1c319ff2014-10-04 20:29:09 -070050 extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
51 implements FlowRuleStore {
alshabib339a3d92014-09-26 17:54:32 -070052
53 private final Logger log = getLogger(getClass());
54
55 // store entries as a pile of rules, no info about device tables
alshabib1c319ff2014-10-04 20:29:09 -070056 private final Multimap<DeviceId, FlowEntry> flowEntries =
57 ArrayListMultimap.<DeviceId, FlowEntry>create();
alshabib339a3d92014-09-26 17:54:32 -070058
alshabib92c65ad2014-10-08 21:56:05 -070059 private final Multimap<Short, FlowRule> flowEntriesById =
60 ArrayListMultimap.<Short, FlowRule>create();
alshabib339a3d92014-09-26 17:54:32 -070061
Madan Jampani38b250d2014-10-17 11:02:38 -070062 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Yuta HIGUCHIfe280eb2014-10-17 12:10:43 -070063 private ReplicaInfoService replicaInfoManager;
Madan Jampani38b250d2014-10-17 11:02:38 -070064
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 private ClusterCommunicationService clusterCommunicator;
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 private ClusterService clusterService;
70
71 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
72 @Override
73 protected void setupKryoPool() {
74 serializerPool = KryoPool.newBuilder()
75 .register(DistributedStoreSerializers.COMMON)
76 .build()
77 .populate(1);
78 }
79 };
80
81 // TODO: make this configurable
82 private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 1000;
83
alshabib339a3d92014-09-26 17:54:32 -070084 @Activate
85 public void activate() {
86 log.info("Started");
87 }
88
89 @Deactivate
90 public void deactivate() {
91 log.info("Stopped");
92 }
93
94
95 @Override
tom9b4030d2014-10-06 10:39:03 -070096 public int getFlowRuleCount() {
97 return flowEntries.size();
98 }
99
100 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700101 public synchronized FlowEntry getFlowEntry(FlowRule rule) {
102 for (FlowEntry f : flowEntries.get(rule.deviceId())) {
alshabib339a3d92014-09-26 17:54:32 -0700103 if (f.equals(rule)) {
104 return f;
105 }
106 }
107 return null;
108 }
109
110 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700111 public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
112 Collection<FlowEntry> rules = flowEntries.get(deviceId);
alshabib339a3d92014-09-26 17:54:32 -0700113 if (rules == null) {
114 return Collections.emptyList();
115 }
116 return ImmutableSet.copyOf(rules);
117 }
118
119 @Override
alshabib1c319ff2014-10-04 20:29:09 -0700120 public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
alshabib92c65ad2014-10-08 21:56:05 -0700121 Collection<FlowRule> rules = flowEntriesById.get(appId.id());
alshabib339a3d92014-09-26 17:54:32 -0700122 if (rules == null) {
123 return Collections.emptyList();
124 }
125 return ImmutableSet.copyOf(rules);
126 }
127
128 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700129 public void storeFlowRule(FlowRule rule) {
130 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700131 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700132 storeFlowEntryInternal(rule);
133 return;
alshabib339a3d92014-09-26 17:54:32 -0700134 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700135
136 ClusterMessage message = new ClusterMessage(
137 clusterService.getLocalNode().id(),
138 FlowStoreMessageSubjects.STORE_FLOW_RULE,
139 SERIALIZER.encode(rule));
140
141 try {
142 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
143 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
144 } catch (IOException | TimeoutException e) {
145 // FIXME: throw a FlowStoreException
146 throw new RuntimeException(e);
147 }
148 }
149
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700150 private synchronized void storeFlowEntryInternal(FlowRule flowRule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700151 FlowEntry flowEntry = new DefaultFlowEntry(flowRule);
152 DeviceId deviceId = flowRule.deviceId();
153 // write to local copy.
154 if (!flowEntries.containsEntry(deviceId, flowEntry)) {
155 flowEntries.put(deviceId, flowEntry);
156 flowEntriesById.put(flowRule.appId(), flowEntry);
157 }
158 // write to backup.
159 // TODO: write to a hazelcast map.
alshabib339a3d92014-09-26 17:54:32 -0700160 }
161
162 @Override
163 public synchronized void deleteFlowRule(FlowRule rule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700164 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700165 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700166 deleteFlowRuleInternal(rule);
167 return;
168 }
169
170 ClusterMessage message = new ClusterMessage(
171 clusterService.getLocalNode().id(),
172 FlowStoreMessageSubjects.DELETE_FLOW_RULE,
173 SERIALIZER.encode(rule));
174
175 try {
176 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
177 response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
178 } catch (IOException | TimeoutException e) {
179 // FIXME: throw a FlowStoreException
180 throw new RuntimeException(e);
181 }
182 }
183
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700184 private synchronized void deleteFlowRuleInternal(FlowRule flowRule) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700185 FlowEntry entry = getFlowEntry(flowRule);
alshabib1c319ff2014-10-04 20:29:09 -0700186 if (entry == null) {
187 return;
alshabib339a3d92014-09-26 17:54:32 -0700188 }
alshabib1c319ff2014-10-04 20:29:09 -0700189 entry.setState(FlowEntryState.PENDING_REMOVE);
Madan Jampani38b250d2014-10-17 11:02:38 -0700190 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700191 }
192
193 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700194 public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
195 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700196 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700197 return addOrUpdateFlowRuleInternal(rule);
198 }
199
200 ClusterMessage message = new ClusterMessage(
201 clusterService.getLocalNode().id(),
202 FlowStoreMessageSubjects.ADD_OR_UPDATE_FLOW_RULE,
203 SERIALIZER.encode(rule));
204
205 try {
206 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
207 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
208 } catch (IOException | TimeoutException e) {
209 // FIXME: throw a FlowStoreException
210 throw new RuntimeException(e);
211 }
212 }
213
214 private synchronized FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
alshabib339a3d92014-09-26 17:54:32 -0700215 DeviceId did = rule.deviceId();
216
217 // check if this new rule is an update to an existing entry
alshabib1c319ff2014-10-04 20:29:09 -0700218 FlowEntry stored = getFlowEntry(rule);
219 if (stored != null) {
220 stored.setBytes(rule.bytes());
221 stored.setLife(rule.life());
222 stored.setPackets(rule.packets());
223 if (stored.state() == FlowEntryState.PENDING_ADD) {
224 stored.setState(FlowEntryState.ADDED);
225 return new FlowRuleEvent(Type.RULE_ADDED, rule);
226 }
alshabib339a3d92014-09-26 17:54:32 -0700227 return new FlowRuleEvent(Type.RULE_UPDATED, rule);
228 }
229
230 flowEntries.put(did, rule);
alshabib1c319ff2014-10-04 20:29:09 -0700231 return null;
Madan Jampani38b250d2014-10-17 11:02:38 -0700232
233 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700234 }
235
236 @Override
Madan Jampani38b250d2014-10-17 11:02:38 -0700237 public FlowRuleEvent removeFlowRule(FlowEntry rule) {
238 ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
Yuta HIGUCHI0858b352014-10-17 11:56:06 -0700239 if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
Madan Jampani38b250d2014-10-17 11:02:38 -0700240 // bypass and handle it locally
241 return removeFlowRuleInternal(rule);
242 }
243
244 ClusterMessage message = new ClusterMessage(
245 clusterService.getLocalNode().id(),
246 FlowStoreMessageSubjects.REMOVE_FLOW_RULE,
247 SERIALIZER.encode(rule));
248
249 try {
250 ClusterMessageResponse response = clusterCommunicator.sendAndReceive(message, replicaInfo.master().get());
251 return SERIALIZER.decode(response.get(FLOW_RULE_STORE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS));
252 } catch (IOException | TimeoutException e) {
253 // FIXME: throw a FlowStoreException
254 throw new RuntimeException(e);
255 }
256 }
257
258 private synchronized FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
alshabib1c319ff2014-10-04 20:29:09 -0700259 // This is where one could mark a rule as removed and still keep it in the store.
alshabib339a3d92014-09-26 17:54:32 -0700260 if (flowEntries.remove(rule.deviceId(), rule)) {
261 return new FlowRuleEvent(RULE_REMOVED, rule);
262 } else {
263 return null;
264 }
Madan Jampani38b250d2014-10-17 11:02:38 -0700265 // TODO: also update backup.
alshabib339a3d92014-09-26 17:54:32 -0700266 }
alshabib339a3d92014-09-26 17:54:32 -0700267}