qm_report/kpi_extension.py

503 lines
19 KiB
Python

import threading
import concurrent.futures
import os
import glob
import pandas as pd
from collections import defaultdict
import typing
import requests
import urllib.parse
from dotenv import load_dotenv
import yaml
from KRParser import krparser
import warnings
from datetime import datetime
warnings.filterwarnings("ignore")
load_dotenv()
class ReportReader:
"""
Reads the XLSX file generated by createReport.py and extracts
necessary information to add additional data to the QM Report.
"""
def __init__(self) -> None:
self.qm_report_file = ""
self.qm_report_df = defaultdict(dict)
self.sheet_names = []
self.qm_report_ids = defaultdict(dict)
def run(self) -> None:
"""
Entrypoint.
"""
self.get_qm_report_file_and_read_to_dataframe()
self.get_slo_ids()
def get_qm_report_file_and_read_to_dataframe(self):
"""
Gets XLSX file and reads it into pandas dataframes
"""
self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0]
sheet_names = self.get_sheet_names()
for sheet_name in sheet_names:
df = pd.read_excel(self.qm_report_file, sheet_name)
columns = df.columns
if "Unnamed: 0" in columns:
df = df.drop("Unnamed: 0", axis=1)
self.qm_report_df[sheet_name] = df
def get_sheet_names(self) -> typing.List:
"""
Gets the sheet names of the XLSX file.
Returns:
typing.List: List with sheet names
"""
return pd.ExcelFile(self.qm_report_file).sheet_names
def get_slo_ids(self) -> None:
"""
Extracts all the SLO ids and sorts them by hub.
"""
for df_sheet_name in self.qm_report_df.keys():
hubs = self.qm_report_df[df_sheet_name]["HUB"].unique()
for hub in hubs:
self.qm_report_ids[df_sheet_name][hub] = []
for _, row in self.qm_report_df[df_sheet_name].iterrows():
self.qm_report_ids[df_sheet_name][row["HUB"]].append(row["id"])
class DynatraceDataGetter:
def __init__(self) -> None:
self.config = {"threads": 3}
self.environment = self._load_environment()
def run(self, data: typing.Dict):
env_doc = self.environment
def get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
if type(params) is dict:
params_string = f"?{self._build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
# TODO: use helper.py from keyrequest parser
headers = {"Authorization": f"Api-Token {token}"}
response = requests.get(
f"{url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
return response.json()
def test_get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
if type(params) is dict:
params_string = f"?{self._build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
return f"{url}/api/v2/{route}{params_string}"
def _build_params(self, params: typing.Dict) -> str:
query_string = "&".join(
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
)
return query_string
def _load_environment(self) -> typing.Dict:
with open("./environment.yaml", "r") as f:
env_doc = yaml.safe_load(f)
return env_doc
class KPIGetter:
def __init__(self) -> None:
self.data_getter = DynatraceDataGetter()
self.extracted_key_requests = defaultdict(dict)
self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid
def transform_key_requests(self) -> str:
for hub in self.extracted_key_requests.keys():
for slo in self.extracted_key_requests[hub].keys():
if len(self.extracted_key_requests[hub][slo]["services"]) > 0:
services = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["services"]
)
self.extracted_key_requests[hub][slo][
"services_transformed"
] = services
else:
print(f"SERVICE: {hub} - {slo} is empty")
if len(self.extracted_key_requests[hub][slo]["requests"]):
requests = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["requests"]
)
self.extracted_key_requests[hub][slo][
"requests_transformed"
] = requests
else:
# TODO: proper logging
print(f"REQUEST: {hub} - {slo} is empty")
def get_kpi_data(self, dfs: ReportReader):
# for hub in self.extracted_key_requests.keys():
# for slo in self.extracted_key_requests[hub].keys():
# if "services_transformed" in self.extracted_key_requests[hub][slo]:
for sheet in dfs.keys():
self.metric_expressions[sheet] = defaultdict(dict)
hubs = dfs[sheet]["HUB"].unique()
for hub in hubs:
self.metric_expressions[sheet][hub] = defaultdict(dict)
for _, row in dfs[sheet].iterrows():
self.metric_expressions[sheet][row["HUB"]][row["id"]] = []
from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps(
row["timeframe"]
)
timeframe = self._get_timeframe_for_kpi_data(
from_timestamp_ms, to_timestamp_ms
)
if (
"services_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
):
metric_kpi1 = self._build_kpi_metric_for_query(
"kpi1",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
# {
# "kpi_name": "kpi_1",
# "metric": metric_kpi1,
# "from_date": from_timestamp_ms,
# "to_date": to_timestamp_ms,
# "resolution": timeframe,
# }
self._template_metric_expression(
"kp_1",
metric_kpi1,
self._calculate_timeshift(from_timestamp_ms, timeframe),
self._calculate_timeshift(to_timestamp_ms, timeframe),
timeframe,
)
)
if (
"requests_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
):
metric_kpi2 = self._build_kpi_metric_for_query(
"kpi2",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"requests_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
# {
# "kpi_name": "kpi_2",
# "metric": metric_kpi2,
# "from_date": from_timestamp_ms,
# "to_date": to_timestamp_ms,
# "resolution": timeframe,
# }
self._template_metric_expression(
"kpi2",
metric_kpi2,
self._calculate_timeshift(from_timestamp_ms, timeframe),
self._calculate_timeshift(to_timestamp_ms, timeframe),
timeframe,
)
)
if (
"requests_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
and "services_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
):
metric_count = self._build_kpi_metric_for_query(
"count",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
self.extracted_key_requests[row["HUB"]][row["id"]][
"requests_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
# {
# "kpi_name": "count",
# "metric": metric_count,
# "from_date": from_timestamp_ms,
# "to_date": to_timestamp_ms,
# "resolution": timeframe,
# }
self._template_metric_expression(
"count",
metric_count,
from_timestamp_ms,
to_timestamp_ms,
timeframe,
)
)
metric_error_count = self._build_kpi_metric_for_query(
"error_count",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
self.extracted_key_requests[row["HUB"]][row["id"]][
"requests_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
self._template_metric_expression(
"error_count",
metric_error_count,
from_timestamp_ms,
to_timestamp_ms,
timeframe,
)
)
def _dispatch_to_dynatrace(self):
# with concurrent.futures.ThreadPoolExecutor(
# self.data_getter.config["threads"]
# ) as executor:
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
for query in self.metric_expressions[sheet][hub][slo]:
if query["metric"] is not False:
params = {
"metricSelector": query["metric"],
"resolution": query["resolution"],
"from": query["from_date"],
"to": query["to_date"],
}
# result = executor.submit(
# self.data_getter.test_get_data_from_dynatrace,
# params,
# hub,
# "metrics/query",
# )
result = self.data_getter.test_get_data_from_dynatrace(
params, hub, "metrics/query"
)
if hub == "euprod":
print(result)
def _template_metric_expression(
self,
kpi_name: str,
metric_expression: str,
from_timestamp_ms: int,
to_timestamp_ms: int,
resolution: str,
) -> typing.Dict:
element = {
"kpi_name": kpi_name,
"metric": metric_expression,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": resolution,
}
return element
def _calculate_timeshift(self, timestamp_ms: int, resolution: str) -> int:
result = ""
if resolution == "7d":
result = timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
if resolution == "1w":
# TODO: get previous week
pass
if resolution == "1M":
# TODO: get previous month
pass
return result
def _get_timeframe_for_kpi_data(
self, from_timestamp: int, to_timestamp: int
) -> str:
days = Helper.get_days(from_timestamp, to_timestamp)
timeframe = ""
if days == 1:
timeframe = "7d"
if days == 7:
timeframe = "1w"
if days >= 28 and days < 32:
timeframe = "1M"
return timeframe
def _build_kpi_metric_for_query(
self, kpi_type: str, timeframe: str, service: str = None, request: str = None
) -> typing.Union[str, bool]:
# if switches are available (python3.10?) use switches
kpi = ""
if kpi_type == "kpi1":
kpi = f'100-(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe}))'
elif kpi_type == "kpi2":
kpi = f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.equals({service})"))))):value:rate(h):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.equals({service})"))))):value:rate(h):fold(avg)))'
elif kpi_type == "count":
kpi = f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):splitBy())'
elif kpi_type == "error_count":
kpi = f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):splitBy())'
else:
kpi = False
return kpi
def _extract_key_requests(
self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str
):
krp = krparser.KRParser(
name=env,
options=krparser.KROption.RESOLVESERVICES,
config={
"threads": 10,
"serviceLookupParams": {"fields": "tags,fromRelationships"},
"extendResultObjects": {"env": env},
},
DTAPIURL=DTURL,
DTAPIToken=DTTOKEN,
)
krs = krp.parse(slo_ids_df)
self.extracted_key_requests[env] = {}
for kr in krs:
self.extracted_key_requests[env][kr.metadata["sloId"]] = {}
self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = []
self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = []
for key_request in kr.keyRequests:
self.extracted_key_requests[env][kr.metadata["sloId"]][
"requests"
].append(key_request["displayName"])
for service in key_request["services"]:
if (
service["displayName"]
not in self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
]
):
self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
].append(service["displayName"])
self.transform_key_requests()
def get_slos(self, slo_ids: list, hub: str):
slo_responses = []
for slo_id in slo_ids:
response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo")
del response["errorBudgetBurnRate"]
slo_responses.append(response)
df = pd.DataFrame(slo_responses)
self._extract_key_requests(
df,
hub,
self.data_getter.environment[hub][1]["env-url"],
os.environ[self.data_getter.environment[hub][2]["env-token-name"]],
)
class Helper:
@staticmethod
def transform_and_format_list(data: list) -> str:
joined_string = ", ".join(data)
string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")])
return string
@staticmethod
def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]:
ts = timestamp.split(" to ")
return int(ts[0]), int(ts[1])
@staticmethod
def get_days(from_timestamp: int, to_timestamp: int) -> int:
from_date = datetime.fromtimestamp(from_timestamp / 1000)
to_timestamp = datetime.fromtimestamp(to_timestamp / 1000)
duration = to_timestamp - from_date
return duration.days
@staticmethod
def previous_day_range(date):
start_date = date - datetime.timedelta(days=1)
end_date = date - datetime.timedelta(days=1)
return start_date, end_date
@staticmethod
def previous_week_range(date):
start_date = date + datetime.timedelta(-date.weekday(), weeks=-1)
end_date = date + datetime.timedelta(-date.weekday() - 1)
return start_date, end_date
@staticmethod
def previous_month_range(date):
end_date = date.replace(day=1) - datetime.timedelta(days=1)
start_date = end_date.replace(day=1)
return start_date, end_date
def main():
"""
Entrypoint.
"""
kpi_getter = KPIGetter()
report_reader = ReportReader()
report_reader.run()
# print(report_reader.qm_report_df)
# Get SLO IDs from first sheet and build metric expression queries.
for i, sheet in enumerate(report_reader.qm_report_ids.keys()):
if i == 0:
for hub in report_reader.qm_report_ids[sheet].keys():
kpi_getter.get_slos(report_reader.qm_report_ids[sheet][hub], hub)
kpi_getter.get_kpi_data(report_reader.qm_report_df)
print(kpi_getter._dispatch_to_dynatrace())
# print("=" * 80)
# print(kpi_getter.metric_expressions)
# print("=" * 80)
if __name__ == "__main__":
main()