qm_report/kpi_extension.py

628 lines
25 KiB
Python

import threading
import concurrent.futures
import os
import glob
import pandas as pd
from collections import defaultdict
import typing
import requests
import urllib.parse
from dotenv import load_dotenv
import yaml
from KRParser import krparser, helper
import warnings
from datetime import datetime, timedelta
from openpyxl import Workbook
from openpyxl.utils.dataframe import dataframe_to_rows
# DEBUG
import json
warnings.filterwarnings("ignore")
load_dotenv()
class ReportReader:
"""
Reads the XLSX file generated by createReport.py and extracts
necessary information to add additional data to the QM Report.
"""
def __init__(self) -> None:
self.qm_report_file = ""
self.qm_report_df = defaultdict(dict)
self.sheet_names = []
self.qm_report_ids = defaultdict(dict)
def run(self) -> None:
"""
Entrypoint.
"""
self.get_qm_report_file_and_read_to_dataframe()
self.get_slo_ids()
def get_qm_report_file_and_read_to_dataframe(self):
"""
Gets XLSX file and reads it into pandas dataframes
"""
self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0]
sheet_names = self.get_sheet_names()
for sheet_name in sheet_names:
df = pd.read_excel(self.qm_report_file, sheet_name)
columns = df.columns
if "Unnamed: 0" in columns:
df = df.drop("Unnamed: 0", axis=1)
self.qm_report_df[sheet_name] = df
def get_sheet_names(self) -> typing.List:
"""
Gets the sheet names of the XLSX file.
Returns:
typing.List: List with sheet names
"""
return pd.ExcelFile(self.qm_report_file).sheet_names
def get_slo_ids(self) -> None:
"""
Extracts all the SLO ids and sorts them by hub.
"""
for df_sheet_name in self.qm_report_df.keys():
hubs = self.qm_report_df[df_sheet_name]["HUB"].unique()
for hub in hubs:
self.qm_report_ids[df_sheet_name][hub] = []
for _, row in self.qm_report_df[df_sheet_name].iterrows():
self.qm_report_ids[df_sheet_name][row["HUB"]].append(row["id"])
class QmReportWriter:
def __init__(self, report_dfs: pd.DataFrame, kpis: typing.Dict):
self.report_dfs = report_dfs
self.kpis = kpis
def _combine_datasets(self):
for sheet in self.kpis.keys():
for hub in self.kpis[sheet].keys():
for slo_id in self.kpis[sheet][hub].keys():
for query in self.kpis[sheet][hub][slo_id]:
if len(query) > 0:
if (
query["result"] != "None"
and len(query["result"]) > 0
and len(query["result"][0]["data"]) > 0
and len(query["result"][0]["data"][0]["values"][0]) > 0
):
values = query["result"][0]["data"][0]["values"][0]
mask = (
(self.report_dfs[sheet]["HUB"] == hub)
& (self.report_dfs[sheet]["id"] == slo_id)
& (
self.report_dfs[sheet]["timeframe"]
== query["timeframe"]
)
)
if (
query["kpi_name"]
not in self.report_dfs[sheet].columns
):
self.report_dfs[sheet][query["kpi_name"]] = None
self.report_dfs[sheet].loc[
mask, query["kpi_name"]
] = values
self.write_report_to_xlsx()
def write_report_to_xlsx(self):
writer = pd.ExcelWriter("test.xlsx", engine="xlsxwriter")
workbook = writer.book
for sheet_name, dataframe in self.report_dfs.items():
dataframe.to_excel(writer, sheet_name=sheet_name, index=False)
worksheet = writer.sheets[sheet_name]
worksheet.autofilter(0, 0, dataframe.shape[0], dataframe.shape[1] - 1)
writer.close()
class DynatraceDataGetter:
def __init__(self) -> None:
self.config = {"threads": 3}
self.environment = self._load_environment()
def run(self, data: typing.Dict):
env_doc = self.environment
def get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
if type(params) is dict:
params_string = f"?{self._build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
# TODO: use helper.py from keyrequest parser
headers = {"Authorization": f"Api-Token {token}"}
response = requests.get(
f"{url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
return response.json()
def krparser_get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
# if type(params) is dict:
# params_string = f"?{self._build_params(params)}"
# elif type(params) is str:
# params_string = f"/{params}"
# if environment == "euprod":
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
dt_url = f"{url}/api/v2/{route}"
headers = {
"Content-Type": "application/json",
"Authorization": "Api-Token " + token,
}
response = helper.get_request(url=dt_url, headers=headers, params=params)
return response.json()
def _build_params(self, params: typing.Dict) -> str:
query_string = "&".join(
f"{key}={urllib.parse.quote(str(value))}" for key, value in params.items()
)
return query_string
def _load_environment(self) -> typing.Dict:
with open("./environment.yaml", "r") as f:
env_doc = yaml.safe_load(f)
return env_doc
class KPIGetter:
def __init__(self) -> None:
self.data_getter = DynatraceDataGetter()
self.extracted_key_requests = defaultdict(dict)
self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid
def transform_key_requests(self) -> str:
for hub in self.extracted_key_requests.keys():
for slo in self.extracted_key_requests[hub].keys():
if len(self.extracted_key_requests[hub][slo]["services"]) > 0:
services = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["services"]
)
self.extracted_key_requests[hub][slo][
"services_transformed"
] = services
else:
print(f"SERVICE: {hub} - {slo} is empty")
if len(self.extracted_key_requests[hub][slo]["requests"]):
requests = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["requests"]
)
self.extracted_key_requests[hub][slo][
"requests_transformed"
] = requests
else:
# TODO: proper logging
print(f"REQUEST: {hub} - {slo} is empty")
def get_kpi_data(self, dfs: ReportReader):
# for hub in self.extracted_key_requests.keys():
# for slo in self.extracted_key_requests[hub].keys():
# if "services_transformed" in self.extracted_key_requests[hub][slo]:
for sheet in dfs.keys():
self.metric_expressions[sheet] = defaultdict(dict)
hubs = dfs[sheet]["HUB"].unique()
for hub in hubs:
self.metric_expressions[sheet][hub] = defaultdict(dict)
for _, row in dfs[sheet].iterrows():
self.metric_expressions[sheet][row["HUB"]][row["id"]] = []
# TODO: another iteration
from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps(
row["timeframe"]
)
timeframe = self._get_timeframe_for_kpi_data(
from_timestamp_ms, to_timestamp_ms
)
(
from_timestamp_ms_shifted,
to_timestamp_ms_shifted,
) = self._calculate_timeshift(
from_timestamp_ms, to_timestamp_ms, timeframe
)
if row["id"] in self.extracted_key_requests[row["HUB"]]:
if (
"services_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
):
metric_kpi1 = self._build_kpi_metric_for_query(
"kpi1",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
self._template_metric_expression(
"kpi_1",
metric_kpi1,
from_timestamp_ms_shifted,
to_timestamp_ms_shifted,
timeframe,
row["timeframe"],
)
)
metric_kpi2 = self._build_kpi_metric_for_query(
"kpi2",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
self._template_metric_expression(
"kpi_2",
metric_kpi2,
from_timestamp_ms_shifted,
to_timestamp_ms_shifted,
timeframe,
)
)
if (
"requests_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
and "services_transformed"
in self.extracted_key_requests[row["HUB"]][row["id"]]
):
metric_count = self._build_kpi_metric_for_query(
"count",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
self.extracted_key_requests[row["HUB"]][row["id"]][
"requests_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
{
"kpi_name": "count",
"metric": metric_count,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": timeframe,
"timeframe": row["timeframe"],
}
)
metric_error_count = self._build_kpi_metric_for_query(
"error_count",
timeframe,
self.extracted_key_requests[row["HUB"]][row["id"]][
"services_transformed"
],
self.extracted_key_requests[row["HUB"]][row["id"]][
"requests_transformed"
],
)
self.metric_expressions[sheet][row["HUB"]][row["id"]].append(
{
"kpi_name": "error_count",
"metric": metric_error_count,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": timeframe,
"timeframe": row["timeframe"],
}
)
def _dispatch_to_dynatrace(self):
with concurrent.futures.ThreadPoolExecutor(
self.data_getter.config["threads"]
) as executor:
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
for index, query in enumerate(
self.metric_expressions[sheet][hub][slo]
):
if query["metric"] is not False:
params = {
"metricSelector": query["metric"],
# "resolution": query["resolution"],
"from": query["from_date"],
"to": query["to_date"],
}
if "resolution" in query:
params["resolution"] = query["resolution"]
# future = executor.submit(
# self.data_getter.get_data_from_dynatrace,
# params,
# hub,
# "metrics/query",
# )
future = executor.submit(
self.data_getter.krparser_get_data_from_dynatrace,
params,
hub,
"metrics/query",
)
self.metric_expressions[sheet][hub][slo][index][
"data"
] = future
self._process_dynatrace_data()
def _process_dynatrace_data(self):
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
# future = self.metric_expressions[sheet][hub][slo]["data"]
for index, query in enumerate(
self.metric_expressions[sheet][hub][slo]
):
future = query["data"]
result = future.result()
if len(result["result"][0]["data"]) > 0:
self.metric_expressions[sheet][hub][slo][index][
"result"
] = result["result"]
else:
self.metric_expressions[sheet][hub][slo][index][
"result"
] = "None"
# DEBUG
del query["data"]
# print(self.metric_expressions[sheet][hub][slo][index]["result"])
# TODO: DEBUG remove
with open("./slo_results.txt", "a") as f:
f.write(f"\n{sheet} -> {hub} -> {slo}:\n")
f.write(json.dumps(result, indent=4))
f.write("\n")
f.write("-" * 80)
def _template_metric_expression(
self,
kpi_name: str,
metric_expression: str,
from_timestamp_ms: int,
to_timestamp_ms: int,
resolution: str,
timeframe: str,
) -> typing.Dict:
element = {
"kpi_name": kpi_name,
"metric": metric_expression,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": resolution,
"timeframe": timeframe,
}
return element
def _calculate_timeshift(
self, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str
) -> int:
from_ts, to_ts = "", ""
if resolution == "7d":
from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
to_ts = to_timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
if resolution == "1w":
from_date, end_date = Helper.previous_week_range(
datetime.fromtimestamp(to_timestamp_ms / 1000), -2
)
from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms")
to_ts = Helper.convert_datetime_to_timestamp(end_date, "ms")
if resolution == "1M":
# TODO: not done yet
from_ts = from_timestamp_ms
to_ts = to_timestamp_ms
return from_ts, to_ts
# def _calculate_1w_timeshift(self, timestamp_ms: int) -> typing.Tuple[int, int]:
# pass
# def _calculate_1M_timeshift(self, timestamp_ms: int) -> typing.Tuple[int, int]:
# pass
def _get_timeframe_for_kpi_data(
self, from_timestamp: int, to_timestamp: int
) -> str:
days = Helper.get_days(from_timestamp, to_timestamp)
timeframe = ""
if days == 1:
timeframe = "7d"
if days == 7:
timeframe = "1w"
if days >= 28 and days < 32:
timeframe = "1M"
return timeframe
def _build_kpi_metric_for_query(
self, kpi_type: str, timeframe: str, service: str = None, request: str = None
) -> typing.Union[str, bool]:
# if switches are available (python3.10?) use switches
# TODO: make nicer
kpi = ""
if kpi_type == "kpi1":
kpi = f'100-(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe}))'
elif kpi_type == "kpi2":
kpi = f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate(h):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate(h):fold(avg)))'
elif kpi_type == "count":
kpi = f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):splitBy())'
elif kpi_type == "error_count":
kpi = f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):splitBy())'
else:
kpi = False
return kpi
def _extract_key_requests(
self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str
):
krp = krparser.KRParser(
name=env,
options=krparser.KROption.RESOLVESERVICES,
config={
"threads": 10,
"serviceLookupParams": {"fields": "tags,fromRelationships"},
"extendResultObjects": {"env": env},
},
DTAPIURL=DTURL,
DTAPIToken=DTTOKEN,
)
krs = krp.parse(slo_ids_df)
self.extracted_key_requests[env] = {}
for kr in krs:
self.extracted_key_requests[env][kr.metadata["sloId"]] = {}
self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = []
self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = []
for key_request in kr.keyRequests:
self.extracted_key_requests[env][kr.metadata["sloId"]][
"requests"
].append(key_request["displayName"])
for service in key_request["services"]:
if (
service["displayName"]
not in self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
]
):
self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
].append(service["displayName"])
self.transform_key_requests()
def get_slos(self, slo_ids: list, hub: str):
slo_responses = []
for slo_id in slo_ids:
response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo")
del response["errorBudgetBurnRate"]
slo_responses.append(response)
df = pd.DataFrame(slo_responses)
self._extract_key_requests(
df,
hub,
self.data_getter.environment[hub][1]["env-url"],
os.environ[self.data_getter.environment[hub][2]["env-token-name"]],
)
class Helper:
@staticmethod
def transform_and_format_list(data: list) -> str:
joined_string = ", ".join(data)
string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")])
return string
@staticmethod
def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]:
ts = timestamp.split(" to ")
return int(ts[0]), int(ts[1])
@staticmethod
def get_days(from_timestamp: int, to_timestamp: int) -> int:
from_date = datetime.fromtimestamp(from_timestamp / 1000)
to_timestamp = datetime.fromtimestamp(to_timestamp / 1000)
duration = to_timestamp - from_date
return duration.days
@staticmethod
def previous_day_range(date):
start_date = date - timedelta(days=1)
end_date = date - timedelta(days=1)
return start_date, end_date
@staticmethod
def previous_week_range(date, weeks: int):
start_date = date + timedelta(-date.weekday(), weeks=weeks) # -1
# end_date = date + timedelta(-date.weekday() - 1)
end_date = date + timedelta(-date.weekday(), weeks=weeks + 1)
return start_date, end_date
@staticmethod
def previous_month_range(date):
end_date = date.replace(day=1) - datetime.timedelta(days=1)
start_date = end_date.replace(day=1)
return start_date, end_date
@staticmethod
def convert_datetime_to_timestamp(date: datetime.date, option: str = None) -> int:
date_datetime = datetime.combine(date, datetime.min.time())
epoch = datetime(1970, 1, 1)
date_timestamp = (date_datetime - epoch).total_seconds()
if option == "ms":
date_timestamp = date_timestamp * 1000
return int(date_timestamp)
def main():
"""
Entrypoint.
"""
kpi_getter = KPIGetter()
report_reader = ReportReader()
report_reader.run()
# print(report_reader.qm_report_df)
# Get SLO IDs from first sheet and build metric expression queries.
for i, sheet in enumerate(report_reader.qm_report_ids.keys()):
if i == 0:
for hub in report_reader.qm_report_ids[sheet].keys():
kpi_getter.get_slos(report_reader.qm_report_ids[sheet][hub], hub)
kpi_getter.get_kpi_data(report_reader.qm_report_df)
kpi_getter._dispatch_to_dynatrace()
write_report = QmReportWriter(
report_reader.qm_report_df, kpi_getter.metric_expressions
)
write_report._combine_datasets()
# DEBUG
with open("metricexpressions.json", "w") as f:
f.write(json.dumps(kpi_getter.metric_expressions["daily"], indent=4))
if __name__ == "__main__":
main()