blob: 7e412439a5becf7edcbf42d9a6f28275e428221e [file] [log] [blame]
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +09001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2017-present Open Networking Foundation
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +09003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.ofagent.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.util.KryoNamespace;
26import org.onosproject.core.ApplicationId;
27import org.onosproject.core.CoreService;
28import org.onosproject.incubator.net.virtual.NetworkId;
Jovana Vuletac884b692017-11-28 16:52:35 +010029import org.onosproject.incubator.net.virtual.TenantId;
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +090030import org.onosproject.ofagent.api.OFAgent;
31import org.onosproject.ofagent.api.OFAgentEvent;
32import org.onosproject.ofagent.api.OFAgentEvent.Type;
33import org.onosproject.ofagent.api.OFAgentStore;
34import org.onosproject.ofagent.api.OFAgentStoreDelegate;
35import org.onosproject.ofagent.api.OFController;
36import org.onosproject.store.AbstractStore;
37import org.onosproject.store.serializers.KryoNamespaces;
38import org.onosproject.store.service.ConsistentMap;
39import org.onosproject.store.service.MapEvent;
40import org.onosproject.store.service.MapEventListener;
41import org.onosproject.store.service.Serializer;
42import org.onosproject.store.service.StorageService;
43import org.onosproject.store.service.Versioned;
44import org.slf4j.Logger;
45
46import java.util.Set;
47import java.util.concurrent.ExecutorService;
48import java.util.stream.Collectors;
49
50import static com.google.common.base.Preconditions.checkArgument;
51import static java.util.concurrent.Executors.newSingleThreadExecutor;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.onosproject.ofagent.api.OFAgent.State.STARTED;
54import static org.onosproject.ofagent.api.OFAgentEvent.Type.*;
55import static org.onosproject.ofagent.api.OFAgentService.APPLICATION_NAME;
56import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * Implementation of the {@link OFAgentStore} with consistent map.
60 */
61@Service
62@Component(immediate = true)
63public class DistributedOFAgentStore extends AbstractStore<OFAgentEvent, OFAgentStoreDelegate>
64 implements OFAgentStore {
65
66 private final Logger log = getLogger(getClass());
67
68 private static final String ERR_NOT_FOUND = " does not exist";
69 private static final String ERR_DUPLICATE = " already exists";
70
71 private static final KryoNamespace SERIALIZER_OFAGENT = KryoNamespace.newBuilder()
72 .register(KryoNamespaces.API)
73 .register(OFAgent.class)
74 .register(OFAgent.State.class)
75 .register(NetworkId.class)
Jovana Vuletac884b692017-11-28 16:52:35 +010076 .register(TenantId.class)
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +090077 .register(DefaultOFAgent.class)
78 .register(OFController.class)
79 .register(DefaultOFController.class)
80 .build();
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
83 protected CoreService coreService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86 protected StorageService storageService;
87
88 private final ExecutorService eventExecutor = newSingleThreadExecutor(
89 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
90 private final MapEventListener<NetworkId, OFAgent> ofAgentMapListener = new OFAgentMapListener();
91
92 private ConsistentMap<NetworkId, OFAgent> ofAgentStore;
93
94 @Activate
95 protected void activate() {
96 ApplicationId appId = coreService.registerApplication(APPLICATION_NAME);
97 ofAgentStore = storageService.<NetworkId, OFAgent>consistentMapBuilder()
98 .withSerializer(Serializer.using(SERIALIZER_OFAGENT))
99 .withName("ofagentstore")
100 .withApplicationId(appId)
101 .build();
102 ofAgentStore.addListener(ofAgentMapListener);
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 protected void deactivate() {
109 ofAgentStore.removeListener(ofAgentMapListener);
110 eventExecutor.shutdown();
111
112 log.info("Stopped");
113 }
114
115 @Override
116 public void createOfAgent(OFAgent ofAgent) {
117 ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
118 final String error = ofAgent.networkId() + ERR_DUPLICATE;
119 checkArgument(existing == null, error);
120 return ofAgent;
121 });
122 }
123
124 @Override
125 public void updateOfAgent(OFAgent ofAgent) {
126 ofAgentStore.compute(ofAgent.networkId(), (id, existing) -> {
127 final String error = ofAgent.networkId() + ERR_NOT_FOUND;
128 checkArgument(existing != null, error);
129 return ofAgent;
130 });
131 }
132
133 @Override
134 public OFAgent removeOfAgent(NetworkId networkId) {
135 Versioned<OFAgent> ofAgent = ofAgentStore.remove(networkId);
136 return ofAgent == null ? null : ofAgent.value();
137 }
138
139 @Override
140 public OFAgent ofAgent(NetworkId networkId) {
141 Versioned<OFAgent> ofAgent = ofAgentStore.get(networkId);
142 return ofAgent == null ? null : ofAgent.value();
143 }
144
145 @Override
146 public Set<OFAgent> ofAgents() {
147 Set<OFAgent> ofAgents = ofAgentStore.values().stream()
148 .map(Versioned::value)
149 .collect(Collectors.toSet());
150 return ImmutableSet.copyOf(ofAgents);
151 }
152
153 private class OFAgentMapListener implements MapEventListener<NetworkId, OFAgent> {
154
155 @Override
156 public void event(MapEvent<NetworkId, OFAgent> event) {
157 switch (event.type()) {
158 case INSERT:
159 eventExecutor.execute(() -> {
Jovana Vuletac884b692017-11-28 16:52:35 +0100160 log.debug(OFAgent.TRACER_LOG_TENANT_ID_PREFIX + event.newValue().value().tenantId()
161 + " OFAgent for network {} created", event.key());
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +0900162 notifyDelegate(new OFAgentEvent(
163 Type.OFAGENT_CREATED,
164 event.newValue().value()));
165 });
166 break;
167 case UPDATE:
168 eventExecutor.execute(() -> {
Jovana Vuletac884b692017-11-28 16:52:35 +0100169 log.debug(OFAgent.TRACER_LOG_TENANT_ID_PREFIX + event.newValue().value().tenantId()
170 + " OFAgent for network {} updated", event.key());
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +0900171 processUpdated(event.oldValue().value(), event.newValue().value());
172 });
173 break;
174 case REMOVE:
175 eventExecutor.execute(() -> {
Jovana Vuletac884b692017-11-28 16:52:35 +0100176 log.debug(OFAgent.TRACER_LOG_TENANT_ID_PREFIX + event.oldValue().value().tenantId()
177 + " OFAgent for network {} removed", event.key());
Hyunsun Moonf4ba44f2017-03-14 03:25:52 +0900178 notifyDelegate(new OFAgentEvent(
179 Type.OFAGENT_REMOVED,
180 event.oldValue().value()));
181 });
182 break;
183 default:
184 break;
185 }
186 }
187
188 private void processUpdated(OFAgent oldValue, OFAgent newValue) {
189 if (!oldValue.controllers().equals(newValue.controllers())) {
190 oldValue.controllers().stream()
191 .filter(controller -> !newValue.controllers().contains(controller))
192 .forEach(controller -> notifyDelegate(new OFAgentEvent(
193 OFAGENT_CONTROLLER_REMOVED,
194 newValue,
195 controller)
196 ));
197
198 newValue.controllers().stream()
199 .filter(controller -> !oldValue.controllers().contains(controller))
200 .forEach(controller -> notifyDelegate(new OFAgentEvent(
201 OFAGENT_CONTROLLER_ADDED,
202 newValue,
203 controller
204 )));
205 }
206
207 if (oldValue.state() != newValue.state()) {
208 Type eventType = newValue.state() == STARTED ? OFAGENT_STARTED : OFAGENT_STOPPED;
209 notifyDelegate(new OFAgentEvent(eventType, newValue));
210 }
211 }
212 }
213}