# import threading import concurrent.futures import os import glob import pandas as pd from collections import defaultdict import typing import requests import urllib.parse from dotenv import load_dotenv import yaml from KRParser import krparser, helper import warnings from datetime import datetime, timedelta import time import json from dateutil.relativedelta import relativedelta DEBUG = False warnings.filterwarnings("ignore") load_dotenv() REPORT_TYPE = os.environ.get("REPORT_TYPE") try: os.environ["TZ"] = "Europe/Berlin" # set new timezone time.tzset() except Exception as e: print(f"This error was encounterted : {e}") class ReportReader: """ Reads the XLSX file generated by createReport.py and extracts necessary information to add additional data to the QM Report. """ def __init__(self) -> None: self.qm_report_file = "" self.qm_report_df = defaultdict(dict) self.sheet_names = [] self.qm_report_ids = defaultdict(dict) def run(self) -> None: """ Entrypoint. """ self.get_qm_report_file_and_read_to_dataframe() self.get_slo_ids() def get_qm_report_file_and_read_to_dataframe(self): """ Gets XLSX file and reads it into pandas dataframes """ Helper.console_output("Getting latest QM-Report and ingesting...") self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0] sheet_names = self.get_sheet_names() for sheet_name in sheet_names: df = pd.read_excel(self.qm_report_file, sheet_name) columns = df.columns if "Unnamed: 0" in columns: df = df.drop("Unnamed: 0", axis=1) self.qm_report_df[sheet_name] = df def get_sheet_names(self) -> typing.List: """ Gets the sheet names of the XLSX file. Returns: typing.List: List with sheet names """ return pd.ExcelFile(self.qm_report_file).sheet_names def get_slo_ids(self) -> None: """ Extracts all the SLO ids and sorts them by hub. """ Helper.console_output("Extracting SLO ids...") for df_sheet_name in self.qm_report_df.keys(): hubs = self._build_environment_names() for hub in hubs: self.qm_report_ids[df_sheet_name][hub] = [] for _, row in self.qm_report_df[df_sheet_name].iterrows(): self.qm_report_ids[df_sheet_name][f'{row["HUB"]}-{row["type"]}'].append( row["id"] ) def _build_environment_names(self) -> typing.List: environment_names = [] for _, row in self.qm_report_df[self.get_sheet_names()[0]].iterrows(): name = f"{row['HUB']}-{row['type']}" if name not in environment_names: environment_names.append(name) return environment_names class QmReportWriter: def __init__(self, report_dfs: pd.DataFrame, kpis: typing.Dict, filename: str): self.report_dfs = report_dfs self.kpis = kpis self.filename = filename def run(self): Helper.console_output("Starting QM-Report writing process...") self._combine_datasets() def _combine_datasets(self): Helper.console_output("Enriching QM-Report with new KPI") for sheet in self.kpis.keys(): for hub in self.kpis[sheet].keys(): for slo_id in self.kpis[sheet][hub].keys(): for query in self.kpis[sheet][hub][slo_id]: if len(query) > 0: if ( query["result"] != "None" and len(query["result"]) > 0 # and len(query["result"][0]["data"]) > 0 # and len(query["result"][0]["data"][0]["values"]) > 0 ): # values = query["result"][0]["data"][0]["values"][0] values = query["api_result"] mask = ( (self.report_dfs[sheet]["HUB"] == hub.split("-")[0]) & (self.report_dfs[sheet]["id"] == slo_id) & ( self.report_dfs[sheet]["timeframe"] == query["timeframe"] ) & ( self.report_dfs[sheet]["type"] == hub.split("-")[1] ) ) if ( query["kpi_name"] not in self.report_dfs[sheet].columns ): self.report_dfs[sheet][query["kpi_name"]] = None self.report_dfs[sheet].loc[ mask, query["kpi_name"] ] = values self._write_report_to_xlsx() def _write_report_to_xlsx(self): Helper.console_output("Writing XLSX") if DEBUG: filename = "test.xlsx" else: filename = self.filename writer = pd.ExcelWriter(filename, engine="xlsxwriter") workbook = writer.book for sheet_name, dataframe in self.report_dfs.items(): dataframe.to_excel(writer, sheet_name=sheet_name, index=False) worksheet = writer.sheets[sheet_name] worksheet.autofilter(0, 0, dataframe.shape[0], dataframe.shape[1] - 1) writer.close() class DynatraceDataGetter: def __init__(self) -> None: self.config = {"threads": 10} self.environment = self._load_environment() # def run(self, data: typing.Dict): # env_doc = self.environment def get_data_from_dynatrace( self, params, environment: str, route: str ) -> typing.Dict: if type(params) is dict: params_string = f"?{self._build_params(params)}" elif type(params) is str: params_string = f"/{params}" url = self.environment[environment][1]["env-url"] token = os.environ[self.environment[environment][2]["env-token-name"]] # TODO: use helper.py from keyrequest parser headers = {"Authorization": f"Api-Token {token}"} response = requests.get( f"{url}/api/v2/{route}{params_string}", headers=headers, verify=False, ) return response.json() def krparser_get_data_from_dynatrace( self, params, environment: str, route: str ) -> typing.Dict: url = self.environment[environment][1]["env-url"] token = os.environ[self.environment[environment][2]["env-token-name"]] dt_url = f"{url}/api/v2/{route}" headers = { "Content-Type": "application/json", "Authorization": "Api-Token " + token, } response = helper.get_request(url=dt_url, headers=headers, params=params) return response.json() def _build_params(self, params: typing.Dict) -> str: query_string = "&".join( f"{key}={urllib.parse.quote(str(value))}" for key, value in params.items() ) return query_string def _load_environment(self) -> typing.Dict: with open("./environment.yaml", "r") as f: env_doc = yaml.safe_load(f) return env_doc class KPIGetter: def __init__(self) -> None: self.data_getter = DynatraceDataGetter() self.extracted_key_requests = defaultdict(dict) self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid def run(self): """ Entrypoint for the KPI extension. """ if DEBUG: Helper.console_output("Script running debug mode") Helper.cleanup_debug_files() report_reader = ReportReader() report_reader.run() # Get SLO IDs from first sheet and build metric expression queries. for i, sheet in enumerate(report_reader.qm_report_ids.keys()): if i == 0: for hub in report_reader.qm_report_ids[sheet].keys(): self.get_slos(report_reader.qm_report_ids[sheet][hub], hub) self.get_kpi_data(report_reader.qm_report_df) write_report = QmReportWriter( report_reader.qm_report_df, self.metric_expressions, report_reader.qm_report_file, ) write_report.run() # DEBUG if DEBUG: with open("metricexpressions.json", "w") as f: f.write(json.dumps(self.metric_expressions, indent=4)) def _transform_key_requests(self): """ Transforms the responses from the key request parser into a joined string. """ for hub in self.extracted_key_requests.keys(): for slo in self.extracted_key_requests[hub].keys(): if len(self.extracted_key_requests[hub][slo]["services"]) > 0: services = Helper.transform_and_format_list( self.extracted_key_requests[hub][slo]["services"] ) self.extracted_key_requests[hub][slo][ "services_transformed" ] = services else: if DEBUG: print(f"SERVICE: {hub} - {slo} is empty") if len(self.extracted_key_requests[hub][slo]["requests"]): requests = Helper.transform_and_format_list( self.extracted_key_requests[hub][slo]["requests"] ) self.extracted_key_requests[hub][slo][ "requests_transformed" ] = requests else: if DEBUG: print(f"REQUEST: {hub} - {slo} is empty") def _build_environment_names(self, df: pd.DataFrame) -> typing.List: """ Creates new environment list from given QM report dataframe. Args: df (pd.DataFrame): Converted QM report xlsx into an dataframe Returns: typing.List: List with unique environment names. """ environment_names = [] for _, row in df.iterrows(): name = f'{row["HUB"]}-{row["type"]}' if name not in environment_names: environment_names.append(name) return environment_names def _get_time_scope(self, sheet_name: str) -> str: if sheet_name == "hourly": pass elif sheet_name == "daily": pass def get_kpi_data(self, dfs: ReportReader): """ Creates queries for dynatrace and adds them into a list for further processing. Args: dfs (ReportReader): Takes in the dictionary with the QM reports from the ReportReader class. """ for sheet in dfs.keys(): self.metric_expressions[sheet] = defaultdict(dict) hubs = self._build_environment_names(dfs[sheet]) for hub in hubs: self.metric_expressions[sheet][hub] = defaultdict(dict) for _, row in dfs[sheet].iterrows(): if ( row["id"] not in self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'] ): self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ] = [] from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps( row["timeframe"] ) # timeframe = self._get_timeframe_for_kpi_data( # from_timestamp_ms, to_timestamp_ms # ) timeframe = self._get_timeframe_for_kpi_data() # get timestamps shifted ( from_timestamp_ms_shifted, to_timestamp_ms_shifted, ) = self._calculate_timeshift( from_timestamp_ms, to_timestamp_ms, timeframe ) if ( row["id"] in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'] ): if ( "services_transformed" in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ] ): # 1M gets deprecated if timeframe == "1M": # KPI 1 :timeshift(in days) # timeshift(-1M) will be deprecated kpi1_timeshift = f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d" metric_kpi1 = self._build_kpi_metric_for_query( "kpi1", kpi1_timeshift, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ) else: metric_kpi1 = self._build_kpi_metric_for_query( "kpi1", timeframe, self.extracted_key_requests[ f'{row["HUB"]}-{row["type"]}' ][row["id"]]["services_transformed"], ) self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ].append( # self._template_metric_expression( # "kpi_1", # metric_kpi1, # from_timestamp_ms, # to_timestamp_ms, # timeframe, # row["timeframe"], # ) { "kpi_name": "kpi_1", "metric": metric_kpi1, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": timeframe, "timeframe": row["timeframe"], } ) metric_kpi2 = self._build_kpi_metric_for_query( "kpi2", timeframe, self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ]["services_transformed"], ) self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ].append( self._template_metric_expression( "kpi_2", metric_kpi2, from_timestamp_ms_shifted, to_timestamp_ms_shifted, timeframe, row["timeframe"], ) ) if ( "requests_transformed" in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ] and "services_transformed" in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ] ): metric_count = self._build_kpi_metric_for_query( "count", timeframe, self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ]["services_transformed"], self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ]["requests_transformed"], ) self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ].append( { "kpi_name": "count", "metric": metric_count, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, # "resolution": timeframe, "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", "timeframe": row["timeframe"], } ) metric_error_count = self._build_kpi_metric_for_query( "error_count", timeframe, self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ]["services_transformed"], self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][ row["id"] ]["requests_transformed"], ) self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][ row["id"] ].append( { "kpi_name": "error_count", "metric": metric_error_count, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d", "timeframe": row["timeframe"], } ) self._dispatch_to_dynatrace() def _dispatch_to_dynatrace(self): """ Dispatches all queries to Dynatrace. """ Helper.console_output("Fetching data from Dynatrace...") with concurrent.futures.ThreadPoolExecutor( self.data_getter.config["threads"] ) as executor: for sheet in self.metric_expressions.keys(): for hub in self.metric_expressions[sheet].keys(): for slo in self.metric_expressions[sheet][hub].keys(): for index, query in enumerate( self.metric_expressions[sheet][hub][slo] ): if query["metric"] is not False: params = { "metricSelector": query["metric"], # "resolution": query["resolution"], "from": query["from_date"], "to": query["to_date"], } if "resolution" in query: params["resolution"] = query["resolution"] future = executor.submit( self.data_getter.krparser_get_data_from_dynatrace, params, hub, "metrics/query", ) self.metric_expressions[sheet][hub][slo][index][ "data" ] = future self._process_dynatrace_data() def _process_dynatrace_data(self): """ Processes the responses from Dynatrace and adds them to a dictionary. """ for sheet in self.metric_expressions.keys(): for hub in self.metric_expressions[sheet].keys(): for slo in self.metric_expressions[sheet][hub].keys(): for index, query in enumerate( self.metric_expressions[sheet][hub][slo] ): future = query["data"] result = future.result() if len(result["result"][0]["data"]) > 0: self.metric_expressions[sheet][hub][slo][index][ "result" ] = result["result"] self.metric_expressions[sheet][hub][slo][index][ "api_result" ] = self._extract_result_from_api( result["result"], self.metric_expressions[sheet][hub][slo][index][ "kpi_name" ], ) else: self.metric_expressions[sheet][hub][slo][index][ "result" ] = "None" del query["data"] # if DEBUG: # with open("./slo_results.txt", "a") as f: # f.write(f"\n{sheet} -> {hub} -> {slo}:\n") # f.write(json.dumps(result, indent=4)) # f.write("\n") # f.write("-" * 80) def _extract_result_from_api( self, result: typing.Dict, result_type: str ) -> typing.Union[int, float, str]: if result_type == "kpi_2": result_values = [] for data in result[0]["data"]: result_values.append(data["values"][0]) return sum(result_values) / len(result_values) else: return result[0]["data"][0]["values"][0] def _template_metric_expression( self, kpi_name: str, metric_expression: str, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str, timeframe: str, ) -> typing.Dict: """ Template for used for Dynatrace KPI query creation. Args: kpi_name (str): KPI name which will be displayed in the QM report metric_expression (str): The metric selector which will be used to fetch data from Dynatrace from_timestamp_ms (int): From timestamp in milliseconds to_timestamp_ms (int): To timestamp in milliseconds resolution (str): Resolution used for fetching data from Dynatrace timeframe (str): Timeframe from the original QM report Returns: typing.Dict: Returns a dictionary with all the necessary information for futher processing. """ element = { "kpi_name": kpi_name, "metric": metric_expression, "from_date": from_timestamp_ms, "to_date": to_timestamp_ms, "resolution": resolution, "timeframe": timeframe, } return element def _calculate_timeshift( self, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str ) -> typing.Tuple[int, int]: """ Calculates the time shift for KPI 2. Args: from_timestamp_ms (int): From timestamp in milliseconds. to_timestamp_ms (int): To timestamp in milliseconds. resolution (str): The resolution used in the Dynatrace query. Returns: typing.Tuple[int, int]: Returns timestamps in milliseconds """ if resolution == "7d": from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000) to_ts = to_timestamp_ms return from_ts, to_ts if resolution == "1w": # from_date, end_date = Helper.previous_week_range( # datetime.fromtimestamp(to_timestamp_ms / 1000), -2 # ) # from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms") # to_ts = Helper.convert_datetime_to_timestamp(end_date, "ms") from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000) to_ts = to_timestamp_ms return from_ts, to_ts if resolution == "1M": from_date, _ = Helper.previous_month_range( datetime.fromtimestamp(from_timestamp_ms / 1000), 1 ) from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms") # to_ts = Helper.convert_datetime_to_timestamp(to_timestamp_ms, "ms") to_ts = to_timestamp_ms return from_ts, to_ts # def _get_timeframe_for_kpi_data( # self, from_timestamp: int, to_timestamp: int # ) -> typing.Union[str, bool]: # """ # Returns the timeframe for KPI data # Args: # from_timestamp (int): From timestamp in milliseconds # to_timestamp (int): To timestamp in milliseconds # Returns: # typing.Union[str, bool]: Returns the timeframe as string. If option not valid, it returns False. # """ # days = Helper.get_days(from_timestamp, to_timestamp) # if days == 1: # return "7d" # elif days == 7: # return "1w" # elif days >= 28 and days < 32: # return "1M" # else: # return False def _get_timeframe_for_kpi_data(self) -> str: if REPORT_TYPE == "day": return "7d" if REPORT_TYPE == "week": return "1w" if REPORT_TYPE == "month": return "1M" def _build_kpi_metric_for_query( self, kpi_type: str, timeframe: str, service: str = None, request: str = None ) -> typing.Union[str, bool]: # if switches are available (python3.10?) use switches """ Returns formatted query string Args: kpi_type (str): KPI option. timeframe (str): Timeframe as string. service (str, optional): String with services from the KRParser. Defaults to None. request (str, optional): String with requests from the KRParser. Defaults to None. Returns: typing.Union[str, bool]: Returns formatted string for quering Dynatrace. If option not available, it returns False. """ if kpi_type == "kpi1": return f'100*(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):lastReal:splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):lastReal:splitBy():timeshift(-{timeframe}))' elif kpi_type == "kpi2": timeframe_split = [letter for letter in timeframe] return f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):fold(avg)))' elif kpi_type == "count": return f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())' elif kpi_type == "error_count": return f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())' else: return False def _extract_key_requests( self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str ): """ Extracts key requests using the KRParser. Args: slo_ids_df (pd.DataFrame): Dataframe containing SLO Ids. env (str): The environment used for quering. DTURL (str): Dynatrace URL. DTTOKEN (str): Dynatrace token. """ Helper.console_output("Extracting Key Requests...") krp = krparser.KRParser( name=env, options=krparser.KROption.RESOLVESERVICES, config={ "threads": 10, "serviceLookupParams": {"fields": "tags,fromRelationships"}, "extendResultObjects": {"env": env}, }, DTAPIURL=DTURL, DTAPIToken=DTTOKEN, ) krs = krp.parse(slo_ids_df) self.extracted_key_requests[env] = {} for kr in krs: self.extracted_key_requests[env][kr.metadata["sloId"]] = {} self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = [] self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = [] for key_request in kr.keyRequests: self.extracted_key_requests[env][kr.metadata["sloId"]][ "requests" ].append(key_request["displayName"]) for service in key_request["services"]: if ( service["displayName"] not in self.extracted_key_requests[env][kr.metadata["sloId"]][ "services" ] ): self.extracted_key_requests[env][kr.metadata["sloId"]][ "services" ].append(service["displayName"]) self._transform_key_requests() def get_slos(self, slo_ids: list, hub: str): """ Ingests a list of SLO Ids and prepares a pandas dataframe for KRParser ingestion. Args: slo_ids (list): List of SLO Ids. hub (str): The hub/environment. """ slo_responses = [] for slo_id in slo_ids: response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo") del response["errorBudgetBurnRate"] slo_responses.append(response) df = pd.DataFrame(slo_responses) self._extract_key_requests( df, hub, self.data_getter.environment[hub][1]["env-url"], os.environ[self.data_getter.environment[hub][2]["env-token-name"]], ) class Helper: @staticmethod def transform_and_format_list(data: list) -> str: """ Joins a list to a string. Args: data (list): List with data for joining. Returns: str: Joined string. """ joined_string = ", ".join(data) string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")]) return string @staticmethod def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]: """ Extracts the timestamps from the "timeframe" column in the QM report. Args: timestamp (str): "timeframe" column value. Returns: typing.Tuple[int, int]: Returns processed "timeframe" value as integers. """ ts = timestamp.split(" to ") return int(ts[0]), int(ts[1]) @staticmethod def get_days(from_timestamp: int, to_timestamp: int) -> int: """ Calculates days between two timestamps. Args: from_timestamp (int): From timestamp in milliseconds. to_timestamp (int): To timestamp in milliseconds. Returns: int: Returns the days between two timestamps. """ from_date = datetime.fromtimestamp(from_timestamp / 1000) to_timestamp = datetime.fromtimestamp(to_timestamp / 1000) duration = to_timestamp - from_date return duration.days @staticmethod def previous_day_range(date): start_date = date - timedelta(days=1) end_date = date - timedelta(days=1) return start_date, end_date @staticmethod def previous_week_range( date: int, weeks: int ) -> typing.Tuple[datetime.date, datetime.date]: """ Gets previous week from current timestamp. Args: date (_type_): Date as timestamp in seconds. int (_type_): Weeks to go back. Returns: typing.Tuple[datetime.date, datetime.date]: Returns start and end date. """ start_date = date + timedelta(-date.weekday(), weeks=weeks) # -1 # end_date = date + timedelta(-date.weekday() - 1) end_date = date + timedelta(-date.weekday(), weeks=weeks + 1) return start_date, end_date @staticmethod def previous_month_range(date, shift: int): shifted_date = date - relativedelta(months=shift) end_date = shifted_date.replace(day=1) - timedelta(days=1) start_date = end_date.replace(day=1) return start_date, end_date @staticmethod def get_previous_month_days(timestamp_ms: int): date = datetime.fromtimestamp(timestamp_ms / 1000).date() end_date = date.replace(day=1) - timedelta(days=1) start_date = end_date.replace(day=1) days = Helper.get_days( Helper.convert_datetime_to_timestamp(start_date) * 1000, Helper.convert_datetime_to_timestamp(end_date) * 1000, ) return days + 1 @staticmethod def convert_datetime_to_timestamp(date: datetime.date, option: str = None) -> int: """ Converts datetime object to timestamp. Returns by default timestamp in seconds. Args: date (datetime.date): Datetime object to convert. option (str, optional): If set to "ms", returns timestamp as milliseconds. Defaults to None. Returns: int: _description_ """ date_datetime = datetime.combine(date, datetime.min.time()) epoch = datetime(1970, 1, 1) date_timestamp = (date_datetime - epoch).total_seconds() if option == "ms": date_timestamp = date_timestamp * 1000 return int(date_timestamp) @staticmethod def console_output(text: str, indent=False): """ Helper function for uniform console output, when debugging is enabled. Args: text (str): _description_ indent (bool, optional): _description_. Defaults to False. """ if DEBUG: if indent: print(f"{' '*10}{text}") else: print(text) print("-" * 80) @staticmethod def cleanup_debug_files(): """ Cleans up files created in debugging mode. """ Helper.console_output("Cleaning up debug files") files = ["./metricexpressions.json", "./slo_results.txt", "./test.xlsx"] for file in files: if os.path.exists(file): os.remove(file) Helper.console_output(f"{file.replace('./', '')} removed.", indent=True) else: Helper.console_output( f"{file.replace('./', '')} not found. Nothing removed.", indent=True ) Helper.console_output("=" * 80) def main(): """ Entrypoint. """ kpi_getter = KPIGetter() kpi_getter.run() if __name__ == "__main__": main()