Removing hazelcat dependency throughout.
Change-Id: I738050fda142418d2956f613035892dac82ef098
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index ba456ca..6585f30 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -71,7 +71,7 @@
<dependency>
<groupId>org.mapdb</groupId>
<artifactId>mapdb</artifactId>
- <version>1.0.7</version>
+ <version>1.0.7</version>
</dependency>
<dependency>
@@ -93,31 +93,21 @@
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <scope>test</scope>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- </dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
</dependency>
<!-- for shaded copycat -->
<dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-thirdparty</artifactId>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onlab-thirdparty</artifactId>
</dependency>
</dependencies>
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
index 4d12723..eb80669 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
@@ -1,6 +1,5 @@
package org.onosproject.store.cluster.impl;
-import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
@@ -31,7 +30,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
-import com.hazelcast.util.AddressUtil;
/**
* Implementation of ClusterDefinitionService.
@@ -115,7 +113,7 @@
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
- if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
+ if (matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
return ip;
}
}
@@ -169,4 +167,11 @@
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
+
+ // Indicates whether the specified interface address matches the given prefix.
+ // FIXME: Add a facility to IpPrefix to make this more robust
+ private static boolean matchInterface(String ip, String ipPrefix) {
+ String s = ipPrefix.replaceAll("\\.\\*", "");
+ return ip.startsWith(s);
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
deleted file mode 100644
index c7be0db..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.Member;
-import com.hazelcast.core.MemberAttributeEvent;
-import com.hazelcast.core.MembershipEvent;
-import com.hazelcast.core.MembershipListener;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.joda.time.DateTime;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterStore;
-import org.onosproject.cluster.ClusterStoreDelegate;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.OptionalCacheLoader;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import static com.google.common.cache.CacheBuilder.newBuilder;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_ACTIVATED;
-import static org.onosproject.cluster.ClusterEvent.Type.INSTANCE_DEACTIVATED;
-import static org.onosproject.cluster.ControllerNode.State;
-
-/**
- * Distributed, Hazelcast-based implementation of the cluster nodes store.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class HazelcastClusterStore
- extends AbstractHazelcastStore<ClusterEvent, ClusterStoreDelegate>
- implements ClusterStore {
-
- private IMap<byte[], byte[]> rawNodes;
- private LoadingCache<NodeId, Optional<DefaultControllerNode>> nodes;
-
- private String listenerId;
- private final MembershipListener listener = new InternalMembershipListener();
- private final Map<NodeId, State> states = new ConcurrentHashMap<>();
- private final Map<NodeId, DateTime> lastUpdatedTimes = Maps.newConcurrentMap();
-
- private String nodesListenerId;
-
- @Override
- @Activate
- public void activate() {
- super.activate();
- listenerId = theInstance.getCluster().addMembershipListener(listener);
-
- rawNodes = theInstance.getMap("nodes");
- OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
- = new OptionalCacheLoader<>(serializer, rawNodes);
- nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
- nodesListenerId = rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
-
- loadClusterNodes();
-
- log.info("Started");
- }
-
- // Loads the initial set of cluster nodes
- private void loadClusterNodes() {
- for (Member member : theInstance.getCluster().getMembers()) {
- addNode(node(member));
- }
- }
-
- @Deactivate
- public void deactivate() {
- rawNodes.removeEntryListener(nodesListenerId);
- theInstance.getCluster().removeMembershipListener(listenerId);
- log.info("Stopped");
- }
-
- @Override
- public ControllerNode getLocalNode() {
- return node(theInstance.getCluster().getLocalMember());
- }
-
- @Override
- public Set<ControllerNode> getNodes() {
- ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
- for (Optional<DefaultControllerNode> optional : nodes.asMap().values()) {
- builder.add(optional.get());
- }
- return builder.build();
- }
-
- @Override
- public ControllerNode getNode(NodeId nodeId) {
- return nodes.getUnchecked(nodeId).orNull();
- }
-
- @Override
- public State getState(NodeId nodeId) {
- State state = states.get(nodeId);
- return state == null ? State.INACTIVE : state;
- }
-
- @Override
- public DateTime getLastUpdated(NodeId nodeId) {
- return lastUpdatedTimes.get(nodeId);
- }
-
- @Override
- public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
- }
-
- @Override
- public void removeNode(NodeId nodeId) {
- synchronized (this) {
- rawNodes.remove(serialize(nodeId));
- nodes.invalidate(nodeId);
- }
- }
-
- // Adds a new node based on the specified member
- private synchronized ControllerNode addNode(DefaultControllerNode node) {
- rawNodes.put(serialize(node.id()), serialize(node));
- nodes.put(node.id(), Optional.of(node));
- updateState(node.id(), State.ACTIVE);
- return node;
- }
-
- // Creates a controller node descriptor from the Hazelcast member.
- private DefaultControllerNode node(Member member) {
- IpAddress ip = memberAddress(member);
- return new DefaultControllerNode(new NodeId(ip.toString()), ip);
- }
-
- private IpAddress memberAddress(Member member) {
- return IpAddress.valueOf(member.getSocketAddress().getAddress());
- }
-
- private void updateState(NodeId nodeId, State newState) {
- updateState(nodeId, newState);
- lastUpdatedTimes.put(nodeId, DateTime.now());
- }
-
- // Interceptor for membership events.
- private class InternalMembershipListener implements MembershipListener {
- @Override
- public void memberAdded(MembershipEvent membershipEvent) {
- log.info("Member {} added", membershipEvent.getMember());
- ControllerNode node = addNode(node(membershipEvent.getMember()));
- notifyDelegate(new ClusterEvent(INSTANCE_ACTIVATED, node));
- }
-
- @Override
- public void memberRemoved(MembershipEvent membershipEvent) {
- log.info("Member {} removed", membershipEvent.getMember());
- NodeId nodeId = new NodeId(memberAddress(membershipEvent.getMember()).toString());
- updateState(nodeId, State.INACTIVE);
- notifyDelegate(new ClusterEvent(INSTANCE_DEACTIVATED, getNode(nodeId)));
- }
-
- @Override
- public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
- log.info("Member {} attribute {} changed to {}",
- memberAttributeEvent.getMember(),
- memberAttributeEvent.getKey(),
- memberAttributeEvent.getValue());
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
deleted file mode 100644
index 5b73246..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ /dev/null
@@ -1,600 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.collect.Maps;
-import com.hazelcast.config.TopicConfig;
-import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.ILock;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.event.ListenerRegistry;
-import org.onosproject.event.EventDeliveryService;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.hz.StoreService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-
-/**
- * Distributed implementation of LeadershipService that is based on Hazelcast.
- * <p>
- * The election is eventually-consistent: if there is Hazelcast partitioning,
- * and the partitioning is healed, there could be a short window of time
- * until the leaders in each partition discover each other. If this happens,
- * the leaders release the leadership and run again for election.
- * </p>
- * <p>
- * The leader election is based on Hazelcast's Global Lock, which is stongly
- * consistent. In addition, each leader periodically advertises events
- * (using a Hazelcast Topic) that it is the elected leader. Those events are
- * used for two purposes: (1) Discover multi-leader collisions (in case of
- * healed Hazelcast partitions), and (2) Inform all listeners who is
- * the current leader (e.g., for informational purpose).
- * </p>
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class HazelcastLeadershipService implements LeadershipService {
- private static final Logger log =
- LoggerFactory.getLogger(HazelcastLeadershipService.class);
-
- private static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .build()
- .populate(1);
- }
- };
-
- private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
- private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000; // 15s
- private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
-
- // indicates there is no term value yet
- private static final long NO_TERM = 0;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StoreService storeService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected EventDeliveryService eventDispatcher;
-
- private ListenerRegistry<LeadershipEvent, LeadershipEventListener>
- listenerRegistry;
- private final Map<String, Topic> topics = Maps.newConcurrentMap();
- private NodeId localNodeId;
-
- private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
- new MessageSubject("hz-leadership-events");
-
- private ExecutorService messageHandlingExecutor;
-
- @Activate
- protected void activate() {
- localNodeId = clusterService.getLocalNode().id();
- listenerRegistry = new ListenerRegistry<>();
- eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
-
- TopicConfig topicConfig = new TopicConfig();
- topicConfig.setGlobalOrderingEnabled(true);
- topicConfig.setName(TOPIC_HZ_ID);
- storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
-
- messageHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/leadership", "message-handler"));
-
- clusterCommunicator.addSubscriber(
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- new InternalLeadershipEventListener(),
- messageHandlingExecutor);
-
- log.info("Hazelcast Leadership Service started");
- }
-
- @Deactivate
- protected void deactivate() {
- eventDispatcher.removeSink(LeadershipEvent.class);
- messageHandlingExecutor.shutdown();
- clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
-
- for (Topic topic : topics.values()) {
- topic.stop();
- }
- topics.clear();
-
- log.info("Hazelcast Leadership Service stopped");
- }
-
- @Override
- public NodeId getLeader(String path) {
- Topic topic = topics.get(path);
- if (topic == null) {
- return null;
- }
- return topic.leader();
- }
-
- @Override
- public Leadership getLeadership(String path) {
- checkArgument(path != null);
- Topic topic = topics.get(path);
- if (topic != null) {
- return new Leadership(topic.topicName(),
- topic.leader(),
- topic.term(),
- 0);
- }
- return null;
- }
-
- @Override
- public Set<String> ownedTopics(NodeId nodeId) {
- checkArgument(nodeId != null);
- return topics.values()
- .stream()
- .filter(topic -> nodeId.equals(topic.leader()))
- .map(topic -> topic.topicName)
- .collect(Collectors.toSet());
- }
-
- @Override
- public CompletableFuture<Leadership> runForLeadership(String path) {
- checkArgument(path != null);
- Topic topic = new Topic(path);
- Topic oldTopic = topics.putIfAbsent(path, topic);
- if (oldTopic == null) {
- topic.start();
- topic.runForLeadership();
- } else {
- oldTopic.runForLeadership();
- }
- return CompletableFuture.completedFuture(getLeadership(path));
- }
-
- @Override
- public CompletableFuture<Void> withdraw(String path) {
- checkArgument(path != null);
- Topic topic = topics.get(path);
- if (topic != null) {
- topics.remove(path, topic);
- topic.stop();
- }
- return CompletableFuture.completedFuture(null);
- }
-
- @Override
- public Map<String, Leadership> getLeaderBoard() {
- Map<String, Leadership> result = new HashMap<>();
-
- for (Topic topic : topics.values()) {
- Leadership leadership = new Leadership(topic.topicName(),
- topic.leader(),
- topic.term(),
- 0);
- result.put(topic.topicName(), leadership);
- }
- return result;
- }
-
- @Override
- public void addListener(LeadershipEventListener listener) {
- listenerRegistry.addListener(listener);
- }
-
- @Override
- public void removeListener(LeadershipEventListener listener) {
- listenerRegistry.removeListener(listener);
- }
-
- /**
- * Class for keeping per-topic information.
- */
- private final class Topic {
- private final String topicName;
- private volatile boolean isShutdown = true;
- private volatile boolean isRunningForLeadership = false;
- private volatile long lastLeadershipUpdateMs = 0;
- private ExecutorService leaderElectionExecutor;
-
- private volatile IAtomicLong term;
- // This is local state, recording the term number for the last time
- // this instance was leader for this topic. The current term could be
- // higher if the mastership has changed any times.
- private long myLastLeaderTerm = NO_TERM;
-
- private volatile NodeId leader;
- private ILock leaderLock;
- private Future<?> getLockFuture;
- private Future<?> periodicProcessingFuture;
-
- /**
- * Constructor.
- *
- * @param topicName the topic name
- */
- private Topic(String topicName) {
- this.topicName = topicName;
- }
-
- /**
- * Gets the topic name.
- *
- * @return the topic name
- */
- private String topicName() {
- return topicName;
- }
-
- /**
- * Gets the leader for the topic.
- *
- * @return the leader for the topic
- */
- private NodeId leader() {
- return leader;
- }
-
- /**
- * Gets the current term for the topic.
- *
- * @return the term for the topic
- */
- private long term() {
- if (term == null) {
- return NO_TERM;
- }
- return term.get();
- }
-
- /**
- * Starts operation.
- */
- private synchronized void start() {
- if (!isShutdown) {
- // already running
- return;
- }
- isShutdown = false;
- String threadPoolName = "election-" + topicName + "-%d";
- leaderElectionExecutor = Executors.newScheduledThreadPool(2,
- groupedThreads("onos/leadership", threadPoolName));
-
- periodicProcessingFuture =
- leaderElectionExecutor.submit(new Runnable() {
- @Override
- public void run() {
- doPeriodicProcessing();
- }
- });
- }
-
- /**
- * Runs for leadership.
- */
- private synchronized void runForLeadership() {
- if (isRunningForLeadership) {
- return; // Nothing to do: already running
- }
- if (isShutdown) {
- start();
- }
- isRunningForLeadership = true;
- String lockHzId = "LeadershipService/" + topicName + "/lock";
- String termHzId = "LeadershipService/" + topicName + "/term";
- leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
- term = storeService.getHazelcastInstance().getAtomicLong(termHzId);
-
- getLockFuture = leaderElectionExecutor.submit(new Runnable() {
- @Override
- public void run() {
- doLeaderElectionThread();
- }
- });
- }
-
- /**
- * Stops leadership election for the topic.
- */
- private synchronized void stop() {
- isShutdown = true;
- isRunningForLeadership = false;
- // getLockFuture.cancel(true);
- // periodicProcessingFuture.cancel(true);
- leaderElectionExecutor.shutdownNow();
- }
-
- /**
- * Received a Leadership Event.
- *
- * @param leadershipEvent the received Leadership Event
- */
- private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
- NodeId eventLeaderId = leadershipEvent.subject().leader();
- if (!leadershipEvent.subject().topic().equals(topicName)) {
- return; // Not our topic: ignore
- }
- if (eventLeaderId.equals(localNodeId)) {
- return; // My own message: ignore
- }
-
- synchronized (this) {
- switch (leadershipEvent.type()) {
- case LEADER_ELECTED:
- // FALLTHROUGH
- case LEADER_REELECTED:
- //
- // Another leader: if we are also a leader, then give up
- // leadership and run for re-election.
- //
- if ((leader != null) && leader.equals(localNodeId)) {
- if (getLockFuture != null) {
- getLockFuture.cancel(true);
- }
- } else {
- // Just update the current leader
- leader = leadershipEvent.subject().leader();
- lastLeadershipUpdateMs = System.currentTimeMillis();
- }
- break;
- case LEADER_BOOTED:
- // Remove the state for the current leader
- if ((leader != null) && eventLeaderId.equals(leader)) {
- leader = null;
- }
- break;
- default:
- break;
- }
- }
- }
-
- private void doPeriodicProcessing() {
-
- while (!isShutdown) {
-
- //
- // Periodic tasks:
- // (a) Advertise ourselves as the leader
- // OR
- // (b) Expire a stale (remote) leader
- //
- synchronized (this) {
- LeadershipEvent leadershipEvent;
- if (leader != null) {
- if (leader.equals(localNodeId)) {
- //
- // Advertise ourselves as the leader
- //
- leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
- // Dispatch to all instances
-
- clusterCommunicator.broadcastIncludeSelf(
- leadershipEvent,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
- } else {
- //
- // Test if time to expire a stale leader
- //
- long delta = System.currentTimeMillis() -
- lastLeadershipUpdateMs;
- if (delta > LEADERSHIP_REMOTE_TIMEOUT_MS) {
- log.debug("Topic {} leader {} booted due to heartbeat timeout",
- topicName, leader);
- leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, leader, myLastLeaderTerm, 0));
- // Dispatch only to the local listener(s)
- eventDispatcher.post(leadershipEvent);
- leader = null;
- }
- }
- }
- }
-
- // Sleep before re-advertising
- try {
- Thread.sleep(LEADERSHIP_PERIODIC_INTERVAL_MS);
- } catch (InterruptedException e) {
- log.debug("Leader Election periodic thread interrupted");
- }
- }
- }
-
- /**
- * Performs the leader election by using Hazelcast.
- */
- private void doLeaderElectionThread() {
-
- while (!isShutdown) {
- LeadershipEvent leadershipEvent;
- //
- // Try to acquire the lock and keep it until the instance is
- // shutdown.
- //
- log.debug("Leader Election begin for topic {}",
- topicName);
- try {
- // Block until it becomes the leader
- leaderLock.lockInterruptibly();
- } catch (InterruptedException e) {
- //
- // Thread interrupted. Either shutdown or run for
- // re-election.
- //
- log.debug("Election interrupted for topic {}",
- topicName);
- continue;
- }
-
- try {
- synchronized (this) {
- //
- // This instance is now the leader
- //
- log.info("Leader Elected for topic {}", topicName);
-
- updateTerm();
-
- leader = localNodeId;
- leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
-
- clusterCommunicator.broadcastIncludeSelf(
- leadershipEvent,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
- }
-
- // Sleep forever until interrupted
- Thread.sleep(Long.MAX_VALUE);
- } catch (InterruptedException e) {
- //
- // Thread interrupted. Either shutdown or run for
- // re-election.
- //
- log.debug("Leader Interrupted for topic {}",
- topicName);
-
- } finally {
- synchronized (this) {
- // If we reach here, we should release the leadership
- log.debug("Leader Lock Released for topic {}", topicName);
- if ((leader != null) &&
- leader.equals(localNodeId)) {
- leader = null;
- }
- leadershipEvent = new LeadershipEvent(
- LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(topicName, localNodeId, myLastLeaderTerm, 0));
-
- clusterCommunicator.broadcastIncludeSelf(
- leadershipEvent,
- LEADERSHIP_EVENT_MESSAGE_SUBJECT,
- SERIALIZER::encode);
-
- if (leaderLock.isLockedByCurrentThread()) {
- leaderLock.unlock();
- }
- }
- }
- }
- isRunningForLeadership = false;
- }
-
- // Globally guarded by the leadership lock for this term
- // Locally guarded by synchronized (this)
- private void updateTerm() {
- long oldTerm = term.get();
- long newTerm = term.incrementAndGet();
- myLastLeaderTerm = newTerm;
- log.debug("Topic {} updated term from {} to {}", topicName,
- oldTerm, newTerm);
- }
- }
-
- private class InternalLeadershipEventListener implements ClusterMessageHandler {
-
- @Override
- public void handle(ClusterMessage message) {
- LeadershipEvent leadershipEvent =
- SERIALIZER.decode(message.payload());
-
- log.trace("Leadership Event: time = {} type = {} event = {}",
- leadershipEvent.time(), leadershipEvent.type(),
- leadershipEvent);
- //
- // If there is no entry for the topic, then create a new one to
- // keep track of the leadership, but don't run for leadership itself.
- //
- String topicName = leadershipEvent.subject().topic();
- Topic topic = topics.get(topicName);
- if (topic == null) {
- topic = new Topic(topicName);
- Topic oldTopic = topics.putIfAbsent(topicName, topic);
- if (oldTopic == null) {
- // encountered new topic, start periodic processing
- topic.start();
- } else {
- topic = oldTopic;
- }
- }
- topic.receivedLeadershipEvent(leadershipEvent);
- eventDispatcher.post(leadershipEvent);
- }
- }
-
- @Override
- public Map<String, List<NodeId>> getCandidates() {
- return null;
- }
-
- @Override
- public List<NodeId> getCandidates(String path) {
- return null;
- }
-
- @Override
- public boolean stepdown(String path) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean makeTopCandidate(String path, NodeId nodeId) {
- throw new UnsupportedOperationException();
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
deleted file mode 100644
index c98dd49..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.core.impl;
-
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.putIfAbsent;
-
-import com.google.common.collect.ImmutableSet;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryListener;
-import com.hazelcast.core.IAtomicLong;
-import com.hazelcast.core.MapEvent;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.ApplicationIdStore;
-import org.onosproject.core.DefaultApplicationId;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Simple implementation of the application ID registry using in-memory
- * structures.
- */
-@Component(immediate = false, enabled = false)
-@Service
-public class DistributedApplicationIdStore
- extends AbstractHazelcastStore<AppIdEvent, AppIdStoreDelegate>
- implements ApplicationIdStore {
-
- protected IAtomicLong lastAppId;
- protected SMap<String, DefaultApplicationId> appIdsByName;
-
- protected Map<Short, DefaultApplicationId> appIds = new ConcurrentHashMap<>();
-
- private String listenerId;
-
-
- @Override
- @Activate
- public void activate() {
- super.activate();
-
- this.serializer = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .build();
- }
- };
-
- lastAppId = theInstance.getAtomicLong("applicationId");
-
- appIdsByName = new SMap<>(theInstance.<byte[], byte[]>getMap("appIdsByName"), this.serializer);
- listenerId = appIdsByName.addEntryListener((new RemoteAppIdEventHandler()), true);
-
- primeAppIds();
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- appIdsByName.removeEntryListener(listenerId);
- log.info("Stopped");
- }
-
- @Override
- public Set<ApplicationId> getAppIds() {
- return ImmutableSet.<ApplicationId>copyOf(appIds.values());
- }
-
- @Override
- public ApplicationId getAppId(Short id) {
- ApplicationId appId = appIds.get(id);
- if (appId == null) {
- primeAppIds();
- return appIds.get(id);
- }
- return appId;
- }
-
- @Override
- public ApplicationId getAppId(String name) {
- return appIdsByName.get(name);
- }
-
- private void primeAppIds() {
- for (DefaultApplicationId appId : appIdsByName.values()) {
- appIds.putIfAbsent(appId.id(), appId);
- }
- }
-
- @Override
- public ApplicationId registerApplication(String name) {
- DefaultApplicationId appId = appIdsByName.get(name);
- if (appId == null) {
- int id = (int) lastAppId.getAndIncrement();
- appId = putIfAbsent(appIdsByName, name,
- new DefaultApplicationId(id, name));
- }
- return appId;
- }
-
- private class RemoteAppIdEventHandler implements EntryListener<String, DefaultApplicationId> {
- @Override
- public void entryAdded(EntryEvent<String, DefaultApplicationId> event) {
- DefaultApplicationId appId = event.getValue();
- appIds.put(appId.id(), appId);
- }
-
- @Override
- public void entryRemoved(EntryEvent<String, DefaultApplicationId> event) {
- }
-
- @Override
- public void entryUpdated(EntryEvent<String, DefaultApplicationId> event) {
- entryAdded(event);
- }
-
- @Override
- public void entryEvicted(EntryEvent<String, DefaultApplicationId> event) {
- }
-
- @Override
- public void mapEvicted(MapEvent event) {
- }
-
- @Override
- public void mapCleared(MapEvent event) {
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedIdBlockStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedIdBlockStore.java
deleted file mode 100644
index 260cef1..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedIdBlockStore.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.core.impl;
-
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.IAtomicLong;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.core.IdBlock;
-import org.onosproject.core.IdBlockStore;
-import org.onosproject.store.hz.StoreService;
-
-import java.util.Map;
-
-/**
- * Distributed implementation of id block store using Hazelcast.
- */
-@Component(immediate = false, enabled = false)
-@Service
-public class DistributedIdBlockStore implements IdBlockStore {
-
- private static final long DEFAULT_BLOCK_SIZE = 0x100000L;
-
- protected Map<String, IAtomicLong> topicBlocks;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StoreService storeService;
-
- protected HazelcastInstance theInstance;
-
- @Activate
- public void activate() {
- theInstance = storeService.getHazelcastInstance();
- }
-
- @Override
- public IdBlock getIdBlock(String topic) {
- Long blockBase = theInstance.getAtomicLong(topic).getAndAdd(DEFAULT_BLOCK_SIZE);
- return new IdBlock(blockBase, DEFAULT_BLOCK_SIZE);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
deleted file mode 100644
index 8b7be9a..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ /dev/null
@@ -1,875 +0,0 @@
- /*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.flow.impl;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-import com.hazelcast.core.IMap;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.BoundedThreadPool;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.NewConcurrentHashMap;
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.core.CoreService;
-import org.onosproject.core.IdGenerator;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.CompletedBatchOperation;
-import org.onosproject.net.flow.DefaultFlowEntry;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowEntry.FlowEntryState;
-import org.onosproject.net.flow.FlowId;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleBatchEntry;
-import org.onosproject.net.flow.FlowRuleBatchEntry.FlowRuleOperation;
-import org.onosproject.net.flow.FlowRuleBatchEvent;
-import org.onosproject.net.flow.FlowRuleBatchOperation;
-import org.onosproject.net.flow.FlowRuleBatchRequest;
-import org.onosproject.net.flow.FlowRuleEvent;
-import org.onosproject.net.flow.FlowRuleEvent.Type;
-import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flow.FlowRuleStore;
-import org.onosproject.net.flow.FlowRuleStoreDelegate;
-import org.onosproject.net.flow.StoredFlowEntry;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.flow.ReplicaInfo;
-import org.onosproject.store.flow.ReplicaInfoEvent;
-import org.onosproject.store.flow.ReplicaInfoEventListener;
-import org.onosproject.store.flow.ReplicaInfoService;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.onosproject.store.serializers.impl.DistributedStoreSerializers;
-import org.osgi.service.component.ComponentContext;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Dictionary;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
-import static org.onosproject.store.flow.impl.FlowStoreMessageSubjects.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of flow rules using a distributed state management protocol.
- */
-@Component(immediate = false, enabled = false)
-@Service
-public class DistributedFlowRuleStore
- extends AbstractHazelcastStore<FlowRuleBatchEvent, FlowRuleStoreDelegate>
- implements FlowRuleStore {
-
- private final Logger log = getLogger(getClass());
-
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
- private static final boolean DEFAULT_BACKUP_ENABLED = true;
- private static final long FLOW_RULE_STORE_TIMEOUT_MILLIS = 5000;
-
- @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
- label = "Number of threads in the message handler pool")
- private int msgHandlerPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
-
- @Property(name = "backupEnabled", boolValue = DEFAULT_BACKUP_ENABLED,
- label = "Indicates whether backups are enabled or not")
- private boolean backupEnabled = DEFAULT_BACKUP_ENABLED;
-
- private InternalFlowTable flowTable = new InternalFlowTable();
-
- /*private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
- flowEntries = new ConcurrentHashMap<>();*/
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ReplicaInfoService replicaInfoManager;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected CoreService coreService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ComponentConfigService configService;
-
- private Map<Long, NodeId> pendingResponses = Maps.newConcurrentMap();
-
- // Cache of SMaps used for backup data. each SMap contain device flow table
- private LoadingCache<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> smaps;
-
- private ExecutorService messageHandlingExecutor;
-
- private final ExecutorService backupExecutors =
- BoundedThreadPool.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
- //Executors.newSingleThreadExecutor(groupedThreads("onos/flow", "async-backups"));
-
- private boolean syncBackup = false;
-
- protected static final StoreSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(DistributedStoreSerializers.STORE_COMMON)
- .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
- .register(FlowRuleEvent.class)
- .register(FlowRuleEvent.Type.class)
- .build();
- }
- };
-
- private ReplicaInfoEventListener replicaInfoEventListener;
-
- private IdGenerator idGenerator;
-
- private NodeId local;
-
- @Activate
- public void activate(ComponentContext context) {
- configService.registerProperties(getClass());
- super.serializer = SERIALIZER;
- super.theInstance = storeService.getHazelcastInstance();
-
- idGenerator = coreService.getIdGenerator(FlowRuleService.FLOW_OP_TOPIC);
-
- local = clusterService.getLocalNode().id();
-
- // Cache to create SMap on demand
- smaps = CacheBuilder.newBuilder()
- .softValues()
- .build(new SMapLoader());
-
- messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
-
- registerMessageHandlers(messageHandlingExecutor);
-
- replicaInfoEventListener = new InternalReplicaInfoEventListener();
-
- replicaInfoManager.addListener(replicaInfoEventListener);
-
- logConfig("Started");
- }
-
- @Deactivate
- public void deactivate(ComponentContext context) {
- configService.unregisterProperties(getClass(), false);
- unregisterMessageHandlers();
- messageHandlingExecutor.shutdownNow();
- replicaInfoManager.removeListener(replicaInfoEventListener);
- log.info("Stopped");
- }
-
- @Modified
- public void modified(ComponentContext context) {
- if (context == null) {
- backupEnabled = DEFAULT_BACKUP_ENABLED;
- logConfig("Default config");
- return;
- }
-
- Dictionary properties = context.getProperties();
- int newPoolSize;
- boolean newBackupEnabled;
- try {
- String s = get(properties, "msgHandlerPoolSize");
- newPoolSize = isNullOrEmpty(s) ? msgHandlerPoolSize : Integer.parseInt(s.trim());
-
- s = get(properties, "backupEnabled");
- newBackupEnabled = isNullOrEmpty(s) ? backupEnabled : Boolean.parseBoolean(s.trim());
-
- } catch (NumberFormatException | ClassCastException e) {
- newPoolSize = MESSAGE_HANDLER_THREAD_POOL_SIZE;
- newBackupEnabled = DEFAULT_BACKUP_ENABLED;
- }
-
- if (newBackupEnabled != backupEnabled) {
- backupEnabled = newBackupEnabled;
- }
- if (newPoolSize != msgHandlerPoolSize) {
- msgHandlerPoolSize = newPoolSize;
- ExecutorService oldMsgHandler = messageHandlingExecutor;
- messageHandlingExecutor = Executors.newFixedThreadPool(
- msgHandlerPoolSize, groupedThreads("onos/store/flow", "message-handlers"));
-
- // replace previously registered handlers.
- registerMessageHandlers(messageHandlingExecutor);
- oldMsgHandler.shutdown();
- }
- logConfig("Reconfigured");
- }
-
- private void registerMessageHandlers(ExecutorService executor) {
-
- clusterCommunicator.addSubscriber(APPLY_BATCH_FLOWS, new OnStoreBatch(local), executor);
-
- clusterCommunicator.addSubscriber(REMOTE_APPLY_COMPLETED, new ClusterMessageHandler() {
- @Override
- public void handle(ClusterMessage message) {
- FlowRuleBatchEvent event = SERIALIZER.decode(message.payload());
- log.trace("received completed notification for {}", event);
- notifyDelegate(event);
- }
- }, executor);
-
- clusterCommunicator.addSubscriber(GET_FLOW_ENTRY, new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- FlowRule rule = SERIALIZER.decode(message.payload());
- log.trace("received get flow entry request for {}", rule);
- FlowEntry flowEntry = flowTable.getFlowEntry(rule); //getFlowEntryInternal(rule);
- message.respond(SERIALIZER.encode(flowEntry));
- }
- }, executor);
-
- clusterCommunicator.addSubscriber(GET_DEVICE_FLOW_ENTRIES, new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- DeviceId deviceId = SERIALIZER.decode(message.payload());
- log.trace("Received get flow entries request for {} from {}", deviceId, message.sender());
- Set<FlowEntry> flowEntries = flowTable.getFlowEntries(deviceId);
- message.respond(SERIALIZER.encode(flowEntries));
- }
- }, executor);
-
- clusterCommunicator.addSubscriber(REMOVE_FLOW_ENTRY, new ClusterMessageHandler() {
-
- @Override
- public void handle(ClusterMessage message) {
- FlowEntry rule = SERIALIZER.decode(message.payload());
- log.trace("received get flow entry request for {}", rule);
- FlowRuleEvent event = removeFlowRuleInternal(rule);
- message.respond(SERIALIZER.encode(event));
- }
- }, executor);
- }
-
- private void unregisterMessageHandlers() {
- clusterCommunicator.removeSubscriber(REMOVE_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(GET_DEVICE_FLOW_ENTRIES);
- clusterCommunicator.removeSubscriber(GET_FLOW_ENTRY);
- clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
- clusterCommunicator.removeSubscriber(REMOTE_APPLY_COMPLETED);
- }
-
- private void logConfig(String prefix) {
- log.info("{} with msgHandlerPoolSize = {}; backupEnabled = {}",
- prefix, msgHandlerPoolSize, backupEnabled);
- }
-
-
- // This is not a efficient operation on a distributed sharded
- // flow store. We need to revisit the need for this operation or at least
- // make it device specific.
- @Override
- public int getFlowRuleCount() {
- // implementing in-efficient operation for debugging purpose.
- int sum = 0;
- for (Device device : deviceService.getDevices()) {
- final DeviceId did = device.id();
- sum += Iterables.size(getFlowEntries(did));
- }
- return sum;
- }
-
- @Override
- public FlowEntry getFlowEntry(FlowRule rule) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
-
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to getFlowEntry: No master for {}", rule.deviceId());
- return null;
- }
-
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return flowTable.getFlowEntry(rule);
- }
-
- log.trace("Forwarding getFlowEntry to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), rule.deviceId());
-
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(rule,
- FlowStoreMessageSubjects.GET_FLOW_ENTRY,
- SERIALIZER::encode,
- SERIALIZER::decode,
- replicaInfo.master().get()),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- null);
- }
-
- @Override
- public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
-
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
-
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to getFlowEntries: No master for {}", deviceId);
- return Collections.emptyList();
- }
-
- if (replicaInfo.master().get().equals(clusterService.getLocalNode().id())) {
- return flowTable.getFlowEntries(deviceId);
- }
-
- log.trace("Forwarding getFlowEntries to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
-
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(deviceId,
- FlowStoreMessageSubjects.GET_DEVICE_FLOW_ENTRIES,
- SERIALIZER::encode,
- SERIALIZER::decode,
- replicaInfo.master().get()),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptyList());
- }
-
- @Override
- public void storeFlowRule(FlowRule rule) {
- storeBatch(new FlowRuleBatchOperation(
- Collections.singletonList(new FlowRuleBatchEntry(FlowRuleOperation.ADD, rule)),
- rule.deviceId(), idGenerator.getNewId()));
- }
-
- @Override
- public void storeBatch(FlowRuleBatchOperation operation) {
-
-
- if (operation.getOperations().isEmpty()) {
-
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(),
- operation.deviceId())));
- return;
- }
-
- DeviceId deviceId = operation.deviceId();
-
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
-
- if (!replicaInfo.master().isPresent()) {
- log.warn("No master for {} : flows will be marked for removal", deviceId);
-
- updateStoreInternal(operation);
-
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
- return;
- }
-
- final NodeId local = clusterService.getLocalNode().id();
- if (replicaInfo.master().get().equals(local)) {
- storeBatchInternal(operation);
- return;
- }
-
- log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
-
- if (!clusterCommunicator.unicast(operation,
- APPLY_BATCH_FLOWS, SERIALIZER::encode,
- replicaInfo.master().get())) {
- log.warn("Failed to storeBatch: {} to {}", operation, replicaInfo.master());
-
- Set<FlowRule> allFailures = operation.getOperations().stream()
- .map(op -> op.target())
- .collect(Collectors.toSet());
-
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(false, allFailures, deviceId)));
- return;
- }
- }
-
- private void storeBatchInternal(FlowRuleBatchOperation operation) {
-
- final DeviceId did = operation.deviceId();
- //final Collection<FlowEntry> ft = flowTable.getFlowEntries(did);
- Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
- if (currentOps.isEmpty()) {
- batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), did)));
- return;
- }
- updateBackup(did, currentOps);
-
- notifyDelegate(FlowRuleBatchEvent.requested(new
- FlowRuleBatchRequest(operation.id(),
- currentOps), operation.deviceId()));
-
- }
-
- private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
- return operation.getOperations().stream().map(
- op -> {
- StoredFlowEntry entry;
- switch (op.operator()) {
- case ADD:
- entry = new DefaultFlowEntry(op.target());
- // always add requested FlowRule
- // Note: 2 equal FlowEntry may have different treatment
- flowTable.remove(entry.deviceId(), entry);
- flowTable.add(entry);
-
- return op;
- case REMOVE:
- entry = flowTable.getFlowEntry(op.target());
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- return op;
- }
- break;
- case MODIFY:
- //TODO: figure this out at some point
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
- }
- ).filter(op -> op != null).collect(Collectors.toSet());
- }
-
- private void updateBackup(DeviceId deviceId, final Set<FlowRuleBatchEntry> entries) {
- if (!backupEnabled) {
- return;
- }
-
- Future<?> backup = backupExecutors.submit(new UpdateBackup(deviceId, entries));
-
- if (syncBackup) {
- // wait for backup to complete
- try {
- backup.get();
- } catch (InterruptedException | ExecutionException e) {
- log.error("Failed to create backups", e);
- }
- }
- }
-
- @Override
- public void deleteFlowRule(FlowRule rule) {
- storeBatch(
- new FlowRuleBatchOperation(
- Collections.singletonList(
- new FlowRuleBatchEntry(
- FlowRuleOperation.REMOVE,
- rule)), rule.deviceId(), idGenerator.getNewId()));
- }
-
- @Override
- public FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(rule.deviceId());
- final NodeId localId = clusterService.getLocalNode().id();
- if (localId.equals(replicaInfo.master().orNull())) {
- return addOrUpdateFlowRuleInternal(rule);
- }
-
- log.warn("Tried to update FlowRule {} state,"
- + " while the Node was not the master.", rule);
- return null;
- }
-
- private FlowRuleEvent addOrUpdateFlowRuleInternal(FlowEntry rule) {
- final DeviceId did = rule.deviceId();
-
-
- // check if this new rule is an update to an existing entry
- StoredFlowEntry stored = flowTable.getFlowEntry(rule);
- if (stored != null) {
- stored.setBytes(rule.bytes());
- stored.setLife(rule.life());
- stored.setPackets(rule.packets());
- if (stored.state() == FlowEntryState.PENDING_ADD) {
- stored.setState(FlowEntryState.ADDED);
- FlowRuleBatchEntry entry =
- new FlowRuleBatchEntry(FlowRuleOperation.ADD, stored);
- updateBackup(did, Sets.newHashSet(entry));
- return new FlowRuleEvent(Type.RULE_ADDED, rule);
- }
- return new FlowRuleEvent(Type.RULE_UPDATED, rule);
- }
-
- // TODO: Confirm if this behavior is correct. See SimpleFlowRuleStore
- // TODO: also update backup if the behavior is correct.
- flowTable.add(rule);
-
-
- return null;
-
- }
-
- @Override
- public FlowRuleEvent removeFlowRule(FlowEntry rule) {
- final DeviceId deviceId = rule.deviceId();
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
-
- final NodeId localId = clusterService.getLocalNode().id();
- if (localId.equals(replicaInfo.master().orNull())) {
- // bypass and handle it locally
- return removeFlowRuleInternal(rule);
- }
-
- if (!replicaInfo.master().isPresent()) {
- log.warn("Failed to removeFlowRule: No master for {}", deviceId);
- // TODO: revisit if this should be null (="no-op") or Exception
- return null;
- }
-
- log.trace("Forwarding removeFlowRule to {}, which is the primary (master) for device {}",
- replicaInfo.master().orNull(), deviceId);
-
- return Futures.get(clusterCommunicator.sendAndReceive(
- rule,
- REMOVE_FLOW_ENTRY,
- SERIALIZER::encode,
- SERIALIZER::decode,
- replicaInfo.master().get()),
- FLOW_RULE_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- RuntimeException.class);
- }
-
- private FlowRuleEvent removeFlowRuleInternal(FlowEntry rule) {
- final DeviceId deviceId = rule.deviceId();
- // This is where one could mark a rule as removed and still keep it in the store.
- final boolean removed = flowTable.remove(deviceId, rule); //flowEntries.remove(deviceId, rule);
- FlowRuleBatchEntry entry =
- new FlowRuleBatchEntry(FlowRuleOperation.REMOVE, rule);
- updateBackup(deviceId, Sets.newHashSet(entry));
- if (removed) {
- return new FlowRuleEvent(RULE_REMOVED, rule);
- } else {
- return null;
- }
-
- }
-
- @Override
- public void batchOperationComplete(FlowRuleBatchEvent event) {
- //FIXME: need a per device pending response
-
- NodeId nodeId = pendingResponses.remove(event.subject().batchId());
- if (nodeId == null) {
- notifyDelegate(event);
- } else {
- // TODO check unicast return value
- clusterCommunicator.unicast(event, REMOTE_APPLY_COMPLETED, SERIALIZER::encode, nodeId);
- //error log: log.warn("Failed to respond to peer for batch operation result");
- }
- }
-
- private void loadFromBackup(final DeviceId did) {
- if (!backupEnabled) {
- return;
- }
- log.info("We are now the master for {}. Will load flow rules from backup", did);
- try {
- log.debug("Loading FlowRules for {} from backups", did);
- SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(did);
- for (Entry<FlowId, ImmutableList<StoredFlowEntry>> e
- : backupFlowTable.entrySet()) {
-
- log.trace("loading {}", e.getValue());
- for (StoredFlowEntry entry : e.getValue()) {
- flowTable.getFlowEntriesById(entry).remove(entry);
- flowTable.getFlowEntriesById(entry).add(entry);
-
-
- }
- }
- } catch (ExecutionException e) {
- log.error("Failed to load backup flowtable for {}", did, e);
- }
- }
-
- private void removeFromPrimary(final DeviceId did) {
- flowTable.clearDevice(did);
- }
-
-
- private final class OnStoreBatch implements ClusterMessageHandler {
- private final NodeId local;
-
- private OnStoreBatch(NodeId local) {
- this.local = local;
- }
-
- @Override
- public void handle(final ClusterMessage message) {
- FlowRuleBatchOperation operation = SERIALIZER.decode(message.payload());
- log.debug("received batch request {}", operation);
-
- final DeviceId deviceId = operation.deviceId();
- ReplicaInfo replicaInfo = replicaInfoManager.getReplicaInfoFor(deviceId);
- if (!local.equals(replicaInfo.master().orNull())) {
-
- Set<FlowRule> failures = new HashSet<>(operation.size());
- for (FlowRuleBatchEntry op : operation.getOperations()) {
- failures.add(op.target());
- }
- CompletedBatchOperation allFailed = new CompletedBatchOperation(false, failures, deviceId);
- // This node is no longer the master, respond as all failed.
- // TODO: we might want to wrap response in envelope
- // to distinguish sw programming failure and hand over
- // it make sense in the latter case to retry immediately.
- message.respond(SERIALIZER.encode(allFailed));
- return;
- }
-
-
- pendingResponses.put(operation.id(), message.sender());
- storeBatchInternal(operation);
-
- }
- }
-
- private final class SMapLoader
- extends CacheLoader<DeviceId, SMap<FlowId, ImmutableList<StoredFlowEntry>>> {
-
- @Override
- public SMap<FlowId, ImmutableList<StoredFlowEntry>> load(DeviceId id)
- throws Exception {
- IMap<byte[], byte[]> map = theInstance.getMap("flowtable_" + id.toString());
- return new SMap<FlowId, ImmutableList<StoredFlowEntry>>(map, SERIALIZER);
- }
- }
-
- private final class InternalReplicaInfoEventListener
- implements ReplicaInfoEventListener {
-
- @Override
- public void event(ReplicaInfoEvent event) {
- final NodeId local = clusterService.getLocalNode().id();
- final DeviceId did = event.subject();
- final ReplicaInfo rInfo = event.replicaInfo();
-
- switch (event.type()) {
- case MASTER_CHANGED:
- if (local.equals(rInfo.master().orNull())) {
- // This node is the new master, populate local structure
- // from backup
- loadFromBackup(did);
- }
- //else {
- // This node is no longer the master holder,
- // clean local structure
- //removeFromPrimary(did);
- // TODO: probably should stop pending backup activities in
- // executors to avoid overwriting with old value
- //}
- break;
- default:
- break;
-
- }
- }
- }
-
- // Task to update FlowEntries in backup HZ store
- private final class UpdateBackup implements Runnable {
-
- private final DeviceId deviceId;
- private final Set<FlowRuleBatchEntry> ops;
-
-
- public UpdateBackup(DeviceId deviceId,
- Set<FlowRuleBatchEntry> ops) {
- this.deviceId = checkNotNull(deviceId);
- this.ops = checkNotNull(ops);
-
- }
-
- @Override
- public void run() {
- try {
- log.trace("update backup {} {}", deviceId, ops
- );
- final SMap<FlowId, ImmutableList<StoredFlowEntry>> backupFlowTable = smaps.get(deviceId);
-
-
- ops.stream().forEach(
- op -> {
- final FlowRule entry = op.target();
- final FlowId id = entry.id();
- ImmutableList<StoredFlowEntry> original = backupFlowTable.get(id);
- List<StoredFlowEntry> list = new ArrayList<>();
- if (original != null) {
- list.addAll(original);
- }
- list.remove(op.target());
- if (op.operator() == FlowRuleOperation.ADD) {
- list.add((StoredFlowEntry) entry);
- }
-
- ImmutableList<StoredFlowEntry> newValue = ImmutableList.copyOf(list);
- boolean success;
- if (original == null) {
- success = (backupFlowTable.putIfAbsent(id, newValue) == null);
- } else {
- success = backupFlowTable.replace(id, original, newValue);
- }
- if (!success) {
- log.error("Updating backup failed.");
- }
-
- }
- );
- } catch (ExecutionException e) {
- log.error("Failed to write to backups", e);
- }
-
- }
- }
-
- private class InternalFlowTable {
-
- /*
- TODO: This needs to be cleaned up. Perhaps using the eventually consistent
- map when it supports distributed to a sequence of instances.
- */
-
-
- private final ConcurrentMap<DeviceId, ConcurrentMap<FlowId, Set<StoredFlowEntry>>>
- flowEntries = new ConcurrentHashMap<>();
-
-
- private NewConcurrentHashMap<FlowId, Set<StoredFlowEntry>> lazyEmptyFlowTable() {
- return NewConcurrentHashMap.<FlowId, Set<StoredFlowEntry>>ifNeeded();
- }
-
- /**
- * Returns the flow table for specified device.
- *
- * @param deviceId identifier of the device
- * @return Map representing Flow Table of given device.
- */
- private ConcurrentMap<FlowId, Set<StoredFlowEntry>> getFlowTable(DeviceId deviceId) {
- return createIfAbsentUnchecked(flowEntries,
- deviceId, lazyEmptyFlowTable());
- }
-
- private Set<StoredFlowEntry> getFlowEntriesInternal(DeviceId deviceId, FlowId flowId) {
- final ConcurrentMap<FlowId, Set<StoredFlowEntry>> flowTable = getFlowTable(deviceId);
- Set<StoredFlowEntry> r = flowTable.get(flowId);
- if (r == null) {
- final Set<StoredFlowEntry> concurrentlyAdded;
- r = new CopyOnWriteArraySet<>();
- concurrentlyAdded = flowTable.putIfAbsent(flowId, r);
- if (concurrentlyAdded != null) {
- return concurrentlyAdded;
- }
- }
- return r;
- }
-
- private StoredFlowEntry getFlowEntryInternal(FlowRule rule) {
- for (StoredFlowEntry f : getFlowEntriesInternal(rule.deviceId(), rule.id())) {
- if (f.equals(rule)) {
- return f;
- }
- }
- return null;
- }
-
- private Set<FlowEntry> getFlowEntriesInternal(DeviceId deviceId) {
- return getFlowTable(deviceId).values().stream()
- .flatMap((list -> list.stream())).collect(Collectors.toSet());
-
- }
-
-
- public StoredFlowEntry getFlowEntry(FlowRule rule) {
- return getFlowEntryInternal(rule);
- }
-
- public Set<FlowEntry> getFlowEntries(DeviceId deviceId) {
- return getFlowEntriesInternal(deviceId);
- }
-
- public Set<StoredFlowEntry> getFlowEntriesById(FlowEntry entry) {
- return getFlowEntriesInternal(entry.deviceId(), entry.id());
- }
-
- public void add(FlowEntry rule) {
- ((CopyOnWriteArraySet)
- getFlowEntriesInternal(rule.deviceId(), rule.id())).add(rule);
- }
-
- public boolean remove(DeviceId deviceId, FlowEntry rule) {
- return ((CopyOnWriteArraySet)
- getFlowEntriesInternal(deviceId, rule.id())).remove(rule);
- //return flowEntries.remove(deviceId, rule);
- }
-
- public void clearDevice(DeviceId did) {
- flowEntries.remove(did);
- }
- }
-
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/AbsentInvalidatingLoadingCache.java b/core/store/dist/src/main/java/org/onosproject/store/hz/AbsentInvalidatingLoadingCache.java
deleted file mode 100644
index 545edda..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/AbsentInvalidatingLoadingCache.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-
-import com.google.common.base.Optional;
-import com.google.common.cache.ForwardingLoadingCache.SimpleForwardingLoadingCache;
-import com.google.common.cache.LoadingCache;
-
-/**
- * Wrapper around LoadingCache to handle negative hit scenario.
- * <p>
- * When the LoadingCache returned Absent,
- * this implementation will invalidate the entry immediately to avoid
- * caching negative hits.
- *
- * @param <K> Cache key type
- * @param <V> Cache value type. (Optional{@literal <V>})
- */
-public class AbsentInvalidatingLoadingCache<K, V> extends
- SimpleForwardingLoadingCache<K, Optional<V>> {
-
- /**
- * Constructor.
- *
- * @param delegate actual {@link LoadingCache} to delegate loading.
- */
- public AbsentInvalidatingLoadingCache(LoadingCache<K, Optional<V>> delegate) {
- super(delegate);
- }
-
- @Override
- public Optional<V> get(K key) throws ExecutionException {
- Optional<V> v = super.get(key);
- if (!v.isPresent()) {
- invalidate(key);
- }
- return v;
- }
-
- @Override
- public Optional<V> getUnchecked(K key) {
- Optional<V> v = super.getUnchecked(key);
- if (!v.isPresent()) {
- invalidate(key);
- }
- return v;
- }
-
- @Override
- public Optional<V> apply(K key) {
- return getUnchecked(key);
- }
-
- @Override
- public Optional<V> getIfPresent(Object key) {
- Optional<V> v = super.getIfPresent(key);
- if (!v.isPresent()) {
- invalidate(key);
- }
- return v;
- }
-
- @Override
- public Optional<V> get(K key, Callable<? extends Optional<V>> valueLoader)
- throws ExecutionException {
-
- Optional<V> v = super.get(key, valueLoader);
- if (!v.isPresent()) {
- invalidate(key);
- }
- return v;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/AbstractHazelcastStore.java b/core/store/dist/src/main/java/org/onosproject/store/hz/AbstractHazelcastStore.java
deleted file mode 100644
index fc2acd3..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/AbstractHazelcastStore.java
+++ /dev/null
@@ -1,259 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import com.google.common.base.Optional;
-import com.google.common.cache.LoadingCache;
-import com.hazelcast.core.EntryAdapter;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.core.MapEvent;
-import com.hazelcast.core.Member;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.event.Event;
-import org.onosproject.store.AbstractStore;
-import org.onosproject.store.StoreDelegate;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.StoreSerializer;
-import org.slf4j.Logger;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Abstraction of a distributed store based on Hazelcast.
- */
-@Component
-public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
- extends AbstractStore<E, D> {
-
- protected final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected StoreService storeService;
-
- protected StoreSerializer serializer;
-
- protected HazelcastInstance theInstance;
-
- @Activate
- public void activate() {
- serializer = new KryoSerializer();
- theInstance = storeService.getHazelcastInstance();
- }
-
- /**
- * Serializes the specified object using the backing store service.
- *
- * @param obj object to be serialized
- * @return serialized object
- */
- protected byte[] serialize(Object obj) {
- return serializer.encode(obj);
- }
-
- /**
- * Deserializes the specified object using the backing store service.
- *
- * @param bytes bytes to be deserialized
- * @param <T> type of object
- * @return deserialized object
- */
- protected <T> T deserialize(byte[] bytes) {
- return serializer.decode(bytes);
- }
-
-
- /**
- * An IMap entry listener, which reflects each remote event to the cache.
- *
- * @param <K> IMap key type after deserialization
- * @param <V> IMap value type after deserialization
- */
- public class RemoteCacheEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
-
- private final Member localMember;
- private LoadingCache<K, Optional<V>> cache;
-
- /**
- * Constructor.
- *
- * @param cache cache to update
- */
- public RemoteCacheEventHandler(LoadingCache<K, Optional<V>> cache) {
- this.localMember = theInstance.getCluster().getLocalMember();
- this.cache = checkNotNull(cache);
- }
-
- @Override
- public void mapCleared(MapEvent event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- cache.invalidateAll();
- }
-
- @Override
- public void entryAdded(EntryEvent<byte[], byte[]> event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- K key = deserialize(event.getKey());
- V newVal = deserialize(event.getValue());
- Optional<V> newValue = Optional.of(newVal);
- cache.asMap().putIfAbsent(key, newValue);
- onAdd(key, newVal);
- }
-
- @Override
- public void entryUpdated(EntryEvent<byte[], byte[]> event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- K key = deserialize(event.getKey());
- V oldVal = deserialize(event.getOldValue());
- Optional<V> oldValue = Optional.fromNullable(oldVal);
- V newVal = deserialize(event.getValue());
- Optional<V> newValue = Optional.of(newVal);
- cache.asMap().replace(key, oldValue, newValue);
- onUpdate(key, oldVal, newVal);
- }
-
- @Override
- public void entryRemoved(EntryEvent<byte[], byte[]> event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- K key = deserialize(event.getKey());
- V val = deserialize(event.getOldValue());
- cache.invalidate(key);
- onRemove(key, val);
- }
-
- /**
- * Cache entry addition hook.
- *
- * @param key new key
- * @param newVal new value
- */
- protected void onAdd(K key, V newVal) {
- }
-
- /**
- * Cache entry update hook.
- *
- * @param key new key
- * @param oldValue old value
- * @param newVal new value
- */
- protected void onUpdate(K key, V oldValue, V newVal) {
- }
-
- /**
- * Cache entry remove hook.
- *
- * @param key new key
- * @param val old value
- */
- protected void onRemove(K key, V val) {
- }
- }
-
- /**
- * Distributed object remote event entry listener.
- *
- * @param <K> Entry key type after deserialization
- * @param <V> Entry value type after deserialization
- */
- public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
-
- private final Member localMember;
-
- public RemoteEventHandler() {
- this.localMember = theInstance.getCluster().getLocalMember();
- }
- @Override
- public void entryAdded(EntryEvent<byte[], byte[]> event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- K key = deserialize(event.getKey());
- V newVal = deserialize(event.getValue());
- onAdd(key, newVal);
- }
-
- @Override
- public void entryRemoved(EntryEvent<byte[], byte[]> event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- K key = deserialize(event.getKey());
- V val = deserialize(event.getValue());
- onRemove(key, val);
- }
-
- @Override
- public void entryUpdated(EntryEvent<byte[], byte[]> event) {
- if (localMember.equals(event.getMember())) {
- // ignore locally triggered event
- return;
- }
- K key = deserialize(event.getKey());
- V oldVal = deserialize(event.getOldValue());
- V newVal = deserialize(event.getValue());
- onUpdate(key, oldVal, newVal);
- }
-
- /**
- * Remote entry addition hook.
- *
- * @param key new key
- * @param newVal new value
- */
- protected void onAdd(K key, V newVal) {
- }
-
- /**
- * Remote entry update hook.
- *
- * @param key new key
- * @param oldValue old value
- * @param newVal new value
- */
- protected void onUpdate(K key, V oldValue, V newVal) {
- }
-
- /**
- * Remote entry remove hook.
- *
- * @param key new key
- * @param val old value
- */
- protected void onRemove(K key, V val) {
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/OptionalCacheLoader.java b/core/store/dist/src/main/java/org/onosproject/store/hz/OptionalCacheLoader.java
deleted file mode 100644
index ca45cf7..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/OptionalCacheLoader.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-import com.google.common.base.Optional;
-import com.google.common.cache.CacheLoader;
-import com.hazelcast.core.IMap;
-
-/**
- * CacheLoader to wrap Map value with Optional,
- * to handle negative hit on underlying IMap.
- *
- * @param <K> IMap key type after deserialization
- * @param <V> IMap value type after deserialization
- */
-public final class OptionalCacheLoader<K, V> extends
- CacheLoader<K, Optional<V>> {
-
- private final StoreSerializer serializer;
- private IMap<byte[], byte[]> rawMap;
-
- /**
- * Constructor.
- *
- * @param serializer to use for serialization
- * @param rawMap underlying IMap
- */
- public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
- this.serializer = checkNotNull(serializer);
- this.rawMap = checkNotNull(rawMap);
- }
-
- @Override
- public Optional<V> load(K key) throws Exception {
- byte[] keyBytes = serializer.encode(key);
- byte[] valBytes = rawMap.get(keyBytes);
- if (valBytes == null) {
- return Optional.absent();
- }
- V dev = serializer.decode(valBytes);
- return Optional.of(dev);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/SMap.java b/core/store/dist/src/main/java/org/onosproject/store/hz/SMap.java
deleted file mode 100644
index 021849f..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/SMap.java
+++ /dev/null
@@ -1,743 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.tuple.Pair;
-import org.onosproject.store.serializers.StoreSerializer;
-
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryListener;
-import com.hazelcast.core.EntryView;
-import com.hazelcast.core.ExecutionCallback;
-import com.hazelcast.core.IMap;
-import com.hazelcast.core.MapEvent;
-import com.hazelcast.map.EntryProcessor;
-import com.hazelcast.map.MapInterceptor;
-import com.hazelcast.mapreduce.JobTracker;
-import com.hazelcast.mapreduce.aggregation.Aggregation;
-import com.hazelcast.mapreduce.aggregation.Supplier;
-import com.hazelcast.monitor.LocalMapStats;
-import com.hazelcast.query.Predicate;
-
-/**
- * Wrapper around IMap<byte[], byte[]> which serializes/deserializes
- * key and value using StoreSerializer.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class SMap<K, V> implements IMap<K, V> {
-
- private final IMap<byte[], byte[]> m;
- private final StoreSerializer serializer;
-
- /**
- * Creates a SMap instance.
- *
- * @param baseMap base IMap to use
- * @param serializer serializer to use for both key and value
- */
- public SMap(IMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
- this.m = checkNotNull(baseMap);
- this.serializer = checkNotNull(serializer);
- }
-
- @Override
- public int size() {
- return m.size();
- }
-
- @Override
- public boolean isEmpty() {
- return m.isEmpty();
- }
-
- @Override
- public void putAll(Map<? extends K, ? extends V> map) {
- Map<byte[], byte[]> sm = new IdentityHashMap<>(map.size());
- for (java.util.Map.Entry<? extends K, ? extends V> e : map.entrySet()) {
- sm.put(serializeKey(e.getKey()), serializeVal(e.getValue()));
- }
- m.putAll(sm);
- }
-
- @Deprecated
- @Override
- public Object getId() {
- return m.getId();
- }
-
- @Override
- public String getPartitionKey() {
- return m.getPartitionKey();
- }
-
- @Override
- public String getName() {
- return m.getName();
- }
-
- @Override
- public String getServiceName() {
- return m.getServiceName();
- }
-
- @Override
- public void destroy() {
- m.destroy();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return m.containsKey(serializeKey(key));
- }
-
- @Override
- public boolean containsValue(Object value) {
- return m.containsValue(serializeVal(value));
- }
-
- @Override
- public V get(Object key) {
- return deserializeVal(m.get(serializeKey(key)));
- }
-
- @Override
- public V put(K key, V value) {
- return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
- }
-
- @Override
- public V remove(Object key) {
- return deserializeVal(m.remove(serializeKey(key)));
- }
-
- @Override
- public boolean remove(Object key, Object value) {
- return m.remove(serializeKey(key), serializeVal(value));
- }
-
- @Override
- public void delete(Object key) {
- m.delete(serializeKey(key));
- }
-
- @Override
- public void flush() {
- m.flush();
- }
-
- @Override
- public Map<K, V> getAll(Set<K> keys) {
- Set<byte[]> sk = serializeKeySet(keys);
- Map<byte[], byte[]> bm = m.getAll(sk);
- Map<K, V> dsm = new HashMap<>(bm.size());
- for (java.util.Map.Entry<byte[], byte[]> e : bm.entrySet()) {
- dsm.put(deserializeKey(e.getKey()), deserializeVal(e.getValue()));
- }
- return dsm;
- }
-
- @Override
- public void loadAll(boolean replaceExistingValues) {
- m.loadAll(replaceExistingValues);
- }
-
- @Override
- public void loadAll(Set<K> keys, boolean replaceExistingValues) {
- Set<byte[]> sk = serializeKeySet(keys);
- m.loadAll(sk, replaceExistingValues);
- }
-
- @Override
- public void clear() {
- m.clear();
- }
-
- @Override
- public Future<V> getAsync(K key) {
- Future<byte[]> f = m.getAsync(serializeKey(key));
- return Futures.lazyTransform(f, new DeserializeVal());
- }
-
- @Override
- public Future<V> putAsync(K key, V value) {
- Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value));
- return Futures.lazyTransform(f, new DeserializeVal());
- }
-
- @Override
- public Future<V> putAsync(K key, V value, long ttl, TimeUnit timeunit) {
- Future<byte[]> f = m.putAsync(serializeKey(key), serializeVal(value), ttl, timeunit);
- return Futures.lazyTransform(f, new DeserializeVal());
- }
-
- @Override
- public Future<V> removeAsync(K key) {
- Future<byte[]> f = m.removeAsync(serializeKey(key));
- return Futures.lazyTransform(f, new DeserializeVal());
- }
-
- @Override
- public boolean tryRemove(K key, long timeout, TimeUnit timeunit) {
- return m.tryRemove(serializeKey(key), timeout, timeunit);
- }
-
- @Override
- public boolean tryPut(K key, V value, long timeout, TimeUnit timeunit) {
- return m.tryPut(serializeKey(key), serializeVal(value), timeout, timeunit);
- }
-
- @Override
- public V put(K key, V value, long ttl, TimeUnit timeunit) {
- return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
- }
-
- @Override
- public void putTransient(K key, V value, long ttl, TimeUnit timeunit) {
- m.putTransient(serializeKey(key), serializeVal(value), ttl, timeunit);
- }
-
- @Override
- public V putIfAbsent(K key, V value) {
- return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
- }
-
- @Override
- public V putIfAbsent(K key, V value, long ttl, TimeUnit timeunit) {
- return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value), ttl, timeunit));
- }
-
- @Override
- public boolean replace(K key, V oldValue, V newValue) {
- return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
- }
-
- @Override
- public V replace(K key, V value) {
- return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
- }
-
- @Override
- public void set(K key, V value) {
- m.set(serializeKey(key), serializeVal(value));
- }
-
- @Override
- public void set(K key, V value, long ttl, TimeUnit timeunit) {
- m.set(serializeKey(key), serializeVal(value), ttl, timeunit);
- }
-
- @Override
- public void lock(K key) {
- m.lock(serializeKey(key));
- }
-
- @Override
- public void lock(K key, long leaseTime, TimeUnit timeUnit) {
- m.lock(serializeKey(key), leaseTime, timeUnit);
- }
-
- @Override
- public boolean isLocked(K key) {
- return m.isLocked(serializeKey(key));
- }
-
- @Override
- public boolean tryLock(K key) {
- return m.tryLock(serializeKey(key));
- }
-
- @Override
- public boolean tryLock(K key, long time, TimeUnit timeunit)
- throws InterruptedException {
- return m.tryLock(serializeKey(key), time, timeunit);
- }
-
- @Override
- public void unlock(K key) {
- m.unlock(serializeKey(key));
- }
-
- @Override
- public void forceUnlock(K key) {
- m.forceUnlock(serializeKey(key));
- }
-
- @Override
- public String addLocalEntryListener(EntryListener<K, V> listener) {
- return m.addLocalEntryListener(new BaseEntryListener(listener));
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public String addLocalEntryListener(EntryListener<K, V> listener,
- Predicate<K, V> predicate, boolean includeValue) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public String addLocalEntryListener(EntryListener<K, V> listener,
- Predicate<K, V> predicate, K key, boolean includeValue) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public String addInterceptor(MapInterceptor interceptor) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void removeInterceptor(String id) {
- m.removeInterceptor(id);
- }
-
- @Override
- public String addEntryListener(EntryListener<K, V> listener,
- boolean includeValue) {
- return m.addEntryListener(new BaseEntryListener(listener), includeValue);
- }
-
- @Override
- public boolean removeEntryListener(String id) {
- return m.removeEntryListener(id);
- }
-
- @Override
- public String addEntryListener(EntryListener<K, V> listener, K key,
- boolean includeValue) {
- return m.addEntryListener(new BaseEntryListener(listener),
- serializeKey(key), includeValue);
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public String addEntryListener(EntryListener<K, V> listener,
- Predicate<K, V> predicate, boolean includeValue) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public String addEntryListener(EntryListener<K, V> listener,
- Predicate<K, V> predicate, K key, boolean includeValue) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public EntryView<K, V> getEntryView(K key) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean evict(K key) {
- return m.evict(serializeKey(key));
- }
-
- @Override
- public void evictAll() {
- m.evictAll();
- }
-
- @Override
- public Set<K> keySet() {
- return deserializeKeySet(m.keySet());
- }
-
- @Override
- public Collection<V> values() {
- return deserializeVal(m.values());
- }
-
- @Override
- public Set<java.util.Map.Entry<K, V>> entrySet() {
- return deserializeEntrySet(m.entrySet());
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Set<K> keySet(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Set<java.util.Map.Entry<K, V>> entrySet(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Collection<V> values(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Set<K> localKeySet() {
- return deserializeKeySet(m.localKeySet());
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Set<K> localKeySet(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public void addIndex(String attribute, boolean ordered) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public LocalMapStats getLocalMapStats() {
- return m.getLocalMapStats();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Object executeOnKey(K key, EntryProcessor entryProcessor) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Map<K, Object> executeOnKeys(Set<K> keys,
- EntryProcessor entryProcessor) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public void submitToKey(K key, EntryProcessor entryProcessor,
- ExecutionCallback callback) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Future submitToKey(K key, EntryProcessor entryProcessor) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @SuppressWarnings("rawtypes")
- @Override
- public Map<K, Object> executeOnEntries(EntryProcessor entryProcessor,
- Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public <SuppliedValue, Result> Result aggregate(
- Supplier<K, V, SuppliedValue> supplier,
- Aggregation<K, SuppliedValue, Result> aggregation) {
-
- throw new UnsupportedOperationException();
- }
-
- /**
- * {@inheritDoc}
- *
- * @deprecated not implemented yet
- * @throws UnsupportedOperationException not implemented yet
- */
- @Deprecated
- @Override
- public <SuppliedValue, Result> Result aggregate(
- Supplier<K, V, SuppliedValue> supplier,
- Aggregation<K, SuppliedValue, Result> aggregation,
- JobTracker jobTracker) {
-
- throw new UnsupportedOperationException();
- }
-
- private byte[] serializeKey(Object key) {
- return serializer.encode(key);
- }
-
- private K deserializeKey(byte[] key) {
- return serializer.decode(key);
- }
-
- private byte[] serializeVal(Object val) {
- return serializer.encode(val);
- }
-
- private V deserializeVal(byte[] val) {
- if (val == null) {
- return null;
- }
- return serializer.decode(val.clone());
- }
-
- private Set<byte[]> serializeKeySet(Set<K> keys) {
- Set<byte[]> sk = Collections.newSetFromMap(new IdentityHashMap<byte[], Boolean>(keys.size()));
- for (K key : keys) {
- sk.add(serializeKey(key));
- }
- return sk;
- }
-
- private Set<K> deserializeKeySet(Set<byte[]> keys) {
- Set<K> dsk = new HashSet<>(keys.size());
- for (byte[] key : keys) {
- dsk.add(deserializeKey(key));
- }
- return dsk;
- }
-
- private Collection<V> deserializeVal(Collection<byte[]> vals) {
- Collection<V> dsl = new ArrayList<>(vals.size());
- for (byte[] val : vals) {
- dsl.add(deserializeVal(val));
- }
- return dsl;
- }
-
- private Set<java.util.Map.Entry<K, V>> deserializeEntrySet(
- Set<java.util.Map.Entry<byte[], byte[]>> entries) {
-
- Set<java.util.Map.Entry<K, V>> dse = new HashSet<>(entries.size());
- for (java.util.Map.Entry<byte[], byte[]> entry : entries) {
- dse.add(Pair.of(deserializeKey(entry.getKey()),
- deserializeVal(entry.getValue())));
- }
- return dse;
- }
-
- private final class BaseEntryListener
- implements EntryListener<byte[], byte[]> {
-
- private final EntryListener<K, V> listener;
-
- public BaseEntryListener(EntryListener<K, V> listener) {
- this.listener = listener;
- }
-
- @Override
- public void mapEvicted(MapEvent event) {
- listener.mapEvicted(event);
- }
-
- @Override
- public void mapCleared(MapEvent event) {
- listener.mapCleared(event);
- }
-
- @Override
- public void entryUpdated(EntryEvent<byte[], byte[]> event) {
- EntryEvent<K, V> evt = new EntryEvent<K, V>(
- event.getSource(),
- event.getMember(),
- event.getEventType().getType(),
- deserializeKey(event.getKey()),
- deserializeVal(event.getOldValue()),
- deserializeVal(event.getValue()));
-
- listener.entryUpdated(evt);
- }
-
- @Override
- public void entryRemoved(EntryEvent<byte[], byte[]> event) {
- EntryEvent<K, V> evt = new EntryEvent<K, V>(
- event.getSource(),
- event.getMember(),
- event.getEventType().getType(),
- deserializeKey(event.getKey()),
- deserializeVal(event.getOldValue()),
- null);
-
- listener.entryRemoved(evt);
- }
-
- @Override
- public void entryEvicted(EntryEvent<byte[], byte[]> event) {
- EntryEvent<K, V> evt = new EntryEvent<K, V>(
- event.getSource(),
- event.getMember(),
- event.getEventType().getType(),
- deserializeKey(event.getKey()),
- deserializeVal(event.getOldValue()),
- deserializeVal(event.getValue()));
-
- listener.entryEvicted(evt);
- }
-
- @Override
- public void entryAdded(EntryEvent<byte[], byte[]> event) {
- EntryEvent<K, V> evt = new EntryEvent<K, V>(
- event.getSource(),
- event.getMember(),
- event.getEventType().getType(),
- deserializeKey(event.getKey()),
- null,
- deserializeVal(event.getValue()));
-
- listener.entryAdded(evt);
- }
- }
-
- private final class DeserializeVal implements Function<byte[], V> {
- @Override
- public V apply(byte[] input) {
- return deserializeVal(input);
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/SQueue.java b/core/store/dist/src/main/java/org/onosproject/store/hz/SQueue.java
deleted file mode 100644
index 3a2abb8..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/SQueue.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
-import com.hazelcast.core.IQueue;
-import com.hazelcast.core.ItemEvent;
-import com.hazelcast.core.ItemListener;
-import com.hazelcast.monitor.LocalQueueStats;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Wrapper around IQueue<byte[]> which serializes/deserializes
- * key and value using StoreSerializer.
- *
- * @param <T> type
- */
-public class SQueue<T> implements IQueue<T> {
-
- private final IQueue<byte[]> q;
- private final StoreSerializer serializer;
-
- /**
- * Creates a SQueue instance.
- *
- * @param baseQueue base IQueue to use
- * @param serializer serializer to use for both key and value
- */
- public SQueue(IQueue<byte[]> baseQueue, StoreSerializer serializer) {
- this.q = checkNotNull(baseQueue);
- this.serializer = checkNotNull(serializer);
- }
-
- private byte[] serialize(Object key) {
- return serializer.encode(key);
- }
-
- private T deserialize(byte[] key) {
- return serializer.decode(key);
- }
-
- @Override
- public boolean add(T t) {
- return q.add(serialize(t));
- }
-
- @Override
- public boolean offer(T t) {
- return q.offer(serialize(t));
- }
-
- @Override
- public void put(T t) throws InterruptedException {
- q.put(serialize(t));
- }
-
- @Override
- public boolean offer(T t, long l, TimeUnit timeUnit) throws InterruptedException {
- return q.offer(serialize(t), l, timeUnit);
- }
-
- @Override
- public T take() throws InterruptedException {
- return deserialize(q.take());
- }
-
- @Override
- public T poll(long l, TimeUnit timeUnit) throws InterruptedException {
- return deserialize(q.poll(l, timeUnit));
- }
-
- @Override
- public int remainingCapacity() {
- return q.remainingCapacity();
- }
-
- @Override
- public boolean remove(Object o) {
- return q.remove(serialize(o));
- }
-
- @Override
- public boolean contains(Object o) {
- return q.contains(serialize(o));
- }
-
- @Deprecated // not implemented yet
- @Override
- public int drainTo(Collection<? super T> collection) {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // not implemented yet
- @Override
- public int drainTo(Collection<? super T> collection, int i) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public T remove() {
- return deserialize(q.remove());
- }
-
- @Override
- public T poll() {
- return deserialize(q.poll());
- }
-
- @Override
- public T element() {
- return deserialize(q.element());
- }
-
- @Override
- public T peek() {
- return deserialize(q.peek());
- }
-
- @Override
- public int size() {
- return q.size();
- }
-
- @Override
- public boolean isEmpty() {
- return q.isEmpty();
- }
-
- @Override
- public Iterator<T> iterator() {
- return FluentIterable.from(q)
- .transform(new DeserializeVal())
- .iterator();
- }
-
- @Deprecated // not implemented yet
- @Override
- public Object[] toArray() {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // not implemented yet
- @Override
- public <T1> T1[] toArray(T1[] t1s) {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // not implemented yet
- @Override
- public boolean containsAll(Collection<?> collection) {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // not implemented yet
- @Override
- public boolean addAll(Collection<? extends T> collection) {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // not implemented yet
- @Override
- public boolean removeAll(Collection<?> collection) {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // not implemented yet
- @Override
- public boolean retainAll(Collection<?> collection) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void clear() {
- q.clear();
- }
-
- @Override
- public LocalQueueStats getLocalQueueStats() {
- return q.getLocalQueueStats();
- }
-
-
- @Override
- public String addItemListener(ItemListener<T> itemListener, boolean withValue) {
- ItemListener<byte[]> il = new ItemListener<byte[]>() {
- @Override
- public void itemAdded(ItemEvent<byte[]> item) {
- itemListener.itemAdded(new ItemEvent<T>(getName(item),
- item.getEventType(),
- deserialize(item.getItem()),
- item.getMember()));
- }
-
- @Override
- public void itemRemoved(ItemEvent<byte[]> item) {
- itemListener.itemRemoved(new ItemEvent<T>(getName(item),
- item.getEventType(),
- deserialize(item.getItem()),
- item.getMember()));
- }
-
- private String getName(ItemEvent<byte[]> item) {
- return (item.getSource() instanceof String) ?
- (String) item.getSource() : item.getSource().toString();
-
- }
- };
- return q.addItemListener(il, withValue);
- }
-
-
- @Override
- public boolean removeItemListener(String registrationId) {
- return q.removeItemListener(registrationId);
- }
-
- @Deprecated
- @Override
- public Object getId() {
- return q.getId();
- }
-
- @Override
- public String getPartitionKey() {
- return q.getPartitionKey();
- }
-
- @Override
- public String getName() {
- return q.getName();
- }
-
- @Override
- public String getServiceName() {
- return q.getServiceName();
- }
-
- @Override
- public void destroy() {
- q.destroy();
- }
-
- private final class DeserializeVal implements Function<byte[], T> {
- @Override
- public T apply(byte[] input) {
- return deserialize(input);
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/STxMap.java b/core/store/dist/src/main/java/org/onosproject/store/hz/STxMap.java
deleted file mode 100644
index 3a92745..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/STxMap.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-import com.hazelcast.core.TransactionalMap;
-import com.hazelcast.query.Predicate;
-
-/**
- * Wrapper around TransactionalMap<byte[], byte[]> which serializes/deserializes
- * key and value using StoreSerializer.
- *
- * @param <K> key type
- * @param <V> value type
- */
-public class STxMap<K, V> implements TransactionalMap<K, V> {
-
- private final TransactionalMap<byte[], byte[]> m;
- private final StoreSerializer serializer;
-
- /**
- * Creates a STxMap instance.
- *
- * @param baseMap base IMap to use
- * @param serializer serializer to use for both key and value
- */
- public STxMap(TransactionalMap<byte[], byte[]> baseMap, StoreSerializer serializer) {
- this.m = checkNotNull(baseMap);
- this.serializer = checkNotNull(serializer);
- }
-
- @Override
- public int size() {
- return m.size();
- }
-
- @Override
- public boolean isEmpty() {
- return m.isEmpty();
- }
-
- @Deprecated
- @Override
- public Object getId() {
- return m.getId();
- }
-
- @Override
- public String getPartitionKey() {
- return m.getPartitionKey();
- }
-
- @Override
- public String getName() {
- return m.getName();
- }
-
- @Override
- public String getServiceName() {
- return m.getServiceName();
- }
-
- @Override
- public void destroy() {
- m.destroy();
- }
-
- @Override
- public boolean containsKey(Object key) {
- return m.containsKey(serializeKey(key));
- }
-
- @Override
- public V get(Object key) {
- return deserializeVal(m.get(serializeKey(key)));
- }
-
- @Override
- public V getForUpdate(Object key) {
- return deserializeVal(m.getForUpdate(serializeKey(key)));
- }
-
- @Override
- public V put(K key, V value) {
- return deserializeVal(m.put(serializeKey(key), serializeVal(value)));
- }
-
- @Override
- public V remove(Object key) {
- return deserializeVal(m.remove(serializeKey(key)));
- }
-
- @Override
- public boolean remove(Object key, Object value) {
- return m.remove(serializeKey(key), serializeVal(value));
- }
-
- @Override
- public void delete(Object key) {
- m.delete(serializeKey(key));
- }
-
- @Override
- public V put(K key, V value, long ttl, TimeUnit timeunit) {
- return deserializeVal(m.put(serializeKey(key), serializeVal(value), ttl, timeunit));
- }
-
- @Override
- public V putIfAbsent(K key, V value) {
- return deserializeVal(m.putIfAbsent(serializeKey(key), serializeVal(value)));
- }
-
- @Override
- public boolean replace(K key, V oldValue, V newValue) {
- return m.replace(serializeKey(key), serializeVal(oldValue), serializeVal(newValue));
- }
-
- @Override
- public V replace(K key, V value) {
- return deserializeVal(m.replace(serializeKey(key), serializeVal(value)));
- }
-
- @Override
- public void set(K key, V value) {
- m.set(serializeKey(key), serializeVal(value));
- }
-
-
- @Override
- public Set<K> keySet() {
- return deserializeKeySet(m.keySet());
- }
-
- @Override
- public Collection<V> values() {
- return deserializeVals(m.values());
- }
-
- @Deprecated // marking method not implemented
- @SuppressWarnings("rawtypes")
- @Override
- public Set<K> keySet(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- @Deprecated // marking method not implemented
- @SuppressWarnings("rawtypes")
- @Override
- public Collection<V> values(Predicate predicate) {
- throw new UnsupportedOperationException();
- }
-
- private byte[] serializeKey(Object key) {
- return serializer.encode(key);
- }
-
- private K deserializeKey(byte[] key) {
- return serializer.decode(key);
- }
-
- private byte[] serializeVal(Object val) {
- return serializer.encode(val);
- }
-
- private V deserializeVal(byte[] val) {
- if (val == null) {
- return null;
- }
- return serializer.decode(val.clone());
- }
-
- private Set<K> deserializeKeySet(Set<byte[]> keys) {
- Set<K> dsk = new HashSet<>(keys.size());
- for (byte[] key : keys) {
- dsk.add(deserializeKey(key));
- }
- return dsk;
- }
-
- private Collection<V> deserializeVals(Collection<byte[]> vals) {
- Collection<V> dsl = new ArrayList<>(vals.size());
- for (byte[] val : vals) {
- dsl.add(deserializeVal(val));
- }
- return dsl;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java b/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
deleted file mode 100644
index 5cb5757..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import com.google.common.io.ByteStreams;
-import com.google.common.io.Files;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.FileSystemXmlConfig;
-import com.hazelcast.core.Hazelcast;
-import com.hazelcast.core.HazelcastInstance;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Auxiliary bootstrap of distributed store.
- */
-@Component(immediate = false, enabled = false)
-@Service
-public class StoreManager implements StoreService {
-
- protected static final String HAZELCAST_XML_FILE = "etc/hazelcast.xml";
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- protected HazelcastInstance instance;
-
- @Activate
- public void activate() {
- try {
- File hazelcastFile = new File(HAZELCAST_XML_FILE);
- if (!hazelcastFile.exists()) {
- createDefaultHazelcastFile(hazelcastFile);
- }
-
- Config config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
-
- instance = Hazelcast.newHazelcastInstance(config);
- log.info("Started");
- } catch (FileNotFoundException e) {
- log.error("Unable to configure Hazelcast", e);
- }
- }
-
- private void createDefaultHazelcastFile(File hazelcastFile) {
- String ip = ClusterDefinitionManager.getSiteLocalAddress();
- String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
- InputStream his = getClass().getResourceAsStream("/hazelcast.xml");
- try {
- String hzCfg = new String(ByteStreams.toByteArray(his), "UTF-8");
- hzCfg = hzCfg.replaceFirst("@NAME", ip);
- hzCfg = hzCfg.replaceFirst("@PREFIX", ipPrefix);
- Files.write(hzCfg.getBytes("UTF-8"), hazelcastFile);
- } catch (IOException e) {
- log.error("Unable to write default hazelcast file", e);
- }
- }
-
- @Deactivate
- public void deactivate() {
- instance.shutdown();
- log.info("Stopped");
- }
-
- @Override
- public HazelcastInstance getHazelcastInstance() {
- return instance;
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreService.java b/core/store/dist/src/main/java/org/onosproject/store/hz/StoreService.java
deleted file mode 100644
index a02acb8..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreService.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import com.hazelcast.core.HazelcastInstance;
-
-/**
- * Bootstrap service to get a handle on a share Hazelcast instance.
- */
-public interface StoreService {
-
- /**
- * Returns the shared Hazelcast instance for use as a distributed store
- * backing.
- *
- * @return shared Hazelcast instance
- */
- HazelcastInstance getHazelcastInstance();
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/hz/package-info.java
deleted file mode 100644
index 7de3b11..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Common abstractions and facilities for implementing distributed store
- * using Hazelcast.
- */
-package org.onosproject.store.hz;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
deleted file mode 100644
index b2c5ade..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ /dev/null
@@ -1,520 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.mastership.impl;
-
-import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
-import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
-import static org.apache.commons.lang3.concurrent.ConcurrentUtils.putIfAbsent;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipStore;
-import org.onosproject.mastership.MastershipStoreDelegate;
-import org.onosproject.mastership.MastershipTerm;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.SMap;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onlab.util.KryoNamespace;
-
-import com.google.common.base.Objects;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.core.EntryEvent;
-import com.hazelcast.core.EntryListener;
-import com.hazelcast.core.MapEvent;
-
-import static org.onosproject.net.MastershipRole.*;
-
-/**
- * Distributed implementation of the mastership store. The store is
- * responsible for the master selection process.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class DistributedMastershipStore
- extends AbstractHazelcastStore<MastershipEvent, MastershipStoreDelegate>
- implements MastershipStore {
-
- //term number representing that master has never been chosen yet
- private static final Integer NOTHING = 0;
- //initial term/TTL value
- private static final Integer INIT = 1;
-
- //device to node roles
- private static final String NODE_ROLES_MAP_NAME = "nodeRoles";
- protected SMap<DeviceId, RoleValue> roleMap;
- //devices to terms
- private static final String TERMS_MAP_NAME = "terms";
- protected SMap<DeviceId, Integer> terms;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- private String listenerId;
-
- @Override
- @Activate
- public void activate() {
- super.activate();
-
- this.serializer = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- .register(new RoleValueSerializer(), RoleValue.class)
- .build();
- }
- };
-
- final Config config = theInstance.getConfig();
-
- MapConfig nodeRolesCfg = config.getMapConfig(NODE_ROLES_MAP_NAME);
- nodeRolesCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - nodeRolesCfg.getBackupCount());
-
- MapConfig termsCfg = config.getMapConfig(TERMS_MAP_NAME);
- termsCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - termsCfg.getBackupCount());
-
- roleMap = new SMap<>(theInstance.<byte[], byte[]>getMap(NODE_ROLES_MAP_NAME), this.serializer);
- listenerId = roleMap.addEntryListener((new RemoteMasterShipEventHandler()), true);
- terms = new SMap<>(theInstance.<byte[], byte[]>getMap(TERMS_MAP_NAME), this.serializer);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- roleMap.removeEntryListener(listenerId);
- log.info("Stopped");
- }
-
- @Override
- public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
- final RoleValue roleInfo = roleMap.get(deviceId);
- if (roleInfo != null) {
- return roleInfo.getRole(nodeId);
- }
- return NONE;
- }
-
- @Override
- public CompletableFuture<MastershipEvent> setMaster(NodeId newMaster, DeviceId deviceId) {
-
- roleMap.lock(deviceId);
- try {
- final RoleValue rv = getRoleValue(deviceId);
- final MastershipRole currentRole = rv.getRole(newMaster);
- switch (currentRole) {
- case MASTER:
- //reinforce mastership
- // RoleInfo integrity check
- boolean modified = rv.reassign(newMaster, STANDBY, NONE);
- if (modified) {
- roleMap.put(deviceId, rv);
- // should never reach here.
- log.warn("{} was in both MASTER and STANDBY for {}", newMaster, deviceId);
- // trigger BACKUPS_CHANGED?
- }
- return CompletableFuture.completedFuture(null);
- case STANDBY:
- case NONE:
- final NodeId currentMaster = rv.get(MASTER);
- if (currentMaster != null) {
- // place current master in STANDBY
- rv.reassign(currentMaster, NONE, STANDBY);
- rv.replace(currentMaster, newMaster, MASTER);
- } else {
- //no master before so just add.
- rv.add(MASTER, newMaster);
- }
- // remove newMaster from STANDBY
- rv.reassign(newMaster, STANDBY, NONE);
- updateTerm(deviceId);
- roleMap.put(deviceId, rv);
- return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
- default:
- log.warn("unknown Mastership Role {}", currentRole);
- return CompletableFuture.completedFuture(null);
- }
- } finally {
- roleMap.unlock(deviceId);
- }
- }
-
- @Override
- public NodeId getMaster(DeviceId deviceId) {
- return getNode(MASTER, deviceId);
- }
-
-
- @Override
- public RoleInfo getNodes(DeviceId deviceId) {
- RoleValue rv = roleMap.get(deviceId);
- if (rv != null) {
- return rv.roleInfo();
- } else {
- return new RoleInfo();
- }
- }
-
- @Override
- public Set<DeviceId> getDevices(NodeId nodeId) {
- Set<DeviceId> devices = new HashSet<>();
-
- for (Map.Entry<DeviceId, RoleValue> el : roleMap.entrySet()) {
- if (nodeId.equals(el.getValue().get(MASTER))) {
- devices.add(el.getKey());
- }
- }
-
- return devices;
- }
-
- @Override
- public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
-
- // if no master => become master
- // if there already exists a master:
- // if I was the master return MASTER
- // else put myself in STANDBY and return STANDBY
-
- final NodeId local = clusterService.getLocalNode().id();
- boolean modified = false;
- roleMap.lock(deviceId);
- try {
- final RoleValue rv = getRoleValue(deviceId);
- if (rv.get(MASTER) == null) {
- // there's no master become one
- // move out from STANDBY
- rv.reassign(local, STANDBY, NONE);
- rv.add(MASTER, local);
-
- updateTerm(deviceId);
- roleMap.put(deviceId, rv);
- return CompletableFuture.completedFuture(MASTER);
- }
- final MastershipRole currentRole = rv.getRole(local);
- switch (currentRole) {
- case MASTER:
- // RoleInfo integrity check
- modified = rv.reassign(local, STANDBY, NONE);
- if (modified) {
- log.warn("{} was in both MASTER and STANDBY for {}", local, deviceId);
- // should never reach here,
- // but heal if we happened to be there
- roleMap.put(deviceId, rv);
- // trigger BACKUPS_CHANGED?
- }
- return CompletableFuture.completedFuture(currentRole);
- case STANDBY:
- // RoleInfo integrity check
- modified = rv.reassign(local, NONE, STANDBY);
- if (modified) {
- log.warn("{} was in both NONE and STANDBY for {}", local, deviceId);
- // should never reach here,
- // but heal if we happened to be there
- roleMap.put(deviceId, rv);
- // trigger BACKUPS_CHANGED?
- }
- return CompletableFuture.completedFuture(currentRole);
- case NONE:
- rv.reassign(local, NONE, STANDBY);
- roleMap.put(deviceId, rv);
- // TODO: notifyDelegate BACKUPS_CHANGED
- return CompletableFuture.completedFuture(STANDBY);
- default:
- log.warn("unknown Mastership Role {}", currentRole);
- }
- return CompletableFuture.completedFuture(currentRole);
- } finally {
- roleMap.unlock(deviceId);
- }
- }
-
- @Override
- public MastershipTerm getTermFor(DeviceId deviceId) {
- // term information and role must be read atomically
- // acquiring write lock for the device
- roleMap.lock(deviceId);
- try {
- RoleValue rv = getRoleValue(deviceId);
- final Integer term = terms.get(deviceId);
- final NodeId master = rv.get(MASTER);
- if (term == null) {
- return MastershipTerm.of(null, NOTHING);
- }
- return MastershipTerm.of(master, term);
- } finally {
- roleMap.unlock(deviceId);
- }
- }
-
- @Override
- public CompletableFuture<MastershipEvent> setStandby(NodeId nodeId, DeviceId deviceId) {
- // if nodeId was MASTER, rotate STANDBY
- // if nodeId was STANDBY no-op
- // if nodeId was NONE, add to STANDBY
-
- roleMap.lock(deviceId);
- try {
- final RoleValue rv = getRoleValue(deviceId);
- final MastershipRole currentRole = getRole(nodeId, deviceId);
- switch (currentRole) {
- case MASTER:
- NodeId newMaster = reelect(nodeId, deviceId, rv);
- rv.reassign(nodeId, NONE, STANDBY);
- updateTerm(deviceId);
- if (newMaster != null) {
- roleMap.put(deviceId, rv);
- return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
- } else {
- // no master candidate
- roleMap.put(deviceId, rv);
- // TBD: Should there be new event type for no MASTER?
- return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
- }
- case STANDBY:
- return CompletableFuture.completedFuture(null);
- case NONE:
- rv.reassign(nodeId, NONE, STANDBY);
- roleMap.put(deviceId, rv);
- return CompletableFuture.completedFuture(
- new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
- default:
- log.warn("unknown Mastership Role {}", currentRole);
- }
- return CompletableFuture.completedFuture(null);
- } finally {
- roleMap.unlock(deviceId);
- }
- }
-
- @Override
- public CompletableFuture<MastershipEvent> relinquishRole(NodeId nodeId, DeviceId deviceId) {
- // relinquishRole is basically set to None
-
- // If nodeId was master reelect next and remove nodeId
- // else remove from STANDBY
-
- roleMap.lock(deviceId);
- try {
- final RoleValue rv = getRoleValue(deviceId);
- final MastershipRole currentRole = rv.getRole(nodeId);
- switch (currentRole) {
- case MASTER:
- NodeId newMaster = reelect(nodeId, deviceId, rv);
- if (newMaster != null) {
- updateTerm(deviceId);
- roleMap.put(deviceId, rv);
- return CompletableFuture.completedFuture(
- new MastershipEvent(MASTER_CHANGED, deviceId, rv.roleInfo()));
- } else {
- // No master candidate - no more backups, device is likely
- // fully disconnected
- roleMap.put(deviceId, rv);
- // Should there be new event type?
- return CompletableFuture.completedFuture(null);
- }
- case STANDBY:
- //fall through to reinforce relinquishment
- case NONE:
- boolean modified = rv.reassign(nodeId, STANDBY, NONE);
- if (modified) {
- roleMap.put(deviceId, rv);
- return CompletableFuture.completedFuture(
- new MastershipEvent(BACKUPS_CHANGED, deviceId, rv.roleInfo()));
- }
- return CompletableFuture.completedFuture(null);
- default:
- log.warn("unknown Mastership Role {}", currentRole);
- }
- return CompletableFuture.completedFuture(null);
- } finally {
- roleMap.unlock(deviceId);
- }
- }
-
- @Override
- public void relinquishAllRole(NodeId nodeId) {
-
- List<MastershipEvent> events = new ArrayList<>();
- for (Entry<DeviceId, RoleValue> entry : roleMap.entrySet()) {
- final DeviceId deviceId = entry.getKey();
- final RoleValue roleValue = entry.getValue();
-
- if (roleValue.contains(MASTER, nodeId) ||
- roleValue.contains(STANDBY, nodeId)) {
-
- relinquishRole(nodeId, deviceId).whenComplete((event, error) -> {
- if (event != null) {
- events.add(event);
- }
- });
- }
- }
- notifyDelegate(events);
- }
-
- // TODO: Consider moving this to RoleValue method
- //helper to fetch a new master candidate for a given device.
- private NodeId reelect(
- NodeId current, DeviceId deviceId, RoleValue rv) {
-
- //if this is an queue it'd be neater.
- NodeId candidate = null;
- for (NodeId n : rv.nodesOfRole(STANDBY)) {
- if (!current.equals(n)) {
- candidate = n;
- break;
- }
- }
-
- if (candidate == null) {
- log.info("{} giving up and going to NONE for {}", current, deviceId);
- rv.remove(MASTER, current);
- // master did change, but there is no master candidate.
- return null;
- } else {
- log.info("{} trying to pass mastership for {} to {}", current, deviceId, candidate);
- rv.replace(current, candidate, MASTER);
- rv.reassign(candidate, STANDBY, NONE);
- return candidate;
- }
- }
-
- //return the RoleValue structure for a device, or create one
- private RoleValue getRoleValue(DeviceId deviceId) {
- RoleValue value = roleMap.get(deviceId);
- if (value == null) {
- value = new RoleValue();
- RoleValue concurrentlyAdded = roleMap.putIfAbsent(deviceId, value);
- if (concurrentlyAdded != null) {
- return concurrentlyAdded;
- }
- }
- return value;
- }
-
- //get first applicable node out of store-unique structure.
- private NodeId getNode(MastershipRole role, DeviceId deviceId) {
- RoleValue value = roleMap.get(deviceId);
- if (value != null) {
- return value.get(role);
- }
- return null;
- }
-
- //adds or updates term information.
- // must be guarded by roleMap.lock(deviceId)
- private void updateTerm(DeviceId deviceId) {
- Integer term = terms.get(deviceId);
- if (term == null) {
- term = terms.putIfAbsent(deviceId, INIT);
- if (term == null) {
- // initial term set successfully
- return;
- }
- // concurrent initialization detected,
- // fall through to try incrementing
- }
- Integer nextTerm = term + 1;
- boolean success = terms.replace(deviceId, term, nextTerm);
- while (!success) {
- term = terms.get(deviceId);
- if (term == null) {
- // something is very wrong, but write something to avoid
- // infinite loop.
- log.warn("Term info for {} disappeared.", deviceId);
- term = putIfAbsent(terms, deviceId, nextTerm);
- }
- nextTerm = term + 1;
- success = terms.replace(deviceId, term, nextTerm);
- }
- }
-
- private class RemoteMasterShipEventHandler implements EntryListener<DeviceId, RoleValue> {
-
- @Override
- public void entryAdded(EntryEvent<DeviceId, RoleValue> event) {
- entryUpdated(event);
- }
-
- @Override
- public void entryRemoved(EntryEvent<DeviceId, RoleValue> event) {
- }
-
- @Override
- public void entryUpdated(EntryEvent<DeviceId, RoleValue> event) {
- // compare old and current RoleValues. If master is different,
- // emit MASTER_CHANGED. else, emit BACKUPS_CHANGED.
- RoleValue oldValue = event.getOldValue();
- RoleValue newValue = event.getValue();
-
- // There will be no oldValue at the very first instance of an EntryEvent.
- // Technically, the progression is: null event -> null master -> some master;
- // We say a null master and a null oldValue are the same condition.
- NodeId oldMaster = null;
- if (oldValue != null) {
- oldMaster = oldValue.get(MASTER);
- }
- NodeId newMaster = newValue.get(MASTER);
-
- if (!Objects.equal(oldMaster, newMaster)) {
- notifyDelegate(new MastershipEvent(
- MASTER_CHANGED, event.getKey(), event.getValue().roleInfo()));
- } else {
- notifyDelegate(new MastershipEvent(
- BACKUPS_CHANGED, event.getKey(), event.getValue().roleInfo()));
- }
- }
-
- @Override
- public void entryEvicted(EntryEvent<DeviceId, RoleValue> event) {
- }
-
- @Override
- public void mapEvicted(MapEvent event) {
- }
-
- @Override
- public void mapCleared(MapEvent event) {
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java
index 71e053a..c81ea7f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/RoleValueSerializer.java
@@ -27,7 +27,7 @@
import com.esotericsoftware.kryo.io.Output;
/**
- * Serializer for RoleValues used by {@link DistributedMastershipStore}.
+ * Serializer for RoleValues used by {@link org.onosproject.mastership.MastershipStore}.
*/
public class RoleValueSerializer extends Serializer<RoleValue> {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStore.java
deleted file mode 100644
index 00141a2..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStore.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.resource.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.Bandwidth;
-import org.onlab.util.PositionalParameterStringFormatter;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Link;
-import org.onosproject.net.LinkKey;
-import org.onosproject.net.intent.IntentId;
-import org.onosproject.net.link.LinkService;
-import org.onosproject.net.resource.BandwidthResource;
-import org.onosproject.net.resource.BandwidthResourceAllocation;
-import org.onosproject.net.resource.LambdaResource;
-import org.onosproject.net.resource.LambdaResourceAllocation;
-import org.onosproject.net.resource.LinkResourceAllocations;
-import org.onosproject.net.resource.LinkResourceEvent;
-import org.onosproject.net.resource.LinkResourceStore;
-import org.onosproject.net.resource.MplsLabel;
-import org.onosproject.net.resource.MplsLabelResourceAllocation;
-import org.onosproject.net.resource.ResourceAllocation;
-import org.onosproject.net.resource.ResourceAllocationException;
-import org.onosproject.net.resource.ResourceType;
-import org.onosproject.store.StoreDelegate;
-import org.onosproject.store.hz.AbstractHazelcastStore;
-import org.onosproject.store.hz.STxMap;
-import org.slf4j.Logger;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-import com.hazelcast.config.Config;
-import com.hazelcast.config.MapConfig;
-import com.hazelcast.core.TransactionalMap;
-import com.hazelcast.transaction.TransactionContext;
-import com.hazelcast.transaction.TransactionException;
-import com.hazelcast.transaction.TransactionOptions;
-import com.hazelcast.transaction.TransactionOptions.TransactionType;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages link resources using Hazelcast.
- */
-@Component(immediate = true, enabled = false)
-@Service
-public class HazelcastLinkResourceStore
- extends AbstractHazelcastStore<LinkResourceEvent, StoreDelegate<LinkResourceEvent>>
- implements LinkResourceStore {
-
-
- private final Logger log = getLogger(getClass());
-
- private static final BandwidthResource DEFAULT_BANDWIDTH = new BandwidthResource(Bandwidth.mbps(1_000));
-
- private static final BandwidthResource EMPTY_BW = new BandwidthResource(Bandwidth.bps(0));
-
- // table to store current allocations
- /** LinkKey -> List<LinkResourceAllocations>. */
- private static final String LINK_RESOURCE_ALLOCATIONS = "LinkResourceAllocations";
-
- /** IntentId -> LinkResourceAllocations. */
- private static final String INTENT_ALLOCATIONS = "IntentAllocations";
-
-
- // TODO make this configurable
- // number of retries to attempt on allocation failure, due to
- // concurrent update
- private static int maxAllocateRetries = 5;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LinkService linkService;
-
- // Link annotation key name to use as bandwidth
- private String bandwidthAnnotation = AnnotationKeys.BANDWIDTH;
- // Link annotation key name to use as max lambda
- private String wavesAnnotation = AnnotationKeys.OPTICAL_WAVES;
-
- // Max MPLS labels: 2^20 – 1
- private int maxMplsLabel = 0xFFFFF;
-
- @Override
- @Activate
- public void activate() {
- super.activate();
-
- final Config config = theInstance.getConfig();
-
- MapConfig linkCfg = config.getMapConfig(LINK_RESOURCE_ALLOCATIONS);
- linkCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - linkCfg.getBackupCount());
-
- MapConfig intentCfg = config.getMapConfig(INTENT_ALLOCATIONS);
- intentCfg.setAsyncBackupCount(MapConfig.MAX_BACKUP_COUNT - intentCfg.getBackupCount());
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- private STxMap<IntentId, LinkResourceAllocations> getIntentAllocs(TransactionContext tx) {
- TransactionalMap<byte[], byte[]> raw = tx.getMap(INTENT_ALLOCATIONS);
- return new STxMap<>(raw, serializer);
- }
-
- private STxMap<LinkKey, List<LinkResourceAllocations>> getLinkAllocs(TransactionContext tx) {
- TransactionalMap<byte[], byte[]> raw = tx.getMap(LINK_RESOURCE_ALLOCATIONS);
- return new STxMap<>(raw, serializer);
- }
-
- private Set<? extends ResourceAllocation> getResourceCapacity(ResourceType type, Link link) {
- if (type == ResourceType.BANDWIDTH) {
- return ImmutableSet.of(getBandwidthResourceCapacity(link));
- }
- if (type == ResourceType.LAMBDA) {
- return getLambdaResourceCapacity(link);
- }
- if (type == ResourceType.MPLS_LABEL) {
- return getMplsResourceCapacity();
- }
- return null;
- }
-
- private Set<LambdaResourceAllocation> getLambdaResourceCapacity(Link link) {
- Set<LambdaResourceAllocation> allocations = new HashSet<>();
- try {
- final int waves = Integer.parseInt(link.annotations().value(wavesAnnotation));
- for (int i = 1; i <= waves; i++) {
- allocations.add(new LambdaResourceAllocation(LambdaResource.valueOf(i)));
- }
- } catch (NumberFormatException e) {
- log.debug("No {} annotation on link %s", wavesAnnotation, link);
- }
- return allocations;
- }
-
- private BandwidthResourceAllocation getBandwidthResourceCapacity(Link link) {
-
- // if Link annotation exist, use them
- // if all fails, use DEFAULT_BANDWIDTH
-
- BandwidthResource bandwidth = null;
- String strBw = link.annotations().value(bandwidthAnnotation);
- if (strBw != null) {
- try {
- bandwidth = new BandwidthResource(Bandwidth.mbps(Double.parseDouble(strBw)));
- } catch (NumberFormatException e) {
- // do nothings
- bandwidth = null;
- }
- }
-
- if (bandwidth == null) {
- // fall back, use fixed default
- bandwidth = DEFAULT_BANDWIDTH;
- }
- return new BandwidthResourceAllocation(bandwidth);
- }
-
- private Set<MplsLabelResourceAllocation> getMplsResourceCapacity() {
- Set<MplsLabelResourceAllocation> allocations = new HashSet<>();
- //Ignoring reserved labels of 0 through 15
- for (int i = 16; i <= maxMplsLabel; i++) {
- allocations.add(new MplsLabelResourceAllocation(MplsLabel
- .valueOf(i)));
-
- }
- return allocations;
- }
-
- private Map<ResourceType, Set<? extends ResourceAllocation>> getResourceCapacity(Link link) {
- Map<ResourceType, Set<? extends ResourceAllocation>> caps = new HashMap<>();
- for (ResourceType type : ResourceType.values()) {
- Set<? extends ResourceAllocation> cap = getResourceCapacity(type, link);
- if (cap != null) {
- caps.put(type, cap);
- }
- }
- return caps;
- }
-
- @Override
- public Set<ResourceAllocation> getFreeResources(Link link) {
- TransactionOptions opt = new TransactionOptions();
- // read-only and will never be commited, thus does not need durability
- opt.setTransactionType(TransactionType.LOCAL);
- TransactionContext tx = theInstance.newTransactionContext(opt);
- tx.beginTransaction();
- try {
- Map<ResourceType, Set<? extends ResourceAllocation>> freeResources = getFreeResourcesEx(tx, link);
- Set<ResourceAllocation> allFree = new HashSet<>();
- for (Set<? extends ResourceAllocation> r : freeResources.values()) {
- allFree.addAll(r);
- }
- return allFree;
- } finally {
- tx.rollbackTransaction();
- }
-
- }
-
- private Map<ResourceType, Set<? extends ResourceAllocation>> getFreeResourcesEx(TransactionContext tx, Link link) {
- // returns capacity - allocated
-
- checkNotNull(link);
- Map<ResourceType, Set<? extends ResourceAllocation>> free = new HashMap<>();
- final Map<ResourceType, Set<? extends ResourceAllocation>> caps = getResourceCapacity(link);
- final Iterable<LinkResourceAllocations> allocations = getAllocations(tx, link);
-
- for (ResourceType type : ResourceType.values()) {
- // there should be class/category of resources
- switch (type) {
- case BANDWIDTH:
- {
- Set<? extends ResourceAllocation> bw = caps.get(ResourceType.BANDWIDTH);
- if (bw == null || bw.isEmpty()) {
- bw = Sets.newHashSet(new BandwidthResourceAllocation(EMPTY_BW));
- }
-
- BandwidthResourceAllocation cap = (BandwidthResourceAllocation) bw.iterator().next();
- double freeBw = cap.bandwidth().toDouble();
-
- // enumerate current allocations, subtracting resources
- for (LinkResourceAllocations alloc : allocations) {
- Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
- for (ResourceAllocation a : types) {
- if (a instanceof BandwidthResourceAllocation) {
- BandwidthResourceAllocation bwA = (BandwidthResourceAllocation) a;
- freeBw -= bwA.bandwidth().toDouble();
- }
- }
- }
-
- free.put(type, Sets.newHashSet(
- new BandwidthResourceAllocation(new BandwidthResource(Bandwidth.bps(freeBw)))));
- break;
- }
-
- case LAMBDA:
- {
- Set<? extends ResourceAllocation> lmd = caps.get(type);
- if (lmd == null || lmd.isEmpty()) {
- // nothing left
- break;
- }
- Set<LambdaResourceAllocation> freeL = new HashSet<>();
- for (ResourceAllocation r : lmd) {
- if (r instanceof LambdaResourceAllocation) {
- freeL.add((LambdaResourceAllocation) r);
- }
- }
-
- // enumerate current allocations, removing resources
- for (LinkResourceAllocations alloc : allocations) {
- Set<ResourceAllocation> types = alloc.getResourceAllocation(link);
- for (ResourceAllocation a : types) {
- if (a instanceof LambdaResourceAllocation) {
- freeL.remove(a);
- }
- }
- }
-
- free.put(type, freeL);
- break;
- }
-
- case MPLS_LABEL:
- Set<? extends ResourceAllocation> mpls = caps.get(type);
- if (mpls == null || mpls.isEmpty()) {
- // nothing left
- break;
- }
- Set<MplsLabelResourceAllocation> freeLabel = new HashSet<>();
- for (ResourceAllocation r : mpls) {
- if (r instanceof MplsLabelResourceAllocation) {
- freeLabel.add((MplsLabelResourceAllocation) r);
- }
- }
-
- // enumerate current allocations, removing resources
- for (LinkResourceAllocations alloc : allocations) {
- Set<ResourceAllocation> types = alloc
- .getResourceAllocation(link);
- for (ResourceAllocation a : types) {
- if (a instanceof MplsLabelResourceAllocation) {
- freeLabel.remove(a);
- }
- }
- }
-
- free.put(type, freeLabel);
- break;
-
- default:
- break;
- }
- }
- return free;
- }
-
- @Override
- public void allocateResources(LinkResourceAllocations allocations) {
- checkNotNull(allocations);
-
- for (int i = 0; i < maxAllocateRetries; ++i) {
- TransactionContext tx = theInstance.newTransactionContext();
- tx.beginTransaction();
- try {
-
- STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx);
- // should this be conditional write?
- intentAllocs.put(allocations.intentId(), allocations);
-
- for (Link link : allocations.links()) {
- allocateLinkResource(tx, link, allocations);
- }
-
- tx.commitTransaction();
- return;
- } catch (TransactionException e) {
- log.debug("Failed to commit allocations for {}. [retry={}]",
- allocations.intentId(), i);
- log.trace(" details {} ", allocations, e);
- continue;
- } catch (Exception e) {
- log.error("Exception thrown, rolling back", e);
- tx.rollbackTransaction();
- throw e;
- }
- }
- }
-
- private void allocateLinkResource(TransactionContext tx, Link link,
- LinkResourceAllocations allocations) {
-
- // requested resources
- Set<ResourceAllocation> reqs = allocations.getResourceAllocation(link);
-
- Map<ResourceType, Set<? extends ResourceAllocation>> available = getFreeResourcesEx(tx, link);
- for (ResourceAllocation req : reqs) {
- Set<? extends ResourceAllocation> avail = available.get(req.type());
- if (req instanceof BandwidthResourceAllocation) {
- // check if allocation should be accepted
- if (avail.isEmpty()) {
- checkState(!avail.isEmpty(),
- "There's no Bandwidth resource on %s?",
- link);
- }
- BandwidthResourceAllocation bw = (BandwidthResourceAllocation) avail.iterator().next();
- double bwLeft = bw.bandwidth().toDouble();
- BandwidthResourceAllocation bwReq = ((BandwidthResourceAllocation) req);
- bwLeft -= bwReq.bandwidth().toDouble();
- if (bwLeft < 0) {
- throw new ResourceAllocationException(
- PositionalParameterStringFormatter.format(
- "Unable to allocate bandwidth for link {} "
- + " requested amount is {} current allocation is {}",
- link,
- bwReq.bandwidth().toDouble(),
- bw));
- }
- } else if (req instanceof LambdaResourceAllocation) {
- LambdaResourceAllocation lambdaAllocation = (LambdaResourceAllocation) req;
- // check if allocation should be accepted
- if (!avail.contains(req)) {
- // requested lambda was not available
- throw new ResourceAllocationException(
- PositionalParameterStringFormatter.format(
- "Unable to allocate lambda for link {} lambda is {}",
- link,
- lambdaAllocation.lambda().toInt()));
- }
- } else if (req instanceof MplsLabelResourceAllocation) {
- MplsLabelResourceAllocation mplsAllocation = (MplsLabelResourceAllocation) req;
- if (!avail.contains(req)) {
- throw new ResourceAllocationException(
- PositionalParameterStringFormatter
- .format("Unable to allocate MPLS label for link "
- + "{} MPLS label is {}",
- link,
- mplsAllocation
- .mplsLabel()
- .toString()));
- }
- }
- }
- // all requests allocatable => add allocation
- final LinkKey linkKey = LinkKey.linkKey(link);
- STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
- List<LinkResourceAllocations> before = linkAllocs.get(linkKey);
- if (before == null) {
- List<LinkResourceAllocations> after = new ArrayList<>();
- after.add(allocations);
- before = linkAllocs.putIfAbsent(linkKey, after);
- if (before != null) {
- // concurrent allocation detected, retry transaction
- throw new TransactionException("Concurrent Allocation, retry");
- }
- } else {
- List<LinkResourceAllocations> after = new ArrayList<>(before.size() + 1);
- after.addAll(before);
- after.add(allocations);
- linkAllocs.replace(linkKey, before, after);
- }
- }
-
- @Override
- public LinkResourceEvent releaseResources(LinkResourceAllocations allocations) {
- checkNotNull(allocations);
-
- final IntentId intendId = allocations.intentId();
- final Collection<Link> links = allocations.links();
-
- boolean success = false;
- do {
- // Note: might want to break it down into smaller tx unit
- // to lower the chance of collisions.
- TransactionContext tx = theInstance.newTransactionContext();
- tx.beginTransaction();
- try {
- STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx);
- intentAllocs.remove(intendId);
-
- STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
-
- for (Link link : links) {
- final LinkKey linkId = LinkKey.linkKey(link);
-
- List<LinkResourceAllocations> before = linkAllocs.get(linkId);
- if (before == null || before.isEmpty()) {
- // something is wrong, but it is already freed
- log.warn("There was no resource left to release on {}", linkId);
- continue;
- }
- List<LinkResourceAllocations> after = new ArrayList<>(before);
- after.remove(allocations);
- linkAllocs.replace(linkId, before, after);
- }
-
- tx.commitTransaction();
- success = true;
- } catch (TransactionException e) {
- log.debug("Transaction failed, retrying");
- } catch (Exception e) {
- log.error("Exception thrown during releaseResource {}",
- allocations, e);
- tx.rollbackTransaction();
- throw e;
- }
- } while (!success);
-
- // Issue events to force recompilation of intents.
- final List<LinkResourceAllocations> releasedResources =
- ImmutableList.of(allocations);
- return new LinkResourceEvent(
- LinkResourceEvent.Type.ADDITIONAL_RESOURCES_AVAILABLE,
- releasedResources);
- }
-
- @Override
- public LinkResourceAllocations getAllocations(IntentId intentId) {
- checkNotNull(intentId);
- TransactionOptions opt = new TransactionOptions();
- // read-only and will never be commited, thus does not need durability
- opt.setTransactionType(TransactionType.LOCAL);
- TransactionContext tx = theInstance.newTransactionContext(opt);
- tx.beginTransaction();
- try {
- STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx);
- return intentAllocs.get(intentId);
- } finally {
- tx.rollbackTransaction();
- }
- }
-
- @Override
- public List<LinkResourceAllocations> getAllocations(Link link) {
- checkNotNull(link);
- final LinkKey key = LinkKey.linkKey(link);
-
- TransactionOptions opt = new TransactionOptions();
- // read-only and will never be commited, thus does not need durability
- opt.setTransactionType(TransactionType.LOCAL);
- TransactionContext tx = theInstance.newTransactionContext(opt);
- tx.beginTransaction();
- List<LinkResourceAllocations> res = null;
- try {
- STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
- res = linkAllocs.get(key);
- } finally {
- tx.rollbackTransaction();
- }
-
- if (res == null) {
- // try to add empty list
- TransactionContext tx2 = theInstance.newTransactionContext();
- tx2.beginTransaction();
- try {
- res = getLinkAllocs(tx2).putIfAbsent(key, new ArrayList<>());
- tx2.commitTransaction();
- if (res == null) {
- return Collections.emptyList();
- } else {
- return res;
- }
- } catch (TransactionException e) {
- // concurrently added?
- return getAllocations(link);
- } catch (Exception e) {
- tx.rollbackTransaction();
- }
- }
- return res;
- }
-
- private Iterable<LinkResourceAllocations> getAllocations(TransactionContext tx,
- Link link) {
- checkNotNull(tx);
- checkNotNull(link);
- final LinkKey key = LinkKey.linkKey(link);
-
- STxMap<LinkKey, List<LinkResourceAllocations>> linkAllocs = getLinkAllocs(tx);
- List<LinkResourceAllocations> res = null;
- res = linkAllocs.get(key);
- if (res == null) {
- res = linkAllocs.putIfAbsent(key, new ArrayList<>());
- if (res == null) {
- return Collections.emptyList();
- } else {
- return res;
- }
- }
- return res;
- }
-
- @Override
- public Iterable<LinkResourceAllocations> getAllocations() {
- TransactionContext tx = theInstance.newTransactionContext();
- tx.beginTransaction();
- try {
- STxMap<IntentId, LinkResourceAllocations> intentAllocs = getIntentAllocs(tx);
- return intentAllocs.values();
- } finally {
- tx.rollbackTransaction();
- }
- }
-}
diff --git a/core/store/dist/src/main/resources/hazelcast.xml b/core/store/dist/src/main/resources/hazelcast.xml
deleted file mode 100644
index 2354bb5..0000000
--- a/core/store/dist/src/main/resources/hazelcast.xml
+++ /dev/null
@@ -1,228 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- ~ Copyright 2015 Open Networking Laboratory
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<!--
- The default Hazelcast configuration. This is used when:
-
- - no hazelcast.xml if present
-
--->
-<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
- xmlns="http://www.hazelcast.com/schema/config"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <group>
- <name>@NAME</name>
- <password>rocks</password>
- </group>
- <management-center enabled="false">http://localhost:8080/mancenter</management-center>
- <properties>
- <property name="hazelcast.max.no.heartbeat.seconds">30</property>
- <property name="hazelcast.merge.first.run.delay.seconds">30</property>
- <property name="hazelcast.merge.next.run.delay.seconds">30</property>
- </properties>
- <network>
- <port auto-increment="true" port-count="100">5701</port>
- <outbound-ports>
- <!--
- Allowed port range when connecting to other nodes.
- 0 or * means use system provided port.
- -->
- <ports>0</ports>
- </outbound-ports>
- <join>
- <multicast enabled="true">
- <multicast-group>224.2.2.3</multicast-group>
- <multicast-port>54327</multicast-port>
- </multicast>
- <tcp-ip enabled="false">
- <interface>127.0.0.1</interface>
- </tcp-ip>
- </join>
- <interfaces enabled="true">
- <interface>@PREFIX</interface>
- </interfaces>
- <ssl enabled="false"/>
- <socket-interceptor enabled="false"/>
- <symmetric-encryption enabled="false">
- <!--
- encryption algorithm such as
- DES/ECB/PKCS5Padding,
- PBEWithMD5AndDES,
- AES/CBC/PKCS5Padding,
- Blowfish,
- DESede
- -->
- <algorithm>PBEWithMD5AndDES</algorithm>
- <!-- salt value to use when generating the secret key -->
- <salt>thesalt</salt>
- <!-- pass phrase to use when generating the secret key -->
- <password>thepass</password>
- <!-- iteration count to use when generating the secret key -->
- <iteration-count>19</iteration-count>
- </symmetric-encryption>
- </network>
- <partition-group enabled="false"/>
- <executor-service name="default">
- <pool-size>16</pool-size>
- <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
- <queue-capacity>0</queue-capacity>
- </executor-service>
- <queue name="default">
- <!--
- Maximum size of the queue. When a JVM's local queue size reaches the maximum,
- all put/offer operations will get blocked until the queue size
- of the JVM goes down below the maximum.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size>0</max-size>
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
-
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
-
- <empty-queue-ttl>-1</empty-queue-ttl>
- </queue>
- <map name="default">
- <!--
- Data type that will be used for storing recordMap.
- Possible values:
- BINARY (default): keys and values will be stored as binary data
- OBJECT : values will be stored in their object forms
- OFFHEAP : values will be stored in non-heap region of JVM
- -->
- <in-memory-format>BINARY</in-memory-format>
-
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
- <!--
- Maximum number of seconds for each entry to stay in the map. Entries that are
- older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
- will get automatically evicted from the map.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
- -->
- <time-to-live-seconds>0</time-to-live-seconds>
- <!--
- Maximum number of seconds for each entry to stay idle in the map. Entries that are
- idle(not touched) for more than <max-idle-seconds> will get
- automatically evicted from the map. Entry is touched if get, put or containsKey is called.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
- -->
- <max-idle-seconds>0</max-idle-seconds>
- <!--
- Valid values are:
- NONE (no eviction),
- LRU (Least Recently Used),
- LFU (Least Frequently Used).
- NONE is the default.
- -->
- <eviction-policy>NONE</eviction-policy>
- <!--
- Maximum size of the map. When max size is reached,
- map is evicted based on the policy defined.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size policy="PER_NODE">0</max-size>
- <!--
- When max. size is reached, specified percentage of
- the map will be evicted. Any integer between 0 and 100.
- If 25 is set for example, 25% of the entries will
- get evicted.
- -->
- <eviction-percentage>25</eviction-percentage>
- <!--
- Minimum time in milliseconds which should pass before checking
- if a partition of this map is evictable or not.
- Default value is 100 millis.
- -->
- <min-eviction-check-millis>100</min-eviction-check-millis>
- <!--
- While recovering from split-brain (network partitioning),
- map entries in the small cluster will merge into the bigger cluster
- based on the policy set here. When an entry merge into the
- cluster, there might an existing entry with the same key already.
- Values of these entries might be different for that same key.
- Which value should be set for the key? Conflict is resolved by
- the policy set here. Default policy is PutIfAbsentMapMergePolicy
-
- There are built-in merge policies such as
- com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key.
- com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
- com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
- com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
- -->
- <merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
- </map>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <list name="default">
- <backup-count>1</backup-count>
- </list>
-
- <set name="default">
- <backup-count>1</backup-count>
- </set>
-
- <jobtracker name="default">
- <max-thread-size>0</max-thread-size>
- <!-- Queue size 0 means number of partitions * 2 -->
- <queue-size>0</queue-size>
- <retry-count>0</retry-count>
- <chunk-size>1000</chunk-size>
- <communicate-stats>true</communicate-stats>
- <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
- </jobtracker>
-
- <semaphore name="default">
- <initial-permits>0</initial-permits>
- <backup-count>1</backup-count>
- <async-backup-count>0</async-backup-count>
- </semaphore>
-
- <serialization>
- <portable-version>0</portable-version>
- </serialization>
-
- <services enable-defaults="true"/>
-
-</hazelcast>
diff --git a/core/store/dist/src/test/java/org/onosproject/store/hz/TestStoreManager.java b/core/store/dist/src/test/java/org/onosproject/store/hz/TestStoreManager.java
deleted file mode 100644
index 04dfb38..0000000
--- a/core/store/dist/src/test/java/org/onosproject/store/hz/TestStoreManager.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.hz;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.FileNotFoundException;
-import java.util.UUID;
-
-import com.hazelcast.config.Config;
-import com.hazelcast.config.FileSystemXmlConfig;
-import com.hazelcast.core.HazelcastInstance;
-import com.hazelcast.test.TestHazelcastInstanceFactory;
-
-/**
- * Dummy StoreManager to use specified Hazelcast instance.
- */
-public class TestStoreManager extends StoreManager {
-
- private TestHazelcastInstanceFactory factory;
-
- /**
- * Gets the Hazelcast Config for testing.
- *
- * @return Hazelcast Configuration for testing
- */
- public static Config getTestConfig() {
- Config config;
- try {
- config = new FileSystemXmlConfig(HAZELCAST_XML_FILE);
- } catch (FileNotFoundException e) {
- // falling back to default
- config = new Config();
- }
- // avoid accidentally joining other cluster
- config.getGroupConfig().setName(UUID.randomUUID().toString());
- // quickly form single node cluster
- config.getNetworkConfig().getJoin()
- .getTcpIpConfig()
- .setEnabled(true).setConnectionTimeoutSeconds(0);
- config.getNetworkConfig().getJoin()
- .getMulticastConfig()
- .setEnabled(false);
- return config;
- }
-
- /**
- * Creates an instance of dummy Hazelcast instance for testing.
- *
- * @return HazelcastInstance
- */
- public HazelcastInstance initSingleInstance() {
- return initInstances(1)[0];
- }
-
- /**
- * Creates some instances of dummy Hazelcast instances for testing.
- *
- * @param count number of instances to create
- * @return array of HazelcastInstances
- */
- public HazelcastInstance[] initInstances(int count) {
- checkArgument(count > 0, "Cluster size must be > 0");
- factory = new TestHazelcastInstanceFactory(count);
- return factory.newInstances(getTestConfig());
- }
-
- /**
- * Sets the Hazelast instance to return on #getHazelcastInstance().
- *
- * @param instance Hazelast instance to return on #getHazelcastInstance()
- */
- public void setHazelcastInstance(HazelcastInstance instance) {
- this.instance = instance;
- }
-
- @Override
- public void activate() {
- // Hazelcast setup removed from original code.
- checkState(this.instance != null, "HazelcastInstance needs to be set");
- }
-
- @Override
- public void deactivate() {
- // Hazelcast instance shutdown removed from original code.
- factory.shutdownAll();
- }
-}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
index 1745687..0b70401 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -15,49 +15,11 @@
*/
package org.onosproject.store.mastership.impl;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onlab.junit.TestTools;
-import org.onlab.packet.IpAddress;
-import org.onosproject.cluster.ClusterServiceAdapter;
-import org.onosproject.cluster.ControllerNode;
-import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipEvent;
-import org.onosproject.mastership.MastershipEvent.Type;
-import org.onosproject.mastership.MastershipStoreDelegate;
-import org.onosproject.mastership.MastershipTerm;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.store.hz.StoreManager;
-import org.onosproject.store.hz.StoreService;
-import org.onosproject.store.hz.TestStoreManager;
-import org.onosproject.store.serializers.KryoSerializer;
-
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.Futures;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.onosproject.net.MastershipRole.MASTER;
-import static org.onosproject.net.MastershipRole.NONE;
-import static org.onosproject.net.MastershipRole.STANDBY;
-
/**
* Test of the Hazelcast-based distributed MastershipStore implementation.
*/
public class DistributedMastershipStoreTest {
-
+/*
private static final DeviceId DID1 = DeviceId.deviceId("of:01");
private static final DeviceId DID2 = DeviceId.deviceId("of:02");
private static final DeviceId DID3 = DeviceId.deviceId("of:03");
@@ -320,5 +282,5 @@
}
}
-
+*/
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStoreTest.java
index 304177d..a5f2fac 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/resource/impl/HazelcastLinkResourceStoreTest.java
@@ -15,50 +15,11 @@
*/
package org.onosproject.store.resource.impl;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.onlab.util.Bandwidth;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Annotations;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DefaultLink;
-import org.onosproject.net.Link;
-import org.onosproject.net.intent.IntentId;
-import org.onosproject.net.provider.ProviderId;
-import org.onosproject.net.resource.BandwidthResource;
-import org.onosproject.net.resource.BandwidthResourceAllocation;
-import org.onosproject.net.resource.DefaultLinkResourceAllocations;
-import org.onosproject.net.resource.DefaultLinkResourceRequest;
-import org.onosproject.net.resource.LambdaResource;
-import org.onosproject.net.resource.LambdaResourceAllocation;
-import org.onosproject.net.resource.LinkResourceAllocations;
-import org.onosproject.net.resource.LinkResourceRequest;
-import org.onosproject.net.resource.LinkResourceStore;
-import org.onosproject.net.resource.ResourceAllocation;
-import org.onosproject.net.resource.ResourceAllocationException;
-import org.onosproject.net.resource.ResourceType;
-import org.onosproject.store.hz.StoreService;
-import org.onosproject.store.hz.TestStoreManager;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.onosproject.net.DeviceId.deviceId;
-import static org.onosproject.net.Link.Type.DIRECT;
-import static org.onosproject.net.PortNumber.portNumber;
-
/**
* Test of the simple LinkResourceStore implementation.
*/
public class HazelcastLinkResourceStoreTest {
-
+/*
private LinkResourceStore store;
private HazelcastLinkResourceStore storeImpl;
private Link link1;
@@ -74,7 +35,7 @@
* @param dev2 destination device
* @param port2 destination port
* @return created {@link Link} object
- */
+ * /
private Link newLink(String dev1, int port1, String dev2, int port2) {
Annotations annotations = DefaultAnnotations.builder()
.set(AnnotationKeys.OPTICAL_WAVES, "80")
@@ -112,9 +73,6 @@
storeMgr.deactivate();
}
- /**
- * Tests constructor and activate method.
- */
@Test
public void testConstructorAndActivate() {
final Iterable<LinkResourceAllocations> allAllocations = store.getAllocations();
@@ -130,13 +88,6 @@
assertNotNull(res);
}
- /**
- * Picks up and returns one of bandwidth allocations from a given set.
- *
- * @param resources the set of {@link ResourceAllocation}s
- * @return {@link BandwidthResourceAllocation} object if found, null
- * otherwise
- */
private BandwidthResourceAllocation getBandwidthObj(Set<ResourceAllocation> resources) {
for (ResourceAllocation res : resources) {
if (res.type() == ResourceType.BANDWIDTH) {
@@ -146,12 +97,6 @@
return null;
}
- /**
- * Returns all lambda allocations from a given set.
- *
- * @param resources the set of {@link ResourceAllocation}s
- * @return a set of {@link LambdaResourceAllocation} objects
- */
private Set<LambdaResourceAllocation> getLambdaObjs(Set<ResourceAllocation> resources) {
Set<LambdaResourceAllocation> lambdaResources = new HashSet<>();
for (ResourceAllocation res : resources) {
@@ -162,9 +107,6 @@
return lambdaResources;
}
- /**
- * Tests initial free bandwidth for a link.
- */
@Test
public void testInitialBandwidth() {
final Set<ResourceAllocation> freeRes = store.getFreeResources(link1);
@@ -176,9 +118,6 @@
assertEquals(new BandwidthResource(Bandwidth.mbps(1000.0)), alloc.bandwidth());
}
- /**
- * Tests initial free lambda for a link.
- */
@Test
public void testInitialLambdas() {
final Set<ResourceAllocation> freeRes = store.getFreeResources(link3);
@@ -198,9 +137,6 @@
}
- /**
- * Tests a successful bandwidth allocation.
- */
@Test
public void testSuccessfulBandwidthAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
@@ -219,9 +155,6 @@
store.allocateResources(allocations);
}
- /**
- * Tests a unsuccessful bandwidth allocation.
- */
@Test
public void testUnsuccessfulBandwidthAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
@@ -247,9 +180,6 @@
assertEquals(true, gotException);
}
- /**
- * Tests a successful bandwidth allocation.
- */
@Test
public void testSuccessfulLambdaAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
@@ -268,9 +198,6 @@
store.allocateResources(allocations);
}
- /**
- * Tests a unsuccessful bandwidth allocation.
- */
@Test
public void testUnsuccessfulLambdaAllocation() {
final Link link = newLink("of:1", 1, "of:2", 2);
@@ -296,4 +223,5 @@
}
assertEquals(true, gotException);
}
+ */
}
diff --git a/features/features.xml b/features/features.xml
index bb21cd1..a49fdd0 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -37,7 +37,6 @@
<bundle>mvn:joda-time/joda-time/2.5</bundle>
- <bundle>mvn:com.hazelcast/hazelcast/3.4</bundle>
<bundle>mvn:io.dropwizard.metrics/metrics-core/3.1.0</bundle>
<bundle>mvn:io.dropwizard.metrics/metrics-json/3.1.0</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
diff --git a/incubator/store/pom.xml b/incubator/store/pom.xml
index ff569f8..b7439ef 100644
--- a/incubator/store/pom.xml
+++ b/incubator/store/pom.xml
@@ -58,21 +58,15 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.easymock</groupId>
- <artifactId>easymock</artifactId>
- <scope>test</scope>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-api</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
</dependency>
</dependencies>
diff --git a/pom.xml b/pom.xml
index b015bdb..009a5ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -287,18 +287,6 @@
</dependency>
<dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- <version>3.4</version>
- </dependency>
- <dependency>
- <groupId>com.hazelcast</groupId>
- <artifactId>hazelcast</artifactId>
- <version>3.4</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.eclipsesource.minimal-json</groupId>
<artifactId>minimal-json</artifactId>
<version>0.9.1</version>
diff --git a/tools/dev/bin/onos-setup-karaf b/tools/dev/bin/onos-setup-karaf
index da63445..2b77e74 100755
--- a/tools/dev/bin/onos-setup-karaf
+++ b/tools/dev/bin/onos-setup-karaf
@@ -17,7 +17,7 @@
# Validates the specified IP regular expression against existing adapters.
# Excludes local-loopback.
function validateIp {
- ifconfig | awk '{ print $2}' | grep -E -o "([0-9]{1,3}[\.]){3}[0-9]{1,3}" | grep -v "127\.0\.0\.1" | grep $1
+ ifconfig | awk '{ print $2}' | grep -E -o "([0-9]{1,3}[\.]){3}[0-9]{1,3}" | grep $1
}
# Clean the previous Karaf directory if requested and if it exists.
@@ -26,6 +26,7 @@
[ -d $KARAF_ROOT ] && rm -fr $KARAF_ROOT $STAGE/apps $STAGE/config
fi
+ONOS_IP=${ONOS_IP:-127.0.0.1}
IP="${1:-$ONOS_IP}"
# If IP was not given, nor configured attempt to use ONOS_NIC env. variable
@@ -104,11 +105,6 @@
"partitions": { "p1": [ { "ip": "$IP", "id": "$IP", "tcpPort": 9876 }]}}
EOF
-echo "Setting up hazelcast.xml for subnet $SUBNET.*..."
-cp $ONOS_ROOT/tools/package/etc/hazelcast.xml $KARAF_ROOT/etc/hazelcast.xml
-perl -pi.old -e "s/192.168.56/$SUBNET/" $KARAF_ROOT/etc/hazelcast.xml
-perl -pi.old -e "s/ <name>onos</ <name>$IP</" $KARAF_ROOT/etc/hazelcast.xml
-
echo "Staging builtin apps..."
rm -fr $STAGE/apps
onos-stage-apps $STAGE/apps $KARAF_ROOT/system
diff --git a/tools/package/bin/onos-config b/tools/package/bin/onos-config
index 307a628..2265d00 100755
--- a/tools/package/bin/onos-config
+++ b/tools/package/bin/onos-config
@@ -1,14 +1,2 @@
#!/bin/bash
-# -----------------------------------------------------------------------------
-# Configures ONOS to multicast on the specified IP prefix/subnet.
-# -----------------------------------------------------------------------------
-
-[ $# -lt 2 ] && echo "usage: $(basename $0) name ipPrefix" && exit 1
-
-name=$1
-ipPrefix=$2
-
-hzXml=$(dirname $0)/../apache-karaf-*/etc/hazelcast.xml
-
-perl -pi.bak -e "s/^ <interface>[^<]*/ <interface>$ipPrefix/g" $hzXml
-perl -pi -e "s/ <name>[^<]*/ <name>$name/g" $hzXml
+echo "This command has been deprecated as this step is no longer required."
\ No newline at end of file
diff --git a/tools/package/etc/hazelcast.xml b/tools/package/etc/hazelcast.xml
deleted file mode 100644
index a137c4b..0000000
--- a/tools/package/etc/hazelcast.xml
+++ /dev/null
@@ -1,229 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<!--
- ~ Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
- ~ Copyright 2014 Open Networking Laboratory
- ~
- ~ Licensed under the Apache License, Version 2.0 (the "License");
- ~ you may not use this file except in compliance with the License.
- ~ You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<!--
- The default Hazelcast configuration. This is used when:
-
- - no hazelcast.xml if present
-
--->
-<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.3.xsd"
- xmlns="http://www.hazelcast.com/schema/config"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
- <group>
- <name>onos</name>
- <password>rocks</password>
- </group>
- <management-center enabled="false">http://localhost:8080/mancenter</management-center>
- <properties>
- <property name="hazelcast.max.no.heartbeat.seconds">30</property>
- <property name="hazelcast.merge.first.run.delay.seconds">30</property>
- <property name="hazelcast.merge.next.run.delay.seconds">30</property>
- </properties>
- <network>
- <port auto-increment="true" port-count="100">5701</port>
- <outbound-ports>
- <!--
- Allowed port range when connecting to other nodes.
- 0 or * means use system provided port.
- -->
- <ports>0</ports>
- </outbound-ports>
- <join>
- <multicast enabled="true">
- <multicast-group>224.2.2.3</multicast-group>
- <multicast-port>54327</multicast-port>
- </multicast>
- <tcp-ip enabled="false">
- <interface>127.0.0.1</interface>
- </tcp-ip>
- </join>
- <interfaces enabled="true">
- <interface>192.168.56.*</interface>
- </interfaces>
- <ssl enabled="false"/>
- <socket-interceptor enabled="false"/>
- <symmetric-encryption enabled="false">
- <!--
- encryption algorithm such as
- DES/ECB/PKCS5Padding,
- PBEWithMD5AndDES,
- AES/CBC/PKCS5Padding,
- Blowfish,
- DESede
- -->
- <algorithm>PBEWithMD5AndDES</algorithm>
- <!-- salt value to use when generating the secret key -->
- <salt>thesalt</salt>
- <!-- pass phrase to use when generating the secret key -->
- <password>thepass</password>
- <!-- iteration count to use when generating the secret key -->
- <iteration-count>19</iteration-count>
- </symmetric-encryption>
- </network>
- <partition-group enabled="false"/>
- <executor-service name="default">
- <pool-size>16</pool-size>
- <!--Queue capacity. 0 means Integer.MAX_VALUE.-->
- <queue-capacity>0</queue-capacity>
- </executor-service>
- <queue name="default">
- <!--
- Maximum size of the queue. When a JVM's local queue size reaches the maximum,
- all put/offer operations will get blocked until the queue size
- of the JVM goes down below the maximum.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size>0</max-size>
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
-
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
-
- <empty-queue-ttl>-1</empty-queue-ttl>
- </queue>
- <map name="default">
- <!--
- Data type that will be used for storing recordMap.
- Possible values:
- BINARY (default): keys and values will be stored as binary data
- OBJECT : values will be stored in their object forms
- OFFHEAP : values will be stored in non-heap region of JVM
- -->
- <in-memory-format>BINARY</in-memory-format>
-
- <!--
- Number of backups. If 1 is set as the backup-count for example,
- then all entries of the map will be copied to another JVM for
- fail-safety. 0 means no backup.
- -->
- <backup-count>1</backup-count>
- <!--
- Number of async backups. 0 means no backup.
- -->
- <async-backup-count>0</async-backup-count>
- <!--
- Maximum number of seconds for each entry to stay in the map. Entries that are
- older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
- will get automatically evicted from the map.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
- -->
- <time-to-live-seconds>0</time-to-live-seconds>
- <!--
- Maximum number of seconds for each entry to stay idle in the map. Entries that are
- idle(not touched) for more than <max-idle-seconds> will get
- automatically evicted from the map. Entry is touched if get, put or containsKey is called.
- Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
- -->
- <max-idle-seconds>0</max-idle-seconds>
- <!--
- Valid values are:
- NONE (no eviction),
- LRU (Least Recently Used),
- LFU (Least Frequently Used).
- NONE is the default.
- -->
- <eviction-policy>NONE</eviction-policy>
- <!--
- Maximum size of the map. When max size is reached,
- map is evicted based on the policy defined.
- Any integer between 0 and Integer.MAX_VALUE. 0 means
- Integer.MAX_VALUE. Default is 0.
- -->
- <max-size policy="PER_NODE">0</max-size>
- <!--
- When max. size is reached, specified percentage of
- the map will be evicted. Any integer between 0 and 100.
- If 25 is set for example, 25% of the entries will
- get evicted.
- -->
- <eviction-percentage>25</eviction-percentage>
- <!--
- Minimum time in milliseconds which should pass before checking
- if a partition of this map is evictable or not.
- Default value is 100 millis.
- -->
- <min-eviction-check-millis>100</min-eviction-check-millis>
- <!--
- While recovering from split-brain (network partitioning),
- map entries in the small cluster will merge into the bigger cluster
- based on the policy set here. When an entry merge into the
- cluster, there might an existing entry with the same key already.
- Values of these entries might be different for that same key.
- Which value should be set for the key? Conflict is resolved by
- the policy set here. Default policy is PutIfAbsentMapMergePolicy
-
- There are built-in merge policies such as
- com.hazelcast.map.merge.PassThroughMergePolicy; entry will be added if there is no existing entry for the key.
- com.hazelcast.map.merge.PutIfAbsentMapMergePolicy ; entry will be added if the merging entry doesn't exist in the cluster.
- com.hazelcast.map.merge.HigherHitsMapMergePolicy ; entry with the higher hits wins.
- com.hazelcast.map.merge.LatestUpdateMapMergePolicy ; entry with the latest update wins.
- -->
- <merge-policy>com.hazelcast.map.merge.PutIfAbsentMapMergePolicy</merge-policy>
- </map>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <multimap name="default">
- <backup-count>1</backup-count>
- <value-collection-type>SET</value-collection-type>
- </multimap>
-
- <list name="default">
- <backup-count>1</backup-count>
- </list>
-
- <set name="default">
- <backup-count>1</backup-count>
- </set>
-
- <jobtracker name="default">
- <max-thread-size>0</max-thread-size>
- <!-- Queue size 0 means number of partitions * 2 -->
- <queue-size>0</queue-size>
- <retry-count>0</retry-count>
- <chunk-size>1000</chunk-size>
- <communicate-stats>true</communicate-stats>
- <topology-changed-strategy>CANCEL_RUNNING_OPERATION</topology-changed-strategy>
- </jobtracker>
-
- <semaphore name="default">
- <initial-permits>0</initial-permits>
- <backup-count>1</backup-count>
- <async-backup-count>0</async-backup-count>
- </semaphore>
-
- <serialization>
- <portable-version>0</portable-version>
- </serialization>
-
- <services enable-defaults="true"/>
-
-</hazelcast>
diff --git a/tools/test/bin/onos-config b/tools/test/bin/onos-config
index b494852..1173171 100755
--- a/tools/test/bin/onos-config
+++ b/tools/test/bin/onos-config
@@ -26,11 +26,6 @@
scp -q $CDEF_FILE $remote:$ONOS_INSTALL_DIR/config/cluster.json
ssh $remote "
- sudo perl -pi.bak -e \"s/ <interface>.*</ <interface>${ONOS_NIC:-192.168.56.*}</g\" \
- $ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
- sudo perl -pi -e \"s/ <name>onos</ <name>${ONOS_CELL:-onos}</g\" \
- $ONOS_INSTALL_DIR/$KARAF_DIST/etc/hazelcast.xml
-
echo \"onos.ip = \$(sudo ifconfig | grep $ONOS_NIC | cut -d: -f2 | cut -d\\ -f1)\" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/system.properties
@@ -38,10 +33,6 @@
echo "log4j.logger.net.kuujo.copycat= INFO" \
>> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.ops4j.pax.logging.cfg
- # Suppress Hazelcast multicast joiner warning
- echo "log4j.logger.com.hazelcast.cluster.impl.MulticastService= ERROR" \
- >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.ops4j.pax.logging.cfg
-
# Patch the Apache Karaf distribution file to load ONOS boot features
perl -pi.old -e \"s|^(featuresBoot=.*,management)(,webconsole,.*)|\1,$ONOS_BOOT_FEATURES|\" \
$ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.apache.karaf.features.cfg