blob: d371fdf49b9a6af3d5d715b2bf10ab171c5065e5 [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Ray Milkeya9ae0d42017-08-06 22:10:47 -070016package org.onosproject.store.mcast.impl;
alshabib79e52872015-12-07 16:01:01 -080017
Julia Ferguson8d315952017-11-19 13:56:17 +000018
19import com.google.common.base.Objects;
20import com.google.common.collect.Sets;
alshabib79e52872015-12-07 16:01:01 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
alshabib79e52872015-12-07 16:01:01 -080027import org.onlab.util.KryoNamespace;
Julia Ferguson8d315952017-11-19 13:56:17 +000028
alshabib79e52872015-12-07 16:01:01 -080029import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.mcast.McastEvent;
31import org.onosproject.net.mcast.McastRoute;
alshabib79e52872015-12-07 16:01:01 -080032import org.onosproject.net.mcast.McastStore;
33import org.onosproject.net.mcast.McastStoreDelegate;
34import org.onosproject.store.AbstractStore;
Jonathan Hart07eb0412016-02-08 16:42:29 -080035import org.onosproject.store.serializers.KryoNamespaces;
alshabib79e52872015-12-07 16:01:01 -080036import org.onosproject.store.service.ConsistentMap;
Julia Ferguson8d315952017-11-19 13:56:17 +000037import org.onosproject.store.service.MapEvent;
38import org.onosproject.store.service.MapEventListener;
alshabib79e52872015-12-07 16:01:01 -080039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
Julia Ferguson8d315952017-11-19 13:56:17 +000041import org.onosproject.store.service.Versioned;
alshabib79e52872015-12-07 16:01:01 -080042import org.slf4j.Logger;
43
alshabib79e52872015-12-07 16:01:01 -080044import java.util.Map;
Julia Ferguson8d315952017-11-19 13:56:17 +000045import java.util.Optional;
alshabib79e52872015-12-07 16:01:01 -080046import java.util.Set;
Julia Ferguson8d315952017-11-19 13:56:17 +000047import java.util.concurrent.ScheduledExecutorService;
Madan Jampani72282af2016-02-23 14:23:52 -080048import java.util.concurrent.atomic.AtomicReference;
alshabib79e52872015-12-07 16:01:01 -080049
Julia Ferguson8d315952017-11-19 13:56:17 +000050import static com.google.common.base.Preconditions.checkNotNull;
Thomas Vachuskad4955ae2016-08-23 14:56:37 -070051import static org.onosproject.net.mcast.McastRouteInfo.mcastRouteInfo;
alshabib79e52872015-12-07 16:01:01 -080052import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * A distributed mcast store implementation. Routes are stored consistently
56 * across the cluster.
57 */
58@Component(immediate = true)
59@Service
Julia Ferguson8d315952017-11-19 13:56:17 +000060public class DistributedMcastStore
61 extends AbstractStore<McastEvent, McastStoreDelegate>
62 implements McastStore {
alshabib79e52872015-12-07 16:01:01 -080063
Jon Halldbe4c532016-10-04 11:45:52 -070064 private static final String MCASTRIB = "onos-mcast-rib-table";
alshabib79e52872015-12-07 16:01:01 -080065 private Logger log = getLogger(getClass());
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart07eb0412016-02-08 16:42:29 -080068 protected StorageService storageService;
alshabib79e52872015-12-07 16:01:01 -080069
Julia Ferguson8d315952017-11-19 13:56:17 +000070 private Map<McastRoute, MulticastData> mcastRoutes;
71 private ConsistentMap<McastRoute, MulticastData> mcastRib;
72 private MapEventListener<McastRoute, MulticastData> mcastRouteListener =
73 new McastRouteListener();
74
75 private ScheduledExecutorService executor;
alshabib79e52872015-12-07 16:01:01 -080076
77
78 @Activate
79 public void activate() {
Jonathan Hart51539b82015-10-29 09:53:04 -070080 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080081 .withName(MCASTRIB)
Jonathan Hart07eb0412016-02-08 16:42:29 -080082 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
alshabib1aa58142016-02-17 15:37:56 -080083 .register(KryoNamespaces.API)
84 .register(
Madan Jampani72282af2016-02-23 14:23:52 -080085 AtomicReference.class,
alshabib1aa58142016-02-17 15:37:56 -080086 MulticastData.class,
87 McastRoute.class,
88 McastRoute.Type.class
89 ).build()))
Julia Ferguson8d315952017-11-19 13:56:17 +000090 .build();
alshabib79e52872015-12-07 16:01:01 -080091
Jonathan Hart51539b82015-10-29 09:53:04 -070092 mcastRoutes = mcastRib.asJavaMap();
Julia Ferguson8d315952017-11-19 13:56:17 +000093 mcastRib.addListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -080094
95 log.info("Started");
96 }
97
98 @Deactivate
99 public void deactivate() {
Julia Ferguson8d315952017-11-19 13:56:17 +0000100 mcastRib.removeListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -0800101 log.info("Stopped");
102 }
103
Julia Ferguson8d315952017-11-19 13:56:17 +0000104
105 private class McastRouteListener implements MapEventListener<McastRoute, MulticastData> {
106 @Override
107 public void event(MapEvent<McastRoute, MulticastData> event) {
108 final McastRoute route = event.key();
109 final MulticastData newData = Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
110 final MulticastData oldData = Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
111
112 switch (event.type()) {
113 case INSERT:
114 checkNotNull(newData);
115
116 if (newData.source() != null) {
117 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
118 mcastRouteInfo(route,
119 newData.sinks(),
120 newData.source())));
121 } else if (!newData.sinks().isEmpty()) {
122 newData.sinks().forEach(sink ->
123 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
124 mcastRouteInfo(route,
125 sink,
126 newData.source())))
127 );
128 } else {
129 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
130 mcastRouteInfo(route)));
131 }
132 break;
133 case UPDATE:
134 checkNotNull(newData);
135 checkNotNull(oldData);
136
Pier Luigi57d41792018-02-26 12:31:38 +0100137 // They are not equal
Julia Ferguson8d315952017-11-19 13:56:17 +0000138 if (!Objects.equal(oldData.source(), newData.source())) {
Pier Luigi57d41792018-02-26 12:31:38 +0100139 // Both not null, it is an update event
140 if (oldData.source() != null && newData.source() != null) {
141 // Broadcast old and new data
142 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_UPDATED,
143 mcastRouteInfo(route,
144 newData.sinks(),
145 newData.source()),
146 mcastRouteInfo(route,
147 oldData.sinks(),
148 oldData.source())));
149 } else if (oldData.source() == null && newData.source() != null) {
150 // It is a source added event, broadcast new data
151 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
152 mcastRouteInfo(route,
153 newData.sinks(),
154 newData.source())));
155 } else {
156 // Scenario not managed for now
157 log.warn("Unhandled scenario {} - new {} - old {}", event.type());
158 }
Julia Ferguson8d315952017-11-19 13:56:17 +0000159 } else {
160 Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
161 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
162 mcastRouteInfo(route,
163 sink,
164 newData.source())))
165 );
166
167 Sets.difference(oldData.sinks(), newData.sinks()).forEach(sink ->
168 notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
169 mcastRouteInfo(route,
170 sink,
171 newData.source())))
172 );
173 }
174 break;
175 case REMOVE:
Pier Luigi9930da52018-02-02 16:19:11 +0100176 // Verify old data is not null
177 checkNotNull(oldData);
178 // Create a route removed event with just the route
179 // and the source connect point
Julia Ferguson8d315952017-11-19 13:56:17 +0000180 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
Pier Luigi9930da52018-02-02 16:19:11 +0100181 mcastRouteInfo(route,
Pier Luigi05514fd2018-02-28 17:24:03 +0100182 oldData.sinks(),
Pier Luigi9930da52018-02-02 16:19:11 +0100183 oldData.source()
184 )));
Julia Ferguson8d315952017-11-19 13:56:17 +0000185 break;
186 default:
187 log.warn("Unknown mcast operation type: {}", event.type());
188 }
189 }
190 }
191
alshabib79e52872015-12-07 16:01:01 -0800192 @Override
193 public void storeRoute(McastRoute route, Type operation) {
194 switch (operation) {
195 case ADD:
Julia Ferguson8d315952017-11-19 13:56:17 +0000196 mcastRoutes.putIfAbsent(route, MulticastData.empty());
alshabib79e52872015-12-07 16:01:01 -0800197 break;
198 case REMOVE:
Julia Ferguson8d315952017-11-19 13:56:17 +0000199 // before remove the route should check that source and sinks are removed?
200 mcastRoutes.remove(route);
alshabib79e52872015-12-07 16:01:01 -0800201 break;
202 default:
203 log.warn("Unknown mcast operation type: {}", operation);
204 }
205 }
206
207 @Override
208 public void storeSource(McastRoute route, ConnectPoint source) {
209 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
210 if (v == null) {
211 return new MulticastData(source);
212 } else {
213 v.setSource(source);
214 }
215 return v;
216 });
alshabib79e52872015-12-07 16:01:01 -0800217 }
218
219 @Override
220 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
221 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
222 switch (operation) {
223 case ADD:
224 if (v == null) {
225 v = MulticastData.empty();
226 }
227 v.appendSink(sink);
228 break;
229 case REMOVE:
230 if (v != null) {
231 v.removeSink(sink);
232 }
233 break;
234 default:
235 log.warn("Unknown mcast operation type: {}", operation);
236 }
237 return v;
238 });
alshabib79e52872015-12-07 16:01:01 -0800239 }
240
241 @Override
242 public ConnectPoint sourceFor(McastRoute route) {
243 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
244 }
245
246 @Override
247 public Set<ConnectPoint> sinksFor(McastRoute route) {
248 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
249 }
250
Jonathan Hart07eb0412016-02-08 16:42:29 -0800251 @Override
252 public Set<McastRoute> getRoutes() {
253 return mcastRoutes.keySet();
254 }
alshabib79e52872015-12-07 16:01:01 -0800255}