blob: 34d470c8fc74f8bc263c5aa92f5a1ffeebdc367a [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Map;
19import java.util.concurrent.atomic.AtomicReference;
20import java.util.function.Function;
Jordan Halterman158feb92018-06-30 23:44:30 -070021import java.util.stream.Collectors;
Jordan Halterman281dbf32018-06-15 17:46:28 -070022
23import com.google.common.collect.Maps;
24import org.onosproject.net.flow.DefaultFlowEntry;
25import org.onosproject.net.flow.FlowEntry;
26import org.onosproject.net.flow.FlowId;
27import org.onosproject.net.flow.FlowRule;
28import org.onosproject.net.flow.StoredFlowEntry;
29import org.onosproject.store.LogicalTimestamp;
30import org.slf4j.Logger;
31import org.slf4j.LoggerFactory;
32
33/**
34 * Container for a bucket of flows assigned to a specific device.
35 * <p>
36 * The bucket is mutable. When changes are made to the bucket, the term and timestamp in which the change
37 * occurred is recorded for ordering changes.
38 */
39public class FlowBucket {
40 private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
41 private final BucketId bucketId;
42 private volatile long term;
Jordan Halterman158feb92018-06-30 23:44:30 -070043 private volatile LogicalTimestamp timestamp;
44 private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket;
Jordan Halterman281dbf32018-06-15 17:46:28 -070045
46 FlowBucket(BucketId bucketId) {
Jordan Halterman158feb92018-06-30 23:44:30 -070047 this(bucketId, 0, new LogicalTimestamp(0), Maps.newConcurrentMap());
48 }
49
50 private FlowBucket(
51 BucketId bucketId,
52 long term,
53 LogicalTimestamp timestamp,
54 Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket) {
Jordan Halterman281dbf32018-06-15 17:46:28 -070055 this.bucketId = bucketId;
Jordan Halterman158feb92018-06-30 23:44:30 -070056 this.term = term;
57 this.timestamp = timestamp;
58 this.flowBucket = flowBucket;
Jordan Halterman281dbf32018-06-15 17:46:28 -070059 }
60
61 /**
62 * Returns the flow bucket identifier.
63 *
64 * @return the flow bucket identifier
65 */
66 public BucketId bucketId() {
67 return bucketId;
68 }
69
70 /**
71 * Returns the flow bucket term.
72 *
73 * @return the flow bucket term
74 */
75 public long term() {
76 return term;
77 }
78
79 /**
80 * Returns the flow bucket timestamp.
81 *
82 * @return the flow bucket timestamp
83 */
84 public LogicalTimestamp timestamp() {
85 return timestamp;
86 }
87
88 /**
89 * Returns the digest for the bucket.
90 *
91 * @return the digest for the bucket
92 */
93 public FlowBucketDigest getDigest() {
94 return new FlowBucketDigest(bucketId().bucket(), term(), timestamp());
95 }
96
97 /**
98 * Returns the flow entries in the bucket.
99 *
100 * @return the flow entries in the bucket
101 */
102 public Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowBucket() {
103 return flowBucket;
104 }
105
106 /**
107 * Returns the flow entries for the given flow.
108 *
109 * @param flowId the flow identifier
110 * @return the flows for the given flow ID
111 */
112 public Map<StoredFlowEntry, StoredFlowEntry> getFlowEntries(FlowId flowId) {
113 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(flowId);
114 return flowEntries != null ? flowEntries : flowBucket.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
115 }
116
117 /**
118 * Counts the flows in the bucket.
119 *
120 * @return the number of flows in the bucket
121 */
122 public int count() {
123 return flowBucket.values()
124 .stream()
125 .mapToInt(entry -> entry.values().size())
126 .sum();
127 }
128
129 /**
Jordan Halterman158feb92018-06-30 23:44:30 -0700130 * Returns a new copy of the flow bucket.
131 *
132 * @return a new copy of the flow bucket
133 */
134 FlowBucket copy() {
135 return new FlowBucket(
136 bucketId,
137 term,
138 timestamp,
139 flowBucket.entrySet()
140 .stream()
141 .map(e -> Maps.immutableEntry(e.getKey(), Maps.newHashMap(e.getValue())))
142 .collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue())));
143 }
144
145 /**
Jordan Halterman281dbf32018-06-15 17:46:28 -0700146 * Records an update to the bucket.
147 */
148 private void recordUpdate(long term, LogicalTimestamp timestamp) {
149 this.term = term;
150 this.timestamp = timestamp;
151 }
152
153 /**
154 * Adds the given flow rule to the bucket.
155 *
156 * @param rule the rule to add
157 * @param term the term in which the change occurred
158 * @param clock the logical clock
159 */
160 public void add(FlowEntry rule, long term, LogicalClock clock) {
161 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
162 if (flowEntries == null) {
163 flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
164 }
165 flowEntries.put((StoredFlowEntry) rule, (StoredFlowEntry) rule);
166 recordUpdate(term, clock.getTimestamp());
167 }
168
169 /**
170 * Updates the given flow rule in the bucket.
171 *
172 * @param rule the rule to update
173 * @param term the term in which the change occurred
174 * @param clock the logical clock
175 */
176 public void update(FlowEntry rule, long term, LogicalClock clock) {
177 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
178 if (flowEntries == null) {
179 flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
180 }
181 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
182 if (rule instanceof DefaultFlowEntry) {
183 DefaultFlowEntry updated = (DefaultFlowEntry) rule;
184 if (stored instanceof DefaultFlowEntry) {
185 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
186 if (updated.created() >= storedEntry.created()) {
187 recordUpdate(term, clock.getTimestamp());
188 return updated;
189 } else {
190 LOGGER.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
191 return stored;
192 }
193 }
194 }
195 return stored;
196 });
197 }
198
199 /**
200 * Applies the given update function to the rule.
201 *
202 * @param rule the rule to update
203 * @param function the update function to apply
204 * @param term the term in which the change occurred
205 * @param clock the logical clock
206 * @param <T> the result type
207 * @return the update result or {@code null} if the rule was not updated
208 */
209 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function, long term, LogicalClock clock) {
210 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
211 if (flowEntries == null) {
212 flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
213 }
214
215 AtomicReference<T> resultRef = new AtomicReference<>();
216 flowEntries.computeIfPresent(new DefaultFlowEntry(rule), (k, stored) -> {
217 if (stored != null) {
218 T result = function.apply(stored);
219 if (result != null) {
220 recordUpdate(term, clock.getTimestamp());
221 resultRef.set(result);
222 }
223 }
224 return stored;
225 });
226 return resultRef.get();
227 }
228
229 /**
230 * Removes the given flow rule from the bucket.
231 *
232 * @param rule the rule to remove
233 * @param term the term in which the change occurred
234 * @param clock the logical clock
235 * @return the removed flow entry
236 */
237 public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
238 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
239 flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
240 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
241 if (rule instanceof DefaultFlowEntry) {
242 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
243 if (stored instanceof DefaultFlowEntry) {
244 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
245 if (toRemove.created() < storedEntry.created()) {
246 LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
247 // the key is not updated, removedRule remains null
248 return stored;
249 }
250 }
251 }
252 removedRule.set(stored);
253 return null;
254 });
255 return flowEntries.isEmpty() ? null : flowEntries;
256 });
257
258 if (removedRule.get() != null) {
259 recordUpdate(term, clock.getTimestamp());
260 return removedRule.get();
261 } else {
262 return null;
263 }
264 }
265
266 /**
267 * Clears the bucket.
268 */
269 public void clear() {
270 term = 0;
271 timestamp = new LogicalTimestamp(0);
272 flowBucket.clear();
273 }
274}