qm_report/kpi_extension.py

1145 lines
46 KiB
Python

#!/usr/bin/python3.8
# import threading
import concurrent.futures
import os
import glob
import pandas as pd
from collections import defaultdict
import typing
import requests
import urllib.parse
import yaml
from KRParser import krparser, helper
import warnings
from datetime import datetime, timedelta
import time
import json
from dateutil.relativedelta import relativedelta
DEBUG = False
if DEBUG:
from dotenv import load_dotenv
load_dotenv()
warnings.filterwarnings("ignore")
REPORT_TYPE = os.environ.get("REPORT_TYPE")
try:
os.environ["TZ"] = "Europe/Berlin" # set new timezone
time.tzset()
except Exception as e:
print(f"This error was encounterted : {e}")
class ReportReader:
"""
Reads the XLSX file generated by createReport.py and extracts
necessary information to add additional data to the QM Report.
"""
def __init__(self) -> None:
self.qm_report_file = ""
self.qm_report_df = defaultdict(dict)
self.sheet_names = []
self.qm_report_ids = defaultdict(dict)
def run(self) -> None:
"""
Entrypoint.
"""
self.get_qm_report_file_and_read_to_dataframe()
self.get_slo_ids()
def get_qm_report_file_and_read_to_dataframe(self):
"""
Gets XLSX file and reads it into pandas dataframes
"""
Helper.console_output("Getting latest QM-Report and ingesting...")
self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0]
sheet_names = self.get_sheet_names()
for sheet_name in sheet_names:
df = pd.read_excel(self.qm_report_file, sheet_name)
columns = df.columns
if "Unnamed: 0" in columns:
df = df.drop("Unnamed: 0", axis=1)
if sheet_name != "total":
df["Date"] = pd.to_datetime(df["Date"], format="%Y-%m-%d")
if DEBUG:
Helper.console_output(f"dtypes of {sheet_name}\n{df.dtypes}")
self.qm_report_df[sheet_name] = df
def get_sheet_names(self) -> typing.List:
"""
Gets the sheet names of the XLSX file.
Returns:
typing.List: List with sheet names
"""
return pd.ExcelFile(self.qm_report_file).sheet_names
def get_slo_ids(self) -> None:
"""
Extracts all the SLO ids and sorts them by hub.
"""
Helper.console_output("Extracting SLO ids...")
for df_sheet_name in self.qm_report_df.keys():
hubs = self._build_environment_names()
for hub in hubs:
self.qm_report_ids[df_sheet_name][hub] = []
for _, row in self.qm_report_df[df_sheet_name].iterrows():
self.qm_report_ids[df_sheet_name][f'{row["HUB"]}-{row["type"]}'].append(
row["id"]
)
def _build_environment_names(self) -> typing.List:
environment_names = []
for _, row in self.qm_report_df[self.get_sheet_names()[0]].iterrows():
name = f"{row['HUB']}-{row['type']}"
if name not in environment_names:
environment_names.append(name)
return environment_names
class QmReportWriter:
def __init__(self, report_dfs: pd.DataFrame, kpis: typing.Dict, filename: str):
self.report_dfs = report_dfs
self.kpis = kpis
self.filename = filename
def run(self):
Helper.console_output("Starting QM-Report writing process...")
self._combine_datasets()
def _combine_datasets(self):
Helper.console_output("Enriching QM-Report with new KPI")
for sheet in self.kpis.keys():
for hub in self.kpis[sheet].keys():
for slo_id in self.kpis[sheet][hub].keys():
for query in self.kpis[sheet][hub][slo_id]:
if len(query) > 0:
if query["result"] != "None" and len(query["result"]) > 0:
values = query["api_result"]
mask = (
(self.report_dfs[sheet]["HUB"] == hub.split("-")[0])
& (self.report_dfs[sheet]["id"] == slo_id)
& (
self.report_dfs[sheet]["timeframe"]
== query["timeframe"]
)
& (
self.report_dfs[sheet]["type"]
== hub.split("-")[1]
)
)
if (
query["kpi_name"]
not in self.report_dfs[sheet].columns
):
self.report_dfs[sheet][query["kpi_name"]] = None
existing_value = self.report_dfs[sheet].loc[
mask, query["kpi_name"]
]
try:
if not existing_value.empty:
existing_value = existing_value.iloc[0]
if existing_value is not None:
self.report_dfs[sheet].loc[
mask, query["kpi_name"]
] = (existing_value + values)
else:
self.report_dfs[sheet].loc[
mask, query["kpi_name"]
] = values
else:
self.report_dfs[sheet].loc[
mask, query["kpi_name"]
] = values
except Exception as e:
print(f"_combine_dataset EXCEPTION: {e}")
# self.report_dfs[sheet].loc[
# mask, query["kpi_name"]
# ] = values
self._write_report_to_xlsx()
if DEBUG:
self._write_to_csv()
def _write_report_to_xlsx(self):
Helper.console_output("Writing XLSX")
if DEBUG:
filename = "test.xlsx"
else:
filename = self.filename
writer = pd.ExcelWriter(filename, engine="xlsxwriter")
workbook = writer.book
for sheet_name, dataframe in self.report_dfs.items():
dataframe.to_excel(writer, sheet_name=sheet_name, index=False)
worksheet = writer.sheets[sheet_name]
worksheet.autofilter(0, 0, dataframe.shape[0], dataframe.shape[1] - 1)
# format date
if sheet_name != "total":
fmt = workbook.add_format({"num_format": "yyyy-mm-dd"})
for row, date_time in enumerate(dataframe["Date"], start=1):
worksheet.write_datetime(row, 0, date_time, fmt)
writer.close()
def _write_to_csv(self):
Helper.console_output("Writing CSV")
for sheet_name, dataframe in self.report_dfs.items():
dataframe.to_csv(f"test_{sheet_name}.csv", index=False)
class DynatraceDataGetter:
def __init__(self) -> None:
self.config = {"threads": 10}
self.environment = self._load_environment()
# def run(self, data: typing.Dict):
# env_doc = self.environment
def get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
if type(params) is dict:
params_string = f"?{self._build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
# TODO: use helper.py from keyrequest parser
headers = {"Authorization": f"Api-Token {token}"}
response = requests.get(
f"{url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
return response.json()
def krparser_get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
dt_url = f"{url}/api/v2/{route}"
headers = {
"Content-Type": "application/json",
"Authorization": "Api-Token " + token,
}
response = helper.get_request(url=dt_url, headers=headers, params=params)
return response.json()
def _build_params(self, params: typing.Dict) -> str:
query_string = "&".join(
f"{key}={urllib.parse.quote(str(value))}" for key, value in params.items()
)
return query_string
def _load_environment(self) -> typing.Dict:
with open("./environment.yaml", "r") as f:
env_doc = yaml.safe_load(f)
return env_doc
class KPIGetter:
def __init__(self) -> None:
self.data_getter = DynatraceDataGetter()
self.extracted_key_requests = defaultdict(dict)
self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid
def run(self):
"""
Entrypoint for the KPI extension.
"""
if DEBUG:
Helper.console_output("Script running debug mode")
Helper.cleanup_debug_files()
report_reader = ReportReader()
report_reader.run()
# Get SLO IDs from first sheet and build metric expression queries.
for i, sheet in enumerate(report_reader.qm_report_ids.keys()):
if i == 0:
for hub in report_reader.qm_report_ids[sheet].keys():
self.get_slos(report_reader.qm_report_ids[sheet][hub], hub)
self.get_kpi_data(report_reader.qm_report_df)
write_report = QmReportWriter(
report_reader.qm_report_df,
self.metric_expressions,
report_reader.qm_report_file,
)
write_report.run()
# DEBUG
if DEBUG:
with open("metricexpressions.json", "w") as f:
f.write(json.dumps(self.metric_expressions, indent=4))
def _transform_key_requests(self):
"""
Transforms the responses from the key request parser into a joined string.
"""
for hub in self.extracted_key_requests.keys():
for slo in self.extracted_key_requests[hub].keys():
if len(self.extracted_key_requests[hub][slo]["services"]) > 0:
services = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["services"]
)
self.extracted_key_requests[hub][slo][
"services_transformed"
] = services
# DEBUG
if DEBUG:
if len(self.extracted_key_requests[hub][slo]["services"]) > 50:
Helper.console_output(
f'SERVICES: {hub} - {slo} - {len(self.extracted_key_requests[hub][slo]["services"])}'
)
# DEBUG
else:
if DEBUG:
print(f"SERVICE: {hub} - {slo} is empty")
if len(self.extracted_key_requests[hub][slo]["requests"]):
requests = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["requests"]
)
self.extracted_key_requests[hub][slo][
"requests_transformed"
] = requests
# DEBUG
if DEBUG:
if len(self.extracted_key_requests[hub][slo]["requests"]) > 50:
Helper.console_output(
f'REQUESTS: {hub} - {slo} - {len(self.extracted_key_requests[hub][slo]["requests"])}'
)
# DEBUG
else:
if DEBUG:
print(f"REQUEST: {hub} - {slo} is empty")
def _build_environment_names(self, df: pd.DataFrame) -> typing.List:
"""
Creates new environment list from given QM report dataframe.
Args:
df (pd.DataFrame): Converted QM report xlsx into an dataframe
Returns:
typing.List: List with unique environment names.
"""
environment_names = []
for _, row in df.iterrows():
name = f'{row["HUB"]}-{row["type"]}'
if name not in environment_names:
environment_names.append(name)
return environment_names
def _get_time_scope(self, sheet_name: str) -> str:
if sheet_name == "hourly":
pass
elif sheet_name == "daily":
pass
def get_kpi_data(self, dfs: ReportReader):
"""
Creates queries for dynatrace and adds them into a list for further processing.
Args:
dfs (ReportReader): Takes in the dictionary with the QM reports from the ReportReader class.
"""
for sheet in dfs.keys():
self.metric_expressions[sheet] = defaultdict(dict)
hubs = self._build_environment_names(dfs[sheet])
for hub in hubs:
self.metric_expressions[sheet][hub] = defaultdict(dict)
for _, row in dfs[sheet].iterrows():
if (
row["id"]
not in self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}']
):
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
] = []
from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps(
row["timeframe"]
)
# timeframe = self._get_timeframe_for_kpi_data(
# from_timestamp_ms, to_timestamp_ms
# )
timeframe = self._get_timeframe_for_kpi_data()
# get timestamps shifted
(
from_timestamp_ms_shifted,
to_timestamp_ms_shifted,
) = self._calculate_timeshift(
from_timestamp_ms, to_timestamp_ms, timeframe
)
if (
row["id"]
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}']
):
if (
"services_transformed"
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]
):
# 1M gets deprecated
if timeframe == "1M":
# KPI 1 :timeshift(in days)
# timeshift(-1M) will be deprecated
kpi1_timeshift = f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d"
metric_kpi1 = self._build_kpi_metric_for_query(
"kpi1",
kpi1_timeshift,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
)
else:
metric_kpi1 = self._build_kpi_metric_for_query(
"kpi1",
timeframe,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
)
# resolution 1d in daily
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
].append(
{
"kpi_name": "kpi_1",
"metric": metric_kpi1,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
# "resolution": timeframe,
"resolution": self._get_resolution_for_kpi_data(),
"timeframe": row["timeframe"],
}
)
# metric_kpi2 = self._build_kpi_metric_for_query(
# "kpi2",
# timeframe,
# self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ]["services_transformed"],
# )
# self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ].append(
# self._template_metric_expression(
# "kpi_2",
# metric_kpi2,
# from_timestamp_ms_shifted,
# to_timestamp_ms_shifted,
# timeframe,
# row["timeframe"],
# )
# )
if (
REPORT_TYPE == "day"
or REPORT_TYPE == "month"
and sheet != "total"
or REPORT_TYPE == "week"
and sheet != "total"
):
# if REPORT_TYPE == "day":
metric_count_shifted = self._build_kpi_metric_for_query(
"count_shifted",
timeframe,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
)
self.metric_expressions[sheet][
f'{row["HUB"]}-{row["type"]}'
][row["id"]].append(
{
"kpi_name": "count-7d",
"metric": metric_count_shifted,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
"timeframe": row["timeframe"],
}
)
if (
"requests_transformed"
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]
and "services_transformed"
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]
):
splitted_requests = Helper.split_list(
self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]["requests_transformed"].split(",")
)
for requests_part in splitted_requests:
metric_count = self._build_kpi_metric_for_query(
"count",
timeframe,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
",".join(requests_part),
)
self.metric_expressions[sheet][
f'{row["HUB"]}-{row["type"]}'
][row["id"]].append(
{
"kpi_name": "count",
"metric": metric_count,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
"timeframe": row["timeframe"],
}
)
metric_error_count = self._build_kpi_metric_for_query(
"error_count",
timeframe,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
",".join(requests_part),
)
self.metric_expressions[sheet][
f'{row["HUB"]}-{row["type"]}'
][row["id"]].append(
{
"kpi_name": "error_count",
"metric": metric_error_count,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
"timeframe": row["timeframe"],
}
)
# metric_count = self._build_kpi_metric_for_query(
# "count",
# timeframe,
# self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ]["services_transformed"],
# self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ]["requests_transformed"],
# )
# self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ].append(
# {
# "kpi_name": "count",
# "metric": metric_count,
# "from_date": from_timestamp_ms,
# "to_date": to_timestamp_ms,
# "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
# "timeframe": row["timeframe"],
# }
# )
# metric_error_count = self._build_kpi_metric_for_query(
# "error_count",
# timeframe,
# self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ]["services_transformed"],
# self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ]["requests_transformed"],
# )
# self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
# row["id"]
# ].append(
# {
# "kpi_name": "error_count",
# "metric": metric_error_count,
# "from_date": from_timestamp_ms,
# "to_date": to_timestamp_ms,
# "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
# "timeframe": row["timeframe"],
# }
# )
self._dispatch_to_dynatrace()
def _dispatch_to_dynatrace(self):
"""
Dispatches all queries to Dynatrace.
"""
Helper.console_output("Fetching data from Dynatrace...")
with concurrent.futures.ThreadPoolExecutor(
self.data_getter.config["threads"]
) as executor:
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
for index, query in enumerate(
self.metric_expressions[sheet][hub][slo]
):
if query["metric"] is not False:
params = {
"metricSelector": query["metric"],
# "resolution": query["resolution"],
"from": query["from_date"],
"to": query["to_date"],
}
if "resolution" in query:
params["resolution"] = query["resolution"]
future = executor.submit(
self.data_getter.krparser_get_data_from_dynatrace,
params,
hub,
"metrics/query",
)
self.metric_expressions[sheet][hub][slo][index][
"data"
] = future
self._process_dynatrace_data()
def _process_dynatrace_data(self):
"""
Processes the responses from Dynatrace and adds them to a dictionary.
"""
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
for index, query in enumerate(
self.metric_expressions[sheet][hub][slo]
):
future = query["data"]
result = future.result()
if len(result["result"][0]["data"]) > 0:
self.metric_expressions[sheet][hub][slo][index][
"result"
] = result["result"]
self.metric_expressions[sheet][hub][slo][index][
"api_result"
] = self._extract_result_from_api(
result["result"],
self.metric_expressions[sheet][hub][slo][index][
"kpi_name"
],
)
else:
if DEBUG:
Helper.console_output(
f"Nothing received for: {hub} - {slo} - {result} - {self.metric_expressions[sheet][hub][slo][index]['kpi_name']}"
)
with open("./failed_requests.txt", "a") as f:
f.write(
f"Nothing received for: {hub} - {slo}\n{json.dumps(result, indent=4)}\n{self.metric_expressions[sheet][hub][slo][index]['kpi_name']}\n{'-'*80}\n"
)
self.metric_expressions[sheet][hub][slo][index][
"result"
] = "None"
del query["data"]
# if DEBUG:
# with open("./slo_results.txt", "a") as f:
# f.write(f"\n{sheet} -> {hub} -> {slo}:\n")
# f.write(json.dumps(result, indent=4))
# f.write("\n")
# f.write("-" * 80)
def _extract_result_from_api(
self, result: typing.Dict, result_type: str
) -> typing.Union[int, float, str]:
if result_type == "kpi_2":
result_values = []
for data in result[0]["data"]:
result_values.append(data["values"][0])
return sum(result_values) / len(result_values)
# elif result_type == "count-7d":
# result_values = []
# option 2
# if any(elem is None for elem in result[0]["data"][0]["values"]):
# for value in result[0]["data"][0]["values"]:
# if value is not None:
# return value
# option 2 end
elif result_type == "kpi_1" or result_type == "count-7d":
# 2nd value + none check
if len(result[0]["data"][0]["values"]) > 0:
if len(result[0]["data"][0]["values"]) == 2:
if (
result[0]["data"][0]["values"][1] != "None"
or result[0]["data"][0]["values"][1] != None
):
return result[0]["data"][0]["values"][1]
elif (
result[0]["data"][0]["values"][0] != "None"
or result[0]["data"][0]["values"][0] != None
):
return result[0]["data"][0]["values"][1]
else:
return "None"
else:
return result[0]["data"][0]["values"][0]
else:
if DEBUG:
Helper.console_output(
f"Extraction No Result: {result_type}\n{result}"
)
return "None"
# if len(result[0]["data"][0]["values"]) > 0:
# result_values = []
# for value in result[0]["data"][0]["values"]:
# if value == None:
# result_values.append(0)
# else:
# result_values.append(value)
# return sum(result_values) / len(result_values)
# else:
# return result[0]["data"][0]["values"][0]
# elif result_type == "count":
# # DEBUG
# Helper.console_output(result[0]["data"])
else:
return result[0]["data"][0]["values"][0]
def _template_metric_expression(
self,
kpi_name: str,
metric_expression: str,
from_timestamp_ms: int,
to_timestamp_ms: int,
resolution: str,
timeframe: str,
) -> typing.Dict:
"""
Template for used for Dynatrace KPI query creation.
Args:
kpi_name (str): KPI name which will be displayed in the QM report
metric_expression (str): The metric selector which will be used to fetch data from Dynatrace
from_timestamp_ms (int): From timestamp in milliseconds
to_timestamp_ms (int): To timestamp in milliseconds
resolution (str): Resolution used for fetching data from Dynatrace
timeframe (str): Timeframe from the original QM report
Returns:
typing.Dict: Returns a dictionary with all the necessary information for futher processing.
"""
element = {
"kpi_name": kpi_name,
"metric": metric_expression,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": resolution,
"timeframe": timeframe,
}
return element
def _calculate_timeshift(
self, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str
) -> typing.Tuple[int, int]:
"""
Calculates the time shift for KPI 2.
Args:
from_timestamp_ms (int): From timestamp in milliseconds.
to_timestamp_ms (int): To timestamp in milliseconds.
resolution (str): The resolution used in the Dynatrace query.
Returns:
typing.Tuple[int, int]: Returns timestamps in milliseconds
"""
if resolution == "7d":
from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
to_ts = to_timestamp_ms
return from_ts, to_ts
if resolution == "1w":
# from_date, end_date = Helper.previous_week_range(
# datetime.fromtimestamp(to_timestamp_ms / 1000), -2
# )
# from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms")
# to_ts = Helper.convert_datetime_to_timestamp(end_date, "ms")
from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
to_ts = to_timestamp_ms
return from_ts, to_ts
if resolution == "1M":
from_date, _ = Helper.previous_month_range(
datetime.fromtimestamp(from_timestamp_ms / 1000), 1
)
from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms")
# to_ts = Helper.convert_datetime_to_timestamp(to_timestamp_ms, "ms")
to_ts = to_timestamp_ms
return from_ts, to_ts
# def _get_timeframe_for_kpi_data(
# self, from_timestamp: int, to_timestamp: int
# ) -> typing.Union[str, bool]:
# """
# Returns the timeframe for KPI data
# Args:
# from_timestamp (int): From timestamp in milliseconds
# to_timestamp (int): To timestamp in milliseconds
# Returns:
# typing.Union[str, bool]: Returns the timeframe as string. If option not valid, it returns False.
# """
# days = Helper.get_days(from_timestamp, to_timestamp)
# if days == 1:
# return "7d"
# elif days == 7:
# return "1w"
# elif days >= 28 and days < 32:
# return "1M"
# else:
# return False
def _get_timeframe_for_kpi_data(self) -> str:
if REPORT_TYPE == "day":
return "7d"
if REPORT_TYPE == "week":
return "1w"
if REPORT_TYPE == "month":
return "1M"
def _get_resolution_for_kpi_data(self) -> str:
if REPORT_TYPE == "day":
return "1d"
if REPORT_TYPE == "week":
return "1w"
if REPORT_TYPE == "month":
return "1M"
def _build_kpi_metric_for_query(
self, kpi_type: str, timeframe: str, service: str = None, request: str = None
) -> typing.Union[str, bool]:
# if switches are available (python3.10?) use switches
"""
Returns formatted query string
Args:
kpi_type (str): KPI option.
timeframe (str): Timeframe as string.
service (str, optional): String with services from the KRParser. Defaults to None.
request (str, optional): String with requests from the KRParser. Defaults to None.
Returns:
typing.Union[str, bool]: Returns formatted string for quering Dynatrace. If option not available, it returns False.
"""
if kpi_type == "kpi1":
return f'100*(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe}))'
elif kpi_type == "kpi2":
timeframe_split = [letter for letter in timeframe]
return f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):fold(avg)))'
elif kpi_type == "count":
return f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())'
elif kpi_type == "error_count":
return f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())'
elif kpi_type == "count_shifted":
return f'builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe})'
else:
return False
def _extract_key_requests(
self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str
):
"""
Extracts key requests using the KRParser.
Args:
slo_ids_df (pd.DataFrame): Dataframe containing SLO Ids.
env (str): The environment used for quering.
DTURL (str): Dynatrace URL.
DTTOKEN (str): Dynatrace token.
"""
Helper.console_output("Extracting Key Requests...")
krp = krparser.KRParser(
name=env,
options=krparser.KROption.RESOLVESERVICES,
config={
"threads": 4,
"serviceLookupParams": {"fields": "tags,fromRelationships"},
"extendResultObjects": {"env": env},
},
DTAPIURL=DTURL,
DTAPIToken=DTTOKEN,
)
krs = krp.parse(slo_ids_df)
self.extracted_key_requests[env] = {}
for kr in krs:
self.extracted_key_requests[env][kr.metadata["sloId"]] = {}
self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = []
self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = []
for key_request in kr.keyRequests:
self.extracted_key_requests[env][kr.metadata["sloId"]][
"requests"
].append(key_request["displayName"])
for service in key_request["services"]:
if (
service["displayName"]
not in self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
]
):
self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
].append(service["displayName"])
self._transform_key_requests()
def get_slos(self, slo_ids: list, hub: str):
"""
Ingests a list of SLO Ids and prepares a pandas dataframe for KRParser ingestion.
Args:
slo_ids (list): List of SLO Ids.
hub (str): The hub/environment.
"""
slo_responses = []
for slo_id in slo_ids:
response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo")
del response["errorBudgetBurnRate"]
slo_responses.append(response)
df = pd.DataFrame(slo_responses)
self._extract_key_requests(
df,
hub,
self.data_getter.environment[hub][1]["env-url"],
os.environ[self.data_getter.environment[hub][2]["env-token-name"]],
)
class Helper:
@staticmethod
def transform_and_format_list(data: list) -> str:
"""
Joins a list to a string.
Args:
data (list): List with data for joining.
Returns:
str: Joined string.
"""
joined_string = ", ".join(data)
string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")])
return string
@staticmethod
def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]:
"""
Extracts the timestamps from the "timeframe" column in the QM report.
Args:
timestamp (str): "timeframe" column value.
Returns:
typing.Tuple[int, int]: Returns processed "timeframe" value as integers.
"""
ts = timestamp.split(" to ")
return int(ts[0]), int(ts[1])
@staticmethod
def get_days(from_timestamp: int, to_timestamp: int) -> int:
"""
Calculates days between two timestamps.
Args:
from_timestamp (int): From timestamp in milliseconds.
to_timestamp (int): To timestamp in milliseconds.
Returns:
int: Returns the days between two timestamps.
"""
from_date = datetime.fromtimestamp(from_timestamp / 1000)
to_timestamp = datetime.fromtimestamp(to_timestamp / 1000)
duration = to_timestamp - from_date
return duration.days
@staticmethod
def previous_day_range(date):
start_date = date - timedelta(days=1)
end_date = date - timedelta(days=1)
return start_date, end_date
@staticmethod
def previous_week_range(
date: int, weeks: int
) -> typing.Tuple[datetime.date, datetime.date]:
"""
Gets previous week from current timestamp.
Args:
date (_type_): Date as timestamp in seconds.
int (_type_): Weeks to go back.
Returns:
typing.Tuple[datetime.date, datetime.date]: Returns start and end date.
"""
start_date = date + timedelta(-date.weekday(), weeks=weeks) # -1
# end_date = date + timedelta(-date.weekday() - 1)
end_date = date + timedelta(-date.weekday(), weeks=weeks + 1)
return start_date, end_date
@staticmethod
def previous_month_range(date, shift: int):
shifted_date = date - relativedelta(months=shift)
end_date = shifted_date.replace(day=1) - timedelta(days=1)
start_date = end_date.replace(day=1)
return start_date, end_date
@staticmethod
def get_previous_month_days(timestamp_ms: int):
date = datetime.fromtimestamp(timestamp_ms / 1000).date()
end_date = date.replace(day=1) - timedelta(days=1)
start_date = end_date.replace(day=1)
days = Helper.get_days(
Helper.convert_datetime_to_timestamp(start_date) * 1000,
Helper.convert_datetime_to_timestamp(end_date) * 1000,
)
return days + 1
@staticmethod
def convert_datetime_to_timestamp(date: datetime.date, option: str = None) -> int:
"""
Converts datetime object to timestamp.
Returns by default timestamp in seconds.
Args:
date (datetime.date): Datetime object to convert.
option (str, optional): If set to "ms", returns timestamp as milliseconds. Defaults to None.
Returns:
int: _description_
"""
date_datetime = datetime.combine(date, datetime.min.time())
epoch = datetime(1970, 1, 1)
date_timestamp = (date_datetime - epoch).total_seconds()
if option == "ms":
date_timestamp = date_timestamp * 1000
return int(date_timestamp)
@staticmethod
def split_list(lst: list, size: int = 50) -> list:
"""
Splits a list
Args:
lst (list): The list to split
size (int, optional): Size of each list. Defaults to 50.
Returns:
list: Returns [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11]]
"""
result = []
for i in range(0, len(lst), size):
result.append(lst[i : i + size])
return result
@staticmethod
def console_output(text: str, indent=False):
"""
Helper function for uniform console output, when debugging is enabled.
Args:
text (str): _description_
indent (bool, optional): _description_. Defaults to False.
"""
if DEBUG:
if indent:
print(f"{' '*10}{text}")
else:
print(text)
print("-" * 80)
@staticmethod
def cleanup_debug_files():
"""
Cleans up files created in debugging mode.
"""
Helper.console_output("Cleaning up debug files")
files = [
"./metricexpressions.json",
"./slo_results.txt",
"./test.xlsx",
"./test_total.csv",
"./test_daily.csv",
"./failed_requests.txt",
]
for file in files:
if os.path.exists(file):
os.remove(file)
Helper.console_output(f"{file.replace('./', '')} removed.", indent=True)
else:
Helper.console_output(
f"{file.replace('./', '')} not found. Nothing removed.", indent=True
)
Helper.console_output("=" * 80)
def main():
"""
Entrypoint.
"""
kpi_getter = KPIGetter()
kpi_getter.run()
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"main EXCEPTION: {e}")