blob: 2205b0bf1d93fec7301d33fddf8778f6f990df21 [file] [log] [blame]
Jordan Halterman281dbf32018-06-15 17:46:28 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.flow.impl;
17
18import java.util.Map;
19import java.util.concurrent.atomic.AtomicReference;
20import java.util.function.Function;
21
22import com.google.common.collect.Maps;
23import org.onosproject.net.flow.DefaultFlowEntry;
24import org.onosproject.net.flow.FlowEntry;
25import org.onosproject.net.flow.FlowId;
26import org.onosproject.net.flow.FlowRule;
27import org.onosproject.net.flow.StoredFlowEntry;
28import org.onosproject.store.LogicalTimestamp;
29import org.slf4j.Logger;
30import org.slf4j.LoggerFactory;
31
32/**
33 * Container for a bucket of flows assigned to a specific device.
34 * <p>
35 * The bucket is mutable. When changes are made to the bucket, the term and timestamp in which the change
36 * occurred is recorded for ordering changes.
37 */
38public class FlowBucket {
39 private static final Logger LOGGER = LoggerFactory.getLogger(FlowBucket.class);
40 private final BucketId bucketId;
41 private volatile long term;
42 private volatile LogicalTimestamp timestamp = new LogicalTimestamp(0);
43 private final Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> flowBucket = Maps.newConcurrentMap();
44
45 FlowBucket(BucketId bucketId) {
46 this.bucketId = bucketId;
47 }
48
49 /**
50 * Returns the flow bucket identifier.
51 *
52 * @return the flow bucket identifier
53 */
54 public BucketId bucketId() {
55 return bucketId;
56 }
57
58 /**
59 * Returns the flow bucket term.
60 *
61 * @return the flow bucket term
62 */
63 public long term() {
64 return term;
65 }
66
67 /**
68 * Returns the flow bucket timestamp.
69 *
70 * @return the flow bucket timestamp
71 */
72 public LogicalTimestamp timestamp() {
73 return timestamp;
74 }
75
76 /**
77 * Returns the digest for the bucket.
78 *
79 * @return the digest for the bucket
80 */
81 public FlowBucketDigest getDigest() {
82 return new FlowBucketDigest(bucketId().bucket(), term(), timestamp());
83 }
84
85 /**
86 * Returns the flow entries in the bucket.
87 *
88 * @return the flow entries in the bucket
89 */
90 public Map<FlowId, Map<StoredFlowEntry, StoredFlowEntry>> getFlowBucket() {
91 return flowBucket;
92 }
93
94 /**
95 * Returns the flow entries for the given flow.
96 *
97 * @param flowId the flow identifier
98 * @return the flows for the given flow ID
99 */
100 public Map<StoredFlowEntry, StoredFlowEntry> getFlowEntries(FlowId flowId) {
101 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(flowId);
102 return flowEntries != null ? flowEntries : flowBucket.computeIfAbsent(flowId, id -> Maps.newConcurrentMap());
103 }
104
105 /**
106 * Counts the flows in the bucket.
107 *
108 * @return the number of flows in the bucket
109 */
110 public int count() {
111 return flowBucket.values()
112 .stream()
113 .mapToInt(entry -> entry.values().size())
114 .sum();
115 }
116
117 /**
118 * Records an update to the bucket.
119 */
120 private void recordUpdate(long term, LogicalTimestamp timestamp) {
121 this.term = term;
122 this.timestamp = timestamp;
123 }
124
125 /**
126 * Adds the given flow rule to the bucket.
127 *
128 * @param rule the rule to add
129 * @param term the term in which the change occurred
130 * @param clock the logical clock
131 */
132 public void add(FlowEntry rule, long term, LogicalClock clock) {
133 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
134 if (flowEntries == null) {
135 flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
136 }
137 flowEntries.put((StoredFlowEntry) rule, (StoredFlowEntry) rule);
138 recordUpdate(term, clock.getTimestamp());
139 }
140
141 /**
142 * Updates the given flow rule in the bucket.
143 *
144 * @param rule the rule to update
145 * @param term the term in which the change occurred
146 * @param clock the logical clock
147 */
148 public void update(FlowEntry rule, long term, LogicalClock clock) {
149 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
150 if (flowEntries == null) {
151 flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
152 }
153 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
154 if (rule instanceof DefaultFlowEntry) {
155 DefaultFlowEntry updated = (DefaultFlowEntry) rule;
156 if (stored instanceof DefaultFlowEntry) {
157 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
158 if (updated.created() >= storedEntry.created()) {
159 recordUpdate(term, clock.getTimestamp());
160 return updated;
161 } else {
162 LOGGER.debug("Trying to update more recent flow entry {} (stored: {})", updated, stored);
163 return stored;
164 }
165 }
166 }
167 return stored;
168 });
169 }
170
171 /**
172 * Applies the given update function to the rule.
173 *
174 * @param rule the rule to update
175 * @param function the update function to apply
176 * @param term the term in which the change occurred
177 * @param clock the logical clock
178 * @param <T> the result type
179 * @return the update result or {@code null} if the rule was not updated
180 */
181 public <T> T update(FlowRule rule, Function<StoredFlowEntry, T> function, long term, LogicalClock clock) {
182 Map<StoredFlowEntry, StoredFlowEntry> flowEntries = flowBucket.get(rule.id());
183 if (flowEntries == null) {
184 flowEntries = flowBucket.computeIfAbsent(rule.id(), id -> Maps.newConcurrentMap());
185 }
186
187 AtomicReference<T> resultRef = new AtomicReference<>();
188 flowEntries.computeIfPresent(new DefaultFlowEntry(rule), (k, stored) -> {
189 if (stored != null) {
190 T result = function.apply(stored);
191 if (result != null) {
192 recordUpdate(term, clock.getTimestamp());
193 resultRef.set(result);
194 }
195 }
196 return stored;
197 });
198 return resultRef.get();
199 }
200
201 /**
202 * Removes the given flow rule from the bucket.
203 *
204 * @param rule the rule to remove
205 * @param term the term in which the change occurred
206 * @param clock the logical clock
207 * @return the removed flow entry
208 */
209 public FlowEntry remove(FlowEntry rule, long term, LogicalClock clock) {
210 final AtomicReference<FlowEntry> removedRule = new AtomicReference<>();
211 flowBucket.computeIfPresent(rule.id(), (flowId, flowEntries) -> {
212 flowEntries.computeIfPresent((StoredFlowEntry) rule, (k, stored) -> {
213 if (rule instanceof DefaultFlowEntry) {
214 DefaultFlowEntry toRemove = (DefaultFlowEntry) rule;
215 if (stored instanceof DefaultFlowEntry) {
216 DefaultFlowEntry storedEntry = (DefaultFlowEntry) stored;
217 if (toRemove.created() < storedEntry.created()) {
218 LOGGER.debug("Trying to remove more recent flow entry {} (stored: {})", toRemove, stored);
219 // the key is not updated, removedRule remains null
220 return stored;
221 }
222 }
223 }
224 removedRule.set(stored);
225 return null;
226 });
227 return flowEntries.isEmpty() ? null : flowEntries;
228 });
229
230 if (removedRule.get() != null) {
231 recordUpdate(term, clock.getTimestamp());
232 return removedRule.get();
233 } else {
234 return null;
235 }
236 }
237
238 /**
239 * Clears the bucket.
240 */
241 public void clear() {
242 term = 0;
243 timestamp = new LogicalTimestamp(0);
244 flowBucket.clear();
245 }
246}