Merge "Hooking up GUI server & client via web-socket."
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
index 4abefa7..c54e3f4 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIp.java
@@ -30,6 +30,7 @@
import org.onlab.onos.net.host.HostService;
import org.onlab.onos.net.intent.IntentService;
import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
+import org.onlab.onos.sdnip.bgp.BgpSession;
import org.onlab.onos.sdnip.bgp.BgpSessionManager;
import org.onlab.onos.sdnip.config.SdnIpConfigReader;
import org.slf4j.Logger;
@@ -97,6 +98,11 @@
}
@Override
+ public Collection<BgpSession> getBgpSessions() {
+ return bgpSessionManager.getBgpSessions();
+ }
+
+ @Override
public Collection<BgpRouteEntry> getBgpRoutes() {
return bgpSessionManager.getBgpRoutes();
}
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpService.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpService.java
index 936b7e8..8503bee 100644
--- a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpService.java
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/SdnIpService.java
@@ -18,12 +18,20 @@
import java.util.Collection;
import org.onlab.onos.sdnip.bgp.BgpRouteEntry;
+import org.onlab.onos.sdnip.bgp.BgpSession;
/**
* Service interface exported by SDN-IP.
*/
public interface SdnIpService {
/**
+ * Gets the BGP sessions.
+ *
+ * @return the BGP sessions
+ */
+ public Collection<BgpSession> getBgpSessions();
+
+ /**
* Gets the BGP routes.
*
* @return the BGP routes
diff --git a/apps/sdnip/src/main/java/org/onlab/onos/sdnip/cli/BgpNeighborsListCommand.java b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/cli/BgpNeighborsListCommand.java
new file mode 100644
index 0000000..d8fe15a
--- /dev/null
+++ b/apps/sdnip/src/main/java/org/onlab/onos/sdnip/cli/BgpNeighborsListCommand.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.sdnip.cli;
+
+import java.util.Collection;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.karaf.shell.commands.Command;
+import org.apache.karaf.shell.commands.Option;
+import org.onlab.onos.cli.AbstractShellCommand;
+import org.onlab.onos.sdnip.SdnIpService;
+import org.onlab.onos.sdnip.bgp.BgpSession;
+
+/**
+ * Command to show the BGP neighbors.
+ */
+@Command(scope = "onos", name = "bgp-neighbors",
+ description = "Lists the BGP neighbors")
+public class BgpNeighborsListCommand extends AbstractShellCommand {
+ @Option(name = "-n", aliases = "--neighbor",
+ description = "BGP neighbor to display information about",
+ required = false, multiValued = false)
+ private String bgpNeighbor;
+
+ private static final String FORMAT_NEIGHBOR_LINE1 =
+ "BGP neighbor is %s, remote AS %d, local AS %d";
+ private static final String FORMAT_NEIGHBOR_LINE2 =
+ " Remote router ID %s, IP %s, BGP version %d, Hold time %d";
+ private static final String FORMAT_NEIGHBOR_LINE3 =
+ " Local router ID %s, IP %s, BGP version %d, Hold time %d";
+
+ @Override
+ protected void execute() {
+ SdnIpService service = get(SdnIpService.class);
+ Collection<BgpSession> bgpSessions = service.getBgpSessions();
+
+ if (bgpNeighbor != null) {
+ // Print a single neighbor (if found)
+ BgpSession foundBgpSession = null;
+ for (BgpSession bgpSession : bgpSessions) {
+ if (bgpSession.getRemoteBgpId().toString().equals(bgpNeighbor)) {
+ foundBgpSession = bgpSession;
+ break;
+ }
+ }
+ if (foundBgpSession != null) {
+ printNeighbor(foundBgpSession);
+ } else {
+ print("BGP neighbor %s not found", bgpNeighbor);
+ }
+ return;
+ }
+
+ // Print all neighbors
+ printNeighbors(bgpSessions);
+ }
+
+ /**
+ * Prints all BGP neighbors.
+ *
+ * @param bgpSessions the BGP sessions for the neighbors to print
+ */
+ private void printNeighbors(Collection<BgpSession> bgpSessions) {
+ if (outputJson()) {
+ print("%s", json(bgpSessions));
+ } else {
+ for (BgpSession bgpSession : bgpSessions) {
+ printNeighbor(bgpSession);
+ }
+ }
+ }
+
+ /**
+ * Prints a BGP neighbor.
+ *
+ * @param bgpSession the BGP session for the neighbor to print
+ */
+ private void printNeighbor(BgpSession bgpSession) {
+ print(FORMAT_NEIGHBOR_LINE1,
+ bgpSession.getRemoteBgpId().toString(),
+ bgpSession.getRemoteAs(),
+ bgpSession.getLocalAs());
+ print(FORMAT_NEIGHBOR_LINE2,
+ bgpSession.getRemoteBgpId().toString(),
+ bgpSession.getRemoteAddress().toString(),
+ bgpSession.getRemoteBgpVersion(),
+ bgpSession.getRemoteHoldtime());
+ print(FORMAT_NEIGHBOR_LINE3,
+ bgpSession.getLocalBgpId().toString(),
+ bgpSession.getLocalAddress().toString(),
+ bgpSession.getLocalBgpVersion(),
+ bgpSession.getLocalHoldtime());
+ }
+
+ /**
+ * Produces a JSON array of BGP neighbors.
+ *
+ * @param bgpSessions the BGP sessions with the data
+ * @return JSON array with the neighbors
+ */
+ private JsonNode json(Collection<BgpSession> bgpSessions) {
+ ObjectMapper mapper = new ObjectMapper();
+ ArrayNode result = mapper.createArrayNode();
+
+ for (BgpSession bgpSession : bgpSessions) {
+ result.add(json(mapper, bgpSession));
+ }
+ return result;
+ }
+
+ /**
+ * Produces JSON object for a BGP neighbor.
+ *
+ * @param mapper the JSON object mapper to use
+ * @param bgpSession the BGP session with the data
+ * @return JSON object for the route
+ */
+ private ObjectNode json(ObjectMapper mapper, BgpSession bgpSession) {
+ ObjectNode result = mapper.createObjectNode();
+
+ result.put("remoteAddress", bgpSession.getRemoteAddress().toString());
+ result.put("remoteBgpVersion", bgpSession.getRemoteBgpVersion());
+ result.put("remoteAs", bgpSession.getRemoteAs());
+ result.put("remoteHoldtime", bgpSession.getRemoteHoldtime());
+ result.put("remoteBgpId", bgpSession.getRemoteBgpId().toString());
+ //
+ result.put("localAddress", bgpSession.getLocalAddress().toString());
+ result.put("localBgpVersion", bgpSession.getLocalBgpVersion());
+ result.put("localAs", bgpSession.getLocalAs());
+ result.put("localHoldtime", bgpSession.getLocalHoldtime());
+ result.put("localBgpId", bgpSession.getLocalBgpId().toString());
+
+ return result;
+ }
+}
diff --git a/apps/sdnip/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/sdnip/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 773ead6..d86f8f4 100644
--- a/apps/sdnip/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/apps/sdnip/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -17,6 +17,9 @@
<command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
<command>
+ <action class="org.onlab.onos.sdnip.cli.BgpNeighborsListCommand"/>
+ </command>
+ <command>
<action class="org.onlab.onos.sdnip.cli.BgpRoutesListCommand"/>
</command>
<command>
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index 33830aa..ac671da 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -61,8 +61,8 @@
// TODO: make these configurable
private static final int MAX_EVENTS = 100;
- private static final int MAX_IDLE_MS = 50;
- private static final int MAX_BATCH_MS = 200;
+ private static final int MAX_IDLE_MS = 5;
+ private static final int MAX_BATCH_MS = 50;
private static final int MAX_THREADS = 8;
// FIXME: Replace with a system-wide timer instance;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1e47a00..6de94b4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -34,10 +34,10 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.serializers.ClusterMessageSerializer;
import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.MessageSubjectSerializer;
+import org.onlab.onos.store.serializers.impl.ClusterMessageSerializer;
+import org.onlab.onos.store.serializers.impl.MessageSubjectSerializer;
import org.onlab.util.KryoNamespace;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 19dc3e3..4665411 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -59,7 +59,7 @@
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.serializers.KryoSerializer;
-import org.onlab.onos.store.serializers.DistributedStoreSerializers;
+import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
import org.onlab.packet.ChassisId;
import org.onlab.util.KryoNamespace;
import org.onlab.util.NewConcurrentHashMap;
@@ -230,9 +230,10 @@
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<DeviceDescription> mergedDesc;
- synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+ synchronized (device) {
event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
- mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId).getDeviceDesc();
+ mergedDesc = device.get(providerId).getDeviceDesc();
}
if (event != null) {
log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
@@ -252,10 +253,10 @@
Timestamped<DeviceDescription> deltaDesc) {
// Collection of DeviceDescriptions for a Device
- Map<ProviderId, DeviceDescriptions> providerDescs
+ Map<ProviderId, DeviceDescriptions> device
= getOrCreateDeviceDescriptionsMap(deviceId);
- synchronized (providerDescs) {
+ synchronized (device) {
// locking per device
if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
@@ -263,7 +264,7 @@
return null;
}
- DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(providerDescs, providerId, deltaDesc);
+ DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
final Device oldDevice = devices.get(deviceId);
final Device newDevice;
@@ -272,7 +273,7 @@
deltaDesc.isNewer(descs.getDeviceDesc())) {
// on new device or valid update
descs.putDeviceDesc(deltaDesc);
- newDevice = composeDevice(deviceId, providerDescs);
+ newDevice = composeDevice(deviceId, device);
} else {
// outdated event, ignored.
return null;
@@ -444,9 +445,10 @@
final List<DeviceEvent> events;
final Timestamped<List<PortDescription>> merged;
- synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+ synchronized (device) {
events = updatePortsInternal(providerId, deviceId, timestampedInput);
- final DeviceDescriptions descs = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId);
+ final DeviceDescriptions descs = device.get(providerId);
List<PortDescription> mergedList =
FluentIterable.from(portDescriptions)
.transform(new Function<PortDescription, PortDescription>() {
@@ -632,9 +634,10 @@
= new Timestamped<>(portDescription, newTimestamp);
final DeviceEvent event;
final Timestamped<PortDescription> mergedDesc;
- synchronized (getOrCreateDeviceDescriptionsMap(deviceId)) {
+ final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
+ synchronized (device) {
event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
- mergedDesc = getOrCreateDeviceDescriptionsMap(deviceId).get(providerId)
+ mergedDesc = device.get(providerId)
.getPortDesc(portDescription.portNumber());
}
if (event != null) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 81f2bfd..f65da3f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -75,9 +75,9 @@
import org.onlab.onos.store.hz.AbstractHazelcastStore;
import org.onlab.onos.store.hz.SMap;
import org.onlab.onos.store.serializers.DecodeTo;
-import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.serializers.StoreSerializer;
+import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
index 30a73e0..5353a91 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/host/impl/GossipHostStore.java
@@ -67,8 +67,8 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.impl.Timestamped;
-import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
index 351581d..e82742a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/GossipLinkStore.java
@@ -55,8 +55,8 @@
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.impl.Timestamped;
-import org.onlab.onos.store.serializers.DistributedStoreSerializers;
import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.impl.DistributedStoreSerializers;
import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java
index ecccd2b..d05620d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/resource/impl/DistributedLinkResourceStore.java
@@ -73,7 +73,7 @@
* @param link the target link
* @return free resources
*/
- private Set<ResourceAllocation> readOriginalFreeResources(Link link) {
+ private synchronized Set<ResourceAllocation> readOriginalFreeResources(Link link) {
// TODO read capacity and lambda resources from topology
Set<ResourceAllocation> allocations = new HashSet<>();
for (int i = 1; i <= 100; i++) {
@@ -92,7 +92,7 @@
* {@link org.onlab.onos.net.resource.BandwidthResourceAllocation} object with 0 bandwidth
*
*/
- private BandwidthResourceAllocation getBandwidth(Set<ResourceAllocation> freeRes) {
+ private synchronized BandwidthResourceAllocation getBandwidth(Set<ResourceAllocation> freeRes) {
for (ResourceAllocation res : freeRes) {
if (res.type() == ResourceType.BANDWIDTH) {
return (BandwidthResourceAllocation) res;
@@ -107,7 +107,7 @@
* @param link the target link
* @param allocations the resources to be subtracted
*/
- private void subtractFreeResources(Link link, LinkResourceAllocations allocations) {
+ private synchronized void subtractFreeResources(Link link, LinkResourceAllocations allocations) {
// TODO Use lock or version for updating freeResources.
checkNotNull(link);
Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
@@ -141,7 +141,7 @@
* @param link the target link
* @param allocations the resources to be added
*/
- private void addFreeResources(Link link, LinkResourceAllocations allocations) {
+ private synchronized void addFreeResources(Link link, LinkResourceAllocations allocations) {
// TODO Use lock or version for updating freeResources.
Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
Set<ResourceAllocation> addRes = allocations.getResourceAllocation(link);
@@ -167,7 +167,7 @@
}
@Override
- public Set<ResourceAllocation> getFreeResources(Link link) {
+ public synchronized Set<ResourceAllocation> getFreeResources(Link link) {
checkNotNull(link);
Set<ResourceAllocation> freeRes = freeResources.get(link);
if (freeRes == null) {
@@ -178,7 +178,7 @@
}
@Override
- public void allocateResources(LinkResourceAllocations allocations) {
+ public synchronized void allocateResources(LinkResourceAllocations allocations) {
checkNotNull(allocations);
linkResourceAllocationsMap.put(allocations.intendId(), allocations);
for (Link link : allocations.links()) {
@@ -193,7 +193,7 @@
}
@Override
- public void releaseResources(LinkResourceAllocations allocations) {
+ public synchronized void releaseResources(LinkResourceAllocations allocations) {
checkNotNull(allocations);
linkResourceAllocationsMap.remove(allocations.intendId());
for (Link link : allocations.links()) {
@@ -209,13 +209,13 @@
}
@Override
- public LinkResourceAllocations getAllocations(IntentId intentId) {
+ public synchronized LinkResourceAllocations getAllocations(IntentId intentId) {
checkNotNull(intentId);
return linkResourceAllocationsMap.get(intentId);
}
@Override
- public Iterable<LinkResourceAllocations> getAllocations(Link link) {
+ public synchronized Iterable<LinkResourceAllocations> getAllocations(Link link) {
checkNotNull(link);
Set<LinkResourceAllocations> result = allocatedResources.get(link);
if (result == null) {
@@ -225,7 +225,7 @@
}
@Override
- public Iterable<LinkResourceAllocations> getAllocations() {
+ public synchronized Iterable<LinkResourceAllocations> getAllocations() {
return Collections.unmodifiableCollection(linkResourceAllocationsMap.values());
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/ClusterMessageSerializer.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/ClusterMessageSerializer.java
index 9cc7f8a..d3f31bb 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/ClusterMessageSerializer.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.onos.store.serializers;
+package org.onlab.onos.store.serializers.impl;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
similarity index 92%
rename from core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
index d2de5ad..2de47bd 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/DistributedStoreSerializers.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/DistributedStoreSerializers.java
@@ -13,11 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.onos.store.serializers;
+package org.onlab.onos.store.serializers.impl;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.impl.Timestamped;
import org.onlab.onos.store.impl.WallClockTimestamp;
+import org.onlab.onos.store.serializers.KryoNamespaces;
import org.onlab.util.KryoNamespace;
public final class DistributedStoreSerializers {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/MastershipBasedTimestampSerializer.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/MastershipBasedTimestampSerializer.java
index 4221d57..83bff3c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/MastershipBasedTimestampSerializer.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.onos.store.serializers;
+package org.onlab.onos.store.serializers.impl;
import org.onlab.onos.store.impl.MastershipBasedTimestamp;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/MessageSubjectSerializer.java
similarity index 96%
rename from core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/MessageSubjectSerializer.java
index e94d302..1173165 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/impl/MessageSubjectSerializer.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onlab.onos.store.serializers;
+package org.onlab.onos.store.serializers.impl;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
index 976fbcf..bb6bfcf 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolClient.java
@@ -85,7 +85,7 @@
return CompletableFuture.completedFuture(null);
}
- public <I> MessageSubject messageType(I input) {
+ private <I> MessageSubject messageType(I input) {
Class<?> clazz = input.getClass();
if (clazz.equals(PollRequest.class)) {
return ClusterMessagingProtocol.COPYCAT_POLL;
@@ -117,7 +117,7 @@
this.request = request;
this.message =
new ClusterMessage(
- null,
+ null, // FIXME fill in proper sender
messageType(request),
ClusterMessagingProtocol.SERIALIZER.encode(request));
this.future = future;
@@ -132,19 +132,20 @@
future.complete(ClusterMessagingProtocol.SERIALIZER.decode(response));
} catch (IOException | InterruptedException | ExecutionException | TimeoutException e) {
- if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
- message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
- log.warn("{} Request to {} failed. Will retry in {} ms",
- message.subject(), remoteNode, RETRY_INTERVAL_MILLIS);
- THREAD_POOL.schedule(
- this,
- RETRY_INTERVAL_MILLIS,
- TimeUnit.MILLISECONDS);
- } else {
+// if (message.subject().equals(ClusterMessagingProtocol.COPYCAT_SYNC) ||
+// message.subject().equals(ClusterMessagingProtocol.COPYCAT_PING)) {
+// log.warn("{} Request to {} failed. Will retry in {} ms",
+// message.subject(), remoteNode, RETRY_INTERVAL_MILLIS);
+// THREAD_POOL.schedule(
+// this,
+// RETRY_INTERVAL_MILLIS,
+// TimeUnit.MILLISECONDS);
+// } else {
log.warn("RPCTask for {} failed.", request, e);
future.completeExceptionally(e);
- }
+// }
} catch (Exception e) {
+ log.warn("RPCTask for {} terribly failed.", request, e);
future.completeExceptionally(e);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
index e2c06c7..a5f207e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocolServer.java
@@ -67,14 +67,36 @@
@Override
public void handle(ClusterMessage message) {
T request = ClusterMessagingProtocol.SERIALIZER.decode(message.payload());
+ if (handler == null) {
+ // there is a slight window of time during state transition,
+ // where handler becomes null
+ for (int i = 0; i < 10; ++i) {
+ if (handler != null) {
+ break;
+ }
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ log.trace("Exception", e);
+ }
+ }
+ if (handler == null) {
+ log.error("There was no handler for registered!");
+ return;
+ }
+ }
if (request.getClass().equals(PingRequest.class)) {
- handler.ping((PingRequest) request).whenComplete(new PostExecutionTask<PingResponse>(message));
+ handler.ping((PingRequest) request)
+ .whenComplete(new PostExecutionTask<PingResponse>(message));
} else if (request.getClass().equals(PollRequest.class)) {
- handler.poll((PollRequest) request).whenComplete(new PostExecutionTask<PollResponse>(message));
+ handler.poll((PollRequest) request)
+ .whenComplete(new PostExecutionTask<PollResponse>(message));
} else if (request.getClass().equals(SyncRequest.class)) {
- handler.sync((SyncRequest) request).whenComplete(new PostExecutionTask<SyncResponse>(message));
+ handler.sync((SyncRequest) request)
+ .whenComplete(new PostExecutionTask<SyncResponse>(message));
} else if (request.getClass().equals(SubmitRequest.class)) {
- handler.submit((SubmitRequest) request).whenComplete(new PostExecutionTask<SubmitResponse>(message));
+ handler.submit((SubmitRequest) request)
+ .whenComplete(new PostExecutionTask<SubmitResponse>(message));
} else {
throw new IllegalStateException("Unknown request type: " + request.getClass().getName());
}
@@ -94,6 +116,7 @@
log.error("Processing for " + message.subject() + " failed.", t);
} else {
try {
+ log.trace("responding to {}", message.subject());
message.respond(ClusterMessagingProtocol.SERIALIZER.encode(response));
} catch (Exception e) {
log.error("Failed to respond to " + response.getClass().getName(), e);
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
index 0060d44..e1da794 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/impl/MastershipBasedTimestampTest.java
@@ -21,7 +21,7 @@
import org.junit.Test;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.onos.store.serializers.impl.MastershipBasedTimestampSerializer;
import org.onlab.util.KryoNamespace;
import com.google.common.testing.EqualsTester;
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkResourceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkResourceStore.java
index 5369375..78cd341 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkResourceStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkResourceStore.java
@@ -73,7 +73,7 @@
* @param link the target link
* @return free resources
*/
- private Set<ResourceAllocation> readOriginalFreeResources(Link link) {
+ private synchronized Set<ResourceAllocation> readOriginalFreeResources(Link link) {
// TODO read capacity and lambda resources from topology
Set<ResourceAllocation> allocations = new HashSet<>();
for (int i = 1; i <= 100; i++) {
@@ -92,7 +92,7 @@
* {@link BandwidthResourceAllocation} object with 0 bandwidth
*
*/
- private BandwidthResourceAllocation getBandwidth(Set<ResourceAllocation> freeRes) {
+ private synchronized BandwidthResourceAllocation getBandwidth(Set<ResourceAllocation> freeRes) {
for (ResourceAllocation res : freeRes) {
if (res.type() == ResourceType.BANDWIDTH) {
return (BandwidthResourceAllocation) res;
@@ -107,7 +107,7 @@
* @param link the target link
* @param allocations the resources to be subtracted
*/
- private void subtractFreeResources(Link link, LinkResourceAllocations allocations) {
+ private synchronized void subtractFreeResources(Link link, LinkResourceAllocations allocations) {
// TODO Use lock or version for updating freeResources.
checkNotNull(link);
Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
@@ -141,7 +141,7 @@
* @param link the target link
* @param allocations the resources to be added
*/
- private void addFreeResources(Link link, LinkResourceAllocations allocations) {
+ private synchronized void addFreeResources(Link link, LinkResourceAllocations allocations) {
// TODO Use lock or version for updating freeResources.
Set<ResourceAllocation> freeRes = new HashSet<>(getFreeResources(link));
Set<ResourceAllocation> addRes = allocations.getResourceAllocation(link);
@@ -167,7 +167,7 @@
}
@Override
- public Set<ResourceAllocation> getFreeResources(Link link) {
+ public synchronized Set<ResourceAllocation> getFreeResources(Link link) {
checkNotNull(link);
Set<ResourceAllocation> freeRes = freeResources.get(link);
if (freeRes == null) {
@@ -178,7 +178,7 @@
}
@Override
- public void allocateResources(LinkResourceAllocations allocations) {
+ public synchronized void allocateResources(LinkResourceAllocations allocations) {
checkNotNull(allocations);
linkResourceAllocationsMap.put(allocations.intendId(), allocations);
for (Link link : allocations.links()) {
@@ -193,7 +193,7 @@
}
@Override
- public void releaseResources(LinkResourceAllocations allocations) {
+ public synchronized void releaseResources(LinkResourceAllocations allocations) {
checkNotNull(allocations);
linkResourceAllocationsMap.remove(allocations.intendId());
for (Link link : allocations.links()) {
@@ -209,13 +209,13 @@
}
@Override
- public LinkResourceAllocations getAllocations(IntentId intentId) {
+ public synchronized LinkResourceAllocations getAllocations(IntentId intentId) {
checkNotNull(intentId);
return linkResourceAllocationsMap.get(intentId);
}
@Override
- public Iterable<LinkResourceAllocations> getAllocations(Link link) {
+ public synchronized Iterable<LinkResourceAllocations> getAllocations(Link link) {
checkNotNull(link);
Set<LinkResourceAllocations> result = allocatedResources.get(link);
if (result == null) {
@@ -225,7 +225,7 @@
}
@Override
- public Iterable<LinkResourceAllocations> getAllocations() {
+ public synchronized Iterable<LinkResourceAllocations> getAllocations() {
return Collections.unmodifiableCollection(linkResourceAllocationsMap.values());
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 54b02de..5b7266e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -315,7 +315,11 @@
return;
}
MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
- handler.handle(message);
+ if (handler != null) {
+ handler.handle(message);
+ } else {
+ log.debug("No handler registered for {}", type);
+ }
}
@Override