working version but without delete

master
SLW\ARNAUA 2023-07-05 16:08:56 +02:00
parent 96da23953e
commit 7d60b71141
6 changed files with 61 additions and 334 deletions

View File

@ -1,40 +0,0 @@
import logging
from typing import Dict
import os
import sys
file_dir = os.path.dirname(__file__)
sys.path.append(file_dir)
from more_utils.httpClient import HttpClient
import pagination
class Dynatrace:
def __init__(
self,
base_url: str,
token: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
):
self.__http_client = HttpClient(
base_url, token, log, proxies, too_many_requests_strategy, retries, retry_delay_ms
)
def returnPageination(self,path,params,list_item):
page = pagination.Pagionation(self.__http_client,path,params,list_item=list_item)
return page
def returnSingle(self,path):
response = self.__http_client.make_request(path)
json_response = response.json()
return json_response

71
main.py
View File

@ -1,13 +1,16 @@
import os import os
import dynatraceAPI
import logging import logging
from decouple import config from decouple import config
import yaml import yaml
import pandas as pd
from dynatrace import Dynatrace from dynatrace import Dynatrace
import logging import logging
def format_block(string, max):
string_length = len(string)
string = (f'{string}{" "*(max-string_length)}')
return string
def calculateDifference(dashboards, viewCounts): def calculateDifference(dashboards, viewCounts):
ids = [] ids = []
for stub in getattr(dashboards, "_PaginatedList__elements"): for stub in getattr(dashboards, "_PaginatedList__elements"):
@ -19,9 +22,14 @@ def calculateDifference(dashboards, viewCounts):
viewIds.append(getattr(metric, "dimension_map")["id"]) viewIds.append(getattr(metric, "dimension_map")["id"])
obsolete = [] obsolete = []
for value in ids: # for value in ids:
if value not in viewIds: # if value not in viewIds:
obsolete.append(value) # obsolete.append(value)
for stub in getattr(dashboards, "_PaginatedList__elements"):
if getattr(stub, "id") not in viewIds:
obsolete.append(stub)
return obsolete return obsolete
@ -29,12 +37,14 @@ def getDashboardsWithViewCount(DT_CLIENT, METRIC_SELECTOR, RESOLUTION,
FROM_DATE, TO_DATE): FROM_DATE, TO_DATE):
metrics = DT_CLIENT.metrics.query(METRIC_SELECTOR, RESOLUTION, FROM_DATE, metrics = DT_CLIENT.metrics.query(METRIC_SELECTOR, RESOLUTION, FROM_DATE,
TO_DATE) TO_DATE)
return metrics count = getattr(metrics, "_PaginatedList__total_count")
return count, metrics
def getDashboards(DT_CLIENT): def getDashboards(DT_CLIENT):
dashboards = DT_CLIENT.dashboards.list(owner=None, tags=None) dashboards = DT_CLIENT.dashboards.list(owner=None, tags=None)
return dashboards n_dashboards = getattr(dashboards, "_PaginatedList__total_count")
return n_dashboards, dashboards
if __name__ == "__main__": if __name__ == "__main__":
@ -45,7 +55,7 @@ if __name__ == "__main__":
environment = yaml.safe_load(env_cfg) environment = yaml.safe_load(env_cfg)
for item, doc in environment.items(): for item, doc in environment.items():
logging.info("%s checking token...", str(item)) logging.debug("%s checking token...", str(item))
if config(dict(doc[2]).get("env-token-name"), default='') != "": if config(dict(doc[2]).get("env-token-name"), default='') != "":
DT_URL = dict(doc[1]).get("env-url") DT_URL = dict(doc[1]).get("env-url")
@ -55,20 +65,43 @@ if __name__ == "__main__":
FROM_DATE= dict(doc[7]).get("fromDate") FROM_DATE= dict(doc[7]).get("fromDate")
TO_DATE= dict(doc[8]).get("toDate") TO_DATE= dict(doc[8]).get("toDate")
logging.info("%s init Dynatrace client...", str(item)) logging.debug("%s init Dynatrace client...", str(item))
DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"), DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"),
None, None, 0, 10*1000) None, None, 0, 10*1000)
logging.info("%s get all dashboards...", str(item)) logging.debug("%s get all dashboards...", str(item))
dashboards = getDashboards(DT_CLIENT) n_dashboards, dashboards = getDashboards(DT_CLIENT)
logging.info("%s %s total dashboards", str(item), str(n_dashboards))
logging.info("%s get viewcount of dashboards older than 6 months..." logging.debug("%s get dashboards with viewCount, resolution %s ...",
, str(item)) str(item), RESOLUTION)
viewCounts = getDashboardsWithViewCount(DT_CLIENT, METRIC_SELECTOR, count, viewCounts = getDashboardsWithViewCount(DT_CLIENT,
RESOLUTION, FROM_DATE, METRIC_SELECTOR,
TO_DATE) RESOLUTION,
FROM_DATE,
TO_DATE)
logging.info("%s %s dashboards with viewCount and older than 6 "
"Months", str(item), str(count))
logging.info("%s calculate difference...", str(item)) logging.debug("%s store ids of obsolete dashboards...", str(item))
result = calculateDifference(dashboards, viewCounts) results = calculateDifference(dashboards, viewCounts)
logging.info("%s %s dashboards with 0 viewCount!",
str(item), n_dashboards - count)
for result in results:
id = getattr(result, "id")
name = getattr(result, "name")
owner = getattr(result, "owner")
filename = os.path.join(".\log", item+"-log.txt")
os.makedirs(os.path.dirname(filename), exist_ok=True)
logging.info("%s calculated!", str(item)) with open(filename, "a", encoding="utf-8") as f:
f.write(f"{'id: ' + id + ' '} " +
f"{'name: ' + name + ' '} " +
f"{'owner: ' + owner}" +
"\n")
# logging.info(f"{'id: %s '}"
# f"{'name: %s'}"
# f"{'owner: %s'}",
# format_block(id, 50),
# format_block(name, 70), owner)
print("finished")

