coco_apm_dashboard_cleaner/test.py

88 lines
2.6 KiB
Python

import time
import typing
import urllib
import requests
import yaml
from decouple import config
import os
import pandas as pd
def build_params(params: typing.Dict) -> str:
"""
Builds the parameter dictionary to a formatted string
Args:
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation
Returns:
str: Returns the query string
"""
query_string = "&".join(
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
)
return query_string
def get_data_from_dynatrace(
throttling_rate: float | int,
token: str,
env_url: str,
params: typing.Dict | str,
route: str,
) -> typing.Dict:
"""
Sends out GET request to dynatrace
Args:
throttling (float | int ): If needed set timeout for throttling
token (str): Token for dynatrace API
env_url (str): Url for the respective environment
params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation
route (str): Route for the request
Returns:
typing.Dict: Returns the response as
"""
time.sleep(throttling_rate)
if type(params) is dict:
params_string = f"?{build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
headers = {"Authorization": f"Api-Token {token}"}
host_response = requests.get(
f"{env_url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
if host_response.status_code == 200:
return host_response.json()
else:
# TODO: proper error handling
print(f"ERROR - {host_response.status_code}")
if __name__ == "__main__":
metricSelector = "builtin:dashboards.viewCount:splitBy(id):sort(value(auto,ascending))"
resolution = "1M"
fromDate = "now-6M"
toDate = "now"
my_params = {
# "nextPageKey": 300,
"metricSelector": f"{metricSelector}",
"resolution": f"{resolution}",
"fromDate" : f"{fromDate}",
"toDate" : f"{toDate}"
}
with open(os.path.basename("./env-config.yaml")) as env_cfg:
env_config = yaml.safe_load(env_cfg)
for item, doc in env_config.items():
token = dict(doc[2])
url = dict(doc[1])
print(item, " crawling through ...")
print(item, " checking token ...")
if config(token.get("env-token-name"), default='') != "":
print(item, " fetching all dashboards ...")
DTTOKEN = config(token.get("env-token-name"), default='')
DTURL = url.get("env-url")
get_data_from_dynatrace(0, DTTOKEN, DTURL, my_params, "metrics/query")