blob: f45a4e50d1b5159da5146520a5332fca14c7c874 [file] [log] [blame]
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.drivers.p4runtime.mirror;
18
19import com.google.common.annotations.Beta;
Carmelo Casconee44592f2018-09-12 02:24:47 -070020import com.google.common.collect.Maps;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080021import org.onlab.util.KryoNamespace;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070022import org.onlab.util.SharedExecutors;
ghj0504520ed7340c2018-10-26 13:06:35 -070023import org.onosproject.net.Annotations;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080024import org.onosproject.net.DeviceId;
25import org.onosproject.net.pi.runtime.PiEntity;
26import org.onosproject.net.pi.runtime.PiHandle;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070027import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
28import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
29import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080030import org.onosproject.store.service.EventuallyConsistentMap;
31import org.onosproject.store.service.StorageService;
32import org.onosproject.store.service.WallClockTimestamp;
Carmelo Casconea46f5542018-12-12 23:41:01 -080033import org.osgi.service.component.annotations.Activate;
34import org.osgi.service.component.annotations.Deactivate;
35import org.osgi.service.component.annotations.Reference;
36import org.osgi.service.component.annotations.ReferenceCardinality;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080037import org.slf4j.Logger;
38
39import java.util.Collection;
40import java.util.Map;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070041import java.util.Set;
42import java.util.concurrent.atomic.AtomicInteger;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080043import java.util.stream.Collectors;
44
45import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070046import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080047import static org.slf4j.LoggerFactory.getLogger;
48
49/**
50 * Abstract implementation of a distributed P4Runtime mirror, backed by an
51 * {@link EventuallyConsistentMap}.
52 *
53 * @param <H> handle class
54 * @param <E> entry class
55 */
56@Beta
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080057public abstract class AbstractDistributedP4RuntimeMirror
58 <H extends PiHandle, E extends PiEntity>
59 implements P4RuntimeMirror<H, E> {
60
61 private final Logger log = getLogger(getClass());
62
Ray Milkeyd84f89b2018-08-17 14:54:17 -070063 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080064 protected StorageService storageService;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080065
Ray Milkeydb57f1c2018-10-09 10:39:29 -070066 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconea46f5542018-12-12 23:41:01 -080067 protected PiPipeconfWatchdogService pipeconfWatchdogService;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070068
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080069 private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
70
ghj0504520ed7340c2018-10-26 13:06:35 -070071 private EventuallyConsistentMap<H, Annotations> annotationsMap;
72
Carmelo Casconec7639fb2018-09-10 01:59:49 -070073 private final PiPipeconfWatchdogListener pipeconfListener =
74 new InternalPipeconfWatchdogListener();
75
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080076 @Activate
77 public void activate() {
78 mirrorMap = storageService
79 .<H, TimedEntry<E>>eventuallyConsistentMapBuilder()
80 .withName(mapName())
81 .withSerializer(storeSerializer())
82 .withTimestampProvider((k, v) -> new WallClockTimestamp())
83 .build();
ghj0504520ed7340c2018-10-26 13:06:35 -070084
85 annotationsMap = storageService
86 .<H, Annotations>eventuallyConsistentMapBuilder()
87 .withName(mapName() + "-annotations")
88 .withSerializer(storeSerializer())
89 .withTimestampProvider((k, v) -> new WallClockTimestamp())
90 .build();
91
Carmelo Casconec7639fb2018-09-10 01:59:49 -070092 pipeconfWatchdogService.addListener(pipeconfListener);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080093 log.info("Started");
94 }
95
96 abstract String mapName();
97
98 abstract KryoNamespace storeSerializer();
99
100 @Deactivate
101 public void deactivate() {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700102 pipeconfWatchdogService.removeListener(pipeconfListener);
Frank Wang222262f2017-12-18 20:17:36 +0800103 mirrorMap.destroy();
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800104 mirrorMap = null;
105 log.info("Stopped");
106 }
107
108 @Override
109 public Collection<TimedEntry<E>> getAll(DeviceId deviceId) {
110 checkNotNull(deviceId);
111 return mirrorMap.entrySet().stream()
112 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
113 .map(Map.Entry::getValue)
114 .collect(Collectors.toList());
115 }
116
117 @Override
118 public TimedEntry<E> get(H handle) {
119 checkNotNull(handle);
120 return mirrorMap.get(handle);
121 }
122
123 @Override
124 public void put(H handle, E entry) {
125 checkNotNull(handle);
126 checkNotNull(entry);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700127 final PiPipeconfWatchdogService.PipelineStatus status =
128 pipeconfWatchdogService.getStatus(handle.deviceId());
129 if (!status.equals(READY)) {
130 log.info("Ignoring device mirror update because pipeline " +
131 "status of {} is {}: {}",
132 handle.deviceId(), status, entry);
133 return;
134 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800135 final long now = new WallClockTimestamp().unixTimestamp();
136 final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
137 mirrorMap.put(handle, timedEntry);
138 }
139
140 @Override
141 public void remove(H handle) {
142 checkNotNull(handle);
143 mirrorMap.remove(handle);
ghj0504520ed7340c2018-10-26 13:06:35 -0700144 annotationsMap.remove(handle);
145 }
146
147 @Override
148 public void putAnnotations(H handle, Annotations annotations) {
149 checkNotNull(handle);
150 checkNotNull(annotations);
151 annotationsMap.put(handle, annotations);
152 }
153
154 @Override
155 public Annotations annotations(H handle) {
156 checkNotNull(handle);
157 return annotationsMap.get(handle);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800158 }
159
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700160 @Override
161 public void sync(DeviceId deviceId, Map<H, E> deviceState) {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700162 checkNotNull(deviceId);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700163 final Map<H, E> localState = getMirrorMapForDevice(deviceId);
164
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700165 final AtomicInteger removeCount = new AtomicInteger(0);
166 final AtomicInteger updateCount = new AtomicInteger(0);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700167 final AtomicInteger addCount = new AtomicInteger(0);
168 // Add missing entries.
169 deviceState.keySet().stream()
170 .filter(deviceHandle -> !localState.containsKey(deviceHandle))
171 .forEach(deviceHandle -> {
172 final E entryToAdd = deviceState.get(deviceHandle);
173 log.debug("Adding mirror entry for {}: {}",
174 deviceId, entryToAdd);
175 put(deviceHandle, entryToAdd);
176 addCount.incrementAndGet();
177 });
178 // Update or remove local entries.
179 localState.keySet().forEach(localHandle -> {
180 final E localEntry = localState.get(localHandle);
181 final E deviceEntry = deviceState.get(localHandle);
182 if (deviceEntry == null) {
183 log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
184 remove(localHandle);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700185 removeCount.incrementAndGet();
Carmelo Casconee44592f2018-09-12 02:24:47 -0700186 } else if (!deviceEntry.equals(localEntry)) {
187 log.debug("Updating mirror entry for {}: {}-->{}",
188 deviceId, localEntry, deviceEntry);
189 put(localHandle, deviceEntry);
190 updateCount.incrementAndGet();
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700191 }
192 });
Carmelo Casconee44592f2018-09-12 02:24:47 -0700193 if (removeCount.get() + updateCount.get() + addCount.get() > 0) {
194 log.info("Synchronized mirror entries for {}: {} removed, {} updated, {} added",
195 deviceId, removeCount, updateCount, addCount);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700196 }
197 }
198
Carmelo Casconee44592f2018-09-12 02:24:47 -0700199 private Set<H> getHandlesForDevice(DeviceId deviceId) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700200 return mirrorMap.keySet().stream()
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700201 .filter(h -> h.deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700202 .collect(Collectors.toSet());
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700203 }
204
Carmelo Casconee44592f2018-09-12 02:24:47 -0700205 private Map<H, E> getMirrorMapForDevice(DeviceId deviceId) {
206 final Map<H, E> deviceMap = Maps.newHashMap();
207 mirrorMap.entrySet().stream()
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700208 .filter(e -> e.getKey().deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700209 .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
210 return deviceMap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700211 }
212
213 private void removeAll(DeviceId deviceId) {
214 checkNotNull(deviceId);
215 Collection<H> handles = getHandlesForDevice(deviceId);
ghj0504520ed7340c2018-10-26 13:06:35 -0700216 handles.forEach(this::remove);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700217 }
218
219 public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
220 @Override
221 public void event(PiPipeconfWatchdogEvent event) {
222 log.debug("Flushing mirror for {}, pipeline status is {}",
223 event.subject(), event.type());
224 SharedExecutors.getPoolThreadExecutor().execute(
225 () -> removeAll(event.subject()));
226 }
227
228 @Override
229 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
230 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
231 }
232 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800233}