View File

View File

@ -1,116 +0,0 @@
import logging
from typing import Dict, Optional, Any
import time
import requests
import urllib3
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TOO_MANY_REQUESTS_WAIT = "wait"
##Not sure where/why this is here
class DynatraceRetry(Retry):
def get_backoff_time(self):
return self.backoff_factor
class HttpClient:
def __init__(
self,
base_url: str,
token: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
#mc_jsession_id: Optional[str] = None,
#mc_b925d32c: Optional[str] = None,
#mc_sso_csrf_cookie: Optional[str] = None,
):
while base_url.endswith("/"):
base_url = base_url[:-1]
self.base_url = base_url
if proxies is None:
proxies = {}
self.proxies = proxies
self.auth_header = {"Authorization": f"Api-Token {token}"}
self.log = log
if self.log is None:
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.WARNING)
st = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(thread)d - %(filename)s:%(lineno)d - %(message)s")
st.setFormatter(fmt)
self.log.addHandler(st)
self.too_many_requests_strategy = too_many_requests_strategy
retry_delay_s = retry_delay_ms / 1000
try:
self.retries = Retry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
allowed_methods=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
except TypeError: # Older version of urllib3?
self.retries = Retry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
method_whitelist=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
# This is for internal dynatrace usage
#self.mc_jsession_id = mc_jsession_id
#self.mc_b925d32c = mc_b925d32c
#self.mc_sso_csrf_cookie = mc_sso_csrf_cookie
def make_request(
self, path: str, params: Optional[Any] = None, headers: Optional[Dict] = None, method="GET", data=None, files=None, query_params=None
) -> requests.Response:
url = f"{self.base_url}{path}"
body = None
if method in ["POST", "PUT"]:
body = params
params = query_params
if headers is None:
headers = {}
if files is None and "content-type" not in [key.lower() for key in headers.keys()]:
headers.update({"content-type": "application/json"})
headers.update(self.auth_header)
cookies = None
#if self.mc_b925d32c and self.mc_sso_csrf_cookie and self.mc_jsession_id:
# headers.update({"Cookie": f"JSESSIONID={self.mc_jsession_id}; ssoCSRFCookie={self.mc_sso_csrf_cookie}; b925d32c={self.mc_b925d32c}"})
# cookies = {"JSESSIONID": self.mc_jsession_id, "ssoCSRFCookie": self.mc_sso_csrf_cookie, "b925d32c": self.mc_b925d32c}
s = requests.Session()
s.mount("https://", HTTPAdapter(max_retries=self.retries))
self.log.debug(f"Making {method} request to '{url}' with params {params} and body: {body}")
r = s.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies, data=data, cookies=cookies, files=files)
self.log.debug(f"Received response '{r}'")
while r.status_code == 429 and self.too_many_requests_strategy == TOO_MANY_REQUESTS_WAIT:
sleep_amount = int(r.headers.get("retry-after", 5))
self.log.warning(f"Sleeping for {sleep_amount}s because we have received an HTTP 429")
time.sleep(sleep_amount)
r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies)
if r.status_code >= 400:
raise Exception(f"Error making request to {url}: {r}. Response: {r.text}")
return r

