Introducing concept of a physical or logical region to facilitate
support of geographically distributed cluster and to lay ground
for multiple/filtered topology layouts.

Added implementation of manager and store; unit-tests included.

Change-Id: Ia01673a0b711b8785c0ea68768552c2f61d7ea6d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/region/impl/DistributedRegionStore.java b/core/store/dist/src/main/java/org/onosproject/store/region/impl/DistributedRegionStore.java
new file mode 100644
index 0000000..6243dac
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/region/impl/DistributedRegionStore.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.region.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Identifier;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.region.DefaultRegion;
+import org.onosproject.net.region.Region;
+import org.onosproject.net.region.RegionEvent;
+import org.onosproject.net.region.RegionId;
+import org.onosproject.net.region.RegionStore;
+import org.onosproject.net.region.RegionStoreDelegate;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.nullIsNotFound;
+import static org.onosproject.net.region.RegionEvent.Type.REGION_MEMBERSHIP_CHANGED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Consistent store implementation for tracking region definitions and device
+ * region affiliation.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedRegionStore
+        extends AbstractStore<RegionEvent, RegionStoreDelegate>
+        implements RegionStore {
+
+    private static final String NO_REGION = "Region does not exist";
+    private static final String DUPLICATE_REGION = "Region already exists";
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ConsistentMap<RegionId, Region> regionsRepo;
+    private Map<RegionId, Region> regionsById;
+
+    private ConsistentMap<RegionId, Set<DeviceId>> membershipRepo;
+    private Map<RegionId, Set<DeviceId>> regionDevices;
+
+    private Map<DeviceId, Region> regionsByDevice = new HashMap<>();
+
+    private final MapEventListener<RegionId, Region> listener =
+            new InternalRegionListener();
+    private final MapEventListener<RegionId, Set<DeviceId>> membershipListener =
+            new InternalMembershipListener();
+
+    @Activate
+    protected void activate() {
+        Serializer serializer =
+                Serializer.using(Arrays.asList(KryoNamespaces.API),
+                                 Identifier.class,
+                                 RegionId.class,
+                                 Region.class,
+                                 DefaultRegion.class,
+                                 Region.Type.class);
+
+        regionsRepo = storageService.<RegionId, Region>consistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("onos-regions")
+                .withRelaxedReadConsistency()
+                .build();
+        regionsRepo.addListener(listener);
+        regionsById = regionsRepo.asJavaMap();
+
+        membershipRepo = storageService.<RegionId, Set<DeviceId>>consistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("onos-region-devices")
+                .withRelaxedReadConsistency()
+                .build();
+        membershipRepo.addListener(membershipListener);
+        regionDevices = membershipRepo.asJavaMap();
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        regionsRepo.removeListener(listener);
+        membershipRepo.removeListener(membershipListener);
+        regionsByDevice.clear();
+        log.info("Stopped");
+    }
+
+    @Override
+    public Set<Region> getRegions() {
+        return ImmutableSet.copyOf(regionsById.values());
+    }
+
+    @Override
+    public Region getRegion(RegionId regionId) {
+        return nullIsNotFound(regionsById.get(regionId), NO_REGION);
+    }
+
+    @Override
+    public Region getRegionForDevice(DeviceId deviceId) {
+        return regionsByDevice.get(deviceId);
+    }
+
+    @Override
+    public Set<DeviceId> getRegionDevices(RegionId regionId) {
+        Set<DeviceId> deviceIds = regionDevices.get(regionId);
+        return deviceIds != null ? ImmutableSet.copyOf(deviceIds) : ImmutableSet.of();
+    }
+
+    @Override
+    public Region createRegion(RegionId regionId, String name, Region.Type type,
+                               List<Set<NodeId>> masterNodeIds) {
+        return regionsRepo.compute(regionId, (id, region) -> {
+            checkArgument(region == null, DUPLICATE_REGION);
+            return new DefaultRegion(regionId, name, type, masterNodeIds);
+        }).value();
+    }
+
+    @Override
+    public Region updateRegion(RegionId regionId, String name, Region.Type type,
+                               List<Set<NodeId>> masterNodeIds) {
+        return regionsRepo.compute(regionId, (id, region) -> {
+            nullIsNotFound(region, NO_REGION);
+            return new DefaultRegion(regionId, name, type, masterNodeIds);
+        }).value();
+    }
+
+    @Override
+    public void removeRegion(RegionId regionId) {
+        membershipRepo.remove(regionId);
+        regionsRepo.remove(regionId);
+    }
+
+    @Override
+    public void addDevices(RegionId regionId, Collection<DeviceId> deviceIds) {
+        membershipRepo.compute(regionId, (id, existingDevices) -> {
+            if (existingDevices == null) {
+                return ImmutableSet.copyOf(deviceIds);
+            } else if (!existingDevices.containsAll(deviceIds)) {
+                return ImmutableSet.<DeviceId>builder()
+                        .addAll(existingDevices)
+                        .addAll(deviceIds)
+                        .build();
+            } else {
+                return existingDevices;
+            }
+        });
+
+        Region region = regionsById.get(regionId);
+        deviceIds.forEach(deviceId -> regionsByDevice.put(deviceId, region));
+    }
+
+    @Override
+    public void removeDevices(RegionId regionId, Collection<DeviceId> deviceIds) {
+        membershipRepo.compute(regionId, (id, existingDevices) -> {
+            if (existingDevices == null || existingDevices.isEmpty()) {
+                return ImmutableSet.of();
+            } else {
+                return ImmutableSet.<DeviceId>builder()
+                        .addAll(Sets.difference(existingDevices,
+                                                ImmutableSet.copyOf(deviceIds)))
+                        .build();
+            }
+        });
+
+        deviceIds.forEach(deviceId -> regionsByDevice.remove(deviceId));
+    }
+
+    /**
+     * Listener class to map listener events to the region inventory events.
+     */
+    private class InternalRegionListener implements MapEventListener<RegionId, Region> {
+        @Override
+        public void event(MapEvent<RegionId, Region> event) {
+            Region region = null;
+            RegionEvent.Type type = null;
+            switch (event.type()) {
+                case INSERT:
+                    type = RegionEvent.Type.REGION_ADDED;
+                    region = checkNotNull(event.newValue().value());
+                    break;
+                case UPDATE:
+                    type = RegionEvent.Type.REGION_UPDATED;
+                    region = checkNotNull(event.newValue().value());
+                    break;
+                case REMOVE:
+                    type = RegionEvent.Type.REGION_REMOVED;
+                    region = checkNotNull(event.oldValue().value());
+                    break;
+                default:
+                    log.error("Unsupported event type: " + event.type());
+            }
+            notifyDelegate(new RegionEvent(type, region));
+        }
+    }
+
+    /**
+     * Listener class to map listener events to the region membership events.
+     */
+    private class InternalMembershipListener implements MapEventListener<RegionId, Set<DeviceId>> {
+        @Override
+        public void event(MapEvent<RegionId, Set<DeviceId>> event) {
+            if (event.type() != MapEvent.Type.REMOVE) {
+                notifyDelegate(new RegionEvent(REGION_MEMBERSHIP_CHANGED,
+                                               regionsById.get(event.key()),
+                                               event.newValue().value()));
+            }
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/region/impl/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/region/impl/package-info.java
new file mode 100644
index 0000000..32d8c90
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/region/impl/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * A distributed store implementation for tracking region definitions
+ *  consistently across the cluster.
+ */
+package org.onosproject.store.region.impl;
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onosproject/store/region/impl/DistributedRegionStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/region/impl/DistributedRegionStoreTest.java
new file mode 100644
index 0000000..df3f048
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/region/impl/DistributedRegionStoreTest.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.region.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.util.ItemNotFoundException;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.region.Region;
+import org.onosproject.net.region.RegionEvent;
+import org.onosproject.net.region.RegionId;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+import static org.onosproject.net.region.Region.Type.*;
+import static org.onosproject.net.region.RegionEvent.Type.*;
+
+/**
+ * Test of the distributed region store implementation.
+ */
+public class DistributedRegionStoreTest {
+
+    private static final RegionId RID1 = RegionId.regionId("r1");
+    private static final RegionId RID2 = RegionId.regionId("r2");
+
+    private static final DeviceId DID1 = DeviceId.deviceId("foo:d1");
+    private static final DeviceId DID2 = DeviceId.deviceId("foo:d2");
+    private static final DeviceId DID3 = DeviceId.deviceId("foo:d3");
+
+    private static final NodeId NID1 = NodeId.nodeId("n1");
+
+    private static final List<Set<NodeId>> MASTERS = ImmutableList.of(ImmutableSet.of(NID1));
+
+    private TestStore store;
+    private RegionEvent event;
+
+    /**
+     * Sets up the device key store and the storage service test harness.
+     */
+    @Before
+    public void setUp() {
+        store = new TestStore();
+        store.storageService = new TestStorageService();
+        store.setDelegate(e -> this.event = e);
+        store.activate();
+    }
+
+    /**
+     * Tears down the device key store.
+     */
+    @After
+    public void tearDown() {
+        store.deactivate();
+    }
+
+    @Test
+    public void basics() {
+        Region r1 = store.createRegion(RID1, "R1", METRO, MASTERS);
+        assertEquals("incorrect id", RID1, r1.id());
+        assertEquals("incorrect event", REGION_ADDED, event.type());
+
+        Region r2 = store.createRegion(RID2, "R2", CAMPUS, MASTERS);
+        assertEquals("incorrect id", RID2, r2.id());
+        assertEquals("incorrect type", CAMPUS, r2.type());
+        assertEquals("incorrect event", REGION_ADDED, event.type());
+
+        r2 = store.updateRegion(RID2, "R2", COUNTRY, MASTERS);
+        assertEquals("incorrect type", COUNTRY, r2.type());
+        assertEquals("incorrect event", REGION_UPDATED, event.type());
+
+        Set<Region> regions = store.getRegions();
+        assertEquals("incorrect size", 2, regions.size());
+        assertTrue("missing r1", regions.contains(r1));
+        assertTrue("missing r2", regions.contains(r2));
+
+        r1 = store.getRegion(RID1);
+        assertEquals("incorrect id", RID1, r1.id());
+
+        store.removeRegion(RID1);
+        regions = store.getRegions();
+        assertEquals("incorrect size", 1, regions.size());
+        assertTrue("missing r2", regions.contains(r2));
+        assertEquals("incorrect event", REGION_REMOVED, event.type());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void duplicateCreate() {
+        store.createRegion(RID1, "R1", METRO, MASTERS);
+        store.createRegion(RID1, "R2", CAMPUS, MASTERS);
+    }
+
+    @Test(expected = ItemNotFoundException.class)
+    public void missingUpdate() {
+        store.updateRegion(RID1, "R1", METRO, MASTERS);
+    }
+
+    @Test
+    public void membership() {
+        Region r = store.createRegion(RID1, "R1", METRO, MASTERS);
+        assertTrue("no devices expected", store.getRegionDevices(RID1).isEmpty());
+        assertNull("no region expected", store.getRegionForDevice(DID1));
+
+        store.addDevices(RID1, ImmutableSet.of(DID1, DID2));
+        Set<DeviceId> deviceIds = store.getRegionDevices(RID1);
+        assertEquals("incorrect device count", 2, deviceIds.size());
+        assertTrue("missing d1", deviceIds.contains(DID1));
+        assertTrue("missing d2", deviceIds.contains(DID2));
+        assertEquals("wrong region", r, store.getRegionForDevice(DID1));
+
+        store.addDevices(RID1, ImmutableSet.of(DID3));
+        deviceIds = store.getRegionDevices(RID1);
+        assertEquals("incorrect device count", 3, deviceIds.size());
+        assertTrue("missing d3", deviceIds.contains(DID3));
+
+        store.addDevices(RID1, ImmutableSet.of(DID3, DID1));
+        deviceIds = store.getRegionDevices(RID1);
+        assertEquals("incorrect device count", 3, deviceIds.size());
+
+        store.removeDevices(RID1, ImmutableSet.of(DID2, DID3));
+        deviceIds = store.getRegionDevices(RID1);
+        assertEquals("incorrect device count", 1, deviceIds.size());
+        assertTrue("missing d1", deviceIds.contains(DID1));
+
+        store.removeDevices(RID1, ImmutableSet.of(DID1, DID3));
+        assertTrue("no devices expected", store.getRegionDevices(RID1).isEmpty());
+
+        store.removeDevices(RID1, ImmutableSet.of(DID2));
+        assertTrue("no devices expected", store.getRegionDevices(RID1).isEmpty());
+    }
+
+    class TestStore extends DistributedRegionStore {
+    }
+}
\ No newline at end of file