Bug Fix - Tunnel subsystem 1. After delete the tunnel and then query the tunnel and call provider. Query will always return null. Null Pointer exception.
2. In Borrow Tunnel, Order relationship is added even if the there is no tunnel to borrow. Note only where tunnel manger do not initiate the tunnel creation through provider.
3. In ConsistancyMap when the key (the set) is empty still the map hold the key with empty list and never deleted. Memory leak.

Change-Id: Iff8ba662f4828324c0588a9dfc494a2b158bcfce
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/tunnel/impl/TunnelManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/tunnel/impl/TunnelManager.java
index 6e2901c..f93fae9 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/tunnel/impl/TunnelManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/tunnel/impl/TunnelManager.java
@@ -93,18 +93,20 @@
     @Override
     public void removeTunnel(TunnelId tunnelId) {
         checkNotNull(tunnelId, TUNNNEL_ID_NULL);
-        store.deleteTunnel(tunnelId);
         Tunnel tunnel = store.queryTunnel(tunnelId);
-        if (tunnel.providerId() != null) {
-            TunnelProvider provider = getProvider(tunnel.providerId());
-            if (provider != null) {
-                provider.releaseTunnel(tunnel);
-            }
-        } else {
-            Set<ProviderId> ids = getProviders();
-            for (ProviderId providerId : ids) {
-                TunnelProvider provider = getProvider(providerId);
-                provider.releaseTunnel(tunnel);
+        if (tunnel != null) {
+            store.deleteTunnel(tunnelId);
+            if (tunnel.providerId() != null) {
+                TunnelProvider provider = getProvider(tunnel.providerId());
+                if (provider != null) {
+                    provider.releaseTunnel(tunnel);
+                }
+            } else {
+                Set<ProviderId> ids = getProviders();
+                for (ProviderId providerId : ids) {
+                    TunnelProvider provider = getProvider(providerId);
+                    provider.releaseTunnel(tunnel);
+                }
             }
         }
     }
@@ -129,23 +131,25 @@
     @Override
     public void removeTunnels(TunnelEndPoint src, TunnelEndPoint dst,
                               ProviderId producerName) {
-        store.deleteTunnel(src, dst, producerName);
         Collection<Tunnel> setTunnels = store.queryTunnel(src, dst);
-        for (Tunnel tunnel : setTunnels) {
-            if (producerName != null
-                    && !tunnel.providerId().equals(producerName)) {
-                continue;
-            }
-            if (tunnel.providerId() != null) {
-                TunnelProvider provider = getProvider(tunnel.providerId());
-                if (provider != null) {
-                    provider.releaseTunnel(tunnel);
+        if (!setTunnels.isEmpty()) {
+            store.deleteTunnel(src, dst, producerName);
+            for (Tunnel tunnel : setTunnels) {
+                if (producerName != null
+                        && !tunnel.providerId().equals(producerName)) {
+                    continue;
                 }
-            } else {
-                Set<ProviderId> ids = getProviders();
-                for (ProviderId providerId : ids) {
-                    TunnelProvider provider = getProvider(providerId);
-                    provider.releaseTunnel(tunnel);
+                if (tunnel.providerId() != null) {
+                    TunnelProvider provider = getProvider(tunnel.providerId());
+                    if (provider != null) {
+                        provider.releaseTunnel(tunnel);
+                    }
+                } else {
+                    Set<ProviderId> ids = getProviders();
+                    for (ProviderId providerId : ids) {
+                        TunnelProvider provider = getProvider(providerId);
+                        provider.releaseTunnel(tunnel);
+                    }
                 }
             }
         }
@@ -154,24 +158,26 @@
     @Override
     public void removeTunnels(TunnelEndPoint src, TunnelEndPoint dst, Type type,
                               ProviderId producerName) {
-        store.deleteTunnel(src, dst, type, producerName);
         Collection<Tunnel> setTunnels = store.queryTunnel(src, dst);
-        for (Tunnel tunnel : setTunnels) {
-            if (producerName != null
-                    && !tunnel.providerId().equals(producerName)
-                    || !type.equals(tunnel.type())) {
-                continue;
-            }
-            if (tunnel.providerId() != null) {
-                TunnelProvider provider = getProvider(tunnel.providerId());
-                if (provider != null) {
-                    provider.releaseTunnel(tunnel);
+        if (!setTunnels.isEmpty()) {
+            store.deleteTunnel(src, dst, type, producerName);
+            for (Tunnel tunnel : setTunnels) {
+                if (producerName != null
+                        && !tunnel.providerId().equals(producerName)
+                        || !type.equals(tunnel.type())) {
+                    continue;
                 }
-            } else {
-                Set<ProviderId> ids = getProviders();
-                for (ProviderId providerId : ids) {
-                    TunnelProvider provider = getProvider(providerId);
-                    provider.releaseTunnel(tunnel);
+                if (tunnel.providerId() != null) {
+                    TunnelProvider provider = getProvider(tunnel.providerId());
+                    if (provider != null) {
+                        provider.releaseTunnel(tunnel);
+                    }
+                } else {
+                    Set<ProviderId> ids = getProviders();
+                    for (ProviderId providerId : ids) {
+                        TunnelProvider provider = getProvider(providerId);
+                        provider.releaseTunnel(tunnel);
+                    }
                 }
             }
         }
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
index d796068..bf61e0d 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
@@ -22,9 +22,11 @@
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -56,6 +58,8 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.EventuallyConsistentMapEvent;
+import org.onosproject.store.service.EventuallyConsistentMapListener;
 import org.onosproject.store.service.MultiValuedTimestamp;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.WallClockTimestamp;
@@ -64,6 +68,9 @@
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableSet;
 
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+
 /**
  * Manages inventory of tunnel in distributed data store that uses optimistic
  * replication and gossip based techniques.
@@ -95,17 +102,21 @@
 
     // tunnel identity as map key in the store.
     private EventuallyConsistentMap<TunnelId, Tunnel> tunnelIdAsKeyStore;
-    // tunnel name as map key in the store.
-    private EventuallyConsistentMap<TunnelName, Set<TunnelId>> tunnelNameAsKeyStore;
-    // maintains all the tunnels between source and destination.
-    private EventuallyConsistentMap<TunnelKey, Set<TunnelId>> srcAndDstKeyStore;
-    // maintains all the tunnels by tunnel type.
-    private EventuallyConsistentMap<Tunnel.Type, Set<TunnelId>> typeKeyStore;
     // maintains records that app subscribes tunnel.
     private EventuallyConsistentMap<ApplicationId, Set<TunnelSubscription>> orderRelationship;
 
+    // tunnel name as map key.
+    private final Map<TunnelName, Set<TunnelId>> tunnelNameAsKeyMap = Maps.newConcurrentMap();
+    // maintains all the tunnels between source and destination.
+    private final Map<TunnelKey, Set<TunnelId>> srcAndDstKeyMap = Maps.newConcurrentMap();
+    // maintains all the tunnels by tunnel type.
+    private final Map<Tunnel.Type, Set<TunnelId>> typeKeyMap = Maps.newConcurrentMap();
+
     private IdGenerator idGenerator;
 
+    private EventuallyConsistentMapListener<TunnelId, Tunnel> tunnelUpdateListener =
+            new InternalTunnelChangeEventListener();
+
     @Activate
     public void activate() {
         KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
@@ -116,33 +127,23 @@
                 .<TunnelId, Tunnel>eventuallyConsistentMapBuilder()
                 .withName("all_tunnel").withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
-        tunnelNameAsKeyStore = storageService
-                .<TunnelName, Set<TunnelId>>eventuallyConsistentMapBuilder()
-                .withName("tunnel_name_tunnel").withSerializer(serializer)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
-        srcAndDstKeyStore = storageService
-                .<TunnelKey, Set<TunnelId>>eventuallyConsistentMapBuilder()
-                .withName("src_dst_tunnel").withSerializer(serializer)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
-        typeKeyStore = storageService
-                .<Tunnel.Type, Set<TunnelId>>eventuallyConsistentMapBuilder()
-                .withName("type_tunnel").withSerializer(serializer)
-                .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
         orderRelationship = storageService
                 .<ApplicationId, Set<TunnelSubscription>>eventuallyConsistentMapBuilder()
                 .withName("type_tunnel").withSerializer(serializer)
                 .withTimestampProvider((k, v) -> new WallClockTimestamp()).build();
         idGenerator = coreService.getIdGenerator(tunnelOpTopic);
+        tunnelIdAsKeyStore.addListener(tunnelUpdateListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        tunnelIdAsKeyStore.removeListener(tunnelUpdateListener);
         orderRelationship.destroy();
         tunnelIdAsKeyStore.destroy();
-        srcAndDstKeyStore.destroy();
-        typeKeyStore.destroy();
-        tunnelNameAsKeyStore.destroy();
+        srcAndDstKeyMap.clear();
+        typeKeyMap.clear();
+        tunnelNameAsKeyMap.clear();
         log.info("Stopped");
     }
 
@@ -189,28 +190,8 @@
                                             tunnel.tunnelName(),
                                             tunnel.path(),
                                             tunnel.annotations());
-            TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst());
             tunnelIdAsKeyStore.put(tunnelId, newT);
-            Set<TunnelId> tunnelnameSet = tunnelNameAsKeyStore.get(tunnel
-                    .tunnelName());
-            if (tunnelnameSet == null) {
-                tunnelnameSet = new HashSet<TunnelId>();
-            }
-            tunnelnameSet.add(tunnelId);
-            tunnelNameAsKeyStore.put(tunnel
-                    .tunnelName(), tunnelnameSet);
-            Set<TunnelId> srcAndDstKeySet = srcAndDstKeyStore.get(key);
-            if (srcAndDstKeySet == null) {
-                srcAndDstKeySet = new HashSet<TunnelId>();
-            }
-            srcAndDstKeySet.add(tunnelId);
-            srcAndDstKeyStore.put(key, srcAndDstKeySet);
-            Set<TunnelId> typeKeySet = typeKeyStore.get(tunnel.type());
-            if (typeKeySet == null) {
-                typeKeySet = new HashSet<TunnelId>();
-            }
-            typeKeySet.add(tunnelId);
-            typeKeyStore.put(tunnel.type(), typeKeySet);
+
             TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_ADDED,
                                                 tunnel);
             notifyDelegate(event);
@@ -224,11 +205,9 @@
         if (deletedTunnel == null) {
             return;
         }
-        tunnelNameAsKeyStore.get(deletedTunnel.tunnelName()).remove(tunnelId);
+
         tunnelIdAsKeyStore.remove(tunnelId);
-        TunnelKey key = new TunnelKey(deletedTunnel.src(), deletedTunnel.dst());
-        srcAndDstKeyStore.get(key).remove(tunnelId);
-        typeKeyStore.get(deletedTunnel.type()).remove(tunnelId);
+
         TunnelEvent event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
                                             deletedTunnel);
         notifyDelegate(event);
@@ -238,7 +217,7 @@
     public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst,
                              ProviderId producerName) {
         TunnelKey key = TunnelKey.tunnelKey(src, dst);
-        Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+        Set<TunnelId> idSet = srcAndDstKeyMap.get(key);
         if (idSet == null) {
             return;
         }
@@ -251,11 +230,7 @@
             if (producerName == null || (producerName != null
                     && producerName.equals(deletedTunnel.providerId()))) {
                 tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId());
-                tunnelNameAsKeyStore.get(deletedTunnel.tunnelName())
-                        .remove(deletedTunnel.tunnelId());
-                srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId());
-                typeKeyStore.get(deletedTunnel.type())
-                        .remove(deletedTunnel.tunnelId());
+
                 event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
                                         deletedTunnel);
                 ls.add(event);
@@ -271,7 +246,7 @@
     public void deleteTunnel(TunnelEndPoint src, TunnelEndPoint dst, Type type,
                              ProviderId producerName) {
         TunnelKey key = TunnelKey.tunnelKey(src, dst);
-        Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+        Set<TunnelId> idSet = srcAndDstKeyMap.get(key);
         if (idSet == null) {
             return;
         }
@@ -284,11 +259,7 @@
             if (type.equals(deletedTunnel.type()) && (producerName == null || (producerName != null
                     && producerName.equals(deletedTunnel.providerId())))) {
                 tunnelIdAsKeyStore.remove(deletedTunnel.tunnelId());
-                tunnelNameAsKeyStore.get(deletedTunnel.tunnelName())
-                        .remove(deletedTunnel.tunnelId());
-                srcAndDstKeyStore.get(key).remove(deletedTunnel.tunnelId());
-                typeKeyStore.get(deletedTunnel.type())
-                        .remove(deletedTunnel.tunnelId());
+
                 event = new TunnelEvent(TunnelEvent.Type.TUNNEL_REMOVED,
                                         deletedTunnel);
                 ls.add(event);
@@ -335,7 +306,7 @@
         }
         orderRelationship.put(appId, orderSet);
         TunnelKey key = TunnelKey.tunnelKey(src, dst);
-        Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+        Set<TunnelId> idSet = srcAndDstKeyMap.get(key);
         if (idSet == null || idSet.size() == 0) {
             return Collections.emptySet();
         }
@@ -365,7 +336,7 @@
         }
         orderRelationship.put(appId, orderSet);
         TunnelKey key = TunnelKey.tunnelKey(src, dst);
-        Set<TunnelId> idSet = srcAndDstKeyStore.get(key);
+        Set<TunnelId> idSet = srcAndDstKeyMap.get(key);
         if (idSet == null || idSet.size() == 0) {
             return Collections.emptySet();
         }
@@ -391,11 +362,8 @@
         TunnelSubscription order = new TunnelSubscription(appId, null, null, null, null, tunnelName,
                                 annotations);
         boolean isExist = orderSet.contains(order);
-        if (!isExist) {
-            orderSet.add(order);
-        }
-        orderRelationship.put(appId, orderSet);
-        Set<TunnelId> idSet = tunnelNameAsKeyStore.get(tunnelName);
+
+        Set<TunnelId> idSet = tunnelNameAsKeyMap.get(tunnelName);
         if (idSet == null || idSet.size() == 0) {
             return Collections.emptySet();
         }
@@ -406,6 +374,12 @@
                 tunnelSet.add(result);
             }
         }
+
+        if (!tunnelSet.isEmpty() && !isExist) {
+            orderSet.add(order);
+            orderRelationship.put(appId, orderSet);
+        }
+
         return tunnelSet;
     }
 
@@ -466,7 +440,7 @@
     @Override
     public Collection<Tunnel> queryTunnel(Type type) {
         Collection<Tunnel> result = new HashSet<Tunnel>();
-        Set<TunnelId> tunnelIds = typeKeyStore.get(type);
+        Set<TunnelId> tunnelIds = typeKeyMap.get(type);
         if (tunnelIds == null) {
             return Collections.emptySet();
         }
@@ -481,7 +455,7 @@
     public Collection<Tunnel> queryTunnel(TunnelEndPoint src, TunnelEndPoint dst) {
         Collection<Tunnel> result = new HashSet<Tunnel>();
         TunnelKey key = TunnelKey.tunnelKey(src, dst);
-        Set<TunnelId> tunnelIds = srcAndDstKeyStore.get(key);
+        Set<TunnelId> tunnelIds = srcAndDstKeyMap.get(key);
         if (tunnelIds == null) {
             return Collections.emptySet();
         }
@@ -550,4 +524,65 @@
                     .add("dst", dst).toString();
         }
     }
+
+    /**
+     * Eventually consistent map listener for tunnel change event which updated the local map based on event.
+     */
+    private class InternalTunnelChangeEventListener
+            implements EventuallyConsistentMapListener<TunnelId, Tunnel> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<TunnelId, Tunnel> event) {
+            TunnelId tunnelId = event.key();
+            Tunnel tunnel = event.value();
+
+            if (event.type() == PUT) {
+
+                // Update tunnel name map
+                Set<TunnelId> tunnelNameSet = tunnelNameAsKeyMap.get(tunnel
+                        .tunnelName());
+                if (tunnelNameSet == null) {
+                    tunnelNameSet = new HashSet<TunnelId>();
+                }
+                tunnelNameSet.add(tunnelId);
+                tunnelNameAsKeyMap.put(tunnel.tunnelName(), tunnelNameSet);
+
+                // Update tunnel source and destination map
+                TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst());
+                Set<TunnelId> srcAndDstKeySet = srcAndDstKeyMap.get(key);
+                if (srcAndDstKeySet == null) {
+                    srcAndDstKeySet = new HashSet<TunnelId>();
+                }
+                srcAndDstKeySet.add(tunnelId);
+                srcAndDstKeyMap.put(key, srcAndDstKeySet);
+
+                // Update tunnel type map
+                Set<TunnelId> typeKeySet = typeKeyMap.get(tunnel.type());
+                if (typeKeySet == null) {
+                    typeKeySet = new HashSet<TunnelId>();
+                }
+                typeKeySet.add(tunnelId);
+                typeKeyMap.put(tunnel.type(), typeKeySet);
+            } else if (event.type() == REMOVE) {
+
+                // Update tunnel name map
+                tunnelNameAsKeyMap.get(tunnel.tunnelName()).remove(tunnelId);
+                if (tunnelNameAsKeyMap.get(tunnel.tunnelName()).isEmpty()) {
+                    tunnelNameAsKeyMap.remove(tunnel.tunnelName());
+                }
+
+                // Update tunnel source and destination map
+                TunnelKey key = TunnelKey.tunnelKey(tunnel.src(), tunnel.dst());
+                srcAndDstKeyMap.get(key).remove(tunnelId);
+                if (srcAndDstKeyMap.get(key).isEmpty()) {
+                    srcAndDstKeyMap.remove(key);
+                }
+
+                // Update tunnel type map
+                typeKeyMap.get(tunnel.type()).remove(tunnelId);
+                if (typeKeyMap.get(tunnel.type()).isEmpty()) {
+                    typeKeyMap.remove(tunnel.type());
+                }
+            }
+        }
+    }
 }