View File

@ -1,68 +0,0 @@
from more_utils.httpClient import HttpClient
class Pagionation():
def __init__(self, http_client, target_url, target_params=None, headers=None, list_item="result"):
#self.__target_class = target_class
self.__http_client: HttpClient = http_client
self.__target_url = target_url
self.__target_params = target_params
self.__headers = headers
self.__list_item = list_item
self._has_next_page = True
self.__total_count = None
self.__page_size = None
self.elements = self._get_next_page()
def __iter__(self):# -> Iterator[T]:
for element in self.__elements:
yield element
while self._has_next_page:
new_elements = self._get_next_page()
for element in new_elements:
yield element
def __len__(self):
return self.__total_count or len(self.__elements)
def _get_next_page(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("totalCount") or len(elements)
while self._has_next_page == True:
self.__target_url = self.__target_url.split("?")[0]
elements += self._get_response()
return elements
def _get_response(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("totalCount") or len(elements)
return elements

92
test.py
View File

@ -1,88 +1,6 @@
import time
import typing
import urllib
import requests
import yaml
from decouple import config
import os
import pandas as pd
def build_params(params: typing.Dict) -> str: format_block("SCHSHSHSHSHSHSHSCHSHSHSHSHSHSH")
""" format_block("SCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSH")
Builds the parameter dictionary to a formatted string format_block("SCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSH")
Args: format_block("SCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSH")
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation format_block("SCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSHSCHSHSHSHSHSHSH")
Returns:
str: Returns the query string
"""
query_string = "&".join(
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
)
return query_string
def get_data_from_dynatrace(
throttling_rate: float | int,
token: str,
env_url: str,
params: typing.Dict | str,
route: str,
) -> typing.Dict:
"""
Sends out GET request to dynatrace
Args:
throttling (float | int ): If needed set timeout for throttling
token (str): Token for dynatrace API
env_url (str): Url for the respective environment
params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation
route (str): Route for the request
Returns:
typing.Dict: Returns the response as
"""
time.sleep(throttling_rate)
if type(params) is dict:
params_string = f"?{build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
headers = {"Authorization": f"Api-Token {token}"}
host_response = requests.get(
f"{env_url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
if host_response.status_code == 200:
return host_response.json()
else:
# TODO: proper error handling
print(f"ERROR - {host_response.status_code}")
if __name__ == "__main__":
metricSelector = "builtin:dashboards.viewCount:splitBy(id):sort(value(auto,ascending))"
resolution = "1M"
fromDate = "now-6M"
toDate = "now"
my_params = {
# "nextPageKey": 300,
"metricSelector": f"{metricSelector}",
"resolution": f"{resolution}",
"fromDate" : f"{fromDate}",
"toDate" : f"{toDate}"
}
with open(os.path.basename("./env-config.yaml")) as env_cfg:
env_config = yaml.safe_load(env_cfg)
for item, doc in env_config.items():
token = dict(doc[2])
url = dict(doc[1])
print(item, " crawling through ...")
print(item, " checking token ...")
if config(token.get("env-token-name"), default='') != "":
print(item, " fetching all dashboards ...")
DTTOKEN = config(token.get("env-token-name"), default='')
DTURL = url.get("env-url")
get_data_from_dynatrace(0, DTTOKEN, DTURL, my_params, "metrics/query")