blob: 6b1eff568258badd4086dee0d8817103e198bd48 [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Ray Milkeya9ae0d42017-08-06 22:10:47 -070016package org.onosproject.store.mcast.impl;
alshabib79e52872015-12-07 16:01:01 -080017
Julia Ferguson75ef1052017-11-19 13:56:17 +000018
19import com.google.common.base.Objects;
20import com.google.common.collect.Sets;
alshabib79e52872015-12-07 16:01:01 -080021import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
alshabib79e52872015-12-07 16:01:01 -080027import org.onlab.util.KryoNamespace;
Julia Ferguson75ef1052017-11-19 13:56:17 +000028
alshabib79e52872015-12-07 16:01:01 -080029import org.onosproject.net.ConnectPoint;
30import org.onosproject.net.mcast.McastEvent;
31import org.onosproject.net.mcast.McastRoute;
alshabib79e52872015-12-07 16:01:01 -080032import org.onosproject.net.mcast.McastStore;
33import org.onosproject.net.mcast.McastStoreDelegate;
34import org.onosproject.store.AbstractStore;
Jonathan Hart07eb0412016-02-08 16:42:29 -080035import org.onosproject.store.serializers.KryoNamespaces;
alshabib79e52872015-12-07 16:01:01 -080036import org.onosproject.store.service.ConsistentMap;
Julia Ferguson75ef1052017-11-19 13:56:17 +000037import org.onosproject.store.service.MapEvent;
38import org.onosproject.store.service.MapEventListener;
alshabib79e52872015-12-07 16:01:01 -080039import org.onosproject.store.service.Serializer;
40import org.onosproject.store.service.StorageService;
Julia Ferguson75ef1052017-11-19 13:56:17 +000041import org.onosproject.store.service.Versioned;
alshabib79e52872015-12-07 16:01:01 -080042import org.slf4j.Logger;
43
alshabib79e52872015-12-07 16:01:01 -080044import java.util.Map;
Julia Ferguson75ef1052017-11-19 13:56:17 +000045import java.util.Optional;
alshabib79e52872015-12-07 16:01:01 -080046import java.util.Set;
Julia Ferguson75ef1052017-11-19 13:56:17 +000047import java.util.concurrent.ScheduledExecutorService;
Madan Jampani72282af2016-02-23 14:23:52 -080048import java.util.concurrent.atomic.AtomicReference;
alshabib79e52872015-12-07 16:01:01 -080049
Julia Ferguson75ef1052017-11-19 13:56:17 +000050import static com.google.common.base.Preconditions.checkNotNull;
Thomas Vachuskad4955ae2016-08-23 14:56:37 -070051import static org.onosproject.net.mcast.McastRouteInfo.mcastRouteInfo;
alshabib79e52872015-12-07 16:01:01 -080052import static org.slf4j.LoggerFactory.getLogger;
53
54/**
55 * A distributed mcast store implementation. Routes are stored consistently
56 * across the cluster.
57 */
58@Component(immediate = true)
59@Service
Julia Ferguson75ef1052017-11-19 13:56:17 +000060public class DistributedMcastStore
61 extends AbstractStore<McastEvent, McastStoreDelegate>
62 implements McastStore {
alshabib79e52872015-12-07 16:01:01 -080063
Jon Halldbe4c532016-10-04 11:45:52 -070064 private static final String MCASTRIB = "onos-mcast-rib-table";
alshabib79e52872015-12-07 16:01:01 -080065 private Logger log = getLogger(getClass());
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jonathan Hart07eb0412016-02-08 16:42:29 -080068 protected StorageService storageService;
alshabib79e52872015-12-07 16:01:01 -080069
Julia Ferguson75ef1052017-11-19 13:56:17 +000070 private Map<McastRoute, MulticastData> mcastRoutes;
71 private ConsistentMap<McastRoute, MulticastData> mcastRib;
72 private MapEventListener<McastRoute, MulticastData> mcastRouteListener =
73 new McastRouteListener();
74
75 private ScheduledExecutorService executor;
alshabib79e52872015-12-07 16:01:01 -080076
77
78 @Activate
79 public void activate() {
Jonathan Hart51539b82015-10-29 09:53:04 -070080 mcastRib = storageService.<McastRoute, MulticastData>consistentMapBuilder()
alshabib79e52872015-12-07 16:01:01 -080081 .withName(MCASTRIB)
Jonathan Hart07eb0412016-02-08 16:42:29 -080082 .withSerializer(Serializer.using(KryoNamespace.newBuilder()
alshabib1aa58142016-02-17 15:37:56 -080083 .register(KryoNamespaces.API)
84 .register(
Madan Jampani72282af2016-02-23 14:23:52 -080085 AtomicReference.class,
alshabib1aa58142016-02-17 15:37:56 -080086 MulticastData.class,
87 McastRoute.class,
88 McastRoute.Type.class
89 ).build()))
Julia Ferguson75ef1052017-11-19 13:56:17 +000090 .build();
alshabib79e52872015-12-07 16:01:01 -080091
Jonathan Hart51539b82015-10-29 09:53:04 -070092 mcastRoutes = mcastRib.asJavaMap();
Julia Ferguson75ef1052017-11-19 13:56:17 +000093 mcastRib.addListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -080094
95 log.info("Started");
96 }
97
98 @Deactivate
99 public void deactivate() {
Julia Ferguson75ef1052017-11-19 13:56:17 +0000100 mcastRib.removeListener(mcastRouteListener);
alshabib79e52872015-12-07 16:01:01 -0800101 log.info("Stopped");
102 }
103
Julia Ferguson75ef1052017-11-19 13:56:17 +0000104
105 private class McastRouteListener implements MapEventListener<McastRoute, MulticastData> {
106 @Override
107 public void event(MapEvent<McastRoute, MulticastData> event) {
108 final McastRoute route = event.key();
109 final MulticastData newData = Optional.ofNullable(event.newValue()).map(Versioned::value).orElse(null);
110 final MulticastData oldData = Optional.ofNullable(event.oldValue()).map(Versioned::value).orElse(null);
111
112 switch (event.type()) {
113 case INSERT:
114 checkNotNull(newData);
115
116 if (newData.source() != null) {
117 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
118 mcastRouteInfo(route,
119 newData.sinks(),
120 newData.source())));
121 } else if (!newData.sinks().isEmpty()) {
122 newData.sinks().forEach(sink ->
123 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
124 mcastRouteInfo(route,
125 sink,
126 newData.source())))
127 );
128 } else {
129 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_ADDED,
130 mcastRouteInfo(route)));
131 }
132 break;
133 case UPDATE:
134 checkNotNull(newData);
135 checkNotNull(oldData);
136
137 if (!Objects.equal(oldData.source(), newData.source())) {
138 notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
139 mcastRouteInfo(route,
140 newData.sinks(),
141 newData.source())));
142 } else {
143 Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
144 notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
145 mcastRouteInfo(route,
146 sink,
147 newData.source())))
148 );
149
150 Sets.difference(oldData.sinks(), newData.sinks()).forEach(sink ->
151 notifyDelegate(new McastEvent(McastEvent.Type.SINK_REMOVED,
152 mcastRouteInfo(route,
153 sink,
154 newData.source())))
155 );
156 }
157 break;
158 case REMOVE:
159 notifyDelegate(new McastEvent(McastEvent.Type.ROUTE_REMOVED,
160 mcastRouteInfo(route)));
161 break;
162 default:
163 log.warn("Unknown mcast operation type: {}", event.type());
164 }
165 }
166 }
167
alshabib79e52872015-12-07 16:01:01 -0800168 @Override
169 public void storeRoute(McastRoute route, Type operation) {
170 switch (operation) {
171 case ADD:
Julia Ferguson75ef1052017-11-19 13:56:17 +0000172 mcastRoutes.putIfAbsent(route, MulticastData.empty());
alshabib79e52872015-12-07 16:01:01 -0800173 break;
174 case REMOVE:
Julia Ferguson75ef1052017-11-19 13:56:17 +0000175 // before remove the route should check that source and sinks are removed?
176 mcastRoutes.remove(route);
alshabib79e52872015-12-07 16:01:01 -0800177 break;
178 default:
179 log.warn("Unknown mcast operation type: {}", operation);
180 }
181 }
182
183 @Override
184 public void storeSource(McastRoute route, ConnectPoint source) {
185 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
186 if (v == null) {
187 return new MulticastData(source);
188 } else {
189 v.setSource(source);
190 }
191 return v;
192 });
alshabib79e52872015-12-07 16:01:01 -0800193 }
194
195 @Override
196 public void storeSink(McastRoute route, ConnectPoint sink, Type operation) {
197 MulticastData data = mcastRoutes.compute(route, (k, v) -> {
198 switch (operation) {
199 case ADD:
200 if (v == null) {
201 v = MulticastData.empty();
202 }
203 v.appendSink(sink);
204 break;
205 case REMOVE:
206 if (v != null) {
207 v.removeSink(sink);
208 }
209 break;
210 default:
211 log.warn("Unknown mcast operation type: {}", operation);
212 }
213 return v;
214 });
alshabib79e52872015-12-07 16:01:01 -0800215 }
216
217 @Override
218 public ConnectPoint sourceFor(McastRoute route) {
219 return mcastRoutes.getOrDefault(route, MulticastData.empty()).source();
220 }
221
222 @Override
223 public Set<ConnectPoint> sinksFor(McastRoute route) {
224 return mcastRoutes.getOrDefault(route, MulticastData.empty()).sinks();
225 }
226
Jonathan Hart07eb0412016-02-08 16:42:29 -0800227 @Override
228 public Set<McastRoute> getRoutes() {
229 return mcastRoutes.keySet();
230 }
alshabib79e52872015-12-07 16:01:01 -0800231}