blob: f48bdc357d3e15404bb646ef3e461027ee8a097d [file] [log] [blame]
Jian Liecae4382018-06-28 17:41:12 +09001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.openstacknetworking.impl;
17
18import com.google.common.collect.ImmutableSet;
19import org.apache.felix.scr.annotations.Activate;
20import org.apache.felix.scr.annotations.Component;
21import org.apache.felix.scr.annotations.Deactivate;
22import org.apache.felix.scr.annotations.Reference;
23import org.apache.felix.scr.annotations.ReferenceCardinality;
24import org.apache.felix.scr.annotations.Service;
25import org.onlab.util.KryoNamespace;
26import org.onosproject.core.ApplicationId;
27import org.onosproject.core.CoreService;
28import org.onosproject.openstacknetworking.api.InstancePort;
29import org.onosproject.openstacknetworking.api.InstancePortEvent;
30import org.onosproject.openstacknetworking.api.InstancePortStore;
31import org.onosproject.openstacknetworking.api.InstancePortStoreDelegate;
32import org.onosproject.store.AbstractStore;
33import org.onosproject.store.serializers.KryoNamespaces;
34import org.onosproject.store.service.ConsistentMap;
35import org.onosproject.store.service.MapEvent;
36import org.onosproject.store.service.MapEventListener;
37import org.onosproject.store.service.Serializer;
38import org.onosproject.store.service.StorageService;
39import org.onosproject.store.service.Versioned;
40import org.slf4j.Logger;
41
42import java.util.Set;
43import java.util.concurrent.ExecutorService;
44
45import static com.google.common.base.Preconditions.checkArgument;
46import static java.util.concurrent.Executors.newSingleThreadExecutor;
47import static org.onlab.util.Tools.groupedThreads;
48import static org.onosproject.openstacknetworking.api.Constants.OPENSTACK_NETWORKING_APP_ID;
49import static org.onosproject.openstacknetworking.api.InstancePort.State.ACTIVE;
50import static org.onosproject.openstacknetworking.api.InstancePort.State.INACTIVE;
Jian Liec5c32b2018-07-13 14:28:58 +090051import static org.onosproject.openstacknetworking.api.InstancePort.State.MIGRATED;
Jian Liecae4382018-06-28 17:41:12 +090052import static org.onosproject.openstacknetworking.api.InstancePort.State.MIGRATING;
Jian Li0488c732018-09-14 20:53:07 +090053import static org.onosproject.openstacknetworking.api.InstancePort.State.REMOVE_PENDING;
Jian Liecae4382018-06-28 17:41:12 +090054import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_ENDED;
55import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_MIGRATION_STARTED;
56import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_PORT_DETECTED;
57import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_PORT_UPDATED;
58import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_PORT_VANISHED;
59import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_RESTARTED;
60import static org.onosproject.openstacknetworking.api.InstancePortEvent.Type.OPENSTACK_INSTANCE_TERMINATED;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Manages the inventory of openstack instance port using a {@code ConsistentMap}.
65 */
66@Service
67@Component(immediate = true)
68public class DistributedInstancePortStore
69 extends AbstractStore<InstancePortEvent, InstancePortStoreDelegate>
70 implements InstancePortStore {
71
72 protected final Logger log = getLogger(getClass());
73
74 private static final String ERR_NOT_FOUND = " does not exist";
75 private static final String ERR_DUPLICATE = " already exists";
76
77 private static final KryoNamespace SERIALIZER_INSTANCE_PORT = KryoNamespace.newBuilder()
78 .register(KryoNamespaces.API)
79 .register(InstancePort.class)
80 .register(DefaultInstancePort.class)
81 .register(InstancePort.State.class)
82 .build();
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85 protected CoreService coreService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88 protected StorageService storageService;
89
90 private final ExecutorService eventExecutor = newSingleThreadExecutor(
91 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
92
93 private final MapEventListener<String, InstancePort>
94 instancePortMapListener = new InstancePortMapListener();
95
96 private ConsistentMap<String, InstancePort> instancePortStore;
97
98 @Activate
99 protected void activate() {
100 ApplicationId appId = coreService.registerApplication(OPENSTACK_NETWORKING_APP_ID);
101
102 instancePortStore = storageService.<String, InstancePort>consistentMapBuilder()
103 .withSerializer(Serializer.using(SERIALIZER_INSTANCE_PORT))
104 .withName("openstack-instanceport-store")
105 .withApplicationId(appId)
106 .build();
107 instancePortStore.addListener(instancePortMapListener);
108
109 log.info("Started");
110 }
111
112 @Deactivate
113 protected void deactivate() {
114 instancePortStore.removeListener(instancePortMapListener);
115 eventExecutor.shutdown();
116
117 log.info("Stopped");
118 }
119
120 @Override
121 public void createInstancePort(InstancePort port) {
122 instancePortStore.compute(port.portId(), (id, existing) -> {
123 final String error = port.portId() + ERR_DUPLICATE;
124 checkArgument(existing == null, error);
125 return port;
126 });
127 }
128
129 @Override
130 public void updateInstancePort(InstancePort port) {
131 instancePortStore.compute(port.portId(), (id, existing) -> {
132 final String error = port.portId() + ERR_NOT_FOUND;
133 checkArgument(existing != null, error);
134 return port;
135 });
136 }
137
138 @Override
139 public InstancePort removeInstancePort(String portId) {
140 Versioned<InstancePort> port = instancePortStore.remove(portId);
141 return port == null ? null : port.value();
142 }
143
144 @Override
145 public InstancePort instancePort(String portId) {
146 return instancePortStore.asJavaMap().get(portId);
147 }
148
149 @Override
150 public Set<InstancePort> instancePorts() {
151 return ImmutableSet.copyOf(instancePortStore.asJavaMap().values());
152 }
153
154 @Override
155 public void clear() {
156 instancePortStore.clear();
157 }
158
159 private class InstancePortMapListener implements MapEventListener<String, InstancePort> {
160
161 @Override
162 public void event(MapEvent<String, InstancePort> event) {
163 switch (event.type()) {
164 case INSERT:
165 log.debug("Instance port created");
166 eventExecutor.execute(() ->
167 notifyDelegate(new InstancePortEvent(
168 OPENSTACK_INSTANCE_PORT_DETECTED,
169 event.newValue().value()))
170 );
171 break;
172 case UPDATE:
173 log.debug("Instance port updated");
Jian Liec5c32b2018-07-13 14:28:58 +0900174 eventExecutor.execute(() -> processInstancePortUpdate(event));
Jian Liecae4382018-06-28 17:41:12 +0900175 break;
176 case REMOVE:
177 log.debug("Instance port removed");
178 eventExecutor.execute(() ->
179 notifyDelegate(new InstancePortEvent(
180 OPENSTACK_INSTANCE_PORT_VANISHED,
181 event.oldValue().value()))
182 );
183 break;
184 default:
185 log.error("Unsupported instance port event type");
186 break;
187 }
188 }
189
190 private void processInstancePortUpdate(MapEvent<String, InstancePort> event) {
191 InstancePort.State oldState = event.oldValue().value().state();
192 InstancePort.State newState = event.newValue().value().state();
193
Jian Liee8214a2018-07-21 20:07:28 +0900194 if ((oldState == ACTIVE || oldState == INACTIVE) && newState == MIGRATING) {
Jian Liecae4382018-06-28 17:41:12 +0900195 notifyDelegate(new InstancePortEvent(
196 OPENSTACK_INSTANCE_MIGRATION_STARTED,
197 event.newValue().value()));
Jian Liec5c32b2018-07-13 14:28:58 +0900198 return;
Jian Liecae4382018-06-28 17:41:12 +0900199 }
200
Jian Liec5c32b2018-07-13 14:28:58 +0900201 if (oldState == MIGRATING && newState == MIGRATED) {
Jian Liecae4382018-06-28 17:41:12 +0900202 notifyDelegate(new InstancePortEvent(
203 OPENSTACK_INSTANCE_MIGRATION_ENDED,
204 event.newValue().value()));
Jian Liec5c32b2018-07-13 14:28:58 +0900205 updateInstancePort(event.newValue().value().updateState(ACTIVE));
206 return;
Jian Liecae4382018-06-28 17:41:12 +0900207 }
208
209 if (oldState == ACTIVE && newState == INACTIVE) {
210 notifyDelegate(new InstancePortEvent(
211 OPENSTACK_INSTANCE_TERMINATED,
212 event.newValue().value()));
Jian Liec5c32b2018-07-13 14:28:58 +0900213 return;
Jian Liecae4382018-06-28 17:41:12 +0900214 }
215
216 if (oldState == INACTIVE && newState == ACTIVE) {
217 notifyDelegate(new InstancePortEvent(
218 OPENSTACK_INSTANCE_RESTARTED,
219 event.newValue().value()));
Jian Liec5c32b2018-07-13 14:28:58 +0900220 return;
Jian Liecae4382018-06-28 17:41:12 +0900221 }
Jian Liec5c32b2018-07-13 14:28:58 +0900222
223 // this should be auto-transition
224 if (oldState == MIGRATED && newState == ACTIVE) {
225 return;
226 }
227
Jian Li0488c732018-09-14 20:53:07 +0900228 // we do not trigger instance port update for pending state transition
229 if (newState == REMOVE_PENDING) {
230 return;
231 }
232
Jian Liec5c32b2018-07-13 14:28:58 +0900233 notifyDelegate(new InstancePortEvent(
234 OPENSTACK_INSTANCE_PORT_UPDATED,
235 event.newValue().value()));
Jian Liecae4382018-06-28 17:41:12 +0900236 }
237 }
238}