blob: 4daeaaac771a5ea42fe99f283a395533b7b974bb [file] [log] [blame]
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.drivers.p4runtime.mirror;
18
19import com.google.common.annotations.Beta;
Carmelo Casconee44592f2018-09-12 02:24:47 -070020import com.google.common.collect.Maps;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080021import org.onlab.util.KryoNamespace;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070022import org.onlab.util.SharedExecutors;
ghj0504520ed7340c2018-10-26 13:06:35 -070023import org.onosproject.net.Annotations;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080024import org.onosproject.net.DeviceId;
25import org.onosproject.net.pi.runtime.PiEntity;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080026import org.onosproject.net.pi.runtime.PiEntityType;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080027import org.onosproject.net.pi.runtime.PiHandle;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070028import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
29import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
30import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080031import org.onosproject.p4runtime.api.P4RuntimeWriteClient;
32import org.onosproject.store.serializers.KryoNamespaces;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080033import org.onosproject.store.service.EventuallyConsistentMap;
34import org.onosproject.store.service.StorageService;
35import org.onosproject.store.service.WallClockTimestamp;
Carmelo Casconea46f5542018-12-12 23:41:01 -080036import org.osgi.service.component.annotations.Activate;
37import org.osgi.service.component.annotations.Deactivate;
38import org.osgi.service.component.annotations.Reference;
39import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080040import org.slf4j.Logger;
41
42import java.util.Collection;
43import java.util.Map;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070044import java.util.Set;
45import java.util.concurrent.atomic.AtomicInteger;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080046import java.util.stream.Collectors;
47
48import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070049import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080050import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Abstract implementation of a distributed P4Runtime mirror, backed by an
54 * {@link EventuallyConsistentMap}.
55 *
56 * @param <H> handle class
57 * @param <E> entry class
58 */
59@Beta
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080060public abstract class AbstractDistributedP4RuntimeMirror
61 <H extends PiHandle, E extends PiEntity>
62 implements P4RuntimeMirror<H, E> {
63
64 private final Logger log = getLogger(getClass());
65
Ray Milkeyd84f89b2018-08-17 14:54:17 -070066 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080067 protected StorageService storageService;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080068
Ray Milkeydb57f1c2018-10-09 10:39:29 -070069 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080070 protected PiPipeconfWatchdogService pipeconfWatchdogService;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070071
Carmelo Cascone4c289b72019-01-22 15:30:45 -080072 private EventuallyConsistentMap<PiHandle, TimedEntry<E>> mirrorMap;
73 private EventuallyConsistentMap<PiHandle, Annotations> annotationsMap;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080074
Carmelo Cascone4c289b72019-01-22 15:30:45 -080075 private final PiEntityType entityType;
ghj0504520ed7340c2018-10-26 13:06:35 -070076
Carmelo Casconec7639fb2018-09-10 01:59:49 -070077 private final PiPipeconfWatchdogListener pipeconfListener =
78 new InternalPipeconfWatchdogListener();
79
Carmelo Cascone4c289b72019-01-22 15:30:45 -080080 AbstractDistributedP4RuntimeMirror(PiEntityType entityType) {
81 this.entityType = entityType;
82 }
83
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080084 @Activate
85 public void activate() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -080086 final String mapName = "onos-p4runtime-mirror-"
87 + entityType.name().toLowerCase();
88 final KryoNamespace serializer = KryoNamespace.newBuilder()
89 .register(KryoNamespaces.API)
90 .register(TimedEntry.class)
91 .build();
92
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080093 mirrorMap = storageService
Carmelo Cascone4c289b72019-01-22 15:30:45 -080094 .<PiHandle, TimedEntry<E>>eventuallyConsistentMapBuilder()
95 .withName(mapName)
96 .withSerializer(serializer)
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080097 .withTimestampProvider((k, v) -> new WallClockTimestamp())
98 .build();
ghj0504520ed7340c2018-10-26 13:06:35 -070099
100 annotationsMap = storageService
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800101 .<PiHandle, Annotations>eventuallyConsistentMapBuilder()
102 .withName(mapName + "-annotations")
103 .withSerializer(serializer)
ghj0504520ed7340c2018-10-26 13:06:35 -0700104 .withTimestampProvider((k, v) -> new WallClockTimestamp())
105 .build();
106
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700107 pipeconfWatchdogService.addListener(pipeconfListener);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800108 log.info("Started");
109 }
110
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800111 @Deactivate
112 public void deactivate() {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700113 pipeconfWatchdogService.removeListener(pipeconfListener);
Frank Wang222262f2017-12-18 20:17:36 +0800114 mirrorMap.destroy();
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800115 mirrorMap = null;
116 log.info("Stopped");
117 }
118
119 @Override
120 public Collection<TimedEntry<E>> getAll(DeviceId deviceId) {
121 checkNotNull(deviceId);
122 return mirrorMap.entrySet().stream()
123 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
124 .map(Map.Entry::getValue)
125 .collect(Collectors.toList());
126 }
127
128 @Override
129 public TimedEntry<E> get(H handle) {
130 checkNotNull(handle);
131 return mirrorMap.get(handle);
132 }
133
134 @Override
135 public void put(H handle, E entry) {
136 checkNotNull(handle);
137 checkNotNull(entry);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700138 final PiPipeconfWatchdogService.PipelineStatus status =
139 pipeconfWatchdogService.getStatus(handle.deviceId());
140 if (!status.equals(READY)) {
141 log.info("Ignoring device mirror update because pipeline " +
142 "status of {} is {}: {}",
143 handle.deviceId(), status, entry);
144 return;
145 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800146 final long now = new WallClockTimestamp().unixTimestamp();
147 final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
148 mirrorMap.put(handle, timedEntry);
149 }
150
151 @Override
152 public void remove(H handle) {
153 checkNotNull(handle);
154 mirrorMap.remove(handle);
ghj0504520ed7340c2018-10-26 13:06:35 -0700155 annotationsMap.remove(handle);
156 }
157
158 @Override
159 public void putAnnotations(H handle, Annotations annotations) {
160 checkNotNull(handle);
161 checkNotNull(annotations);
162 annotationsMap.put(handle, annotations);
163 }
164
165 @Override
166 public Annotations annotations(H handle) {
167 checkNotNull(handle);
168 return annotationsMap.get(handle);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800169 }
170
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700171 @Override
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800172 @SuppressWarnings("unchecked")
173 public void sync(DeviceId deviceId, Collection<E> entities) {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700174 checkNotNull(deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800175 final Map<PiHandle, E> deviceState = entities.stream()
176 .collect(Collectors.toMap(e -> e.handle(deviceId), e -> e));
177 final Map<PiHandle, E> localState = deviceHandleMap(deviceId);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700178
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700179 final AtomicInteger removeCount = new AtomicInteger(0);
180 final AtomicInteger updateCount = new AtomicInteger(0);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700181 final AtomicInteger addCount = new AtomicInteger(0);
182 // Add missing entries.
183 deviceState.keySet().stream()
184 .filter(deviceHandle -> !localState.containsKey(deviceHandle))
185 .forEach(deviceHandle -> {
186 final E entryToAdd = deviceState.get(deviceHandle);
187 log.debug("Adding mirror entry for {}: {}",
188 deviceId, entryToAdd);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800189 put((H) deviceHandle, entryToAdd);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700190 addCount.incrementAndGet();
191 });
192 // Update or remove local entries.
193 localState.keySet().forEach(localHandle -> {
194 final E localEntry = localState.get(localHandle);
195 final E deviceEntry = deviceState.get(localHandle);
196 if (deviceEntry == null) {
197 log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800198 remove((H) localHandle);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700199 removeCount.incrementAndGet();
Carmelo Casconee44592f2018-09-12 02:24:47 -0700200 } else if (!deviceEntry.equals(localEntry)) {
201 log.debug("Updating mirror entry for {}: {}-->{}",
202 deviceId, localEntry, deviceEntry);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800203 put((H) localHandle, deviceEntry);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700204 updateCount.incrementAndGet();
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700205 }
206 });
Carmelo Casconee44592f2018-09-12 02:24:47 -0700207 if (removeCount.get() + updateCount.get() + addCount.get() > 0) {
208 log.info("Synchronized mirror entries for {}: {} removed, {} updated, {} added",
209 deviceId, removeCount, updateCount, addCount);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700210 }
211 }
212
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800213 private Set<PiHandle> getHandlesForDevice(DeviceId deviceId) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700214 return mirrorMap.keySet().stream()
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700215 .filter(h -> h.deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700216 .collect(Collectors.toSet());
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700217 }
218
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800219 private Map<PiHandle, E> deviceHandleMap(DeviceId deviceId) {
220 final Map<PiHandle, E> deviceMap = Maps.newHashMap();
Carmelo Casconee44592f2018-09-12 02:24:47 -0700221 mirrorMap.entrySet().stream()
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700222 .filter(e -> e.getKey().deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700223 .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
224 return deviceMap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700225 }
226
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800227
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700228 private void removeAll(DeviceId deviceId) {
229 checkNotNull(deviceId);
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800230 @SuppressWarnings("unchecked")
231 Collection<H> handles = (Collection<H>) getHandlesForDevice(deviceId);
ghj0504520ed7340c2018-10-26 13:06:35 -0700232 handles.forEach(this::remove);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700233 }
234
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800235 @Override
236 @SuppressWarnings("unchecked")
Carmelo Cascone61469462019-03-05 23:59:11 -0800237 public void applyWriteRequest(P4RuntimeWriteClient.WriteRequest request) {
238 request.pendingUpdates().stream()
239 .filter(r -> r.entityType().equals(this.entityType))
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800240 .forEach(r -> {
241 switch (r.updateType()) {
242 case INSERT:
243 case MODIFY:
244 put((H) r.handle(), (E) r.entity());
245 break;
246 case DELETE:
247 remove((H) r.handle());
248 break;
249 default:
250 log.error("Unknown update type {}", r.updateType());
251 }
252 });
253 }
254
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700255 public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
256 @Override
257 public void event(PiPipeconfWatchdogEvent event) {
258 log.debug("Flushing mirror for {}, pipeline status is {}",
259 event.subject(), event.type());
260 SharedExecutors.getPoolThreadExecutor().execute(
261 () -> removeAll(event.subject()));
262 }
263
264 @Override
265 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
266 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
267 }
268 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800269}