[ONOS-6556] Distributed Implementation of PiPipeconfService
Change-Id: I7196ce6eee333e732c0cd5015d4d8d32ee069e27
diff --git a/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java b/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java
index a010ddc..81491d0 100644
--- a/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java
+++ b/apps/dhcprelay/src/test/java/org/onosproject/dhcprelay/store/DistributedDhcpRelayStoreTest.java
@@ -134,7 +134,7 @@
assertEquals(record.ip6Status(), removedRecord.ip6Status());
assertEquals(record.directlyConnected(), removedRecord.directlyConnected());
event = recordComplete.join();
- assertEquals(null, event.subject());
+ assertEquals(record, event.subject());
assertEquals(DhcpRelayStoreEvent.Type.REMOVED, event.type());
recordInStore = store.getDhcpRecord(HOST_ID).orElse(null);
assertNull(recordInStore);
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfDeviceMappingEvent.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfDeviceMappingEvent.java
new file mode 100644
index 0000000..540967c
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfDeviceMappingEvent.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Entity that represents pipeconf to device binding events.
+ */
+@Beta
+public class PiPipeconfDeviceMappingEvent extends AbstractEvent<PiPipeconfDeviceMappingEvent.Type, DeviceId> {
+
+ /**
+ * Type of pipeconf to device mapping event.
+ */
+ public enum Type {
+
+ /**
+ * Individual mapping pipeconf to device added.
+ */
+ CREATED,
+
+ /**
+ * Individual mapping pipeconf to device removed.
+ */
+ REMOVED,
+ }
+
+ /**
+ * Creates an event due to one Pipeconf being mapped to a device.
+ *
+ * @param type event type
+ * @param deviceId the deviceId for which the pipeconf was bound or updated.
+ */
+ public PiPipeconfDeviceMappingEvent(PiPipeconfDeviceMappingEvent.Type type, DeviceId deviceId) {
+ super(type, deviceId);
+ }
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStore.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStore.java
new file mode 100644
index 0000000..76438b2
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStore.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.store.Store;
+
+import java.util.Set;
+
+/**
+ * Manages the mapping of Pipeconfs that are deployed to devices; not intended for direct use.
+ */
+@Beta
+public interface PiPipeconfMappingStore extends Store<PiPipeconfDeviceMappingEvent, PiPipeconfMappingStoreDelegate> {
+
+ /**
+ * Retrieves the id of the pipeconf deployed on a given device.
+ *
+ * @param deviceId device identifier
+ * @return PiPipeconfId
+ */
+ PiPipeconfId getPipeconfId(DeviceId deviceId);
+
+ /**
+ * Retrieves the set of devices on which the given pipeconf is applied.
+ *
+ * @param pipeconfId pipeconf identifier
+ * @return the set of devices that have that pipeconf applied.
+ */
+ Set<DeviceId> getDevices(PiPipeconfId pipeconfId);
+
+ /**
+ * Stores or updates a binding between a device and the pipeconf deployed on it.
+ *
+ * @param deviceId deviceId
+ * @param pipeconfId pipeconfId
+ */
+
+ void createOrUpdateBinding(DeviceId deviceId, PiPipeconfId pipeconfId);
+
+ /**
+ * Removes device to pipeconf binding.
+ *
+ * @param deviceId deviceId
+ */
+
+ void removeBinding(DeviceId deviceId);
+
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStoreDelegate.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStoreDelegate.java
new file mode 100644
index 0000000..c2fbf17
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfMappingStoreDelegate.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.onosproject.net.pi.runtime;
+
+import com.google.common.annotations.Beta;
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Pipeconf store delegate abstraction.
+ */
+@Beta
+public interface PiPipeconfMappingStoreDelegate extends StoreDelegate<PiPipeconfDeviceMappingEvent> {
+}
diff --git a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java
index c43be6d..79bc624 100644
--- a/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java
+++ b/core/api/src/main/java/org/onosproject/net/pi/runtime/PiPipeconfService.java
@@ -89,4 +89,5 @@
* @return an optional pipeconf identifier
*/
Optional<PiPipeconfId> ofDevice(DeviceId deviceId);
+
}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java b/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java
index b28356d..4a8dc47 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestEventuallyConsistentMap.java
@@ -103,7 +103,7 @@
if (result != null) {
EventuallyConsistentMapEvent<K, V> removeEvent =
new EventuallyConsistentMapEvent<>(mapName, REMOVE,
- key, map.get(key));
+ key, result);
notifyListeners(removeEvent);
}
return result;
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
index 9072f3f..a0bf93e 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfManager.java
@@ -44,6 +44,7 @@
import org.onosproject.net.pi.model.PiPipeconf;
import org.onosproject.net.pi.model.PiPipeconfId;
import org.onosproject.net.pi.runtime.PiPipeconfConfig;
+import org.onosproject.net.pi.runtime.PiPipeconfMappingStore;
import org.onosproject.net.pi.runtime.PiPipeconfService;
import org.slf4j.Logger;
@@ -83,10 +84,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DriverAdminService driverAdminService;
- //TODO move to replicated map
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PiPipeconfMappingStore pipeconfMappingStore;
+
+ // Registered pipeconf are replicated through the app subsystem and registered on app activated events.
protected ConcurrentHashMap<PiPipeconfId, PiPipeconf> piPipeconfs = new ConcurrentHashMap<>();
- //TODO move to replicated map
- protected ConcurrentHashMap<DeviceId, PiPipeconfId> devicesToPipeconf = new ConcurrentHashMap<>();
protected ExecutorService executor =
Executors.newFixedThreadPool(5, groupedThreads("onos/pipipeconfservice",
@@ -120,7 +122,6 @@
cfgService.removeListener(cfgListener);
cfgService.unregisterConfigFactory(factory);
piPipeconfs.clear();
- devicesToPipeconf.clear();
cfgService = null;
driverAdminService = null;
driverService = null;
@@ -139,11 +140,12 @@
@Override
public void remove(PiPipeconfId pipeconfId) throws IllegalStateException {
- //TODO move to the distributed mechanism
//TODO add mechanism to remove from device.
if (!piPipeconfs.containsKey(pipeconfId)) {
throw new IllegalStateException(format("Pipeconf %s is not registered", pipeconfId));
}
+ // TODO remove the binding from the distributed Store when the lifecycle of a pipeconf is defined.
+ // pipeconfMappingStore.removeBindings(pipeconfId);
piPipeconfs.remove(pipeconfId);
}
@@ -212,7 +214,7 @@
// Completable future is needed for when this method will also apply the pipeline to the device.
// FIXME (maybe): the pipeline is currently applied by the general device provider. But we store here
// the association between device and pipeconf.
- devicesToPipeconf.put(deviceId, pipeconfId);
+ pipeconfMappingStore.createOrUpdateBinding(deviceId, pipeconfId);
operationResult.complete(true);
}
});
@@ -221,9 +223,10 @@
@Override
public Optional<PiPipeconfId> ofDevice(DeviceId deviceId) {
- return Optional.ofNullable(devicesToPipeconf.get(deviceId));
+ return Optional.ofNullable(pipeconfMappingStore.getPipeconfId(deviceId));
}
+
private class PiPipeconfDriverProviderInternal implements DriverProvider {
Driver driver;
@@ -245,7 +248,7 @@
if (id.id().equals("")) {
log.warn("Not adding empty pipeconfId for device {}", deviceId);
} else {
- devicesToPipeconf.put(deviceId, pipeconfConfig.piPipeconfId());
+ pipeconfMappingStore.createOrUpdateBinding(deviceId, id);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
new file mode 100644
index 0000000..d07429a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStore.java
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.pi.impl;
+
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.net.pi.runtime.PiPipeconfDeviceMappingEvent;
+import org.onosproject.net.pi.runtime.PiPipeconfMappingStore;
+import org.onosproject.net.pi.runtime.PiPipeconfMappingStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
+import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages information of pipeconf to device binding using gossip protocol to distribute
+ * information.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedDevicePipeconfMappingStore
+ extends AbstractStore<PiPipeconfDeviceMappingEvent, PiPipeconfMappingStoreDelegate>
+ implements PiPipeconfMappingStore {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ protected EventuallyConsistentMap<DeviceId, PiPipeconfId> deviceToPipeconf;
+
+ protected final EventuallyConsistentMapListener<DeviceId, PiPipeconfId> pipeconfListener =
+ new InternalPiPipeconfListener();
+
+ protected ConcurrentMap<PiPipeconfId, Set<DeviceId>> pipeconfToDevices = new ConcurrentHashMap<>();
+
+ @Activate
+ public void activate() {
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MultiValuedTimestamp.class);
+ deviceToPipeconf = storageService.<DeviceId, PiPipeconfId>eventuallyConsistentMapBuilder()
+ .withName("onos-pipeconf-table")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
+ deviceToPipeconf.addListener(pipeconfListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ deviceToPipeconf.removeListener(pipeconfListener);
+ deviceToPipeconf = null;
+ pipeconfToDevices = null;
+ log.info("Stopped");
+ }
+
+ @Override
+ public PiPipeconfId getPipeconfId(DeviceId deviceId) {
+ return deviceToPipeconf.get(deviceId);
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(PiPipeconfId pipeconfId) {
+ return pipeconfToDevices.get(pipeconfId);
+ }
+
+ @Override
+ public void createOrUpdateBinding(DeviceId deviceId, PiPipeconfId pipeconfId) {
+ deviceToPipeconf.put(deviceId, pipeconfId);
+ }
+
+ @Override
+ public void removeBinding(DeviceId deviceId) {
+ deviceToPipeconf.remove(deviceId);
+ }
+
+ private class InternalPiPipeconfListener implements EventuallyConsistentMapListener<DeviceId, PiPipeconfId> {
+
+ @Override
+ public void event(EventuallyConsistentMapEvent<DeviceId, PiPipeconfId> mapEvent) {
+ final PiPipeconfDeviceMappingEvent.Type type;
+ final DeviceId deviceId = mapEvent.key();
+ final PiPipeconfId pipeconfId = mapEvent.value();
+ switch (mapEvent.type()) {
+ case PUT:
+ type = PiPipeconfDeviceMappingEvent.Type.CREATED;
+ pipeconfToDevices.compute(pipeconfId, (pipeconf, devices) -> {
+ if (devices == null) {
+ devices = Sets.newConcurrentHashSet();
+ }
+ devices.add(deviceId);
+ return devices;
+ });
+ break;
+ case REMOVE:
+ type = PiPipeconfDeviceMappingEvent.Type.REMOVED;
+ pipeconfToDevices.computeIfPresent(pipeconfId, (pipeconf, devices) -> {
+ devices.remove(deviceId);
+ return devices;
+ });
+ break;
+ default:
+ throw new IllegalArgumentException("Wrong event type " + mapEvent.type());
+ }
+ notifyDelegate(new PiPipeconfDeviceMappingEvent(type, deviceId));
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/pi/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/package-info.java
new file mode 100644
index 0000000..d336455
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/pi/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation of distributed Pipeconf to device mapping store.
+ */
+package org.onosproject.store.pi.impl;
diff --git a/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
new file mode 100644
index 0000000..83b997d
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/pi/impl/DistributedDevicePipeconfMappingStoreTest.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.pi.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.pi.model.PiPipeconfId;
+import org.onosproject.store.service.TestStorageService;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test class for the Distributed Device to Pipeconf store.
+ */
+public class DistributedDevicePipeconfMappingStoreTest {
+
+ private DistributedDevicePipeconfMappingStore store;
+ private static final DeviceId DEVICE_ID = DeviceId.deviceId("foo:bar");
+ private static final PiPipeconfId PIPECONF_ID = new PiPipeconfId("foo-pipeconf");
+
+ /**
+ * Sets up the device key store and the storage service test harness.
+ */
+ @Before
+ public void setUp() {
+ store = new DistributedDevicePipeconfMappingStore();
+ store.storageService = new TestStorageService();
+ store.setDelegate(event -> {
+ });
+ store.activate();
+ }
+
+ /**
+ * Test for activate.
+ */
+ @Test
+ public void activate() {
+ assertNotNull(store.storageService);
+ assertTrue("Store must have delegate", store.hasDelegate());
+ assertTrue("No value should be in the map", store.deviceToPipeconf.isEmpty());
+ assertTrue("No value should be in the map", store.pipeconfToDevices.isEmpty());
+ }
+
+ /**
+ * Test for deactivate.
+ */
+ @Test
+ public void deactivate() {
+ store.deactivate();
+ assertNull("Should be null", store.deviceToPipeconf);
+ assertNull("Should be null", store.pipeconfToDevices);
+ }
+
+ /**
+ * Test for saving the binding in eventually consistent map and in reverse map.
+ */
+ @Test
+ public void createOrUpdatePipeconfToDeviceBinding() {
+ store.createOrUpdateBinding(DEVICE_ID, PIPECONF_ID);
+ assertTrue("Value should be in the map", store.deviceToPipeconf.containsKey(DEVICE_ID));
+ assertTrue("Value should be in the map", store.deviceToPipeconf.containsValue(PIPECONF_ID));
+ assertTrue("Value should be in the map", store.pipeconfToDevices.containsKey(PIPECONF_ID));
+ assertTrue("Value should be in the map", store.pipeconfToDevices.containsValue(ImmutableSet.of(DEVICE_ID)));
+ }
+
+ /**
+ * Test for getting the deviceId to pipeconfId binding.
+ */
+ @Test
+ public void getPipeconfIdDevice() throws Exception {
+ clear();
+ createOrUpdatePipeconfToDeviceBinding();
+ assertEquals("Wrong PipeconfId", store.getPipeconfId(DEVICE_ID), PIPECONF_ID);
+ }
+
+ /**
+ * Test for getting the pipeconfId to devices binding.
+ */
+ @Test
+ public void getDevices() {
+ clear();
+ createOrUpdatePipeconfToDeviceBinding();
+ assertEquals("Wrong set of DeviceIds", store.getDevices(PIPECONF_ID), ImmutableSet.of(DEVICE_ID));
+
+ }
+
+ /**
+ * Test for clearing binding for a given device.
+ */
+ @Test
+ public void clearDeviceToPipeconfBinding() throws Exception {
+ clear();
+ createOrUpdatePipeconfToDeviceBinding();
+ store.removeBinding(DEVICE_ID);
+ assertFalse("Unexpected DeviceId in the map", store.deviceToPipeconf.containsKey(DEVICE_ID));
+ assertTrue("No value should be in the map", store.pipeconfToDevices.get(PIPECONF_ID).isEmpty());
+ }
+
+ /**
+ * Clears the store and revers map.
+ */
+ private void clear() {
+ store.pipeconfToDevices.clear();
+ store.deviceToPipeconf.clear();
+ }
+
+}
\ No newline at end of file