master
SLW\ARNAUA 2023-07-04 13:03:42 +02:00
commit 273372db7a
8 changed files with 581 additions and 0 deletions

151
.gitignore vendored Normal file
View File

@ -0,0 +1,151 @@
.vscode
.idea
# Byte-compiled / optimized / DLL files
__pycache__/
more_utils/__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
### Terraform stuff
**/.terraform/*
crash.log
*.tfvars
#excel reports
*.xlsx
*.csv
# for dev
slo_parameter.yaml
metricexpressions.json
*.bak
*.json
failed_requests.txt
# other
*.txt

40
dynatraceAPI.py Normal file
View File

@ -0,0 +1,40 @@
import logging
from typing import Dict
import os
import sys
file_dir = os.path.dirname(__file__)
sys.path.append(file_dir)
from more_utils.httpClient import HttpClient
import pagination
class Dynatrace:
def __init__(
self,
base_url: str,
token: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
):
self.__http_client = HttpClient(
base_url, token, log, proxies, too_many_requests_strategy, retries, retry_delay_ms
)
def returnPageination(self,path,params,list_item):
page = pagination.Pagionation(self.__http_client,path,params,list_item=list_item)
return page
def returnSingle(self,path):
response = self.__http_client.make_request(path)
json_response = response.json()
return json_response

43
env-config.yaml Normal file
View File

@ -0,0 +1,43 @@
---
euprod-coco:
- name: "euprod"
- env-url: "https://xxu26128.live.dynatrace.com"
- env-token-name: "EUPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
- type: "coco"
euprod-gcdm:
- name: "euprod"
- env-url: "https://moh22956.live.dynatrace.com"
- env-token-name: "EUPRODSAAS_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
- type: "gcdm"
eupreprod-coco:
- name: "eupreprod"
- env-url: "https://qqk70169.live.dynatrace.com"
- env-token-name: "EUPREPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
- type: "coco"
naprod-coco:
- name: "naprod"
- env-url: "https://wgv50241.live.dynatrace.com"
- env-token-name: "NAPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
- type: "coco"
napreprod-coco:
- name: "napreprod"
- env-url: "https://onb44935.live.dynatrace.com"
- env-token-name: "NAPREPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
- type: "coco"
cnprod-coco:
- name: "cnprod"
- env-url: "https://dyna-synth-cn.bmwgroup.com.cn/e/b921f1b9-c00e-4031-b9d1-f5a0d530757b"
- env-token-name: "CNPROD_TOKEN_VAR"
- jenkins: "https://jaws-china.bmwgroup.net/opmaas/"
- type: "coco"
cnpreprod-coco:
- name: "cnpreprod"
- env-url: "https://dynatracemgd-tsp.bmwgroup.net/e/ab88c03b-b7fc-45f0-9115-9e9ecc0ced35"
- env-token-name: "CNPREPROD_TOKEN_VAR"
- jenkins: "https://jaws-china.bmwgroup.net/opmaas/"
- type: "coco"

75
main.py Normal file
View File

@ -0,0 +1,75 @@
import os
import dynatraceAPI
import logging
from decouple import config
import yaml
import pandas as pd
from dynatrace import Dynatrace
from dynatrace import TOO_MANY_REQUESTS_WAIT
from dynatrace.environment_v2.tokens_api import SCOPE_METRICS_READ, SCOPE_METRICS_INGEST
def getD(DTURL, DTTOKEN, metricSelector, resolution, fromDate, toDate):
# Create a Dynatrace client
dt = Dynatrace(DTURL, DTTOKEN)
for metric in dt.metrics.query(metricSelector, resolution, fromDate, toDate):
with open('metrics.txt', 'w') as f:
f.write(str(metric))
def getDashboardsWithViewCount(DTAPIToken, DTENV, metricSelector, resolution,
fromDate, toDate):
dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken,
logging.Logger("ERROR"), None, None, 0,
2 * 1000)
params = {
"metricSelector": f"{metricSelector}",
"resolution": f"{resolution}",
"fromDate": f"{fromDate}",
"toDate": f"{toDate}"
}
api_url_report = "/api/v2/metrics/query"
pages = dtclient.returnPageination(api_url_report, params, "data")
df = pd.DataFrame(pages.elements)
return df
def getDashboards(DTAPIToken, DTENV):
dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken,
logging.Logger("ERROR"), None, None, 0,
2 * 1000)
my_params = {
"owner": "",
"tags": [],
}
api_url_report = "/api/config/v1/dashboards"
pages = dtclient.returnPageination(api_url_report, my_params, "dashboards")
df = pd.DataFrame(pages.elements)
return df
if __name__ == "__main__":
metricSelector = "builtin:dashboards.viewCount:splitBy(id):sort(value(auto,ascending))"
resolution = "1M"
fromDate = "now-6M"
toDate = "now"
with open(os.path.basename("./env-config.yaml")) as env_cfg:
env_config = yaml.safe_load(env_cfg)
for item, doc in env_config.items():
token = dict(doc[2])
url = dict(doc[1])
print(item, " crawling through ...")
print(item, " checking token ...")
if config(token.get("env-token-name"), default='') != "":
print(item, " fetching all dashboards ...")
DTTOKEN = config(token.get("env-token-name"), default='')
DTURL = url.get("env-url")
getDashboards(DTTOKEN, DTURL)
getDashboardsWithViewCount(DTTOKEN, DTURL, metricSelector,
resolution, fromDate, toDate)
getD(DTURL, DTTOKEN, metricSelector, resolution, fromDate, toDate)

0
more_utils/__init__.py Normal file
View File

116
more_utils/httpClient.py Normal file
View File

@ -0,0 +1,116 @@
import logging
from typing import Dict, Optional, Any
import time
import requests
import urllib3
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TOO_MANY_REQUESTS_WAIT = "wait"
##Not sure where/why this is here
class DynatraceRetry(Retry):
def get_backoff_time(self):
return self.backoff_factor
class HttpClient:
def __init__(
self,
base_url: str,
token: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
#mc_jsession_id: Optional[str] = None,
#mc_b925d32c: Optional[str] = None,
#mc_sso_csrf_cookie: Optional[str] = None,
):
while base_url.endswith("/"):
base_url = base_url[:-1]
self.base_url = base_url
if proxies is None:
proxies = {}
self.proxies = proxies
self.auth_header = {"Authorization": f"Api-Token {token}"}
self.log = log
if self.log is None:
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.WARNING)
st = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(thread)d - %(filename)s:%(lineno)d - %(message)s")
st.setFormatter(fmt)
self.log.addHandler(st)
self.too_many_requests_strategy = too_many_requests_strategy
retry_delay_s = retry_delay_ms / 1000
try:
self.retries = Retry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
allowed_methods=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
except TypeError: # Older version of urllib3?
self.retries = Retry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
method_whitelist=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
# This is for internal dynatrace usage
#self.mc_jsession_id = mc_jsession_id
#self.mc_b925d32c = mc_b925d32c
#self.mc_sso_csrf_cookie = mc_sso_csrf_cookie
def make_request(
self, path: str, params: Optional[Any] = None, headers: Optional[Dict] = None, method="GET", data=None, files=None, query_params=None
) -> requests.Response:
url = f"{self.base_url}{path}"
body = None
if method in ["POST", "PUT"]:
body = params
params = query_params
if headers is None:
headers = {}
if files is None and "content-type" not in [key.lower() for key in headers.keys()]:
headers.update({"content-type": "application/json"})
headers.update(self.auth_header)
cookies = None
#if self.mc_b925d32c and self.mc_sso_csrf_cookie and self.mc_jsession_id:
# headers.update({"Cookie": f"JSESSIONID={self.mc_jsession_id}; ssoCSRFCookie={self.mc_sso_csrf_cookie}; b925d32c={self.mc_b925d32c}"})
# cookies = {"JSESSIONID": self.mc_jsession_id, "ssoCSRFCookie": self.mc_sso_csrf_cookie, "b925d32c": self.mc_b925d32c}
s = requests.Session()
s.mount("https://", HTTPAdapter(max_retries=self.retries))
self.log.debug(f"Making {method} request to '{url}' with params {params} and body: {body}")
r = s.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies, data=data, cookies=cookies, files=files)
self.log.debug(f"Received response '{r}'")
while r.status_code == 429 and self.too_many_requests_strategy == TOO_MANY_REQUESTS_WAIT:
sleep_amount = int(r.headers.get("retry-after", 5))
self.log.warning(f"Sleeping for {sleep_amount}s because we have received an HTTP 429")
time.sleep(sleep_amount)
r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies)
if r.status_code >= 400:
raise Exception(f"Error making request to {url}: {r}. Response: {r.text}")
return r

68
pagination.py Normal file
View File

@ -0,0 +1,68 @@
from more_utils.httpClient import HttpClient
class Pagionation():
def __init__(self, http_client, target_url, target_params=None, headers=None, list_item="result"):
#self.__target_class = target_class
self.__http_client: HttpClient = http_client
self.__target_url = target_url
self.__target_params = target_params
self.__headers = headers
self.__list_item = list_item
self._has_next_page = True
self.__total_count = None
self.__page_size = None
self.elements = self._get_next_page()
def __iter__(self):# -> Iterator[T]:
for element in self.__elements:
yield element
while self._has_next_page:
new_elements = self._get_next_page()
for element in new_elements:
yield element
def __len__(self):
return self.__total_count or len(self.__elements)
def _get_next_page(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("totalCount") or len(elements)
while self._has_next_page == True:
self.__target_url = self.__target_url.split("?")[0]
elements += self._get_response()
return elements
def _get_response(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("totalCount") or len(elements)
return elements

88
test.py Normal file
View File

@ -0,0 +1,88 @@
import time
import typing
import urllib
import requests
import yaml
from decouple import config
import os
import pandas as pd
def build_params(params: typing.Dict) -> str:
"""
Builds the parameter dictionary to a formatted string
Args:
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation
Returns:
str: Returns the query string
"""
query_string = "&".join(
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
)
return query_string
def get_data_from_dynatrace(
throttling_rate: float | int,
token: str,
env_url: str,
params: typing.Dict | str,
route: str,
) -> typing.Dict:
"""
Sends out GET request to dynatrace
Args:
throttling (float | int ): If needed set timeout for throttling
token (str): Token for dynatrace API
env_url (str): Url for the respective environment
params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation
route (str): Route for the request
Returns:
typing.Dict: Returns the response as
"""
time.sleep(throttling_rate)
if type(params) is dict:
params_string = f"?{build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
headers = {"Authorization": f"Api-Token {token}"}
host_response = requests.get(
f"{env_url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
if host_response.status_code == 200:
return host_response.json()
else:
# TODO: proper error handling
print(f"ERROR - {host_response.status_code}")
if __name__ == "__main__":
metricSelector = "builtin:dashboards.viewCount:splitBy(id):sort(value(auto,ascending))"
resolution = "1M"
fromDate = "now-6M"
toDate = "now"
my_params = {
# "nextPageKey": 300,
"metricSelector": f"{metricSelector}",
"resolution": f"{resolution}",
"fromDate" : f"{fromDate}",
"toDate" : f"{toDate}"
}
with open(os.path.basename("./env-config.yaml")) as env_cfg:
env_config = yaml.safe_load(env_cfg)
for item, doc in env_config.items():
token = dict(doc[2])
url = dict(doc[1])
print(item, " crawling through ...")
print(item, " checking token ...")
if config(token.get("env-token-name"), default='') != "":
print(item, " fetching all dashboards ...")
DTTOKEN = config(token.get("env-token-name"), default='')
DTURL = url.get("env-url")
get_data_from_dynatrace(0, DTTOKEN, DTURL, my_params, "metrics/query")