blob: 4c8545868867f404d9ef1d0362d154385bc73b9d [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Ray Milkeya9ae0d42017-08-06 22:10:47 -070016package org.onosproject.store.mcast.impl;
alshabib79e52872015-12-07 16:01:01 -080017
Julia Ferguson8d315952017-11-19 13:56:17 +000018
19import com.google.common.base.Objects;
20import com.google.common.collect.Sets;
alshabib79e52872015-12-07 16:01:01 -080021import org.onlab.util.KryoNamespace;
22import org.onosproject.net.ConnectPoint;
23import org.onosproject.net.mcast.McastEvent;
24import org.onosproject.net.mcast.McastRoute;
alshabib79e52872015-12-07 16:01:01 -080025import org.onosproject.net.mcast.McastStore;
26import org.onosproject.net.mcast.McastStoreDelegate;
27import org.onosproject.store.AbstractStore;
Jonathan Hart07eb0412016-02-08 16:42:29 -080028import org.onosproject.store.serializers.KryoNamespaces;
alshabib79e52872015-12-07 16:01:01 -080029import org.onosproject.store.service.ConsistentMap;
Julia Ferguson8d315952017-11-19 13:56:17 +000030import org.onosproject.store.service.MapEvent;
31import org.onosproject.store.service.MapEventListener;
alshabib79e52872015-12-07 16:01:01 -080032import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.StorageService;
Julia Ferguson8d315952017-11-19 13:56:17 +000034import org.onosproject.store.service.Versioned;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070035import org.osgi.service.component.annotations.Activate;
36import org.osgi.service.component.annotations.Component;
37import org.osgi.service.component.annotations.Deactivate;
38import org.osgi.service.component.annotations.Reference;
39import org.osgi.service.component.annotations.ReferenceCardinality;
alshabib79e52872015-12-07 16:01:01 -080040import org.slf4j.Logger;
41
alshabib79e52872015-12-07 16:01:01 -080042import java.util.Map;
Julia Ferguson8d315952017-11-19 13:56:17 +000043import java.util.Optional;
alshabib79e52872015-12-07 16:01:01 -080044import java.util.Set;
Julia Ferguson8d315952017-11-19 13:56:17 +000045import java.util.concurrent.ScheduledExecutorService;
Madan Jampani72282af2016-02-23 14:23:52 -080046import java.util.concurrent.atomic.AtomicReference;
alshabib79e52872015-12-07 16:01:01 -080047
Julia Ferguson8d315952017-11-19 13:56:17 +000048import static com.google.common.base.Preconditions.checkNotNull;
Thomas Vachuskad4955ae2016-08-23 14:56:37 -070049import static org.onosproject.net.mcast.McastRouteInfo.mcastRouteInfo;
alshabib79e52872015-12-07 16:01:01 -080050import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * A distributed mcast store implementation. Routes are stored consistently
54 * across the cluster.
55 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070056@Component(immediate = true, service = McastStore.class)
Julia Ferguson8d315952017-11-19 13:56:17 +000057public class DistributedMcastStore
58 extends AbstractStore<McastEvent, McastStoreDelegate>
59 implements McastStore {
alshabib79e52872015-12-07 16:01:01 -080060
Jon Halldbe4c532016-10-04 11:45:52 -070061 private static final String MCASTRIB = "onos-mcast-rib-table";
alshabib79e52872015-12-07 16:01:01 -080062 private Logger log = getLogger(getClass());
63
Ray Milkeyd84f89b2018-08-17 14:54:17 -070064 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart07eb0412016-02-08 16:42:29 -080065 protected StorageService storageService;
alshabib79e52872015-12-07 16:01:01 -080066
Julia Ferguson8d315952017-11-19 13:56:17 +000067 private Map<McastRoute, MulticastData> mcastRoutes;
68 private ConsistentMap<McastRoute, MulticastData> mcastRib;
69 private MapEventListener<McastRoute, MulticastData> mcastRouteListener =
70 new McastRouteListener();
71
72 private ScheduledExecutorService executor;
alshabib79e52872015-12-07 16:01:01 -080073
74
75 @Activate
76 public void activate() {
Jonathan Hart51539b82015-10-29 09:53:04 -070077 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080078 .withName(MCASTRIB)
Jonathan Hart07eb0412016-02-08 16:42:29 -080079 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
alshabib1aa58142016-02-17 15:37:56 -080080 .register(KryoNamespaces.API)
81 .register(
Madan Jampani72282af2016-02-23 14:23:52 -080082 AtomicReference.class,
alshabib1aa58142016-02-17 15:37:56 -080083 MulticastData.class,
84 McastRoute.class,
85 McastRoute.Type.class
86 ).build()))
Julia Ferguson8d315952017-11-19 13:56:17 +000087 .build();
alshabib79e52872015-12-07 16:01:01 -080088
Jonathan Hart51539b82015-10-29 09:53:04 -070089 mcastRoutes = mcastRib.asJavaMap();
Julia Ferguson8d315952017-11-19 13:56:17 +000090 mcastRib.addListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -080091
92 log.info("Started");
93 }
94
95 @Deactivate
96 public void deactivate() {
Julia Ferguson8d315952017-11-19 13:56:17 +000097 mcastRib.removeListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -080098 log.info("Stopped");
99 }
100
Julia Ferguson8d315952017-11-19 13:56:17 +0000101
102 private class McastRouteListener implements MapEventListener<McastRoute, MulticastData> {
103 @Override
104 public void event(MapEvent<McastRoute, MulticastData> event) {
105 final McastRoute route = event.key();
106 final MulticastData newData = Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
107 final MulticastData oldData = Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
108
109 switch (event.type()) {
110 case INSERT:
111 checkNotNull(newData);
112
113 if (newData.source() != null) {
114 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
115 mcastRouteInfo(route,
116 newData.sinks(),
117 newData.source())));
118 } else if (!newData.sinks().isEmpty()) {
119 newData.sinks().forEach(sink ->
120 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
121 mcastRouteInfo(route,
122 sink,
123 newData.source())))
124 );
125 } else {
126 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
127 mcastRouteInfo(route)));
128 }
129 break;
130 case UPDATE:
131 checkNotNull(newData);
132 checkNotNull(oldData);
133
Pier Luigi57d41792018-02-26 12:31:38 +0100134 // They are not equal
Julia Ferguson8d315952017-11-19 13:56:17 +0000135 if (!Objects.equal(oldData.source(), newData.source())) {
Pier Luigi57d41792018-02-26 12:31:38 +0100136 // Both not null, it is an update event
137 if (oldData.source() != null && newData.source() != null) {
138 // Broadcast old and new data
139 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_UPDATED,
140 mcastRouteInfo(route,
141 newData.sinks(),
142 newData.source()),
143 mcastRouteInfo(route,
144 oldData.sinks(),
145 oldData.source())));
146 } else if (oldData.source() == null && newData.source() != null) {
147 // It is a source added event, broadcast new data
148 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
149 mcastRouteInfo(route,
150 newData.sinks(),
151 newData.source())));
152 } else {
153 // Scenario not managed for now
154 log.warn("Unhandled scenario {} - new {} - old {}", event.type());
155 }
Julia Ferguson8d315952017-11-19 13:56:17 +0000156 } else {
157 Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
158 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
159 mcastRouteInfo(route,
160 sink,
161 newData.source())))
162 );
163
164 Sets.difference(oldData.sinks(), newData.sinks()).forEach(sink ->
165 notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
166 mcastRouteInfo(route,
167 sink,
168 newData.source())))
169 );
170 }
171 break;
172 case REMOVE:
Pier Luigi9930da52018-02-02 16:19:11 +0100173 // Verify old data is not null
174 checkNotNull(oldData);
175 // Create a route removed event with just the route
176 // and the source connect point
Julia Ferguson8d315952017-11-19 13:56:17 +0000177 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
Pier Luigi9930da52018-02-02 16:19:11 +0100178 mcastRouteInfo(route,
Pier Luigi05514fd2018-02-28 17:24:03 +0100179 oldData.sinks(),
Pier Luigi9930da52018-02-02 16:19:11 +0100180 oldData.source()
181 )));
Julia Ferguson8d315952017-11-19 13:56:17 +0000182 break;
183 default:
184 log.warn("Unknown mcast operation type: {}", event.type());
185 }
186 }
187 }
188
alshabib79e52872015-12-07 16:01:01 -0800189 @Override
190 public void storeRoute(McastRoute route, Type operation) {
191 switch (operation) {
192 case ADD:
Julia Ferguson8d315952017-11-19 13:56:17 +0000193 mcastRoutes.putIfAbsent(route, MulticastData.empty());
alshabib79e52872015-12-07 16:01:01 -0800194 break;
195 case REMOVE:
Julia Ferguson8d315952017-11-19 13:56:17 +0000196 // before remove the route should check that source and sinks are removed?
197 mcastRoutes.remove(route);
alshabib79e52872015-12-07 16:01:01 -0800198 break;
199 default:
200 log.warn("Unknown mcast operation type: {}", operation);
201 }
202 }
203
204 @Override
205 public void storeSource(McastRoute route, ConnectPoint source) {
206 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
207 if (v == null) {
208 return new MulticastData(source);
209 } else {
210 v.setSource(source);
211 }
212 return v;
213 });
alshabib79e52872015-12-07 16:01:01 -0800214 }
215
216 @Override
217 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
218 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
219 switch (operation) {
220 case ADD:
221 if (v == null) {
222 v = MulticastData.empty();
223 }
224 v.appendSink(sink);
225 break;
226 case REMOVE:
227 if (v != null) {
228 v.removeSink(sink);
229 }
230 break;
231 default:
232 log.warn("Unknown mcast operation type: {}", operation);
233 }
234 return v;
235 });
alshabib79e52872015-12-07 16:01:01 -0800236 }
237
238 @Override
239 public ConnectPoint sourceFor(McastRoute route) {
240 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
241 }
242
243 @Override
244 public Set<ConnectPoint> sinksFor(McastRoute route) {
245 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
246 }
247
Jonathan Hart07eb0412016-02-08 16:42:29 -0800248 @Override
249 public Set<McastRoute> getRoutes() {
250 return mcastRoutes.keySet();
251 }
alshabib79e52872015-12-07 16:01:01 -0800252}