Adding route subsystem code to support EVPN

Change-Id: I5d9715af8e2a9474892267c97cd4763f12be4816
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/EvpnRouteTable.java b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/EvpnRouteTable.java
new file mode 100755
index 0000000..9fd8798
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/routing/impl/EvpnRouteTable.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.routing.impl;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.incubator.net.routing.EvpnInternalRouteEvent;
+import org.onosproject.incubator.net.routing.EvpnPrefix;
+import org.onosproject.incubator.net.routing.EvpnRoute;
+import org.onosproject.incubator.net.routing.EvpnRouteSet;
+import org.onosproject.incubator.net.routing.EvpnRouteStoreDelegate;
+import org.onosproject.incubator.net.routing.EvpnTable;
+import org.onosproject.incubator.net.routing.Label;
+import org.onosproject.incubator.net.routing.Route;
+import org.onosproject.incubator.net.routing.RouteDistinguisher;
+import org.onosproject.incubator.net.routing.RouteTableId;
+import org.onosproject.incubator.net.routing.VpnRouteTarget;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Default implementation of a route table based on a consistent map.
+ */
+public class EvpnRouteTable implements EvpnTable {
+
+    private final RouteTableId id;
+    private final ConsistentMap<EvpnPrefix, Set<EvpnRoute>> routes;
+    private final EvpnRouteStoreDelegate delegate;
+    private final ExecutorService executor;
+    private final RouteTableListener listener = new RouteTableListener();
+
+    private final Consumer<DistributedPrimitive.Status> statusChangeListener;
+
+    /**
+     * Creates a new route table.
+     *
+     * @param id             route table ID
+     * @param delegate       route store delegate to notify of events
+     * @param storageService storage service
+     * @param executor       executor service
+     */
+    public EvpnRouteTable(RouteTableId id, EvpnRouteStoreDelegate delegate,
+                          StorageService storageService, ExecutorService executor) {
+        this.delegate = checkNotNull(delegate);
+        this.id = checkNotNull(id);
+        this.routes = buildRouteMap(checkNotNull(storageService));
+        this.executor = checkNotNull(executor);
+
+        statusChangeListener = status -> {
+            if (status.equals(DistributedPrimitive.Status.ACTIVE)) {
+                executor.execute(this::notifyExistingRoutes);
+            }
+        };
+        routes.addStatusChangeListener(statusChangeListener);
+
+        notifyExistingRoutes();
+
+        routes.addListener(listener);
+    }
+
+    private void notifyExistingRoutes() {
+        routes.entrySet().stream()
+                .map(e -> new EvpnInternalRouteEvent(
+                        EvpnInternalRouteEvent.Type.ROUTE_ADDED,
+                        new EvpnRouteSet(id, e.getKey(), e.getValue().value())))
+                .forEach(delegate::notify);
+    }
+
+    private ConsistentMap<EvpnPrefix, Set<EvpnRoute>> buildRouteMap(StorageService
+                                                                            storageService) {
+        KryoNamespace routeTableSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(KryoNamespaces.MISC)
+                .register(Route.class)
+                .register(Route.Source.class)
+                .register(EvpnRoute.class)
+                .register(EvpnPrefix.class)
+                .register(RouteDistinguisher.class)
+                .register(MacAddress.class)
+                .register(IpPrefix.class)
+                .register(EvpnRoute.Source.class)
+                .register(IpAddress.class)
+                .register(VpnRouteTarget.class)
+                .register(Label.class)
+                .register(RouteTableId.class)
+                .build();
+        return storageService.<EvpnPrefix, Set<EvpnRoute>>consistentMapBuilder()
+                .withName("onos-evpn-routes-" + id.name())
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(routeTableSerializer))
+                .build();
+    }
+
+    @Override
+    public RouteTableId id() {
+        return id;
+    }
+
+    @Override
+    public void shutdown() {
+        routes.removeStatusChangeListener(statusChangeListener);
+        routes.removeListener(listener);
+    }
+
+    @Override
+    public void destroy() {
+        shutdown();
+        routes.destroy();
+    }
+
+    @Override
+    public void update(EvpnRoute route) {
+        routes.compute(route.evpnPrefix(), (prefix, set) -> {
+            if (set == null) {
+                set = new HashSet<>();
+            }
+            set.add(route);
+            return set;
+        });
+    }
+
+    @Override
+    public void remove(EvpnRoute route) {
+        routes.compute(route.evpnPrefix(), (prefix, set) -> {
+            if (set != null) {
+                set.remove(route);
+                if (set.isEmpty()) {
+                    return null;
+                }
+                return set;
+            }
+            return null;
+        });
+    }
+
+    @Override
+    public Collection<EvpnRouteSet> getRoutes() {
+        return routes.entrySet().stream()
+                .map(e -> new EvpnRouteSet(id, e.getKey(), e.getValue().value()))
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public EvpnRouteSet getRoutes(EvpnPrefix prefix) {
+        Versioned<Set<EvpnRoute>> routeSet = routes.get(prefix);
+
+        if (routeSet != null) {
+            return new EvpnRouteSet(id, prefix, routeSet.value());
+        }
+        return null;
+    }
+
+    @Override
+    public Collection<EvpnRoute> getRoutesForNextHop(IpAddress nextHop) {
+        // TODO index
+        return routes.values().stream()
+                .flatMap(v -> v.value().stream())
+                .filter(r -> r.nextHop().equals(nextHop))
+                .collect(Collectors.toSet());
+    }
+
+    private class RouteTableListener
+            implements MapEventListener<EvpnPrefix, Set<EvpnRoute>> {
+
+        private EvpnInternalRouteEvent createRouteEvent(
+                EvpnInternalRouteEvent.Type type, MapEvent<EvpnPrefix, Set<EvpnRoute>>
+                event) {
+            Set<EvpnRoute> currentRoutes =
+                    (event.newValue() == null) ? Collections.emptySet() : event.newValue().value();
+            return new EvpnInternalRouteEvent(type, new EvpnRouteSet(id, event
+                    .key(), currentRoutes));
+        }
+
+        @Override
+        public void event(MapEvent<EvpnPrefix, Set<EvpnRoute>> event) {
+            EvpnInternalRouteEvent ire = null;
+            switch (event.type()) {
+                case INSERT:
+                    ire = createRouteEvent(EvpnInternalRouteEvent.Type.ROUTE_ADDED, event);
+                    break;
+                case UPDATE:
+                    if (event.newValue().value().size() > event.oldValue().value().size()) {
+                        ire = createRouteEvent(EvpnInternalRouteEvent.Type.ROUTE_ADDED, event);
+                    } else {
+                        ire = createRouteEvent(EvpnInternalRouteEvent.Type.ROUTE_REMOVED, event);
+                    }
+                    break;
+                case REMOVE:
+                    ire = createRouteEvent(EvpnInternalRouteEvent.Type.ROUTE_REMOVED, event);
+                    break;
+                default:
+                    break;
+            }
+            if (ire != null) {
+                delegate.notify(ire);
+            }
+        }
+    }
+
+}
+