import threading import concurrent.futures import os import glob import pandas as pd from collections import defaultdict import typing import requests import urllib.parse from dotenv import load_dotenv import yaml from KRParser import krparser import warnings from datetime import datetime warnings.filterwarnings("ignore") load_dotenv() class ReportReader: """ Reads the XLSX file generated by createReport.py and extracts necessary information to add additional data to the QM Report. """ def __init__(self) -> None: self.qm_report_file = "" self.qm_report_df = defaultdict(dict) self.sheet_names = [] self.qm_report_ids = defaultdict(dict) def run(self) -> None: """ Entrypoint. """ self.get_qm_report_file_and_read_to_dataframe() self.get_slo_ids() def get_qm_report_file_and_read_to_dataframe(self): """ Gets XLSX file and reads it into pandas dataframes """ self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0] sheet_names = self.get_sheet_names() for sheet_name in sheet_names: df = pd.read_excel(self.qm_report_file, sheet_name) columns = df.columns if "Unnamed: 0" in columns: df = df.drop("Unnamed: 0", axis=1) self.qm_report_df[sheet_name] = df def get_sheet_names(self) -> typing.List: """ Gets the sheet names of the XLSX file. Returns: typing.List: List with sheet names """ return pd.ExcelFile(self.qm_report_file).sheet_names def get_slo_ids(self) -> None: """ Extracts all the SLO ids and sorts them by hub. """ for df_sheet_name in self.qm_report_df.keys(): hubs = self.qm_report_df[df_sheet_name]["HUB"].unique() for hub in hubs: self.qm_report_ids[df_sheet_name][hub] = [] for _, row in self.qm_report_df[df_sheet_name].iterrows(): self.qm_report_ids[df_sheet_name][row["HUB"]].append(row["id"]) class DynatraceDataGetter: def __init__(self) -> None: self.config = {"threads": 3} self.environment = self._load_environment() def run(self, data: typing.Dict): env_doc = self.environment def get_data_from_dynatrace( self, params, environment: str, route: str ) -> typing.Dict: if type(params) is dict: params_string = f"?{self._build_params(params)}" elif type(params) is str: params_string = f"/{params}" url = self.environment[environment][1]["env-url"] token = os.environ[self.environment[environment][2]["env-token-name"]] # TODO: use helper.py from keyrequest parser headers = {"Authorization": f"Api-Token {token}"} response = requests.get( f"{url}/api/v2/{route}{params_string}", headers=headers, verify=False, ) return response.json() def test_get_data_from_dynatrace( self, params, environment: str, route: str ) -> typing.Dict: if type(params) is dict: params_string = f"?{self._build_params(params)}" elif type(params) is str: params_string = f"/{params}" url = self.environment[environment][1]["env-url"] token = os.environ[self.environment[environment][2]["env-token-name"]] return f"{url}/api/v2/{route}{params_string}" def _build_params(self, params: typing.Dict) -> str: query_string = "&".join( f"{key}={urllib.parse.quote(value)}" for key, value in params.items() ) return query_string def _load_environment(self) -> typing.Dict: with open("./environment.yaml", "r") as f: env_doc = yaml.safe_load(f) return env_doc class KPIGetter: def __init__(self) -> None: self.data_getter = DynatraceDataGetter() self.extracted_key_requests = defaultdict(dict) self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid def transform_key_requests(self) -> str: for hub in self.extracted_key_requests.keys(): for slo in self.extracted_key_requests[hub].keys(): if len(self.extracted_key_requests[hub][slo]["services"]) > 0: services = Helper.transform_and_format_list( self.extracted_key_requests[hub][slo]["services"] ) self.extracted_key_requests[hub][slo][ "services_transformed" ] = services else: print(f"SERVICE: {hub} - {slo} is empty") if len(self.extracted_key_requests[hub][slo]["requests"]): requests = Helper.transform_and_format_list( self.extracted_key_requests[hub][slo]["requests"] ) self.extracted_key_requests[hub][slo][ "requests_transformed" ] = requests else: # TODO: proper logging print(f"REQUEST: {hub} - {slo} is empty") def get_kpi_data(self, dfs: ReportReader): # for hub in self.extracted_key_requests.keys(): # for slo in self.extracted_key_requests[hub].keys(): # if "services_transformed" in self.extracted_key_requests[hub][slo]: for sheet in dfs.keys(): self.metric_expressions[sheet] = defaultdict(dict) hubs = dfs[sheet]["HUB"].unique() for hub in hubs: self.metric_expressions[sheet][hub] = defaultdict(dict) for _, row in dfs[sheet].iterrows(): self.metric_expressions[sheet][row["HUB"]][row["id"]] = [] from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps( row["timeframe"] ) timeframe = self._get_timeframe_for_kpi_data( from_timestamp_ms, to_timestamp_ms ) if ( "services_transformed" in self.extracted_key_requests[row["HUB"]][row["id"]] ): metric_kpi1 = self._build_kpi_metric_for_query( "kpi1", timeframe, self.extracted_key_requests[row["HUB"]][row["id"]][ "services_transformed" ], ) self.metric_expressions[sheet][row["HUB"]][row["id"]].append( # { # "kpi_name": "kpi_1", # "metric": metric_kpi1, # "from_date": from_timestamp_ms, # "to_date": to_timestamp_ms, # "resolution": timeframe, # } self._template_metric_expression( "kp_1", metric_kpi1, self._calculate_timeshift(from_timestamp_ms, timeframe), self._calculate_timeshift(to_timestamp_ms, timeframe), timeframe, ) ) if ( "requests_transformed" in self.extracted_key_requests[row["HUB"]][row["id"]] ): metric_kpi2 = self._build_kpi_metric_for_query( "kpi2", timeframe, self.extracted_key_requests[row["HUB"]][row["id"]][ "requests_transformed" ], ) self.metric_expressions[sheet][row["HUB"]][row["id"]].append( # { # "kpi_name": "kpi_2", # "metric": metric_kpi2, # "from_date": from_timestamp_ms, # "to_date": to_timestamp_ms, # "resolution": timeframe, # } self._template_metric_expression( "kpi2", metric_kpi2, self._calculate_timeshift(from_timestamp_ms, timeframe), self._calculate_timeshift(to_timestamp_ms, timeframe), timeframe, ) ) if ( "requests_transformed" in self.extracted_key_requests[row["HUB"]][row["id"]] and "services_transformed" in self.extracted_key_requests[row["HUB"]][row["id"]] ): metric_count = self._build_kpi_metric_for_query( "count", timeframe, self.extracted_key_requests[row["HUB"]][row["id"]][ "services_transformed" ], self.extracted_key_requests[row["HUB"]][row["id"]][ "requests_transformed" ], ) self.metric_expressions[sheet][row["HUB"]][row["id"]].append( # { # "kpi_name": "count", # "metric": metric_count, # "from_date": from_timestamp_ms, # "to_date": to_timestamp_ms, # "resolution": timeframe, # } self._template_metric_expression( "count", metric_count, from_timestamp_ms, to_timestamp_ms, timeframe, ) ) metric_error_count = self._build_kpi_metric_for_query( "error_count", timeframe, self.extracted_key_requests[row["HUB"]][row["id"]][ "services_transformed" ], self.extracted_key_requests[row["HUB"]][row["id"]][ "requests_transformed" ], ) self.metric_expressions[sheet][row["HUB"]][row["id"]].append( self._template_metric_expression( "error_count", metric_error_count, from_timestamp_ms, to_timestamp_ms, timeframe, ) ) def _dispatch_to_dynatrace(self): # with concurrent.futures.ThreadPoolExecutor( # self.data_getter.config["threads"] # ) as executor: for sheet in self.metric_expressions.keys(): for hub in self.metric_expressions[sheet].keys(): for slo in self.metric_expressions[sheet][hub].keys(): for query in self.metric_expressions[sheet][hub][slo]: if query["metric"] is not False: params = { "metricSelector": query["metric"], "resolution": query["resolution"], "from": query["from_date"], "to": query["to_date"], } # result = executor.submit( # self.data_getter.test_get_data_from_dynatrace, # params, # hub, # "metrics/query", # ) result = self.data_getter.test_get_data_from_dynatrace( params, hub, "metrics/query" ) if hub == "euprod": print(result) def _template_metric_expression( self, kpi_name: str, metric_expression: str, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str, ) -> typing.Dict: element = { "kpi_name": kpi_name, "metric": metric_expression, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": resolution, } return element def _calculate_timeshift(self, timestamp_ms: int, resolution: str) -> int: result = "" if resolution == "7d": result = timestamp_ms - ((60 * 60 * 24 * 7) * 1000) if resolution == "1w": # TODO: get previous week pass if resolution == "1M": # TODO: get previous month pass return result def _get_timeframe_for_kpi_data( self, from_timestamp: int, to_timestamp: int ) -> str: days = Helper.get_days(from_timestamp, to_timestamp) timeframe = "" if days == 1: timeframe = "7d" if days == 7: timeframe = "1w" if days >= 28 and days < 32: timeframe = "1M" return timeframe def _build_kpi_metric_for_query( self, kpi_type: str, timeframe: str, service: str = None, request: str = None ) -> typing.Union[str, bool]: # if switches are available (python3.10?) use switches kpi = "" if kpi_type == "kpi1": kpi = f'100-(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe}))' elif kpi_type == "kpi2": kpi = f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.equals({service})"))))):value:rate(h):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.equals({service})"))))):value:rate(h):fold(avg)))' elif kpi_type == "count": kpi = f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):splitBy())' elif kpi_type == "error_count": kpi = f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):splitBy())' else: kpi = False return kpi def _extract_key_requests( self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str ): krp = krparser.KRParser( name=env, options=krparser.KROption.RESOLVESERVICES, config={ "threads": 10, "serviceLookupParams": {"fields": "tags,fromRelationships"}, "extendResultObjects": {"env": env}, }, DTAPIURL=DTURL, DTAPIToken=DTTOKEN, ) krs = krp.parse(slo_ids_df) self.extracted_key_requests[env] = {} for kr in krs: self.extracted_key_requests[env][kr.metadata["sloId"]] = {} self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = [] self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = [] for key_request in kr.keyRequests: self.extracted_key_requests[env][kr.metadata["sloId"]][ "requests" ].append(key_request["displayName"]) for service in key_request["services"]: if ( service["displayName"] not in self.extracted_key_requests[env][kr.metadata["sloId"]][ "services" ] ): self.extracted_key_requests[env][kr.metadata["sloId"]][ "services" ].append(service["displayName"]) self.transform_key_requests() def get_slos(self, slo_ids: list, hub: str): slo_responses = [] for slo_id in slo_ids: response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo") del response["errorBudgetBurnRate"] slo_responses.append(response) df = pd.DataFrame(slo_responses) self._extract_key_requests( df, hub, self.data_getter.environment[hub][1]["env-url"], os.environ[self.data_getter.environment[hub][2]["env-token-name"]], ) class Helper: @staticmethod def transform_and_format_list(data: list) -> str: joined_string = ", ".join(data) string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")]) return string @staticmethod def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]: ts = timestamp.split(" to ") return int(ts[0]), int(ts[1]) @staticmethod def get_days(from_timestamp: int, to_timestamp: int) -> int: from_date = datetime.fromtimestamp(from_timestamp / 1000) to_timestamp = datetime.fromtimestamp(to_timestamp / 1000) duration = to_timestamp - from_date return duration.days @staticmethod def previous_day_range(date): start_date = date - datetime.timedelta(days=1) end_date = date - datetime.timedelta(days=1) return start_date, end_date @staticmethod def previous_week_range(date): start_date = date + datetime.timedelta(-date.weekday(), weeks=-1) end_date = date + datetime.timedelta(-date.weekday() - 1) return start_date, end_date @staticmethod def previous_month_range(date): end_date = date.replace(day=1) - datetime.timedelta(days=1) start_date = end_date.replace(day=1) return start_date, end_date def main(): """ Entrypoint. """ kpi_getter = KPIGetter() report_reader = ReportReader() report_reader.run() # print(report_reader.qm_report_df) # Get SLO IDs from first sheet and build metric expression queries. for i, sheet in enumerate(report_reader.qm_report_ids.keys()): if i == 0: for hub in report_reader.qm_report_ids[sheet].keys(): kpi_getter.get_slos(report_reader.qm_report_ids[sheet][hub], hub) kpi_getter.get_kpi_data(report_reader.qm_report_df) print(kpi_getter._dispatch_to_dynatrace()) # print("=" * 80) # print(kpi_getter.metric_expressions) # print("=" * 80) if __name__ == "__main__": main()