blob: b98e7eca1e7183c1fe4f8336cd10469c6e917ad2 [file] [log] [blame]
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -08001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.drivers.p4runtime.mirror;
18
19import com.google.common.annotations.Beta;
Carmelo Casconee44592f2018-09-12 02:24:47 -070020import com.google.common.collect.Maps;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070021import org.osgi.service.component.annotations.Activate;
22import org.osgi.service.component.annotations.Component;
23import org.osgi.service.component.annotations.Deactivate;
24import org.osgi.service.component.annotations.Reference;
25import org.osgi.service.component.annotations.ReferenceCardinality;
Ray Milkeydb57f1c2018-10-09 10:39:29 -070026
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080027import org.onlab.util.KryoNamespace;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070028import org.onlab.util.SharedExecutors;
ghj0504520ed7340c2018-10-26 13:06:35 -070029import org.onosproject.net.Annotations;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080030import org.onosproject.net.DeviceId;
31import org.onosproject.net.pi.runtime.PiEntity;
32import org.onosproject.net.pi.runtime.PiHandle;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070033import org.onosproject.net.pi.service.PiPipeconfWatchdogEvent;
34import org.onosproject.net.pi.service.PiPipeconfWatchdogListener;
35import org.onosproject.net.pi.service.PiPipeconfWatchdogService;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080036import org.onosproject.store.service.EventuallyConsistentMap;
37import org.onosproject.store.service.StorageService;
38import org.onosproject.store.service.WallClockTimestamp;
39import org.slf4j.Logger;
40
41import java.util.Collection;
42import java.util.Map;
Carmelo Cascone50d195f2018-09-11 13:26:38 -070043import java.util.Set;
44import java.util.concurrent.atomic.AtomicInteger;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080045import java.util.stream.Collectors;
46
47import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconec7639fb2018-09-10 01:59:49 -070048import static org.onosproject.net.pi.service.PiPipeconfWatchdogService.PipelineStatus.READY;
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080049import static org.slf4j.LoggerFactory.getLogger;
50
51/**
52 * Abstract implementation of a distributed P4Runtime mirror, backed by an
53 * {@link EventuallyConsistentMap}.
54 *
55 * @param <H> handle class
56 * @param <E> entry class
57 */
58@Beta
59@Component(immediate = true)
60public abstract class AbstractDistributedP4RuntimeMirror
61 <H extends PiHandle, E extends PiEntity>
62 implements P4RuntimeMirror<H, E> {
63
64 private final Logger log = getLogger(getClass());
65
Ray Milkeyd84f89b2018-08-17 14:54:17 -070066 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080067 private StorageService storageService;
68
Ray Milkeydb57f1c2018-10-09 10:39:29 -070069 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Carmelo Casconec7639fb2018-09-10 01:59:49 -070070 private PiPipeconfWatchdogService pipeconfWatchdogService;
71
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080072 private EventuallyConsistentMap<H, TimedEntry<E>> mirrorMap;
73
ghj0504520ed7340c2018-10-26 13:06:35 -070074 private EventuallyConsistentMap<H, Annotations> annotationsMap;
75
Carmelo Casconec7639fb2018-09-10 01:59:49 -070076 private final PiPipeconfWatchdogListener pipeconfListener =
77 new InternalPipeconfWatchdogListener();
78
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080079 @Activate
80 public void activate() {
81 mirrorMap = storageService
82 .<H, TimedEntry<E>>eventuallyConsistentMapBuilder()
83 .withName(mapName())
84 .withSerializer(storeSerializer())
85 .withTimestampProvider((k, v) -> new WallClockTimestamp())
86 .build();
ghj0504520ed7340c2018-10-26 13:06:35 -070087
88 annotationsMap = storageService
89 .<H, Annotations>eventuallyConsistentMapBuilder()
90 .withName(mapName() + "-annotations")
91 .withSerializer(storeSerializer())
92 .withTimestampProvider((k, v) -> new WallClockTimestamp())
93 .build();
94
Carmelo Casconec7639fb2018-09-10 01:59:49 -070095 pipeconfWatchdogService.addListener(pipeconfListener);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -080096 log.info("Started");
97 }
98
99 abstract String mapName();
100
101 abstract KryoNamespace storeSerializer();
102
103 @Deactivate
104 public void deactivate() {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700105 pipeconfWatchdogService.removeListener(pipeconfListener);
Frank Wang222262f2017-12-18 20:17:36 +0800106 mirrorMap.destroy();
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800107 mirrorMap = null;
108 log.info("Stopped");
109 }
110
111 @Override
112 public Collection<TimedEntry<E>> getAll(DeviceId deviceId) {
113 checkNotNull(deviceId);
114 return mirrorMap.entrySet().stream()
115 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
116 .map(Map.Entry::getValue)
117 .collect(Collectors.toList());
118 }
119
120 @Override
121 public TimedEntry<E> get(H handle) {
122 checkNotNull(handle);
123 return mirrorMap.get(handle);
124 }
125
126 @Override
127 public void put(H handle, E entry) {
128 checkNotNull(handle);
129 checkNotNull(entry);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700130 final PiPipeconfWatchdogService.PipelineStatus status =
131 pipeconfWatchdogService.getStatus(handle.deviceId());
132 if (!status.equals(READY)) {
133 log.info("Ignoring device mirror update because pipeline " +
134 "status of {} is {}: {}",
135 handle.deviceId(), status, entry);
136 return;
137 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800138 final long now = new WallClockTimestamp().unixTimestamp();
139 final TimedEntry<E> timedEntry = new TimedEntry<>(now, entry);
140 mirrorMap.put(handle, timedEntry);
141 }
142
143 @Override
144 public void remove(H handle) {
145 checkNotNull(handle);
146 mirrorMap.remove(handle);
ghj0504520ed7340c2018-10-26 13:06:35 -0700147 annotationsMap.remove(handle);
148 }
149
150 @Override
151 public void putAnnotations(H handle, Annotations annotations) {
152 checkNotNull(handle);
153 checkNotNull(annotations);
154 annotationsMap.put(handle, annotations);
155 }
156
157 @Override
158 public Annotations annotations(H handle) {
159 checkNotNull(handle);
160 return annotationsMap.get(handle);
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800161 }
162
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700163 @Override
164 public void sync(DeviceId deviceId, Map<H, E> deviceState) {
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700165 checkNotNull(deviceId);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700166 final Map<H, E> localState = getMirrorMapForDevice(deviceId);
167
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700168 final AtomicInteger removeCount = new AtomicInteger(0);
169 final AtomicInteger updateCount = new AtomicInteger(0);
Carmelo Casconee44592f2018-09-12 02:24:47 -0700170 final AtomicInteger addCount = new AtomicInteger(0);
171 // Add missing entries.
172 deviceState.keySet().stream()
173 .filter(deviceHandle -> !localState.containsKey(deviceHandle))
174 .forEach(deviceHandle -> {
175 final E entryToAdd = deviceState.get(deviceHandle);
176 log.debug("Adding mirror entry for {}: {}",
177 deviceId, entryToAdd);
178 put(deviceHandle, entryToAdd);
179 addCount.incrementAndGet();
180 });
181 // Update or remove local entries.
182 localState.keySet().forEach(localHandle -> {
183 final E localEntry = localState.get(localHandle);
184 final E deviceEntry = deviceState.get(localHandle);
185 if (deviceEntry == null) {
186 log.debug("Removing mirror entry for {}: {}", deviceId, localEntry);
187 remove(localHandle);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700188 removeCount.incrementAndGet();
Carmelo Casconee44592f2018-09-12 02:24:47 -0700189 } else if (!deviceEntry.equals(localEntry)) {
190 log.debug("Updating mirror entry for {}: {}-->{}",
191 deviceId, localEntry, deviceEntry);
192 put(localHandle, deviceEntry);
193 updateCount.incrementAndGet();
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700194 }
195 });
Carmelo Casconee44592f2018-09-12 02:24:47 -0700196 if (removeCount.get() + updateCount.get() + addCount.get() > 0) {
197 log.info("Synchronized mirror entries for {}: {} removed, {} updated, {} added",
198 deviceId, removeCount, updateCount, addCount);
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700199 }
200 }
201
Carmelo Casconee44592f2018-09-12 02:24:47 -0700202 private Set<H> getHandlesForDevice(DeviceId deviceId) {
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700203 return mirrorMap.keySet().stream()
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700204 .filter(h -> h.deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700205 .collect(Collectors.toSet());
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700206 }
207
Carmelo Casconee44592f2018-09-12 02:24:47 -0700208 private Map<H, E> getMirrorMapForDevice(DeviceId deviceId) {
209 final Map<H, E> deviceMap = Maps.newHashMap();
210 mirrorMap.entrySet().stream()
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700211 .filter(e -> e.getKey().deviceId().equals(deviceId))
Carmelo Casconee44592f2018-09-12 02:24:47 -0700212 .forEach(e -> deviceMap.put(e.getKey(), e.getValue().entry()));
213 return deviceMap;
Carmelo Cascone50d195f2018-09-11 13:26:38 -0700214 }
215
216 private void removeAll(DeviceId deviceId) {
217 checkNotNull(deviceId);
218 Collection<H> handles = getHandlesForDevice(deviceId);
ghj0504520ed7340c2018-10-26 13:06:35 -0700219 handles.forEach(this::remove);
Carmelo Casconec7639fb2018-09-10 01:59:49 -0700220 }
221
222 public class InternalPipeconfWatchdogListener implements PiPipeconfWatchdogListener {
223 @Override
224 public void event(PiPipeconfWatchdogEvent event) {
225 log.debug("Flushing mirror for {}, pipeline status is {}",
226 event.subject(), event.type());
227 SharedExecutors.getPoolThreadExecutor().execute(
228 () -> removeAll(event.subject()));
229 }
230
231 @Override
232 public boolean isRelevant(PiPipeconfWatchdogEvent event) {
233 return event.type().equals(PiPipeconfWatchdogEvent.Type.PIPELINE_UNKNOWN);
234 }
235 }
Carmelo Cascone6a0b5a32017-11-20 23:08:32 -0800236}