Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java
new file mode 100644
index 0000000..486cea5
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract bootstrapper for loading and registering telemetry configurations
+ * that are independent from the default telemetry configurations.
+ */
+public abstract class AbstractTelemetryConfigLoader {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private TelemetryConfigProvider provider;
+ private final String path;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigAdminService configAdminService;
+
+ /**
+ * Creates a new loader for resource with the specified path.
+ *
+ * @param path configurations definition XML resource path
+ */
+ protected AbstractTelemetryConfigLoader(String path) {
+ this.path = path;
+ }
+
+ @Activate
+ protected void activate() {
+ try {
+ provider = new XmlTelemetryConfigLoader().loadTelemetryConfigs(
+ getClass().getResourceAsStream(path));
+ configAdminService.registerProvider(provider);
+ } catch (Exception e) {
+ log.error("Unable to load {} telemetry configuration definitions", path, e);
+ }
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ configAdminService.unregisterProvider(provider);
+ log.info("Stopped");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
index 2b2d7b6..9ee84f8 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
@@ -24,6 +24,9 @@
import static com.google.common.base.Preconditions.checkArgument;
+/**
+ * Implementation of StatsFlowRule.
+ */
public final class DefaultStatsFlowRule implements StatsFlowRule {
private final IpPrefix srcIpPrefix;
private final IpPrefix dstIpPrefix;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java
new file mode 100644
index 0000000..368b36b
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.ImmutableMap.copyOf;
+
+/**
+ * Implementation of TelemetryConfig.
+ */
+public final class DefaultTelemetryConfig implements TelemetryConfig {
+ private final String name;
+ private final ConfigType type;
+ private final List<TelemetryConfig> parents;
+
+ private final String manufacturer;
+ private final String swVersion;
+ private final boolean enabled;
+
+ private final Map<String, String> properties;
+
+ /**
+ * Creates a configuration with the specified name.
+ *
+ * @param name configuration name
+ * @param type configuration type
+ * @param parents optional parent configurations
+ * @param manufacturer off-platform application manufacturer
+ * @param swVersion off-platform application software version
+ * @param enabled service enable flag
+ * @param properties properties for telemetry configuration
+ */
+ public DefaultTelemetryConfig(String name, ConfigType type,
+ List<TelemetryConfig> parents,
+ String manufacturer, String swVersion,
+ boolean enabled, Map<String, String> properties) {
+ this.name = checkNotNull(name, "Name cannot be null");
+ this.type = checkNotNull(type, "type cannot be null");
+ this.parents = parents == null ? ImmutableList.of() : ImmutableList.copyOf(parents);
+ this.manufacturer = checkNotNull(manufacturer, "Manufacturer cannot be null");
+ this.swVersion = checkNotNull(swVersion, "SW version cannot be null");
+ this.properties = copyOf(checkNotNull(properties, "Properties cannot be null"));
+ this.enabled = enabled;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public ConfigType type() {
+ return type;
+ }
+
+ @Override
+ public List<TelemetryConfig> parents() {
+ if (parents == null) {
+ return ImmutableList.of();
+ } else {
+ return ImmutableList.copyOf(parents);
+ }
+ }
+
+ @Override
+ public String manufacturer() {
+ return manufacturer;
+ }
+
+ @Override
+ public String swVersion() {
+ return swVersion;
+ }
+
+ @Override
+ public boolean enabled() {
+ return enabled;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ if (properties == null) {
+ return ImmutableMap.of();
+ } else {
+ return ImmutableMap.copyOf(properties);
+ }
+ }
+
+ @Override
+ public String getProperty(String name) {
+ Queue<TelemetryConfig> queue = new LinkedList<>();
+ queue.add(this);
+ while (!queue.isEmpty()) {
+ TelemetryConfig config = queue.remove();
+ String property = config.properties().get(name);
+ if (property != null) {
+ return property;
+ } else if (config.parents() != null) {
+ queue.addAll(config.parents());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public TelemetryConfig merge(TelemetryConfig other) {
+ // merge the properties
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ properties.putAll(other.properties());
+
+ // remove duplicated properties from this configuration and merge
+ this.properties().entrySet().stream()
+ .filter(e -> !other.properties().containsKey(e.getKey()))
+ .forEach(properties::put);
+
+ List<TelemetryConfig> completeParents = new ArrayList<>();
+
+ if (parents != null) {
+ parents.forEach(parent -> other.parents().forEach(otherParent -> {
+ if (otherParent.name().equals(parent.name())) {
+ completeParents.add(parent.merge(otherParent));
+ } else if (!completeParents.contains(otherParent)) {
+ completeParents.add(otherParent);
+ } else if (!completeParents.contains(parent)) {
+ completeParents.add(parent);
+ }
+ }));
+ }
+
+ return new DefaultTelemetryConfig(name, type,
+ !completeParents.isEmpty() ? completeParents : other.parents(),
+ manufacturer, swVersion, enabled, properties.build());
+ }
+
+ @Override
+ public Set<String> keys() {
+ return properties.keySet();
+ }
+
+ @Override
+ public String value(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("name", name)
+ .add("type", type)
+ .add("parents", parents)
+ .add("manufacturer", manufacturer)
+ .add("swVersion", swVersion)
+ .add("enabled", enabled)
+ .add("properties", properties)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object configToBeCompared) {
+ if (this == configToBeCompared) {
+ return true;
+ }
+
+ if (configToBeCompared == null || getClass() != configToBeCompared.getClass()) {
+ return false;
+ }
+
+ DefaultTelemetryConfig telemetryConfig =
+ (DefaultTelemetryConfig) configToBeCompared;
+ return name.equals(telemetryConfig.name());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(name);
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java
new file mode 100644
index 0000000..5fb1e70
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Default telemetry configuration provider implementation.
+ */
+public class DefaultTelemetryConfigProvider implements TelemetryConfigProvider {
+
+ protected final Map<String, TelemetryConfig> configs = Maps.newConcurrentMap();
+
+ @Override
+ public Set<TelemetryConfig> getTelemetryConfigs() {
+ return ImmutableSet.copyOf(configs.values());
+ }
+
+ /**
+ * Adds the specified configuration to the provider. If a configuration with
+ * the name does not exist yet, the specified one will be added. Otherwise,
+ * the existing configuration will be merged with the new one and the result will
+ * be registered.
+ *
+ * @param config telemetry configuration to be provided
+ * @return registered configuration
+ */
+ public TelemetryConfig addConfig(TelemetryConfig config) {
+ return configs.compute(config.name(), (name, oldConfig) ->
+ oldConfig == null ? config : oldConfig.merge(config));
+ }
+
+ /**
+ * Removes the specified configuration from the provider.
+ *
+ * @param config telemetry configuration
+ */
+ public void removeConfig(TelemetryConfig config) {
+ configs.remove(config.name());
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("configs", configs).toString();
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
new file mode 100644
index 0000000..b1d6133
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStore;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStoreDelegate;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_ADDED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_DELETED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_UPDATED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of telemetry configurations using a {@code ConsistentMap}.
+ */
+@Component(immediate = true, service = TelemetryConfigStore.class)
+public class DistributedTelemetryConfigStore
+ extends AbstractStore<TelemetryConfigEvent, TelemetryConfigStoreDelegate>
+ implements TelemetryConfigStore {
+
+ protected final Logger log = getLogger(getClass());
+
+ private static final String ERR_NOT_FOUND = " does not exist";
+ private static final String ERR_DUPLICATE = " already exists";
+
+ private static final KryoNamespace SERIALIZER_TELEMETRY_CONFIG =
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(TelemetryConfigProvider.class)
+ .register(DefaultTelemetryConfigProvider.class)
+ .register(TelemetryConfig.class)
+ .register(TelemetryConfig.ConfigType.class)
+ .register(DefaultTelemetryConfig.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final MapEventListener<String, TelemetryConfig>
+ telemetryConfigMapListener = new TelemetryConfigMapListener();
+
+ private ConsistentMap<String, TelemetryConfig> telemetryConfigStore;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+
+ telemetryConfigStore = storageService.<String, TelemetryConfig>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_TELEMETRY_CONFIG))
+ .withName("telemetry-config-store")
+ .withApplicationId(appId)
+ .build();
+ telemetryConfigStore.addListener(telemetryConfigMapListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ telemetryConfigStore.removeListener(telemetryConfigMapListener);
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void createTelemetryConfig(TelemetryConfig config) {
+ telemetryConfigStore.compute(config.name(), (name, existing) -> {
+ final String error = config.name() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return config;
+ });
+ }
+
+ @Override
+ public void updateTelemetryConfig(TelemetryConfig config) {
+ telemetryConfigStore.compute(config.name(), (name, existing) -> {
+ final String error = config.name() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return config;
+ });
+ }
+
+ @Override
+ public TelemetryConfig removeTelemetryConfig(String name) {
+ Versioned<TelemetryConfig> config = telemetryConfigStore.remove(name);
+ return config == null ? null : config.value();
+ }
+
+ @Override
+ public TelemetryConfig telemetryConfig(String name) {
+ return telemetryConfigStore.asJavaMap().get(name);
+ }
+
+ @Override
+ public Set<TelemetryConfig> telemetryConfigs() {
+ return ImmutableSet.copyOf(telemetryConfigStore.asJavaMap().values());
+ }
+
+ @Override
+ public Set<TelemetryConfig> telemetryConfigsByType(ConfigType type) {
+ return ImmutableSet.copyOf(telemetryConfigStore.asJavaMap().values()
+ .stream().filter(c -> c.type() == type).collect(Collectors.toSet()));
+ }
+
+ @Override
+ public void clear() {
+ telemetryConfigStore.clear();
+ }
+
+ private class TelemetryConfigMapListener
+ implements MapEventListener<String, TelemetryConfig> {
+
+ @Override
+ public void event(MapEvent<String, TelemetryConfig> event) {
+ switch (event.type()) {
+ case INSERT:
+ eventExecutor.execute(() -> processTelemetryConfigMapInsertion(event));
+ break;
+ case UPDATE:
+ eventExecutor.execute(() -> processTelemetryConfigMapUpdate(event));
+ break;
+ case REMOVE:
+ eventExecutor.execute(() -> processTelemetryConfigMapRemoval(event));
+ break;
+ default:
+ log.error("Unsupported telemetry config event type");
+ break;
+ }
+ }
+
+ private void processTelemetryConfigMapInsertion(MapEvent<String,
+ TelemetryConfig> event) {
+ log.debug("Telemetry config created");
+ notifyDelegate(new TelemetryConfigEvent(
+ CONFIG_ADDED, event.newValue().value()));
+ }
+
+ private void processTelemetryConfigMapUpdate(MapEvent<String,
+ TelemetryConfig> event) {
+ log.debug("Telemetry config updated");
+ notifyDelegate(new TelemetryConfigEvent(
+ CONFIG_UPDATED, event.newValue().value()));
+ }
+
+ private void processTelemetryConfigMapRemoval(MapEvent<String,
+ TelemetryConfig> event) {
+ log.debug("Telemetry config removed");
+ notifyDelegate(new TelemetryConfigEvent(
+ CONFIG_DELETED, event.oldValue().value()));
+ }
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
new file mode 100644
index 0000000..f7e4693
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for gRPC telemetry configurations.
+ */
+@Component(immediate = true)
+public class GrpcConfigsLoader extends AbstractTelemetryConfigLoader {
+ public GrpcConfigsLoader() {
+ super("grpc-configs.xml");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java
deleted file mode 100644
index 70b1d70..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.GrpcTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_USE_PLAINTEXT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_MAX_INBOUND_MSG_SIZE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_USE_PLAINTEXT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * gRPC server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = GrpcTelemetryConfigService.class,
- property = {
- PROP_GRPC_ENABLE_SERVICE + ":Boolean=" + GRPC_ENABLE_SERVICE_DEFAULT,
- PROP_GRPC_SERVER_ADDRESS + "=" + GRPC_SERVER_ADDRESS_DEFAULT,
- PROP_GRPC_SERVER_PORT + ":Integer=" + GRPC_SERVER_PORT_DEFAULT,
- PROP_GRPC_USE_PLAINTEXT + ":Boolean=" + GRPC_USE_PLAINTEXT_DEFAULT,
- PROP_GRPC_MAX_INBOUND_MSG_SIZE + ":Integer=" + GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT
- }
-)
-public class GrpcTelemetryConfigManager implements GrpcTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected GrpcTelemetryAdminService grpcTelemetryAdminService;
-
- /** Default IP address to establish initial connection to gRPC server. */
- protected String address = GRPC_SERVER_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to gRPC server. */
- protected Integer port = GRPC_SERVER_PORT_DEFAULT;
-
- /** UsePlaintext flag value used for connecting to gRPC server. */
- protected Boolean usePlaintext = GRPC_USE_PLAINTEXT_DEFAULT;
-
- /** Maximum inbound message size used for communicating with gRPC server. */
- protected Integer maxInboundMsgSize = GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = GRPC_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- grpcTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- grpcTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(grpcTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultGrpcTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withUsePlaintext(usePlaintext)
- .withMaxInboundMsgSize(maxInboundMsgSize)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = get(properties, PROP_GRPC_SERVER_ADDRESS);
- address = addressStr != null ? addressStr : GRPC_SERVER_ADDRESS_DEFAULT;
- log.info("Configured. gRPC server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_GRPC_SERVER_PORT);
- if (portConfigured == null) {
- port = GRPC_SERVER_PORT_DEFAULT;
- log.info("gRPC server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. gRPC server port is {}", port);
- }
-
- Boolean usePlaintextConfigured =
- getBooleanProperty(properties, PROP_GRPC_USE_PLAINTEXT);
- if (usePlaintextConfigured == null) {
- usePlaintext = GRPC_USE_PLAINTEXT_DEFAULT;
- log.info("gRPC server use plaintext flag is NOT " +
- "configured, default value is {}", usePlaintext);
- } else {
- usePlaintext = usePlaintextConfigured;
- log.info("Configured. gRPC server use plaintext flag is {}", usePlaintext);
- }
-
- Integer maxInboundMsgSizeConfigured =
- getIntegerProperty(properties, PROP_GRPC_MAX_INBOUND_MSG_SIZE);
- if (maxInboundMsgSizeConfigured == null) {
- maxInboundMsgSize = GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
- log.info("gRPC server max inbound message size is NOT " +
- "configured, default value is {}", maxInboundMsgSize);
- } else {
- maxInboundMsgSize = maxInboundMsgSizeConfigured;
- log.info("Configured. gRPC server max inbound message size is {}", maxInboundMsgSize);
- }
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_GRPC_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = GRPC_ENABLE_SERVICE_DEFAULT;
- log.info("gRPC service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. gRPC service enable flag is {}", enableService);
- }
- }
-
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
index 391b1d8..f487504 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
@@ -15,12 +15,13 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -29,6 +30,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.fromTelemetryConfig;
+
/**
* gRPC telemetry manager.
*/
@@ -40,7 +47,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private ManagedChannel channel = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Set<ManagedChannel> channels = Sets.newConcurrentHashSet();
@Activate
protected void activate() {
@@ -60,43 +70,41 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (channel != null) {
- log.info("gRPC producer has already been started");
- return;
- }
+ public void start() {
+ telemetryConfigService.getConfigsByType(GRPC).forEach(c -> {
+ GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(c);
- GrpcTelemetryConfig grpcConfig = (GrpcTelemetryConfig) config;
- channel = ManagedChannelBuilder
- .forAddress(grpcConfig.address(), grpcConfig.port())
- .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
- .usePlaintext(grpcConfig.usePlaintext())
- .build();
+ if (grpcConfig != null && !c.name().equals(GRPC_SCHEME) && c.enabled()) {
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(grpcConfig.address(), grpcConfig.port())
+ .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
+ .usePlaintext(grpcConfig.usePlaintext())
+ .build();
+
+ channels.add(channel);
+ }
+ });
log.info("gRPC producer has Started");
}
@Override
public void stop() {
- if (channel != null) {
- channel.shutdown();
- channel = null;
- }
-
+ channels.forEach(ManagedChannel::shutdown);
log.info("gRPC producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
public Object publish(Object record) {
// TODO: need to find a way to invoke gRPC endpoint using channel
- if (channel == null) {
+ if (channels.isEmpty()) {
log.debug("gRPC telemetry service has not been enabled!");
}
@@ -105,6 +113,6 @@
@Override
public boolean isRunning() {
- return channel != null;
+ return !channels.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
new file mode 100644
index 0000000..215e1f6
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for InfluxDB telemetry configurations.
+ */
+@Component(immediate = true)
+public class InfluxDbConfigsLoader extends AbstractTelemetryConfigLoader {
+ public InfluxDbConfigsLoader() {
+ super("influxdb-configs.xml");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
deleted file mode 100644
index d9107ca..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_DATABASE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_DATABASE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_BATCH;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_BATCH_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_MEASUREMENT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_PASSWORD;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_PASSWORD_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_USERNAME;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_USERNAME_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * InfluxDB server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = InfluxDbTelemetryConfigService.class,
- property = {
- PROP_INFLUXDB_ENABLE_SERVICE + ":Boolean=" + PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT,
- PROP_INFLUXDB_SERVER_ADDRESS + "=" + PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT,
- PROP_INFLUXDB_SERVER_PORT + ":Integer=" + PROP_INFLUXDB_SERVER_PORT_DEFAULT,
- PROP_INFLUXDB_USERNAME + "=" + PROP_INFLUXDB_USERNAME_DEFAULT,
- PROP_INFLUXDB_PASSWORD + "=" + PROP_INFLUXDB_PASSWORD_DEFAULT,
- PROP_INFLUXDB_DATABASE + "=" + PROP_INFLUXDB_DATABASE_DEFAULT,
- PROP_INFLUXDB_MEASUREMENT + "=" + PROP_INFLUXDB_MEASUREMENT_DEFAULT,
- PROP_INFLUXDB_ENABLE_BATCH + ":Boolean=" + PROP_INFLUXDB_ENABLE_BATCH_DEFAULT
- }
-)
-public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected InfluxDbTelemetryAdminService influxDbTelemetryAdminService;
-
- /** Default IP address to establish initial connection to InfluxDB server. */
- protected String address = PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to InfluxDB server. */
- protected Integer port = PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-
- /** Username used for authenticating against InfluxDB server. */
- protected String username = PROP_INFLUXDB_USERNAME_DEFAULT;
-
- /** Password used for authenticating against InfluxDB server. */
- protected String password = PROP_INFLUXDB_PASSWORD_DEFAULT;
-
- /** Database of InfluxDB server. */
- protected String database = PROP_INFLUXDB_DATABASE_DEFAULT;
-
- /** Measurement of InfluxDB server. */
- protected String measurement = PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-
- /** Flag value of enabling batch mode of InfluxDB server. */
- protected Boolean enableBatch = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- influxDbTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- influxDbTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(influxDbTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultInfluxDbTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withUsername(username)
- .withPassword(password)
- .withDatabase(database)
- .withMeasurement(measurement)
- .withEnableBatch(enableBatch)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_INFLUXDB_SERVER_ADDRESS);
- address = addressStr != null ? addressStr : PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
- log.info("Configured. InfluxDB server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_INFLUXDB_SERVER_PORT);
- if (portConfigured == null) {
- port = PROP_INFLUXDB_SERVER_PORT_DEFAULT;
- log.info("InfluxDB server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. InfluxDB server port is {}", port);
- }
-
- String usernameStr = Tools.get(properties, PROP_INFLUXDB_USERNAME);
- username = usernameStr != null ? usernameStr : PROP_INFLUXDB_USERNAME_DEFAULT;
- log.info("Configured. InfluxDB server username is {}", username);
-
- String passwordStr = Tools.get(properties, PROP_INFLUXDB_PASSWORD);
- password = passwordStr != null ? passwordStr : PROP_INFLUXDB_PASSWORD_DEFAULT;
- log.info("Configured. InfluxDB server password is {}", password);
-
- String databaseStr = Tools.get(properties, PROP_INFLUXDB_DATABASE);
- database = databaseStr != null ? databaseStr : PROP_INFLUXDB_DATABASE_DEFAULT;
- log.info("Configured. InfluxDB server database is {}", database);
-
- String measurementStr = Tools.get(properties, PROP_INFLUXDB_MEASUREMENT);
- measurement = measurementStr != null ? measurementStr : PROP_INFLUXDB_MEASUREMENT_DEFAULT;
- log.info("Configured. InfluxDB server measurement is {}", measurement);
-
- Boolean enableBatchConfigured = getBooleanProperty(properties, PROP_INFLUXDB_ENABLE_BATCH);
- if (enableBatchConfigured == null) {
- enableBatch = PROP_INFLUXDB_ENABLE_BATCH_DEFAULT;
- log.info("InfluxDB server enable batch flag is " +
- "NOT configured, default value is {}", enableBatch);
- } else {
- enableBatch = enableBatchConfigured;
- log.info("Configured. InfluxDB server enable batch is {}", enableBatch);
- }
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_INFLUXDB_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
- log.info("InfluxDB service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. InfluxDB service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 40528d4..796da19 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -24,6 +24,7 @@
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -34,8 +35,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
+
/**
* InfluxDB telemetry manager.
*/
@@ -70,10 +76,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
private static final String INFLUX_PROTOCOL = "http";
- private InfluxDB producer = null;
- private String database = null;
- private String measurement = null;
+ private Map<String, InfluxDB> producers = null;
@Activate
protected void activate() {
@@ -93,51 +100,49 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (producer != null) {
- log.info("InfluxDB producer has already been started");
- return;
- }
+ public void start() {
- InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
- StringBuilder influxDbServerBuilder = new StringBuilder();
- influxDbServerBuilder.append(INFLUX_PROTOCOL);
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append("//");
- influxDbServerBuilder.append(influxDbConfig.address());
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append(influxDbConfig.port());
+ if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
+ StringBuilder influxDbServerBuilder = new StringBuilder();
+ influxDbServerBuilder.append(INFLUX_PROTOCOL);
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append("//");
+ influxDbServerBuilder.append(influxDbConfig.address());
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append(influxDbConfig.port());
- producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
- influxDbConfig.username(), influxDbConfig.password());
- database = influxDbConfig.database();
- measurement = influxDbConfig.measurement();
+ InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+ influxDbConfig.username(), influxDbConfig.password());
+ producers.put(c.name(), producer);
+
+ createDB(producer, influxDbConfig.database());
+ }
+ });
log.info("InfluxDB producer has Started");
-
- createDB();
}
@Override
public void stop() {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (producers != null) {
+ producers.values().forEach(InfluxDB::close);
}
log.info("InfluxDB producer has stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
- if (producer == null) {
+ if (producers == null || producers.isEmpty()) {
log.debug("InfluxDB telemetry service has not been enabled!");
return;
}
@@ -149,53 +154,61 @@
log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
- BatchPoints batchPoints = BatchPoints.database(database).build();
+ producers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
- for (FlowInfo flowInfo: record.flowInfos()) {
- Point.Builder pointBuilder = Point
- .measurement((measurement == null) ? record.measurement() : measurement)
- .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
- .tag(DEVICE_ID, flowInfo.deviceId().toString())
- .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
- .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
- .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
- .tag(SRC_IP, flowInfo.srcIp().toString())
- .tag(DST_IP, flowInfo.dstIp().toString())
- .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
- .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
- .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
- .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
- .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
- .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
- .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
- .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
- .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
- .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
- .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+ String database = influxDbConfig.database();
+ String measurement = influxDbConfig.measurement();
- if (flowInfo.vlanId() != null) {
- pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+ BatchPoints batchPoints = BatchPoints.database(database).build();
+
+ for (FlowInfo flowInfo: record.flowInfos()) {
+ Point.Builder pointBuilder = Point
+ .measurement((measurement == null) ? record.measurement() : measurement)
+ .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+ .tag(DEVICE_ID, flowInfo.deviceId().toString())
+ .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+ .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+ .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+ .tag(SRC_IP, flowInfo.srcIp().toString())
+ .tag(DST_IP, flowInfo.dstIp().toString())
+ .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+ .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+ .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+ .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+ .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+ .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+ .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+ .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+ .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+ .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+ .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+
+ if (flowInfo.vlanId() != null) {
+ pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+ }
+
+ if (flowInfo.srcPort() != null) {
+ pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
+ }
+
+ if (flowInfo.dstPort() != null) {
+ pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
+ }
+
+ batchPoints.point(pointBuilder.build());
}
-
- if (flowInfo.srcPort() != null) {
- pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
- }
-
- if (flowInfo.dstPort() != null) {
- pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
- }
-
- batchPoints.point(pointBuilder.build());
- }
- producer.write(batchPoints);
+ v.write(batchPoints);
+ });
}
@Override
public boolean isRunning() {
- return producer != null;
+ return !producers.isEmpty();
}
- private void createDB() {
+ private void createDB(InfluxDB producer, String database) {
if (producer.databaseExists(database)) {
log.debug("Database {} is already created", database);
} else {
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
new file mode 100644
index 0000000..1eca176
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for kafka telemetry configurations.
+ */
+@Component(immediate = true)
+public class KafkaConfigsLoader extends AbstractTelemetryConfigLoader {
+ public KafkaConfigsLoader() {
+ super("kafka-configs.xml");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java
deleted file mode 100644
index 6621e0b..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * Kafka server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = KafkaTelemetryConfigService.class,
- property = {
- PROP_KAFKA_ADDRESS + "=" + PROP_KAFKA_ADDRESS_DEFAULT,
- PROP_KAFKA_PORT + ":Integer=" + PROP_KAFKA_PORT_DEFAULT,
- PROP_KAFKA_RETRIES + ":Integer=" + PROP_KAFKA_RETRIES_DEFAULT,
- PROP_KAFKA_REQUIRED_ACKS + "=" + PROP_KAFKA_REQUIRED_ACKS_DEFAULT,
- PROP_KAFKA_BATCH_SIZE + ":Integer=" + PROP_KAFKA_BATCH_SIZE_DEFAULT,
- PROP_KAFKA_LINGER_MS + ":Integer=" + PROP_KAFKA_LINGER_MS_DEFAULT,
- PROP_KAFKA_MEMORY_BUFFER + ":Integer=" + PROP_KAFKA_MEMORY_BUFFER_DEFAULT,
- PROP_KAFKA_KEY_SERIALIZER + "=" + PROP_KAFKA_KEY_SERIALIZER_DEFAULT,
- PROP_KAFKA_VALUE_SERIALIZER + "=" + PROP_KAFKA_VALUE_SERIALIZER_DEFAULT,
- PROP_KAFKA_ENABLE_SERVICE + ":Boolean=" + PROP_KAFKA_ENABLE_SERVICE_DEFAULT
- }
-)
-public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
-
- /** Default IP address to establish initial connection to Kafka server. */
- protected String address = PROP_KAFKA_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to Kafka server. */
- protected Integer port = PROP_KAFKA_PORT_DEFAULT;
-
- /** Number of times the producer can retry to send after first failure. */
- protected int retries = PROP_KAFKA_RETRIES_DEFAULT;
-
- /** Producer will get an acknowledgement after the leader has replicated the data. */
- protected String requiredAcks = PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-
- /** The largest record batch size allowed by Kafka. */
- protected Integer batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
-
- /** The producer groups together any records that arrive between request transmissions into a single batch. */
- protected Integer lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
-
- /** The total memory used for log cleaner I/O buffers across all cleaner threads. */
- protected Integer memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-
- /** Serializer class for key that implements the Serializer interface. */
- protected String keySerializer = PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-
- /** Serializer class for value that implements the Serializer interface. */
- protected String valueSerializer = PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- kafkaTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- kafkaTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(kafkaTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultKafkaTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withRetries(retries)
- .withRequiredAcks(requiredAcks)
- .withBatchSize(batchSize)
- .withLingerMs(lingerMs)
- .withMemoryBuffer(memoryBuffer)
- .withKeySerializer(keySerializer)
- .withValueSerializer(valueSerializer)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_KAFKA_ADDRESS);
- address = addressStr != null ? addressStr : PROP_KAFKA_ADDRESS_DEFAULT;
- log.info("Configured. Kafka server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_PORT);
- if (portConfigured == null) {
- port = PROP_KAFKA_PORT_DEFAULT;
- log.info("Kafka server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. Kafka server port is {}", port);
- }
-
- Integer retriesConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_RETRIES);
- if (retriesConfigured == null) {
- retries = PROP_KAFKA_RETRIES_DEFAULT;
- log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
- } else {
- retries = retriesConfigured;
- log.info("Configured. Kafka number of retries is {}", retries);
- }
-
- String requiredAcksStr = Tools.get(properties, PROP_KAFKA_REQUIRED_ACKS);
- requiredAcks = requiredAcksStr != null ? requiredAcksStr : PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
- log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
-
- Integer batchSizeConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_BATCH_SIZE);
- if (batchSizeConfigured == null) {
- batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
- log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
- } else {
- batchSize = batchSizeConfigured;
- log.info("Configured. Kafka batch size is {}", batchSize);
- }
-
- Integer lingerMsConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_LINGER_MS);
- if (lingerMsConfigured == null) {
- lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
- log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
- } else {
- lingerMs = lingerMsConfigured;
- log.info("Configured. Kafka lingerMs is {}", lingerMs);
- }
-
- Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_MEMORY_BUFFER);
- if (memoryBufferConfigured == null) {
- memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
- log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
- } else {
- memoryBuffer = memoryBufferConfigured;
- log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
- }
-
- String keySerializerStr = Tools.get(properties, PROP_KAFKA_KEY_SERIALIZER);
- keySerializer = keySerializerStr != null ? keySerializerStr : PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
- log.info("Configured, Kafka key serializer is {}", keySerializer);
-
- String valueSerializerStr = Tools.get(properties, PROP_KAFKA_VALUE_SERIALIZER);
- valueSerializer = valueSerializerStr != null ? valueSerializerStr : PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
- log.info("Configured, Kafka value serializer is {}", valueSerializer);
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_KAFKA_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
- log.info("Kafka service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. Kafka service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e895e46..85f36e3 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,12 +15,17 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -31,9 +36,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Future;
+import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+
/**
* Kafka telemetry manager.
*/
@@ -42,6 +54,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String CODEC_PREFIX = "org.onosproject.openstacktelemetry.codec.";
+
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String RETRIES = "retries";
private static final String ACKS = "acks";
@@ -54,7 +68,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private Producer<String, byte[]> producer = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -74,64 +91,83 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (producer != null) {
- log.info("Kafka producer has already been started");
- return;
- }
+ public void start() {
+ telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
- KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
+ if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
+ StringBuilder kafkaServerBuilder = new StringBuilder();
+ kafkaServerBuilder.append(kafkaConfig.address());
+ kafkaServerBuilder.append(":");
+ kafkaServerBuilder.append(kafkaConfig.port());
- StringBuilder kafkaServerBuilder = new StringBuilder();
- kafkaServerBuilder.append(kafkaConfig.address());
- kafkaServerBuilder.append(":");
- kafkaServerBuilder.append(kafkaConfig.port());
+ // Configure Kafka server properties
+ Properties prop = new Properties();
+ prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+ prop.put(RETRIES, kafkaConfig.retries());
+ prop.put(ACKS, kafkaConfig.requiredAcks());
+ prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+ prop.put(LINGER_MS, kafkaConfig.lingerMs());
+ prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+ prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+ prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
- // Configure Kafka server properties
- Properties prop = new Properties();
- prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
- prop.put(RETRIES, kafkaConfig.retries());
- prop.put(ACKS, kafkaConfig.requiredAcks());
- prop.put(BATCH_SIZE, kafkaConfig.batchSize());
- prop.put(LINGER_MS, kafkaConfig.lingerMs());
- prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
- prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
- prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+ producers.put(c.name(), new KafkaProducer<>(prop));
+ }
+ });
- producer = new KafkaProducer<>(prop);
log.info("Kafka producer has Started");
}
@Override
public void stop() {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (!producers.isEmpty()) {
+ producers.values().forEach(Producer::close);
}
+ producers.clear();
+
log.info("Kafka producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
- public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+ public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
- if (producer == null) {
+ if (producers == null || producers.isEmpty()) {
log.debug("Kafka telemetry service has not been enabled!");
return null;
}
log.debug("Send telemetry record to kafka server...");
- return producer.send(record);
+ Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
+ producers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig =
+ fromTelemetryConfig(config);
+
+ try {
+ Class codecClazz = Class.forName(CODEC_PREFIX + kafkaConfig.codec());
+ TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+
+ ByteBuffer buffer = codec.encode(flowInfos);
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+ futureSet.add(v.send(record));
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+ });
+ return futureSet;
}
@Override
public boolean isRunning() {
- return producer != null;
+ return !producers.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 783dbd6..ce325cd 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -17,8 +17,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -26,8 +24,10 @@
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryService;
-import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -36,14 +36,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
-import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
-import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getPropertyValueAsBoolean;
/**
* Openstack telemetry manager.
@@ -53,63 +49,57 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final String ENABLE_SERVICE = "enableService";
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
+ protected TelemetryConfigService telemetryConfigService;
- private List<TelemetryService> telemetryServices = Lists.newArrayList();
+ private List<TelemetryAdminService> telemetryServices = Lists.newArrayList();
+ private InternalTelemetryConfigListener
+ configListener = new InternalTelemetryConfigListener();
@Activate
protected void activate() {
+ telemetryConfigService.addListener(configListener);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ telemetryConfigService.removeListener(configListener);
+
log.info("Stopped");
}
@Override
- public void addTelemetryService(TelemetryService telemetryService) {
+ public void addTelemetryService(TelemetryAdminService telemetryService) {
telemetryServices.add(telemetryService);
}
@Override
- public void removeTelemetryService(TelemetryService telemetryService) {
+ public void removeTelemetryService(TelemetryAdminService telemetryService) {
telemetryServices.remove(telemetryService);
}
@Override
public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
- if (service instanceof GrpcTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- GrpcTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof GrpcTelemetryManager) {
invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
- if (service instanceof InfluxDbTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- InfluxDbTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof InfluxDbTelemetryManager) {
invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
}
- if (service instanceof PrometheusTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- PrometheusTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof PrometheusTelemetryManager) {
invokePrometheusPublisher((PrometheusTelemetryService) service, flowInfos);
}
- if (service instanceof KafkaTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- KafkaTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof KafkaTelemetryManager) {
invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
}
- if (service instanceof RestTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- RestTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof RestTelemetryManager) {
invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
@@ -118,31 +108,55 @@
}
@Override
- public Set<TelemetryService> telemetryServices() {
+ public Set<TelemetryAdminService> telemetryServices() {
return ImmutableSet.copyOf(telemetryServices);
}
- private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
+ @Override
+ public TelemetryAdminService telemetryService(String type) {
+ return telemetryServices.stream()
+ .filter(s -> s.type().name().equalsIgnoreCase(type))
+ .findFirst()
+ .orElse(null);
+ }
+
+ private void invokeGrpcPublisher(GrpcTelemetryService service,
+ Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
+ private void invokeInfluxDbPublisher(InfluxDbTelemetryService service,
+ Set<FlowInfo> flowInfos) {
DefaultInfluxRecord<String, Set<FlowInfo>> influxRecord
= new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos);
service.publish(influxRecord);
}
- private void invokePrometheusPublisher(PrometheusTelemetryService service, Set<FlowInfo> flowInfos) {
+ private void invokePrometheusPublisher(PrometheusTelemetryService service,
+ Set<FlowInfo> flowInfos) {
service.publish(flowInfos);
}
- private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
- TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
- ByteBuffer buffer = codec.encode(flowInfos);
- service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
+ private void invokeKafkaPublisher(KafkaTelemetryService service,
+ Set<FlowInfo> flowInfos) {
+ service.publish(flowInfos);
}
- private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
+ private void invokeRestPublisher(RestTelemetryService service,
+ Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
+
+ private class InternalTelemetryConfigListener implements TelemetryConfigListener {
+
+ @Override
+ public void event(TelemetryConfigEvent event) {
+ TelemetryAdminService service =
+ telemetryService(event.subject().type().name());
+
+ if (service != null) {
+ service.restart();
+ }
+ }
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
index 325bce2..cc754f5 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
@@ -23,65 +23,7 @@
private OsgiPropertyConstants() {
}
- // REST telemetry
-
- static final String PROP_REST_ENABLE_SERVICE = "enableService";
- static final boolean PROP_REST_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_REST_SERVER_ADDRESS = "address";
- static final String PROP_REST_SERVER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_REST_SERVER_PORT = "port";
- static final int PROP_REST_SERVER_PORT_DEFAULT = 80;
-
- static final String PROP_REST_ENDPOINT = "endpoint";
- static final String PROP_REST_ENDPOINT_DEFAULT = "telemetry";
-
- static final String PROP_REST_METHOD = "method";
- static final String PROP_REST_METHOD_DEFAULT = "POST";
-
- static final String PROP_REST_REQUEST_MEDIA_TYPE = "requestMediaType";
- static final String PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT = "application/json";
-
- static final String PROP_REST_RESPONSE_MEDIA_TYPE = "responseMediaType";
- static final String PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT = "application/json";
-
- // Kafka telemetry
-
- static final String PROP_KAFKA_ENABLE_SERVICE = "enableService";
- static final boolean PROP_KAFKA_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_KAFKA_ADDRESS = "address";
- static final String PROP_KAFKA_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_KAFKA_PORT = "port";
- static final int PROP_KAFKA_PORT_DEFAULT = 9092;
-
- static final String PROP_KAFKA_RETRIES = "retries";
- static final int PROP_KAFKA_RETRIES_DEFAULT = 0;
-
- static final String PROP_KAFKA_REQUIRED_ACKS = "requiredAcks";
- static final String PROP_KAFKA_REQUIRED_ACKS_DEFAULT = "all";
-
- static final String PROP_KAFKA_BATCH_SIZE = "batchSize";
- static final int PROP_KAFKA_BATCH_SIZE_DEFAULT = 16384;
-
- static final String PROP_KAFKA_LINGER_MS = "lingerMs";
- static final int PROP_KAFKA_LINGER_MS_DEFAULT = 1;
-
- static final String PROP_KAFKA_MEMORY_BUFFER = "memoryBuffer";
- static final int PROP_KAFKA_MEMORY_BUFFER_DEFAULT = 33554432;
-
- static final String PROP_KAFKA_KEY_SERIALIZER = "keySerializer";
- static final String PROP_KAFKA_KEY_SERIALIZER_DEFAULT =
- "org.apache.kafka.common.serialization.StringSerializer";
-
- static final String PROP_KAFKA_VALUE_SERIALIZER = "valueSerializer";
- static final String PROP_KAFKA_VALUE_SERIALIZER_DEFAULT =
- "org.apache.kafka.common.serialization.ByteArraySerializer";
-
// Stats flow rule manager
-
static final String PROP_REVERSE_PATH_STATS = "reversePathStats";
static final boolean PROP_REVERSE_PATH_STATS_DEFAULT = false;
@@ -96,56 +38,4 @@
static final String PROP_MONITOR_UNDERLAY = "monitorUnderlay";
static final boolean PROP_MONITOR_UNDERLAY_DEFAULT = true;
-
- // Influx DB Telemetry config manager
-
- static final String PROP_INFLUXDB_ENABLE_SERVICE = "enableService";
- static final boolean PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_INFLUXDB_SERVER_ADDRESS = "address";
- static final String PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_INFLUXDB_SERVER_PORT = "port";
- static final int PROP_INFLUXDB_SERVER_PORT_DEFAULT = 8086;
-
- static final String PROP_INFLUXDB_USERNAME = "username";
- static final String PROP_INFLUXDB_USERNAME_DEFAULT = "onos";
-
- static final String PROP_INFLUXDB_PASSWORD = "password";
- static final String PROP_INFLUXDB_PASSWORD_DEFAULT = "onos";
-
- static final String PROP_INFLUXDB_DATABASE = "database";
- static final String PROP_INFLUXDB_DATABASE_DEFAULT = "onos";
-
- static final String PROP_INFLUXDB_MEASUREMENT = "measurement";
- static final String PROP_INFLUXDB_MEASUREMENT_DEFAULT = "sonaflow";
-
- static final String PROP_INFLUXDB_ENABLE_BATCH = "enableBatch";
- static final boolean PROP_INFLUXDB_ENABLE_BATCH_DEFAULT = true;
-
- // GRPC Telemetry config manager
- static final String PROP_GRPC_ENABLE_SERVICE = "enableService";
- static final boolean GRPC_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_GRPC_SERVER_ADDRESS = "address";
- static final String GRPC_SERVER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_GRPC_SERVER_PORT = "port";
- static final int GRPC_SERVER_PORT_DEFAULT = 50051;
-
- static final String PROP_GRPC_USE_PLAINTEXT = "usePlaintext";
- static final boolean GRPC_USE_PLAINTEXT_DEFAULT = true;
-
- static final String PROP_GRPC_MAX_INBOUND_MSG_SIZE = "maxInboundMsgSize";
- static final int GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT = 4194304; //4 * 1024 * 1024;
-
- // Prometheus Telemetry config manager
- static final String PROP_PROMETHEUS_ENABLE_SERVICE = "enableService";
- static final boolean PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT = true;
-
- static final String PROP_PROMETHEUS_EXPORTER_ADDRESS = "address";
- public static final String PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_PROMETHEUS_EXPORTER_PORT = "port";
- public static final int PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT = 9555;
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
new file mode 100644
index 0000000..a7b740d
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for prometheus telemetry configurations.
+ */
+@Component(immediate = true)
+public class PrometheusConfigsLoader extends AbstractTelemetryConfigLoader {
+ public PrometheusConfigsLoader() {
+ super("prometheus-configs.xml");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java
deleted file mode 100644
index dcc7023..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.PrometheusTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * Prometheus exporter configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = PrometheusTelemetryConfigService.class,
- property = {
- PROP_PROMETHEUS_ENABLE_SERVICE + ":Boolean=" + PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT,
- PROP_PROMETHEUS_EXPORTER_ADDRESS + "=" + PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT,
- PROP_PROMETHEUS_EXPORTER_PORT + ":Integer=" + PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT
- }
-)
-public class PrometheusTelemetryConfigManager implements PrometheusTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected PrometheusTelemetryAdminService prometheusTelemetryAdminService;
-
- /** Default IP address of prometheus exporter. */
- protected String address = PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-
- /** Default port number of prometheus exporter. */
- protected Integer port = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
- if (enableService) {
- prometheusTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
- if (enableService) {
- prometheusTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(prometheusTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultPrometheusTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_PROMETHEUS_EXPORTER_ADDRESS);
- address = addressStr != null ? addressStr : PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
- log.info("Configured. Prometheus exporter address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_PROMETHEUS_EXPORTER_PORT);
- if (portConfigured == null) {
- port = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
- log.info("Prometheus exporter port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. Prometheus exporter port is {}", port);
- }
-
- Boolean enableServiceConfigured = getBooleanProperty(properties, PROP_PROMETHEUS_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
- log.info("Prometheus service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. Prometheus service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
index 69423ca..7111fc0 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
@@ -15,13 +15,19 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
+import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
import org.onlab.packet.TpPort;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -30,15 +36,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.prometheus.client.Counter;
-import io.prometheus.client.exporter.MetricsServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
import java.util.Arrays;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
+
/**
* Prometheus telemetry manager.
*/
@@ -47,7 +51,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private Server prometheusExporter;
+ private Set<Server> prometheusExporters = Sets.newConcurrentHashSet();
private static final String FLOW_TYPE = "flowType";
private static final String DEVICE_ID = "deviceId";
@@ -134,6 +138,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
@Activate
protected void activate() {
openstackTelemetryService.addTelemetryService(this);
@@ -148,38 +155,51 @@
}
@Override
- public void start(TelemetryConfig config) {
+ public void start() {
log.info("Prometheus exporter starts.");
- PrometheusTelemetryConfig prometheusConfig = (PrometheusTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> {
+ PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(c);
- try {
- prometheusExporter = new Server(prometheusConfig.port());
- ServletContextHandler context = new ServletContextHandler();
- context.setContextPath("/");
- prometheusExporter.setHandler(context);
- context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
- log.info("Prometeus server start (Server port:{})", prometheusConfig.port());
- prometheusExporter.start();
- } catch (Exception ex) {
- log.warn("Exception: {}", ex.toString());
- }
+ if (prometheusConfig != null &&
+ !c.name().equals(PROMETHEUS_SCHEME) && c.enabled()) {
+ try {
+ // TODO Offer a 'Authentication'
+ Server prometheusExporter = new Server(prometheusConfig.port());
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ prometheusExporter.setHandler(context);
+ context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+
+ log.info("Prometheus server start");
+
+ prometheusExporter.start();
+
+ prometheusExporters.add(prometheusExporter);
+
+ } catch (Exception ex) {
+ log.warn("Exception: {}", ex);
+ }
+ }
+ });
}
@Override
public void stop() {
- try {
- prometheusExporter.stop();
- } catch (Exception ex) {
- log.warn("Exception: {}", ex.toString());
- }
+ prometheusExporters.forEach(pe -> {
+ try {
+ pe.stop();
+ } catch (Exception e) {
+ log.warn("Failed to stop prometheus server due to {}", e);
+ }
+ });
log.info("Prometheus exporter has stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
@@ -247,6 +267,6 @@
@Override
public boolean isRunning() {
- return prometheusExporter.isRunning();
+ return !prometheusExporters.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
new file mode 100644
index 0000000..ffe6cbc
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * Loader for REST telemetry configurations.
+ */
+@Component(immediate = true)
+public class RestConfigsLoader extends AbstractTelemetryConfigLoader {
+ public RestConfigsLoader() {
+ super("rest-configs.xml");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java
deleted file mode 100644
index 16b62cb..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.RestTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENDPOINT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_METHOD;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_REQUEST_MEDIA_TYPE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_RESPONSE_MEDIA_TYPE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENDPOINT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_METHOD_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * REST server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = RestTelemetryConfigService.class,
- property = {
- PROP_REST_ENABLE_SERVICE + ":Boolean=" + PROP_REST_ENABLE_SERVICE_DEFAULT,
- PROP_REST_SERVER_ADDRESS + "=" + PROP_REST_SERVER_ADDRESS_DEFAULT,
- PROP_REST_SERVER_PORT + ":Integer=" + PROP_REST_SERVER_PORT_DEFAULT,
- PROP_REST_ENDPOINT + "=" + PROP_REST_ENDPOINT_DEFAULT,
- PROP_REST_METHOD + "=" + PROP_REST_METHOD_DEFAULT,
- PROP_REST_REQUEST_MEDIA_TYPE + "=" + PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT,
- PROP_REST_RESPONSE_MEDIA_TYPE + "=" + PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT
- }
-)
-public class RestTelemetryConfigManager implements RestTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected RestTelemetryAdminService restTelemetryAdminService;
-
- /** Default IP address to establish initial connection to REST server. */
- protected String address = PROP_REST_SERVER_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to REST server. */
- protected Integer port = PROP_REST_SERVER_PORT_DEFAULT;
-
- /** Endpoint of REST server. */
- protected String endpoint = PROP_REST_ENDPOINT_DEFAULT;
-
- /** HTTP method of REST server. */
- protected String method = PROP_REST_METHOD_DEFAULT;
-
- /** Request media type of REST server. */
- protected String requestMediaType = PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-
- /** Response media type of REST server. */
- protected String responseMediaType = PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_REST_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- restTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- restTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(restTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultRestTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withEndpoint(endpoint)
- .withMethod(method)
- .withRequestMediaType(requestMediaType)
- .withResponseMediaType(responseMediaType)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_REST_SERVER_ADDRESS);
- address = addressStr != null ? addressStr : PROP_REST_SERVER_ADDRESS_DEFAULT;
- log.info("Configured. REST server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_REST_SERVER_PORT);
- if (portConfigured == null) {
- port = PROP_REST_SERVER_PORT_DEFAULT;
- log.info("REST server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. REST server port is {}", port);
- }
-
- String endpointStr = Tools.get(properties, PROP_REST_ENDPOINT);
- endpoint = endpointStr != null ? endpointStr : PROP_REST_ENDPOINT_DEFAULT;
- log.info("Configured. REST server endpoint is {}", endpoint);
-
- String methodStr = Tools.get(properties, PROP_REST_METHOD);
- method = methodStr != null ? methodStr : PROP_REST_METHOD_DEFAULT;
- log.info("Configured. REST server default HTTP method is {}", method);
-
- String requestMediaTypeStr = Tools.get(properties, PROP_REST_REQUEST_MEDIA_TYPE);
- requestMediaType = requestMediaTypeStr != null ?
- requestMediaTypeStr : PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
- log.info("Configured. REST server request media type is {}", requestMediaType);
-
- String responseMediaTypeStr = Tools.get(properties, PROP_REST_RESPONSE_MEDIA_TYPE);
- responseMediaType = responseMediaTypeStr != null ?
- responseMediaTypeStr : PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
- log.info("Configured. REST server response media type is {}", responseMediaType);
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_REST_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_REST_ENABLE_SERVICE_DEFAULT;
- log.info("REST service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. REST service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 9b4aaf9..48cef34 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -15,8 +15,11 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -32,6 +35,12 @@
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.REST_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.fromTelemetryConfig;
/**
* REST telemetry manager.
@@ -48,8 +57,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private WebTarget target = null;
- private RestTelemetryConfig restConfig = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Map<String, WebTarget> targets = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -69,85 +80,71 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (target != null) {
- log.info("REST producer has already been started");
- return;
- }
+ public void start() {
- restConfig = (RestTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(REST).forEach(c -> {
+ RestTelemetryConfig restConfig = fromTelemetryConfig(c);
- StringBuilder restServerBuilder = new StringBuilder();
- restServerBuilder.append(PROTOCOL);
- restServerBuilder.append(":");
- restServerBuilder.append("//");
- restServerBuilder.append(restConfig.address());
- restServerBuilder.append(":");
- restServerBuilder.append(restConfig.port());
- restServerBuilder.append("/");
+ if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
+ StringBuilder restServerBuilder = new StringBuilder();
+ restServerBuilder.append(PROTOCOL);
+ restServerBuilder.append(":");
+ restServerBuilder.append("//");
+ restServerBuilder.append(restConfig.address());
+ restServerBuilder.append(":");
+ restServerBuilder.append(restConfig.port());
+ restServerBuilder.append("/");
- Client client = ClientBuilder.newBuilder().build();
+ Client client = ClientBuilder.newBuilder().build();
- target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+ WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+
+ targets.put(c.name(), target);
+ }
+ });
log.info("REST producer has Started");
}
@Override
public void stop() {
- if (target != null) {
- target = null;
- }
-
+ targets.values().forEach(t -> t = null);
log.info("REST producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
- public Response publish(String endpoint, String method, String record) {
- // TODO: need to find a way to invoke REST endpoint using target
- return null;
- }
+ public Set<Response> publish(String record) {
- @Override
- public Response publish(String method, String record) {
- switch (method) {
- case POST_METHOD:
- return target.request(restConfig.requestMediaType())
- .post(Entity.json(record));
- case GET_METHOD:
- return target.request(restConfig.requestMediaType()).get();
- default:
- return null;
- }
- }
+ Set<Response> responses = Sets.newConcurrentHashSet();
- @Override
- public Response publish(String record) {
+ targets.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ RestTelemetryConfig restConfig = fromTelemetryConfig(config);
- if (target == null) {
- log.debug("REST telemetry service has not been enabled!");
- return null;
- }
+ switch (restConfig.method()) {
+ case POST_METHOD:
+ responses.add(v.request(restConfig.requestMediaType())
+ .post(Entity.json(record)));
+ break;
+ case GET_METHOD:
+ responses.add(v.request(restConfig.requestMediaType()).get());
+ break;
+ default:
+ break;
+ }
+ });
- switch (restConfig.method()) {
- case POST_METHOD:
- return target.request(restConfig.requestMediaType())
- .post(Entity.json(record));
- case GET_METHOD:
- return target.request(restConfig.requestMediaType()).get();
- default:
- return null;
- }
+ return responses;
}
@Override
public boolean isRunning() {
- return target != null;
+ return !targets.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java
new file mode 100644
index 0000000..3ae2504
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStore;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStoreDelegate;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.nullIsNotFound;
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+
+/**
+ * Provides implementation of administering and interfacing telemetry configs.
+ * It also provides telemetry config events for the various exporters or connectors.
+ */
+@Component(
+ immediate = true,
+ service = {
+ TelemetryConfigService.class, TelemetryConfigAdminService.class
+ }
+)
+public class TelemetryConfigManager
+ extends ListenerRegistry<TelemetryConfigEvent, TelemetryConfigListener>
+ implements TelemetryConfigService, TelemetryConfigAdminService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String MSG_TELEMETRY_CONFIG = "Telemetry config %s %s";
+ private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
+ private static final String MSG_REMOVED = "removed";
+
+ private static final String ERR_NULL_CONFIG = "Telemetry config cannot be null";
+ private static final String NO_CONFIG = "Telemetry config not found";
+
+ private static final KryoNamespace SERIALIZER_TELEMETRY_CONFIG =
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(TelemetryConfigProvider.class)
+ .register(DefaultTelemetryConfigProvider.class)
+ .register(TelemetryConfig.class)
+ .register(TelemetryConfig.ConfigType.class)
+ .register(DefaultTelemetryConfig.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigStore telemetryConfigStore;
+
+ private final TelemetryConfigStoreDelegate
+ delegate = new InternalTelemetryConfigStoreDelegate();
+
+ private ConsistentMap<String, TelemetryConfigProvider> providerMap;
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ telemetryConfigStore.setDelegate(delegate);
+ leadershipService.runForLeadership(appId.name());
+
+ providerMap = storageService.<String, TelemetryConfigProvider>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_TELEMETRY_CONFIG))
+ .withName("openstack-telemetry-config-provider")
+ .withApplicationId(appId)
+ .build();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ telemetryConfigStore.unsetDelegate(delegate);
+ leadershipService.withdraw(appId.name());
+ providerMap.clear();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<TelemetryConfigProvider> getProviders() {
+ ImmutableSet.Builder<TelemetryConfigProvider> builder = ImmutableSet.builder();
+ providerMap.asJavaMap().values().forEach(builder::add);
+ return builder.build();
+ }
+
+ @Override
+ public void registerProvider(TelemetryConfigProvider provider) {
+ if (isLeader()) {
+ StringBuilder nameBuilder = new StringBuilder();
+ provider.getTelemetryConfigs().forEach(config -> {
+ nameBuilder.append(config.name());
+ telemetryConfigStore.createTelemetryConfig(config);
+ log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_CREATED));
+ });
+ providerMap.put(nameBuilder.toString(), provider);
+ }
+ }
+
+ @Override
+ public void unregisterProvider(TelemetryConfigProvider provider) {
+ if (isLeader()) {
+ StringBuilder nameBuilder = new StringBuilder();
+ provider.getTelemetryConfigs().forEach(config -> {
+ nameBuilder.append(config.name());
+ telemetryConfigStore.removeTelemetryConfig(config.name());
+ log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_REMOVED));
+ });
+ providerMap.remove(nameBuilder.toString());
+ }
+ }
+
+ @Override
+ public void updateTelemetryConfig(TelemetryConfig config) {
+ checkNotNull(config, ERR_NULL_CONFIG);
+
+ telemetryConfigStore.updateTelemetryConfig(config);
+ log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_UPDATED));
+ }
+
+ @Override
+ public TelemetryConfig getConfig(String name) {
+ return nullIsNotFound(telemetryConfigStore.telemetryConfig(name), NO_CONFIG);
+ }
+
+ @Override
+ public Set<TelemetryConfig> getConfigsByType(ConfigType type) {
+ return ImmutableSet.copyOf(telemetryConfigStore.telemetryConfigsByType(type));
+ }
+
+ @Override
+ public Set<TelemetryConfig> getConfigs() {
+ return ImmutableSet.copyOf(telemetryConfigStore.telemetryConfigs());
+ }
+
+ private boolean isLeader() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ private class InternalTelemetryConfigStoreDelegate implements TelemetryConfigStoreDelegate {
+
+ @Override
+ public void notify(TelemetryConfigEvent event) {
+ if (event != null) {
+ log.trace("send telemetry config event {}", event);
+ process(event);
+ }
+ }
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java
new file mode 100644
index 0000000..39a5df0
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.UNKNOWN;
+
+/**
+ * Utility capable of reading telemetry configuration XML resources and producing
+ * a telemetry config as a result.
+ * <p>
+ * The telemetry configurations stream structure is as follows:
+ * </p>
+ * <pre>
+ * <configs>
+ * <config name=“...” [manufacturer="..." swVersion="..."]>
+ * [<property name=“key”>value</key>]
+ * ...
+ * </config>
+ * ...
+ * </configs>
+ * </pre>
+ */
+public class XmlTelemetryConfigLoader {
+
+ private static final String CONFIGS = "configs";
+ private static final String CONFIG = "config";
+
+ private static final String PROPERTY = "property";
+
+ private static final String TRUE = "true";
+
+ private static final String NAME = "[@name]";
+ private static final String TYPE = "[@type]";
+ private static final String EXTENDS = "[@extends]";
+ private static final String MFG = "[@manufacturer]";
+ private static final String SW = "[@swVersion]";
+ private static final String ENABLED = "[@enabled]";
+
+ private Map<String, TelemetryConfig> configs = Maps.newHashMap();
+
+ /**
+ * Creates a new config loader capable of loading configs from the supplied
+ * class loader.
+ */
+ public XmlTelemetryConfigLoader() {
+ }
+
+ /**
+ * Loads the specified telemetry configs resource as an XML stream and parses
+ * it to produce a ready-to-register config provider.
+ *
+ * @param configsStream stream containing the telemetry configs definition
+ * @return telemetry configuration provider
+ * @throws IOException if issues are encountered reading the stream
+ * or parsing the telemetry config definition within
+ */
+ public DefaultTelemetryConfigProvider
+ loadTelemetryConfigs(InputStream configsStream) throws IOException {
+ try {
+ XMLConfiguration cfg = new XMLConfiguration();
+ cfg.setRootElementName(CONFIGS);
+ cfg.setAttributeSplittingDisabled(true);
+
+ cfg.load(configsStream);
+ return loadTelemetryConfigs(cfg);
+ } catch (ConfigurationException e) {
+ throw new IOException("Unable to load telemetry configs", e);
+ }
+ }
+
+ /**
+ * Loads a telemetry config provider from the supplied hierarchical configuration.
+ *
+ * @param telemetryConfig hierarchical configuration containing the configs definition
+ * @return telemetry configuration provider
+ */
+ public DefaultTelemetryConfigProvider
+ loadTelemetryConfigs(HierarchicalConfiguration telemetryConfig) {
+ DefaultTelemetryConfigProvider provider = new DefaultTelemetryConfigProvider();
+ for (HierarchicalConfiguration cfg : telemetryConfig.configurationsAt(CONFIG)) {
+ DefaultTelemetryConfig config = loadTelemetryConfig(cfg);
+ configs.put(config.name(), config);
+ provider.addConfig(config);
+ }
+ configs.clear();
+ return provider;
+ }
+
+ /**
+ * Loads a telemetry configuration from the supplied hierarchical configuration.
+ *
+ * @param telemetryCfg hierarchical configuration containing the telemetry config definition
+ * @return telemetry configuration
+ */
+ public DefaultTelemetryConfig loadTelemetryConfig(HierarchicalConfiguration telemetryCfg) {
+ String name = telemetryCfg.getString(NAME);
+ String parentsString = telemetryCfg.getString(EXTENDS, "");
+ List<TelemetryConfig> parents = Lists.newArrayList();
+
+ if (!"".equals(parentsString)) {
+ List<String> parentsNames;
+ if (parentsString.contains(",")) {
+ parentsNames = Arrays.asList(
+ parentsString.replace(" ", "").split(","));
+ } else {
+ parentsNames = Lists.newArrayList(parentsString);
+ }
+ parents = parentsNames.stream().map(parent -> (parent != null) ?
+ configs.get(parent) : null).collect(Collectors.toList());
+ }
+
+ String typeStr = telemetryCfg.getString(TYPE, getParentAttribute(parents, TYPE));
+ String manufacturer = telemetryCfg.getString(MFG, getParentAttribute(parents, MFG));
+ String swVersion = telemetryCfg.getString(SW, getParentAttribute(parents, SW));
+
+ // note that we do not inherits enabled property from parent
+ String enabledStr = telemetryCfg.getString(ENABLED);
+
+ boolean enabled = enabledStr != null && enabledStr.equalsIgnoreCase(TRUE);
+
+ TelemetryConfig.ConfigType type = type(typeStr);
+
+ if (type == null) {
+ return null;
+ }
+
+ return new DefaultTelemetryConfig(name, type, parents, manufacturer,
+ swVersion, enabled, parseProperties(parents, telemetryCfg));
+ }
+
+ private TelemetryConfig.ConfigType type(String typeStr) {
+ switch (typeStr.toUpperCase()) {
+ case "GRPC" :
+ return GRPC;
+ case "KAFKA":
+ return KAFKA;
+ case "REST":
+ return REST;
+ case "INFLUXDB":
+ return INFLUXDB;
+ case "PROMETHEUS":
+ return PROMETHEUS;
+ case "UNKNOWN":
+ default:
+ return UNKNOWN;
+ }
+ }
+
+ // Returns the specified property from the highest priority parent
+ private String getParentAttribute(List<TelemetryConfig> parents, String attribute) {
+ if (!parents.isEmpty()) {
+ TelemetryConfig parent = parents.get(0);
+ switch (attribute) {
+ case TYPE:
+ return parent.type().name().toLowerCase();
+ case MFG:
+ return parent.manufacturer();
+ case SW:
+ return parent.swVersion();
+ default:
+ throw new IllegalArgumentException("Unsupported attribute");
+ }
+ }
+ return "";
+ }
+
+ // Parses the properties section.
+ private Map<String, String> parseProperties(List<TelemetryConfig> parents,
+ HierarchicalConfiguration config) {
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+ // note that, we only allow the inheritance from single source
+ Map<String, String> parentConfigs = Maps.newHashMap();
+ if (!parents.isEmpty()) {
+ TelemetryConfig parent = parents.get(0);
+ parentConfigs = parent.properties();
+ }
+
+ properties.putAll(parentConfigs);
+
+ for (HierarchicalConfiguration b : config.configurationsAt(PROPERTY)) {
+ properties.put(b.getString(NAME), (String) b.getRootNode().getValue());
+ }
+ return properties.build();
+ }
+}