blob: b9fb8696541c727d9bec834189c36d6a2d511134 [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Ray Milkeya9ae0d42017-08-06 22:10:47 -070016package org.onosproject.store.mcast.impl;
alshabib79e52872015-12-07 16:01:01 -080017
Julia Ferguson8d315952017-11-19 13:56:17 +000018
19import com.google.common.base.Objects;
20import com.google.common.collect.Sets;
alshabib79e52872015-12-07 16:01:01 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
alshabib79e52872015-12-07 16:01:01 -080027import org.onlab.util.KryoNamespace;
Julia Ferguson8d315952017-11-19 13:56:17 +000028
alshabib79e52872015-12-07 16:01:01 -080029import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.mcast.McastEvent;
31import org.onosproject.net.mcast.McastRoute;
alshabib79e52872015-12-07 16:01:01 -080032import org.onosproject.net.mcast.McastStore;
33import org.onosproject.net.mcast.McastStoreDelegate;
34import org.onosproject.store.AbstractStore;
Jonathan Hart07eb0412016-02-08 16:42:29 -080035import org.onosproject.store.serializers.KryoNamespaces;
alshabib79e52872015-12-07 16:01:01 -080036import org.onosproject.store.service.ConsistentMap;
Julia Ferguson8d315952017-11-19 13:56:17 +000037import org.onosproject.store.service.MapEvent;
38import org.onosproject.store.service.MapEventListener;
alshabib79e52872015-12-07 16:01:01 -080039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
Julia Ferguson8d315952017-11-19 13:56:17 +000041import org.onosproject.store.service.Versioned;
alshabib79e52872015-12-07 16:01:01 -080042import org.slf4j.Logger;
43
Pier Luigi9930da52018-02-02 16:19:11 +010044import java.util.Collections;
alshabib79e52872015-12-07 16:01:01 -080045import java.util.Map;
Julia Ferguson8d315952017-11-19 13:56:17 +000046import java.util.Optional;
alshabib79e52872015-12-07 16:01:01 -080047import java.util.Set;
Julia Ferguson8d315952017-11-19 13:56:17 +000048import java.util.concurrent.ScheduledExecutorService;
Madan Jampani72282af2016-02-23 14:23:52 -080049import java.util.concurrent.atomic.AtomicReference;
alshabib79e52872015-12-07 16:01:01 -080050
Julia Ferguson8d315952017-11-19 13:56:17 +000051import static com.google.common.base.Preconditions.checkNotNull;
Thomas Vachuskad4955ae2016-08-23 14:56:37 -070052import static org.onosproject.net.mcast.McastRouteInfo.mcastRouteInfo;
alshabib79e52872015-12-07 16:01:01 -080053import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * A distributed mcast store implementation. Routes are stored consistently
57 * across the cluster.
58 */
59@Component(immediate = true)
60@Service
Julia Ferguson8d315952017-11-19 13:56:17 +000061public class DistributedMcastStore
62 extends AbstractStore<McastEvent, McastStoreDelegate>
63 implements McastStore {
alshabib79e52872015-12-07 16:01:01 -080064
Jon Halldbe4c532016-10-04 11:45:52 -070065 private static final String MCASTRIB = "onos-mcast-rib-table";
alshabib79e52872015-12-07 16:01:01 -080066 private Logger log = getLogger(getClass());
67
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart07eb0412016-02-08 16:42:29 -080069 protected StorageService storageService;
alshabib79e52872015-12-07 16:01:01 -080070
Julia Ferguson8d315952017-11-19 13:56:17 +000071 private Map<McastRoute, MulticastData> mcastRoutes;
72 private ConsistentMap<McastRoute, MulticastData> mcastRib;
73 private MapEventListener<McastRoute, MulticastData> mcastRouteListener =
74 new McastRouteListener();
75
76 private ScheduledExecutorService executor;
alshabib79e52872015-12-07 16:01:01 -080077
78
79 @Activate
80 public void activate() {
Jonathan Hart51539b82015-10-29 09:53:04 -070081 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080082 .withName(MCASTRIB)
Jonathan Hart07eb0412016-02-08 16:42:29 -080083 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
alshabib1aa58142016-02-17 15:37:56 -080084 .register(KryoNamespaces.API)
85 .register(
Madan Jampani72282af2016-02-23 14:23:52 -080086 AtomicReference.class,
alshabib1aa58142016-02-17 15:37:56 -080087 MulticastData.class,
88 McastRoute.class,
89 McastRoute.Type.class
90 ).build()))
Julia Ferguson8d315952017-11-19 13:56:17 +000091 .build();
alshabib79e52872015-12-07 16:01:01 -080092
Jonathan Hart51539b82015-10-29 09:53:04 -070093 mcastRoutes = mcastRib.asJavaMap();
Julia Ferguson8d315952017-11-19 13:56:17 +000094 mcastRib.addListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -080095
96 log.info("Started");
97 }
98
99 @Deactivate
100 public void deactivate() {
Julia Ferguson8d315952017-11-19 13:56:17 +0000101 mcastRib.removeListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -0800102 log.info("Stopped");
103 }
104
Julia Ferguson8d315952017-11-19 13:56:17 +0000105
106 private class McastRouteListener implements MapEventListener<McastRoute, MulticastData> {
107 @Override
108 public void event(MapEvent<McastRoute, MulticastData> event) {
109 final McastRoute route = event.key();
110 final MulticastData newData = Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
111 final MulticastData oldData = Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
112
113 switch (event.type()) {
114 case INSERT:
115 checkNotNull(newData);
116
117 if (newData.source() != null) {
118 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
119 mcastRouteInfo(route,
120 newData.sinks(),
121 newData.source())));
122 } else if (!newData.sinks().isEmpty()) {
123 newData.sinks().forEach(sink ->
124 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
125 mcastRouteInfo(route,
126 sink,
127 newData.source())))
128 );
129 } else {
130 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
131 mcastRouteInfo(route)));
132 }
133 break;
134 case UPDATE:
135 checkNotNull(newData);
136 checkNotNull(oldData);
137
138 if (!Objects.equal(oldData.source(), newData.source())) {
139 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
140 mcastRouteInfo(route,
141 newData.sinks(),
142 newData.source())));
143 } else {
144 Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
145 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
146 mcastRouteInfo(route,
147 sink,
148 newData.source())))
149 );
150
151 Sets.difference(oldData.sinks(), newData.sinks()).forEach(sink ->
152 notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
153 mcastRouteInfo(route,
154 sink,
155 newData.source())))
156 );
157 }
158 break;
159 case REMOVE:
Pier Luigi9930da52018-02-02 16:19:11 +0100160 // Verify old data is not null
161 checkNotNull(oldData);
162 // Create a route removed event with just the route
163 // and the source connect point
Julia Ferguson8d315952017-11-19 13:56:17 +0000164 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
Pier Luigi9930da52018-02-02 16:19:11 +0100165 mcastRouteInfo(route,
166 Collections.emptySet(),
167 oldData.source()
168 )));
Julia Ferguson8d315952017-11-19 13:56:17 +0000169 break;
170 default:
171 log.warn("Unknown mcast operation type: {}", event.type());
172 }
173 }
174 }
175
alshabib79e52872015-12-07 16:01:01 -0800176 @Override
177 public void storeRoute(McastRoute route, Type operation) {
178 switch (operation) {
179 case ADD:
Julia Ferguson8d315952017-11-19 13:56:17 +0000180 mcastRoutes.putIfAbsent(route, MulticastData.empty());
alshabib79e52872015-12-07 16:01:01 -0800181 break;
182 case REMOVE:
Julia Ferguson8d315952017-11-19 13:56:17 +0000183 // before remove the route should check that source and sinks are removed?
184 mcastRoutes.remove(route);
alshabib79e52872015-12-07 16:01:01 -0800185 break;
186 default:
187 log.warn("Unknown mcast operation type: {}", operation);
188 }
189 }
190
191 @Override
192 public void storeSource(McastRoute route, ConnectPoint source) {
193 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
194 if (v == null) {
195 return new MulticastData(source);
196 } else {
197 v.setSource(source);
198 }
199 return v;
200 });
alshabib79e52872015-12-07 16:01:01 -0800201 }
202
203 @Override
204 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
205 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
206 switch (operation) {
207 case ADD:
208 if (v == null) {
209 v = MulticastData.empty();
210 }
211 v.appendSink(sink);
212 break;
213 case REMOVE:
214 if (v != null) {
215 v.removeSink(sink);
216 }
217 break;
218 default:
219 log.warn("Unknown mcast operation type: {}", operation);
220 }
221 return v;
222 });
alshabib79e52872015-12-07 16:01:01 -0800223 }
224
225 @Override
226 public ConnectPoint sourceFor(McastRoute route) {
227 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
228 }
229
230 @Override
231 public Set<ConnectPoint> sinksFor(McastRoute route) {
232 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
233 }
234
Jonathan Hart07eb0412016-02-08 16:42:29 -0800235 @Override
236 public Set<McastRoute> getRoutes() {
237 return mcastRoutes.keySet();
238 }
alshabib79e52872015-12-07 16:01:01 -0800239}