#!/usr/bin/python3.8 # import threading import concurrent.futures import os import glob import pandas as pd from collections import defaultdict import typing import requests import urllib.parse import yaml from KRParser import krparser, helper import warnings from datetime import datetime, timedelta import time import json from dateutil.relativedelta import relativedelta DEBUG = False if DEBUG: from dotenv import load_dotenv load_dotenv() warnings.filterwarnings("ignore") REPORT_TYPE = os.environ.get("REPORT_TYPE") try: os.environ["TZ"] = "Europe/Berlin" # set new timezone time.tzset() except Exception as e: print(f"This error was encounterted : {e}") class ReportReader: """ Reads the XLSX file generated by createReport.py and extracts necessary information to add additional data to the QM Report. """ def __init__(self) -> None: self.qm_report_file = "" self.qm_report_df = defaultdict(dict) self.sheet_names = [] self.qm_report_ids = defaultdict(dict) def run(self) -> None: """ Entrypoint. """ self.get_qm_report_file_and_read_to_dataframe() self.get_slo_ids() def get_qm_report_file_and_read_to_dataframe(self): """ Gets XLSX file and reads it into pandas dataframes """ Helper.console_output("Getting latest QM-Report and ingesting...") self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0] sheet_names = self.get_sheet_names() for sheet_name in sheet_names: df = pd.read_excel(self.qm_report_file, sheet_name) columns = df.columns if "Unnamed: 0" in columns: df = df.drop("Unnamed: 0", axis=1) if sheet_name != "total": df["Date"] = pd.to_datetime(df["Date"], format="%Y-%m-%d") if DEBUG: Helper.console_output(f"dtypes of {sheet_name}\n{df.dtypes}") self.qm_report_df[sheet_name] = df def get_sheet_names(self) -> typing.List: """ Gets the sheet names of the XLSX file. Returns: typing.List: List with sheet names """ return pd.ExcelFile(self.qm_report_file).sheet_names def get_slo_ids(self) -> None: """ Extracts all the SLO ids and sorts them by hub. """ Helper.console_output("Extracting SLO ids...") for df_sheet_name in self.qm_report_df.keys(): hubs = self._build_environment_names() for hub in hubs: self.qm_report_ids[df_sheet_name][hub] = [] for _, row in self.qm_report_df[df_sheet_name].iterrows(): self.qm_report_ids[df_sheet_name][f'{row["HUB"]}-{row["type"]}'].append( row["id"] ) def _build_environment_names(self) -> typing.List: environment_names = [] for _, row in self.qm_report_df[self.get_sheet_names()[0]].iterrows(): name = f"{row['HUB']}-{row['type']}" if name not in environment_names: environment_names.append(name) return environment_names class QmReportWriter: def __init__(self, report_dfs: pd.DataFrame, kpis: typing.Dict, filename: str): self.report_dfs = report_dfs self.kpis = kpis self.filename = filename def run(self): Helper.console_output("Starting QM-Report writing process...") self._combine_datasets() def _combine_datasets(self): Helper.console_output("Enriching QM-Report with new KPI") for sheet in self.kpis.keys(): for hub in self.kpis[sheet].keys(): for slo_id in self.kpis[sheet][hub].keys(): for query in self.kpis[sheet][hub][slo_id]: if len(query) > 0: if query["result"] != "None" and len(query["result"]) > 0: values = query["api_result"] mask = ( (self.report_dfs[sheet]["HUB"] == hub.split("-")[0]) & (self.report_dfs[sheet]["id"] == slo_id) & ( self.report_dfs[sheet]["timeframe"] == query["timeframe"] ) & ( self.report_dfs[sheet]["type"] == hub.split("-")[1] ) ) if ( query["kpi_name"] not in self.report_dfs[sheet].columns ): self.report_dfs[sheet][query["kpi_name"]] = None existing_value = self.report_dfs[sheet].loc[ mask, query["kpi_name"] ] try: if not existing_value.empty: existing_value = existing_value.iloc[0] if existing_value is not None: self.report_dfs[sheet].loc[ mask, query["kpi_name"] ] = (existing_value + values) else: self.report_dfs[sheet].loc[ mask, query["kpi_name"] ] = values else: self.report_dfs[sheet].loc[ mask, query["kpi_name"] ] = values except Exception as e: print(f"_combine_dataset EXCEPTION: {e}") # self.report_dfs[sheet].loc[ # mask, query["kpi_name"] # ] = values self._write_report_to_xlsx() if DEBUG: self._write_to_csv() def _write_report_to_xlsx(self): Helper.console_output("Writing XLSX") if DEBUG: filename = "test.xlsx" else: filename = self.filename writer = pd.ExcelWriter(filename, engine="xlsxwriter") workbook = writer.book for sheet_name, dataframe in self.report_dfs.items(): dataframe.to_excel(writer, sheet_name=sheet_name, index=False) worksheet = writer.sheets[sheet_name] worksheet.autofilter(0, 0, dataframe.shape[0], dataframe.shape[1] - 1) # format date if sheet_name != "total": fmt = workbook.add_format({"num_format": "yyyy-mm-dd"}) for row, date_time in enumerate(dataframe["Date"], start=1): worksheet.write_datetime(row, 0, date_time, fmt) writer.close() def _write_to_csv(self): Helper.console_output("Writing CSV") for sheet_name, dataframe in self.report_dfs.items(): dataframe.to_csv(f"test_{sheet_name}.csv", index=False) class DynatraceDataGetter: def __init__(self) -> None: self.config = {"threads": 10} self.environment = self._load_environment() # def run(self, data: typing.Dict): # env_doc = self.environment def get_data_from_dynatrace( self, params, environment: str, route: str ) -> typing.Dict: if type(params) is dict: params_string = f"?{self._build_params(params)}" elif type(params) is str: params_string = f"/{params}" url = self.environment[environment][1]["env-url"] token = os.environ[self.environment[environment][2]["env-token-name"]] # TODO: use helper.py from keyrequest parser headers = {"Authorization": f"Api-Token {token}"} response = requests.get( f"{url}/api/v2/{route}{params_string}", headers=headers, verify=False, ) return response.json() def krparser_get_data_from_dynatrace( self, params, environment: str, route: str ) -> typing.Dict: url = self.environment[environment][1]["env-url"] token = os.environ[self.environment[environment][2]["env-token-name"]] dt_url = f"{url}/api/v2/{route}" headers = { "Content-Type": "application/json", "Authorization": "Api-Token " + token, } response = helper.get_request(url=dt_url, headers=headers, params=params) return response.json() def _build_params(self, params: typing.Dict) -> str: query_string = "&".join( f"{key}={urllib.parse.quote(str(value))}" for key, value in params.items() ) return query_string def _load_environment(self) -> typing.Dict: with open("./environment.yaml", "r") as f: env_doc = yaml.safe_load(f) return env_doc class KPIGetter: def __init__(self) -> None: self.data_getter = DynatraceDataGetter() self.extracted_key_requests = defaultdict(dict) self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid def run(self): """ Entrypoint for the KPI extension. """ if DEBUG: Helper.console_output("Script running debug mode") Helper.cleanup_debug_files() report_reader = ReportReader() report_reader.run() # Get SLO IDs from first sheet and build metric expression queries. for i, sheet in enumerate(report_reader.qm_report_ids.keys()): if i == 0: for hub in report_reader.qm_report_ids[sheet].keys(): self.get_slos(report_reader.qm_report_ids[sheet][hub], hub) self.get_kpi_data(report_reader.qm_report_df) write_report = QmReportWriter( report_reader.qm_report_df, self.metric_expressions, report_reader.qm_report_file, ) write_report.run() # DEBUG if DEBUG: with open("metricexpressions.json", "w") as f: f.write(json.dumps(self.metric_expressions, indent=4)) def _transform_key_requests(self): """ Transforms the responses from the key request parser into a joined string. """ for hub in self.extracted_key_requests.keys(): for slo in self.extracted_key_requests[hub].keys(): if len(self.extracted_key_requests[hub][slo]["services"]) > 0: services = Helper.transform_and_format_list( self.extracted_key_requests[hub][slo]["services"] ) self.extracted_key_requests[hub][slo][ "services_transformed" ] = services # DEBUG if DEBUG: if len(self.extracted_key_requests[hub][slo]["services"]) > 50: Helper.console_output( f'SERVICES: {hub} - {slo} - {len(self.extracted_key_requests[hub][slo]["services"])}' ) # DEBUG else: if DEBUG: print(f"SERVICE: {hub} - {slo} is empty") if len(self.extracted_key_requests[hub][slo]["requests"]): requests = Helper.transform_and_format_list( self.extracted_key_requests[hub][slo]["requests"] ) self.extracted_key_requests[hub][slo][ "requests_transformed" ] = requests # DEBUG if DEBUG: if len(self.extracted_key_requests[hub][slo]["requests"]) > 50: Helper.console_output( f'REQUESTS: {hub} - {slo} - {len(self.extracted_key_requests[hub][slo]["requests"])}' ) # DEBUG else: if DEBUG: print(f"REQUEST: {hub} - {slo} is empty") def _build_environment_names(self, df: pd.DataFrame) -> typing.List: """ Creates new environment list from given QM report dataframe. Args: df (pd.DataFrame): Converted QM report xlsx into an dataframe Returns: typing.List: List with unique environment names. """ environment_names = [] for _, row in df.iterrows(): name = f'{row["HUB"]}-{row["type"]}' if name not in environment_names: environment_names.append(name) return environment_names def _get_time_scope(self, sheet_name: str) -> str: if sheet_name == "hourly": pass elif sheet_name == "daily": pass def get_kpi_data(self, dfs: ReportReader): """ Creates queries for dynatrace and adds them into a list for further processing. Args: dfs (ReportReader): Takes in the dictionary with the QM reports from the ReportReader class. """ for sheet in dfs.keys(): self.metric_expressions[sheet] = defaultdict(dict) hubs = self._build_environment_names(dfs[sheet]) for hub in hubs: self.metric_expressions[sheet][hub] = defaultdict(dict) for _, row in dfs[sheet].iterrows(): if ( row["id"] not in self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'] ): self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ] = [] from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps( row["timeframe"] ) # timeframe = self._get_timeframe_for_kpi_data( # from_timestamp_ms, to_timestamp_ms # ) timeframe = self._get_timeframe_for_kpi_data() # get timestamps shifted ( from_timestamp_ms_shifted, to_timestamp_ms_shifted, ) = self._calculate_timeshift( from_timestamp_ms, to_timestamp_ms, timeframe ) if ( row["id"] in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'] ): if ( "services_transformed" in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ] ): # 1M gets deprecated if timeframe == "1M": # KPI 1 :timeshift(in days) # timeshift(-1M) will be deprecated kpi1_timeshift = f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d" metric_kpi1 = self._build_kpi_metric_for_query( "kpi1", kpi1_timeshift, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ) else: metric_kpi1 = self._build_kpi_metric_for_query( "kpi1", timeframe, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ) # resolution 1d in daily self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ].append( { "kpi_name": "kpi_1", "metric": metric_kpi1, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, # "resolution": timeframe, "resolution": self._get_resolution_for_kpi_data(), "timeframe": row["timeframe"], } ) # metric_kpi2 = self._build_kpi_metric_for_query( # "kpi2", # timeframe, # self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ]["services_transformed"], # ) # self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ].append( # self._template_metric_expression( # "kpi_2", # metric_kpi2, # from_timestamp_ms_shifted, # to_timestamp_ms_shifted, # timeframe, # row["timeframe"], # ) # ) if ( REPORT_TYPE == "day" or REPORT_TYPE == "month" and sheet != "total" or REPORT_TYPE == "week" and sheet != "total" ): # if REPORT_TYPE == "day": metric_count_shifted = self._build_kpi_metric_for_query( "count_shifted", timeframe, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ) self.metric_expressions[sheet][ f'{row["HUB"]}-{row["type"]}' ][row["id"]].append( { "kpi_name": "count-7d", "metric": metric_count_shifted, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", "timeframe": row["timeframe"], } ) if ( "requests_transformed" in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ] and "services_transformed" in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ] ): splitted_requests = Helper.split_list( self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ]["requests_transformed"].split(",") ) for requests_part in splitted_requests: metric_count = self._build_kpi_metric_for_query( "count", timeframe, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ",".join(requests_part), ) self.metric_expressions[sheet][ f'{row["HUB"]}-{row["type"]}' ][row["id"]].append( { "kpi_name": "count", "metric": metric_count, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", "timeframe": row["timeframe"], } ) metric_error_count = self._build_kpi_metric_for_query( "error_count", timeframe, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ",".join(requests_part), ) self.metric_expressions[sheet][ f'{row["HUB"]}-{row["type"]}' ][row["id"]].append( { "kpi_name": "error_count", "metric": metric_error_count, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", "timeframe": row["timeframe"], } ) # metric_count = self._build_kpi_metric_for_query( # "count", # timeframe, # self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ]["services_transformed"], # self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ]["requests_transformed"], # ) # self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ].append( # { # "kpi_name": "count", # "metric": metric_count, # "from_date": from_timestamp_ms, # "to_date": to_timestamp_ms, # "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", # "timeframe": row["timeframe"], # } # ) # metric_error_count = self._build_kpi_metric_for_query( # "error_count", # timeframe, # self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ]["services_transformed"], # self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ]["requests_transformed"], # ) # self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ # row["id"] # ].append( # { # "kpi_name": "error_count", # "metric": metric_error_count, # "from_date": from_timestamp_ms, # "to_date": to_timestamp_ms, # "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", # "timeframe": row["timeframe"], # } # ) self._dispatch_to_dynatrace() def _dispatch_to_dynatrace(self): """ Dispatches all queries to Dynatrace. """ Helper.console_output("Fetching data from Dynatrace...") with concurrent.futures.ThreadPoolExecutor( self.data_getter.config["threads"] ) as executor: for sheet in self.metric_expressions.keys(): for hub in self.metric_expressions[sheet].keys(): for slo in self.metric_expressions[sheet][hub].keys(): for index, query in enumerate( self.metric_expressions[sheet][hub][slo] ): if query["metric"] is not False: params = { "metricSelector": query["metric"], # "resolution": query["resolution"], "from": query["from_date"], "to": query["to_date"], } if "resolution" in query: params["resolution"] = query["resolution"] future = executor.submit( self.data_getter.krparser_get_data_from_dynatrace, params, hub, "metrics/query", ) self.metric_expressions[sheet][hub][slo][index][ "data" ] = future self._process_dynatrace_data() def _process_dynatrace_data(self): """ Processes the responses from Dynatrace and adds them to a dictionary. """ for sheet in self.metric_expressions.keys(): for hub in self.metric_expressions[sheet].keys(): for slo in self.metric_expressions[sheet][hub].keys(): for index, query in enumerate( self.metric_expressions[sheet][hub][slo] ): future = query["data"] result = future.result() if len(result["result"][0]["data"]) > 0: self.metric_expressions[sheet][hub][slo][index][ "result" ] = result["result"] self.metric_expressions[sheet][hub][slo][index][ "api_result" ] = self._extract_result_from_api( result["result"], self.metric_expressions[sheet][hub][slo][index][ "kpi_name" ], ) else: if DEBUG: Helper.console_output( f"Nothing received for: {hub} - {slo} - {result} - {self.metric_expressions[sheet][hub][slo][index]['kpi_name']}" ) with open("./failed_requests.txt", "a") as f: f.write( f"Nothing received for: {hub} - {slo}\n{json.dumps(result, indent=4)}\n{self.metric_expressions[sheet][hub][slo][index]['kpi_name']}\n{'-'*80}\n" ) self.metric_expressions[sheet][hub][slo][index][ "result" ] = "None" del query["data"] # if DEBUG: # with open("./slo_results.txt", "a") as f: # f.write(f"\n{sheet} -> {hub} -> {slo}:\n") # f.write(json.dumps(result, indent=4)) # f.write("\n") # f.write("-" * 80) def _extract_result_from_api( self, result: typing.Dict, result_type: str ) -> typing.Union[int, float, str]: if result_type == "kpi_2": result_values = [] for data in result[0]["data"]: result_values.append(data["values"][0]) return sum(result_values) / len(result_values) # elif result_type == "count-7d": # result_values = [] # option 2 # if any(elem is None for elem in result[0]["data"][0]["values"]): # for value in result[0]["data"][0]["values"]: # if value is not None: # return value # option 2 end elif result_type == "kpi_1" or result_type == "count-7d": # 2nd value + none check if len(result[0]["data"][0]["values"]) > 0: if len(result[0]["data"][0]["values"]) == 2: if ( result[0]["data"][0]["values"][1] != "None" or result[0]["data"][0]["values"][1] != None ): return result[0]["data"][0]["values"][1] elif ( result[0]["data"][0]["values"][0] != "None" or result[0]["data"][0]["values"][0] != None ): return result[0]["data"][0]["values"][1] else: return "None" else: return result[0]["data"][0]["values"][0] else: if DEBUG: Helper.console_output( f"Extraction No Result: {result_type}\n{result}" ) return "None" # if len(result[0]["data"][0]["values"]) > 0: # result_values = [] # for value in result[0]["data"][0]["values"]: # if value == None: # result_values.append(0) # else: # result_values.append(value) # return sum(result_values) / len(result_values) # else: # return result[0]["data"][0]["values"][0] # elif result_type == "count": # # DEBUG # Helper.console_output(result[0]["data"]) else: return result[0]["data"][0]["values"][0] def _template_metric_expression( self, kpi_name: str, metric_expression: str, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str, timeframe: str, ) -> typing.Dict: """ Template for used for Dynatrace KPI query creation. Args: kpi_name (str): KPI name which will be displayed in the QM report metric_expression (str): The metric selector which will be used to fetch data from Dynatrace from_timestamp_ms (int): From timestamp in milliseconds to_timestamp_ms (int): To timestamp in milliseconds resolution (str): Resolution used for fetching data from Dynatrace timeframe (str): Timeframe from the original QM report Returns: typing.Dict: Returns a dictionary with all the necessary information for futher processing. """ element = { "kpi_name": kpi_name, "metric": metric_expression, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": resolution, "timeframe": timeframe, } return element def _calculate_timeshift( self, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str ) -> typing.Tuple[int, int]: """ Calculates the time shift for KPI 2. Args: from_timestamp_ms (int): From timestamp in milliseconds. to_timestamp_ms (int): To timestamp in milliseconds. resolution (str): The resolution used in the Dynatrace query. Returns: typing.Tuple[int, int]: Returns timestamps in milliseconds """ if resolution == "7d": from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000) to_ts = to_timestamp_ms return from_ts, to_ts if resolution == "1w": # from_date, end_date = Helper.previous_week_range( # datetime.fromtimestamp(to_timestamp_ms / 1000), -2 # ) # from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms") # to_ts = Helper.convert_datetime_to_timestamp(end_date, "ms") from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000) to_ts = to_timestamp_ms return from_ts, to_ts if resolution == "1M": from_date, _ = Helper.previous_month_range( datetime.fromtimestamp(from_timestamp_ms / 1000), 1 ) from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms") # to_ts = Helper.convert_datetime_to_timestamp(to_timestamp_ms, "ms") to_ts = to_timestamp_ms return from_ts, to_ts # def _get_timeframe_for_kpi_data( # self, from_timestamp: int, to_timestamp: int # ) -> typing.Union[str, bool]: # """ # Returns the timeframe for KPI data # Args: # from_timestamp (int): From timestamp in milliseconds # to_timestamp (int): To timestamp in milliseconds # Returns: # typing.Union[str, bool]: Returns the timeframe as string. If option not valid, it returns False. # """ # days = Helper.get_days(from_timestamp, to_timestamp) # if days == 1: # return "7d" # elif days == 7: # return "1w" # elif days >= 28 and days < 32: # return "1M" # else: # return False def _get_timeframe_for_kpi_data(self) -> str: if REPORT_TYPE == "day": return "7d" if REPORT_TYPE == "week": return "1w" if REPORT_TYPE == "month": return "1M" def _get_resolution_for_kpi_data(self) -> str: if REPORT_TYPE == "day": return "1d" if REPORT_TYPE == "week": return "1w" if REPORT_TYPE == "month": return "1M" def _build_kpi_metric_for_query( self, kpi_type: str, timeframe: str, service: str = None, request: str = None ) -> typing.Union[str, bool]: # if switches are available (python3.10?) use switches """ Returns formatted query string Args: kpi_type (str): KPI option. timeframe (str): Timeframe as string. service (str, optional): String with services from the KRParser. Defaults to None. request (str, optional): String with requests from the KRParser. Defaults to None. Returns: typing.Union[str, bool]: Returns formatted string for quering Dynatrace. If option not available, it returns False. """ if kpi_type == "kpi1": return f'100*(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe}))' elif kpi_type == "kpi2": timeframe_split = [letter for letter in timeframe] return f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):fold(avg)))' elif kpi_type == "count": return f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())' elif kpi_type == "error_count": return f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())' elif kpi_type == "count_shifted": return f'builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):splitBy():timeshift(-{timeframe})' else: return False def _extract_key_requests( self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str ): """ Extracts key requests using the KRParser. Args: slo_ids_df (pd.DataFrame): Dataframe containing SLO Ids. env (str): The environment used for quering. DTURL (str): Dynatrace URL. DTTOKEN (str): Dynatrace token. """ Helper.console_output("Extracting Key Requests...") krp = krparser.KRParser( name=env, options=krparser.KROption.RESOLVESERVICES, config={ "threads": 4, "serviceLookupParams": {"fields": "tags,fromRelationships"}, "extendResultObjects": {"env": env}, }, DTAPIURL=DTURL, DTAPIToken=DTTOKEN, ) krs = krp.parse(slo_ids_df) self.extracted_key_requests[env] = {} for kr in krs: self.extracted_key_requests[env][kr.metadata["sloId"]] = {} self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = [] self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = [] for key_request in kr.keyRequests: self.extracted_key_requests[env][kr.metadata["sloId"]][ "requests" ].append(key_request["displayName"]) for service in key_request["services"]: if ( service["displayName"] not in self.extracted_key_requests[env][kr.metadata["sloId"]][ "services" ] ): self.extracted_key_requests[env][kr.metadata["sloId"]][ "services" ].append(service["displayName"]) self._transform_key_requests() def get_slos(self, slo_ids: list, hub: str): """ Ingests a list of SLO Ids and prepares a pandas dataframe for KRParser ingestion. Args: slo_ids (list): List of SLO Ids. hub (str): The hub/environment. """ slo_responses = [] for slo_id in slo_ids: response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo") del response["errorBudgetBurnRate"] slo_responses.append(response) df = pd.DataFrame(slo_responses) self._extract_key_requests( df, hub, self.data_getter.environment[hub][1]["env-url"], os.environ[self.data_getter.environment[hub][2]["env-token-name"]], ) class Helper: @staticmethod def transform_and_format_list(data: list) -> str: """ Joins a list to a string. Args: data (list): List with data for joining. Returns: str: Joined string. """ joined_string = ", ".join(data) string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")]) return string @staticmethod def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]: """ Extracts the timestamps from the "timeframe" column in the QM report. Args: timestamp (str): "timeframe" column value. Returns: typing.Tuple[int, int]: Returns processed "timeframe" value as integers. """ ts = timestamp.split(" to ") return int(ts[0]), int(ts[1]) @staticmethod def get_days(from_timestamp: int, to_timestamp: int) -> int: """ Calculates days between two timestamps. Args: from_timestamp (int): From timestamp in milliseconds. to_timestamp (int): To timestamp in milliseconds. Returns: int: Returns the days between two timestamps. """ from_date = datetime.fromtimestamp(from_timestamp / 1000) to_timestamp = datetime.fromtimestamp(to_timestamp / 1000) duration = to_timestamp - from_date return duration.days @staticmethod def previous_day_range(date): start_date = date - timedelta(days=1) end_date = date - timedelta(days=1) return start_date, end_date @staticmethod def previous_week_range( date: int, weeks: int ) -> typing.Tuple[datetime.date, datetime.date]: """ Gets previous week from current timestamp. Args: date (_type_): Date as timestamp in seconds. int (_type_): Weeks to go back. Returns: typing.Tuple[datetime.date, datetime.date]: Returns start and end date. """ start_date = date + timedelta(-date.weekday(), weeks=weeks) # -1 # end_date = date + timedelta(-date.weekday() - 1) end_date = date + timedelta(-date.weekday(), weeks=weeks + 1) return start_date, end_date @staticmethod def previous_month_range(date, shift: int): shifted_date = date - relativedelta(months=shift) end_date = shifted_date.replace(day=1) - timedelta(days=1) start_date = end_date.replace(day=1) return start_date, end_date @staticmethod def get_previous_month_days(timestamp_ms: int): date = datetime.fromtimestamp(timestamp_ms / 1000).date() end_date = date.replace(day=1) - timedelta(days=1) start_date = end_date.replace(day=1) days = Helper.get_days( Helper.convert_datetime_to_timestamp(start_date) * 1000, Helper.convert_datetime_to_timestamp(end_date) * 1000, ) return days + 1 @staticmethod def convert_datetime_to_timestamp(date: datetime.date, option: str = None) -> int: """ Converts datetime object to timestamp. Returns by default timestamp in seconds. Args: date (datetime.date): Datetime object to convert. option (str, optional): If set to "ms", returns timestamp as milliseconds. Defaults to None. Returns: int: _description_ """ date_datetime = datetime.combine(date, datetime.min.time()) epoch = datetime(1970, 1, 1) date_timestamp = (date_datetime - epoch).total_seconds() if option == "ms": date_timestamp = date_timestamp * 1000 return int(date_timestamp) @staticmethod def split_list(lst: list, size: int = 50) -> list: """ Splits a list Args: lst (list): The list to split size (int, optional): Size of each list. Defaults to 50. Returns: list: Returns [[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11]] """ result = [] for i in range(0, len(lst), size): result.append(lst[i : i + size]) return result @staticmethod def console_output(text: str, indent=False): """ Helper function for uniform console output, when debugging is enabled. Args: text (str): _description_ indent (bool, optional): _description_. Defaults to False. """ if DEBUG: if indent: print(f"{' '*10}{text}") else: print(text) print("-" * 80) @staticmethod def cleanup_debug_files(): """ Cleans up files created in debugging mode. """ Helper.console_output("Cleaning up debug files") files = [ "./metricexpressions.json", "./slo_results.txt", "./test.xlsx", "./test_total.csv", "./test_daily.csv", "./failed_requests.txt", ] for file in files: if os.path.exists(file): os.remove(file) Helper.console_output(f"{file.replace('./', '')} removed.", indent=True) else: Helper.console_output( f"{file.replace('./', '')} not found. Nothing removed.", indent=True ) Helper.console_output("=" * 80) def main(): """ Entrypoint. """ kpi_getter = KPIGetter() kpi_getter.run() if __name__ == "__main__": try: main() except Exception as e: print(f"main EXCEPTION: {e}")