blob: fc982bea41b44c11bc2df5b0b6a6e866984deaa1 [file] [log] [blame]
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.drivers.p4runtime.mirror;
18
19import com.google.common.annotations.Beta;
Carmelo Casconee44592f2018-09-12 02:24:47 -070020import com.google.common.collect.Maps;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.onlab.util.KryoNamespace;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070027import org.onlab.util.SharedExecutors;
ghj0504520ed7340c2018-10-26 13:06:35 -070028import org.onosproject.net.Annotations;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080029import org.onosproject.net.DeviceId;
30import org.onosproject.net.pi.runtime.PiEntity;
31import org.onosproject.net.pi.runtime.PiHandle;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070032import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
33import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
34import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080035import org.onosproject.store.service.EventuallyConsistentMap;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.WallClockTimestamp;
38import org.slf4j.Logger;
39
40import java.util.Collection;
41import java.util.Map;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070042import java.util.Set;
43import java.util.concurrent.atomic.AtomicInteger;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080044import java.util.stream.Collectors;
45
46import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070047import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080048import static org.slf4j.LoggerFactory.getLogger;
49
50/**
51 * Abstract implementation of a distributed P4Runtime mirror, backed by an
52 * {@link EventuallyConsistentMap}.
53 *
54 * @param <H> handle class
55 * @param <E> entry class
56 */
57@Beta
58@Component(immediate = true)
59public abstract class AbstractDistributedP4RuntimeMirror
60 <H extends PiHandle, E extends PiEntity>
61 implements P4RuntimeMirror<H, E> {
62
63 private final Logger log = getLogger(getClass());
64
65 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
66 private StorageService storageService;
67
Carmelo Casconec7639fb2018-09-10 01:59:49 -070068 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 private PiPipeconfWatchdogService pipeconfWatchdogService;
70
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080071 private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
72
ghj0504520ed7340c2018-10-26 13:06:35 -070073 private EventuallyConsistentMap<H, Annotations> annotationsMap;
74
Carmelo Casconec7639fb2018-09-10 01:59:49 -070075 private final PiPipeconfWatchdogListener pipeconfListener =
76 new InternalPipeconfWatchdogListener();
77
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080078 @Activate
79 public void activate() {
80 mirrorMap = storageService
81 .<H, TimedEntry<E>>eventuallyConsistentMapBuilder()
82 .withName(mapName())
83 .withSerializer(storeSerializer())
84 .withTimestampProvider((k, v) -> new WallClockTimestamp())
85 .build();
ghj0504520ed7340c2018-10-26 13:06:35 -070086
87 annotationsMap = storageService
88 .<H, Annotations>eventuallyConsistentMapBuilder()
89 .withName(mapName() + "-annotations")
90 .withSerializer(storeSerializer())
91 .withTimestampProvider((k, v) -> new WallClockTimestamp())
92 .build();
93
Carmelo Casconec7639fb2018-09-10 01:59:49 -070094 pipeconfWatchdogService.addListener(pipeconfListener);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080095 log.info("Started");
96 }
97
98 abstract String mapName();
99
100 abstract KryoNamespace storeSerializer();
101
102 @Deactivate
103 public void deactivate() {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700104 pipeconfWatchdogService.removeListener(pipeconfListener);
Frank Wang222262f2017-12-18 20:17:36 +0800105 mirrorMap.destroy();
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800106 mirrorMap = null;
107 log.info("Stopped");
108 }
109
110 @Override
111 public Collection<TimedEntry<E>> getAll(DeviceId deviceId) {
112 checkNotNull(deviceId);
113 return mirrorMap.entrySet().stream()
114 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
115 .map(Map.Entry::getValue)
116 .collect(Collectors.toList());
117 }
118
119 @Override
120 public TimedEntry<E> get(H handle) {
121 checkNotNull(handle);
122 return mirrorMap.get(handle);
123 }
124
125 @Override
126 public void put(H handle, E entry) {
127 checkNotNull(handle);
128 checkNotNull(entry);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700129 final PiPipeconfWatchdogService.PipelineStatus status =
130 pipeconfWatchdogService.getStatus(handle.deviceId());
131 if (!status.equals(READY)) {
132 log.info("Ignoring device mirror update because pipeline " +
133 "status of {} is {}: {}",
134 handle.deviceId(), status, entry);
135 return;
136 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800137 final long now = new WallClockTimestamp().unixTimestamp();
138 final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
139 mirrorMap.put(handle, timedEntry);
140 }
141
142 @Override
143 public void remove(H handle) {
144 checkNotNull(handle);
145 mirrorMap.remove(handle);
ghj0504520ed7340c2018-10-26 13:06:35 -0700146 annotationsMap.remove(handle);
147 }
148
149 @Override
150 public void putAnnotations(H handle, Annotations annotations) {
151 checkNotNull(handle);
152 checkNotNull(annotations);
153 annotationsMap.put(handle, annotations);
154 }
155
156 @Override
157 public Annotations annotations(H handle) {
158 checkNotNull(handle);
159 return annotationsMap.get(handle);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800160 }
161
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700162 @Override
163 public void sync(DeviceId deviceId, Map<H, E> deviceState) {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700164 checkNotNull(deviceId);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700165 final Map<H, E> localState = getMirrorMapForDevice(deviceId);
166
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700167 final AtomicInteger removeCount = new AtomicInteger(0);
168 final AtomicInteger updateCount = new AtomicInteger(0);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700169 final AtomicInteger addCount = new AtomicInteger(0);
170 // Add missing entries.
171 deviceState.keySet().stream()
172 .filter(deviceHandle -> !localState.containsKey(deviceHandle))
173 .forEach(deviceHandle -> {
174 final E entryToAdd = deviceState.get(deviceHandle);
175 log.debug("Adding mirror entry for {}: {}",
176 deviceId, entryToAdd);
177 put(deviceHandle, entryToAdd);
178 addCount.incrementAndGet();
179 });
180 // Update or remove local entries.
181 localState.keySet().forEach(localHandle -> {
182 final E localEntry = localState.get(localHandle);
183 final E deviceEntry = deviceState.get(localHandle);
184 if (deviceEntry == null) {
185 log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
186 remove(localHandle);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700187 removeCount.incrementAndGet();
Carmelo Casconee44592f2018-09-12 02:24:47 -0700188 } else if (!deviceEntry.equals(localEntry)) {
189 log.debug("Updating mirror entry for {}: {}-->{}",
190 deviceId, localEntry, deviceEntry);
191 put(localHandle, deviceEntry);
192 updateCount.incrementAndGet();
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700193 }
194 });
Carmelo Casconee44592f2018-09-12 02:24:47 -0700195 if (removeCount.get() + updateCount.get() + addCount.get() > 0) {
196 log.info("Synchronized mirror entries for {}: {} removed, {} updated, {} added",
197 deviceId, removeCount, updateCount, addCount);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700198 }
199 }
200
Carmelo Casconee44592f2018-09-12 02:24:47 -0700201 private Set<H> getHandlesForDevice(DeviceId deviceId) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700202 return mirrorMap.keySet().stream()
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700203 .filter(h -> h.deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700204 .collect(Collectors.toSet());
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700205 }
206
Carmelo Casconee44592f2018-09-12 02:24:47 -0700207 private Map<H, E> getMirrorMapForDevice(DeviceId deviceId) {
208 final Map<H, E> deviceMap = Maps.newHashMap();
209 mirrorMap.entrySet().stream()
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700210 .filter(e -> e.getKey().deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700211 .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
212 return deviceMap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700213 }
214
215 private void removeAll(DeviceId deviceId) {
216 checkNotNull(deviceId);
217 Collection<H> handles = getHandlesForDevice(deviceId);
ghj0504520ed7340c2018-10-26 13:06:35 -0700218 handles.forEach(this::remove);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700219 }
220
221 public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
222 @Override
223 public void event(PiPipeconfWatchdogEvent event) {
224 log.debug("Flushing mirror for {}, pipeline status is {}",
225 event.subject(), event.type());
226 SharedExecutors.getPoolThreadExecutor().execute(
227 () -> removeAll(event.subject()));
228 }
229
230 @Override
231 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
232 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
233 }
234 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800235}