qm_report/kpi_extension.py

933 lines
36 KiB
Python

# import threading
import concurrent.futures
import os
import glob
import pandas as pd
from collections import defaultdict
import typing
import requests
import urllib.parse
from dotenv import load_dotenv
import yaml
from KRParser import krparser, helper
import warnings
from datetime import datetime, timedelta
import time
import json
from dateutil.relativedelta import relativedelta
DEBUG = False
warnings.filterwarnings("ignore")
load_dotenv()
REPORT_TYPE = os.environ.get("REPORT_TYPE")
try:
os.environ["TZ"] = "Europe/Berlin" # set new timezone
time.tzset()
except Exception as e:
print(f"This error was encounterted : {e}")
class ReportReader:
"""
Reads the XLSX file generated by createReport.py and extracts
necessary information to add additional data to the QM Report.
"""
def __init__(self) -> None:
self.qm_report_file = ""
self.qm_report_df = defaultdict(dict)
self.sheet_names = []
self.qm_report_ids = defaultdict(dict)
def run(self) -> None:
"""
Entrypoint.
"""
self.get_qm_report_file_and_read_to_dataframe()
self.get_slo_ids()
def get_qm_report_file_and_read_to_dataframe(self):
"""
Gets XLSX file and reads it into pandas dataframes
"""
Helper.console_output("Getting latest QM-Report and ingesting...")
self.qm_report_file = glob.glob(os.path.join(os.getcwd(), "*.xlsx"))[0]
sheet_names = self.get_sheet_names()
for sheet_name in sheet_names:
df = pd.read_excel(self.qm_report_file, sheet_name)
columns = df.columns
if "Unnamed: 0" in columns:
df = df.drop("Unnamed: 0", axis=1)
self.qm_report_df[sheet_name] = df
def get_sheet_names(self) -> typing.List:
"""
Gets the sheet names of the XLSX file.
Returns:
typing.List: List with sheet names
"""
return pd.ExcelFile(self.qm_report_file).sheet_names
def get_slo_ids(self) -> None:
"""
Extracts all the SLO ids and sorts them by hub.
"""
Helper.console_output("Extracting SLO ids...")
for df_sheet_name in self.qm_report_df.keys():
hubs = self._build_environment_names()
for hub in hubs:
self.qm_report_ids[df_sheet_name][hub] = []
for _, row in self.qm_report_df[df_sheet_name].iterrows():
self.qm_report_ids[df_sheet_name][f'{row["HUB"]}-{row["type"]}'].append(
row["id"]
)
def _build_environment_names(self) -> typing.List:
environment_names = []
for _, row in self.qm_report_df[self.get_sheet_names()[0]].iterrows():
name = f"{row['HUB']}-{row['type']}"
if name not in environment_names:
environment_names.append(name)
return environment_names
class QmReportWriter:
def __init__(self, report_dfs: pd.DataFrame, kpis: typing.Dict, filename: str):
self.report_dfs = report_dfs
self.kpis = kpis
self.filename = filename
def run(self):
Helper.console_output("Starting QM-Report writing process...")
self._combine_datasets()
def _combine_datasets(self):
Helper.console_output("Enriching QM-Report with new KPI")
for sheet in self.kpis.keys():
for hub in self.kpis[sheet].keys():
for slo_id in self.kpis[sheet][hub].keys():
for query in self.kpis[sheet][hub][slo_id]:
if len(query) > 0:
if (
query["result"] != "None"
and len(query["result"]) > 0
# and len(query["result"][0]["data"]) > 0
# and len(query["result"][0]["data"][0]["values"]) > 0
):
# values = query["result"][0]["data"][0]["values"][0]
values = query["api_result"]
mask = (
(self.report_dfs[sheet]["HUB"] == hub.split("-")[0])
& (self.report_dfs[sheet]["id"] == slo_id)
& (
self.report_dfs[sheet]["timeframe"]
== query["timeframe"]
)
& (
self.report_dfs[sheet]["type"]
== hub.split("-")[1]
)
)
if (
query["kpi_name"]
not in self.report_dfs[sheet].columns
):
self.report_dfs[sheet][query["kpi_name"]] = None
self.report_dfs[sheet].loc[
mask, query["kpi_name"]
] = values
self._write_report_to_xlsx()
def _write_report_to_xlsx(self):
Helper.console_output("Writing XLSX")
if DEBUG:
filename = "test.xlsx"
else:
filename = self.filename
writer = pd.ExcelWriter(filename, engine="xlsxwriter")
workbook = writer.book
for sheet_name, dataframe in self.report_dfs.items():
dataframe.to_excel(writer, sheet_name=sheet_name, index=False)
worksheet = writer.sheets[sheet_name]
worksheet.autofilter(0, 0, dataframe.shape[0], dataframe.shape[1] - 1)
writer.close()
class DynatraceDataGetter:
def __init__(self) -> None:
self.config = {"threads": 10}
self.environment = self._load_environment()
# def run(self, data: typing.Dict):
# env_doc = self.environment
def get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
if type(params) is dict:
params_string = f"?{self._build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
# TODO: use helper.py from keyrequest parser
headers = {"Authorization": f"Api-Token {token}"}
response = requests.get(
f"{url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
return response.json()
def krparser_get_data_from_dynatrace(
self, params, environment: str, route: str
) -> typing.Dict:
url = self.environment[environment][1]["env-url"]
token = os.environ[self.environment[environment][2]["env-token-name"]]
dt_url = f"{url}/api/v2/{route}"
headers = {
"Content-Type": "application/json",
"Authorization": "Api-Token " + token,
}
response = helper.get_request(url=dt_url, headers=headers, params=params)
return response.json()
def _build_params(self, params: typing.Dict) -> str:
query_string = "&".join(
f"{key}={urllib.parse.quote(str(value))}" for key, value in params.items()
)
return query_string
def _load_environment(self) -> typing.Dict:
with open("./environment.yaml", "r") as f:
env_doc = yaml.safe_load(f)
return env_doc
class KPIGetter:
def __init__(self) -> None:
self.data_getter = DynatraceDataGetter()
self.extracted_key_requests = defaultdict(dict)
self.metric_expressions = defaultdict(dict) # sheet -> hub -> sloid
def run(self):
"""
Entrypoint for the KPI extension.
"""
if DEBUG:
Helper.console_output("Script running debug mode")
Helper.cleanup_debug_files()
report_reader = ReportReader()
report_reader.run()
# Get SLO IDs from first sheet and build metric expression queries.
for i, sheet in enumerate(report_reader.qm_report_ids.keys()):
if i == 0:
for hub in report_reader.qm_report_ids[sheet].keys():
self.get_slos(report_reader.qm_report_ids[sheet][hub], hub)
self.get_kpi_data(report_reader.qm_report_df)
write_report = QmReportWriter(
report_reader.qm_report_df,
self.metric_expressions,
report_reader.qm_report_file,
)
write_report.run()
# DEBUG
if DEBUG:
with open("metricexpressions.json", "w") as f:
f.write(json.dumps(self.metric_expressions, indent=4))
def _transform_key_requests(self):
"""
Transforms the responses from the key request parser into a joined string.
"""
for hub in self.extracted_key_requests.keys():
for slo in self.extracted_key_requests[hub].keys():
if len(self.extracted_key_requests[hub][slo]["services"]) > 0:
services = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["services"]
)
self.extracted_key_requests[hub][slo][
"services_transformed"
] = services
else:
if DEBUG:
print(f"SERVICE: {hub} - {slo} is empty")
if len(self.extracted_key_requests[hub][slo]["requests"]):
requests = Helper.transform_and_format_list(
self.extracted_key_requests[hub][slo]["requests"]
)
self.extracted_key_requests[hub][slo][
"requests_transformed"
] = requests
else:
if DEBUG:
print(f"REQUEST: {hub} - {slo} is empty")
def _build_environment_names(self, df: pd.DataFrame) -> typing.List:
"""
Creates new environment list from given QM report dataframe.
Args:
df (pd.DataFrame): Converted QM report xlsx into an dataframe
Returns:
typing.List: List with unique environment names.
"""
environment_names = []
for _, row in df.iterrows():
name = f'{row["HUB"]}-{row["type"]}'
if name not in environment_names:
environment_names.append(name)
return environment_names
def _get_time_scope(self, sheet_name: str) -> str:
if sheet_name == "hourly":
pass
elif sheet_name == "daily":
pass
def get_kpi_data(self, dfs: ReportReader):
"""
Creates queries for dynatrace and adds them into a list for further processing.
Args:
dfs (ReportReader): Takes in the dictionary with the QM reports from the ReportReader class.
"""
for sheet in dfs.keys():
self.metric_expressions[sheet] = defaultdict(dict)
hubs = self._build_environment_names(dfs[sheet])
for hub in hubs:
self.metric_expressions[sheet][hub] = defaultdict(dict)
for _, row in dfs[sheet].iterrows():
if (
row["id"]
not in self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}']
):
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
] = []
from_timestamp_ms, to_timestamp_ms = Helper.extract_timestamps(
row["timeframe"]
)
# timeframe = self._get_timeframe_for_kpi_data(
# from_timestamp_ms, to_timestamp_ms
# )
timeframe = self._get_timeframe_for_kpi_data()
# get timestamps shifted
(
from_timestamp_ms_shifted,
to_timestamp_ms_shifted,
) = self._calculate_timeshift(
from_timestamp_ms, to_timestamp_ms, timeframe
)
if (
row["id"]
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}']
):
if (
"services_transformed"
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]
):
# 1M gets deprecated
if timeframe == "1M":
# KPI 1 :timeshift(in days)
# timeshift(-1M) will be deprecated
kpi1_timeshift = f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d"
metric_kpi1 = self._build_kpi_metric_for_query(
"kpi1",
kpi1_timeshift,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
)
else:
metric_kpi1 = self._build_kpi_metric_for_query(
"kpi1",
timeframe,
self.extracted_key_requests[
f'{row["HUB"]}-{row["type"]}'
][row["id"]]["services_transformed"],
)
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
].append(
# self._template_metric_expression(
# "kpi_1",
# metric_kpi1,
# from_timestamp_ms,
# to_timestamp_ms,
# timeframe,
# row["timeframe"],
# )
{
"kpi_name": "kpi_1",
"metric": metric_kpi1,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": timeframe,
"timeframe": row["timeframe"],
}
)
metric_kpi2 = self._build_kpi_metric_for_query(
"kpi2",
timeframe,
self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]["services_transformed"],
)
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
].append(
self._template_metric_expression(
"kpi_2",
metric_kpi2,
from_timestamp_ms_shifted,
to_timestamp_ms_shifted,
timeframe,
row["timeframe"],
)
)
if (
"requests_transformed"
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]
and "services_transformed"
in self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]
):
metric_count = self._build_kpi_metric_for_query(
"count",
timeframe,
self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]["services_transformed"],
self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]["requests_transformed"],
)
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
].append(
{
"kpi_name": "count",
"metric": metric_count,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
# "resolution": timeframe,
"resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
"timeframe": row["timeframe"],
}
)
metric_error_count = self._build_kpi_metric_for_query(
"error_count",
timeframe,
self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]["services_transformed"],
self.extracted_key_requests[f'{row["HUB"]}-{row["type"]}'][
row["id"]
]["requests_transformed"],
)
self.metric_expressions[sheet][f'{row["HUB"]}-{row["type"]}'][
row["id"]
].append(
{
"kpi_name": "error_count",
"metric": metric_error_count,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": f"{Helper.get_days(from_timestamp_ms, to_timestamp_ms)}d",
"timeframe": row["timeframe"],
}
)
self._dispatch_to_dynatrace()
def _dispatch_to_dynatrace(self):
"""
Dispatches all queries to Dynatrace.
"""
Helper.console_output("Fetching data from Dynatrace...")
with concurrent.futures.ThreadPoolExecutor(
self.data_getter.config["threads"]
) as executor:
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
for index, query in enumerate(
self.metric_expressions[sheet][hub][slo]
):
if query["metric"] is not False:
params = {
"metricSelector": query["metric"],
# "resolution": query["resolution"],
"from": query["from_date"],
"to": query["to_date"],
}
if "resolution" in query:
params["resolution"] = query["resolution"]
future = executor.submit(
self.data_getter.krparser_get_data_from_dynatrace,
params,
hub,
"metrics/query",
)
self.metric_expressions[sheet][hub][slo][index][
"data"
] = future
self._process_dynatrace_data()
def _process_dynatrace_data(self):
"""
Processes the responses from Dynatrace and adds them to a dictionary.
"""
for sheet in self.metric_expressions.keys():
for hub in self.metric_expressions[sheet].keys():
for slo in self.metric_expressions[sheet][hub].keys():
for index, query in enumerate(
self.metric_expressions[sheet][hub][slo]
):
future = query["data"]
result = future.result()
if len(result["result"][0]["data"]) > 0:
self.metric_expressions[sheet][hub][slo][index][
"result"
] = result["result"]
self.metric_expressions[sheet][hub][slo][index][
"api_result"
] = self._extract_result_from_api(
result["result"],
self.metric_expressions[sheet][hub][slo][index][
"kpi_name"
],
)
else:
self.metric_expressions[sheet][hub][slo][index][
"result"
] = "None"
del query["data"]
# if DEBUG:
# with open("./slo_results.txt", "a") as f:
# f.write(f"\n{sheet} -> {hub} -> {slo}:\n")
# f.write(json.dumps(result, indent=4))
# f.write("\n")
# f.write("-" * 80)
def _extract_result_from_api(
self, result: typing.Dict, result_type: str
) -> typing.Union[int, float, str]:
if result_type == "kpi_2":
result_values = []
for data in result[0]["data"]:
result_values.append(data["values"][0])
return sum(result_values) / len(result_values)
else:
return result[0]["data"][0]["values"][0]
def _template_metric_expression(
self,
kpi_name: str,
metric_expression: str,
from_timestamp_ms: int,
to_timestamp_ms: int,
resolution: str,
timeframe: str,
) -> typing.Dict:
"""
Template for used for Dynatrace KPI query creation.
Args:
kpi_name (str): KPI name which will be displayed in the QM report
metric_expression (str): The metric selector which will be used to fetch data from Dynatrace
from_timestamp_ms (int): From timestamp in milliseconds
to_timestamp_ms (int): To timestamp in milliseconds
resolution (str): Resolution used for fetching data from Dynatrace
timeframe (str): Timeframe from the original QM report
Returns:
typing.Dict: Returns a dictionary with all the necessary information for futher processing.
"""
element = {
"kpi_name": kpi_name,
"metric": metric_expression,
"from_date": from_timestamp_ms,
"to_date": to_timestamp_ms,
"resolution": resolution,
"timeframe": timeframe,
}
return element
def _calculate_timeshift(
self, from_timestamp_ms: int, to_timestamp_ms: int, resolution: str
) -> typing.Tuple[int, int]:
"""
Calculates the time shift for KPI 2.
Args:
from_timestamp_ms (int): From timestamp in milliseconds.
to_timestamp_ms (int): To timestamp in milliseconds.
resolution (str): The resolution used in the Dynatrace query.
Returns:
typing.Tuple[int, int]: Returns timestamps in milliseconds
"""
if resolution == "7d":
from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
to_ts = to_timestamp_ms
return from_ts, to_ts
if resolution == "1w":
# from_date, end_date = Helper.previous_week_range(
# datetime.fromtimestamp(to_timestamp_ms / 1000), -2
# )
# from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms")
# to_ts = Helper.convert_datetime_to_timestamp(end_date, "ms")
from_ts = from_timestamp_ms - ((60 * 60 * 24 * 7) * 1000)
to_ts = to_timestamp_ms
return from_ts, to_ts
if resolution == "1M":
from_date, _ = Helper.previous_month_range(
datetime.fromtimestamp(from_timestamp_ms / 1000), 1
)
from_ts = Helper.convert_datetime_to_timestamp(from_date, "ms")
# to_ts = Helper.convert_datetime_to_timestamp(to_timestamp_ms, "ms")
to_ts = to_timestamp_ms
return from_ts, to_ts
# def _get_timeframe_for_kpi_data(
# self, from_timestamp: int, to_timestamp: int
# ) -> typing.Union[str, bool]:
# """
# Returns the timeframe for KPI data
# Args:
# from_timestamp (int): From timestamp in milliseconds
# to_timestamp (int): To timestamp in milliseconds
# Returns:
# typing.Union[str, bool]: Returns the timeframe as string. If option not valid, it returns False.
# """
# days = Helper.get_days(from_timestamp, to_timestamp)
# if days == 1:
# return "7d"
# elif days == 7:
# return "1w"
# elif days >= 28 and days < 32:
# return "1M"
# else:
# return False
def _get_timeframe_for_kpi_data(self) -> str:
if REPORT_TYPE == "day":
return "7d"
if REPORT_TYPE == "week":
return "1w"
if REPORT_TYPE == "month":
return "1M"
def _build_kpi_metric_for_query(
self, kpi_type: str, timeframe: str, service: str = None, request: str = None
) -> typing.Union[str, bool]:
# if switches are available (python3.10?) use switches
"""
Returns formatted query string
Args:
kpi_type (str): KPI option.
timeframe (str): Timeframe as string.
service (str, optional): String with services from the KRParser. Defaults to None.
request (str, optional): String with requests from the KRParser. Defaults to None.
Returns:
typing.Union[str, bool]: Returns formatted string for quering Dynatrace. If option not available, it returns False.
"""
if kpi_type == "kpi1":
return f'100*(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):lastReal:splitBy()/builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in({service}))"))))):lastReal:splitBy():timeshift(-{timeframe}))'
elif kpi_type == "kpi2":
timeframe_split = [letter for letter in timeframe]
return f'100*((builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):lastReal())/(builtin:service.requestCount.server:filter(and(or(in("dt.entity.service",entitySelector("type(service),entityName.in({service})"))))):value:rate({timeframe_split[1]}):fold(avg)))'
elif kpi_type == "count":
return f'(builtin:service.keyRequest.count.total:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())'
elif kpi_type == "error_count":
return f'(builtin:service.keyRequest.errors.fivexx.count:filter(and(or(in("dt.entity.service_method",entitySelector("type(service_method), fromRelationship.isServiceMethodOfService( type(~"SERVICE~"),entityName.in( {service} ) ) ,entityName.in( {request} )"))))):lastReal:splitBy())'
else:
return False
def _extract_key_requests(
self, slo_ids_df: pd.DataFrame, env: str, DTURL: str, DTTOKEN: str
):
"""
Extracts key requests using the KRParser.
Args:
slo_ids_df (pd.DataFrame): Dataframe containing SLO Ids.
env (str): The environment used for quering.
DTURL (str): Dynatrace URL.
DTTOKEN (str): Dynatrace token.
"""
Helper.console_output("Extracting Key Requests...")
krp = krparser.KRParser(
name=env,
options=krparser.KROption.RESOLVESERVICES,
config={
"threads": 10,
"serviceLookupParams": {"fields": "tags,fromRelationships"},
"extendResultObjects": {"env": env},
},
DTAPIURL=DTURL,
DTAPIToken=DTTOKEN,
)
krs = krp.parse(slo_ids_df)
self.extracted_key_requests[env] = {}
for kr in krs:
self.extracted_key_requests[env][kr.metadata["sloId"]] = {}
self.extracted_key_requests[env][kr.metadata["sloId"]]["requests"] = []
self.extracted_key_requests[env][kr.metadata["sloId"]]["services"] = []
for key_request in kr.keyRequests:
self.extracted_key_requests[env][kr.metadata["sloId"]][
"requests"
].append(key_request["displayName"])
for service in key_request["services"]:
if (
service["displayName"]
not in self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
]
):
self.extracted_key_requests[env][kr.metadata["sloId"]][
"services"
].append(service["displayName"])
self._transform_key_requests()
def get_slos(self, slo_ids: list, hub: str):
"""
Ingests a list of SLO Ids and prepares a pandas dataframe for KRParser ingestion.
Args:
slo_ids (list): List of SLO Ids.
hub (str): The hub/environment.
"""
slo_responses = []
for slo_id in slo_ids:
response = self.data_getter.get_data_from_dynatrace(slo_id, hub, "slo")
del response["errorBudgetBurnRate"]
slo_responses.append(response)
df = pd.DataFrame(slo_responses)
self._extract_key_requests(
df,
hub,
self.data_getter.environment[hub][1]["env-url"],
os.environ[self.data_getter.environment[hub][2]["env-token-name"]],
)
class Helper:
@staticmethod
def transform_and_format_list(data: list) -> str:
"""
Joins a list to a string.
Args:
data (list): List with data for joining.
Returns:
str: Joined string.
"""
joined_string = ", ".join(data)
string = ", ".join([f'~"{s}~"' for s in joined_string.split(", ")])
return string
@staticmethod
def extract_timestamps(timestamp: str) -> typing.Tuple[int, int]:
"""
Extracts the timestamps from the "timeframe" column in the QM report.
Args:
timestamp (str): "timeframe" column value.
Returns:
typing.Tuple[int, int]: Returns processed "timeframe" value as integers.
"""
ts = timestamp.split(" to ")
return int(ts[0]), int(ts[1])
@staticmethod
def get_days(from_timestamp: int, to_timestamp: int) -> int:
"""
Calculates days between two timestamps.
Args:
from_timestamp (int): From timestamp in milliseconds.
to_timestamp (int): To timestamp in milliseconds.
Returns:
int: Returns the days between two timestamps.
"""
from_date = datetime.fromtimestamp(from_timestamp / 1000)
to_timestamp = datetime.fromtimestamp(to_timestamp / 1000)
duration = to_timestamp - from_date
return duration.days
@staticmethod
def previous_day_range(date):
start_date = date - timedelta(days=1)
end_date = date - timedelta(days=1)
return start_date, end_date
@staticmethod
def previous_week_range(
date: int, weeks: int
) -> typing.Tuple[datetime.date, datetime.date]:
"""
Gets previous week from current timestamp.
Args:
date (_type_): Date as timestamp in seconds.
int (_type_): Weeks to go back.
Returns:
typing.Tuple[datetime.date, datetime.date]: Returns start and end date.
"""
start_date = date + timedelta(-date.weekday(), weeks=weeks) # -1
# end_date = date + timedelta(-date.weekday() - 1)
end_date = date + timedelta(-date.weekday(), weeks=weeks + 1)
return start_date, end_date
@staticmethod
def previous_month_range(date, shift: int):
shifted_date = date - relativedelta(months=shift)
end_date = shifted_date.replace(day=1) - timedelta(days=1)
start_date = end_date.replace(day=1)
return start_date, end_date
@staticmethod
def get_previous_month_days(timestamp_ms: int):
date = datetime.fromtimestamp(timestamp_ms / 1000).date()
end_date = date.replace(day=1) - timedelta(days=1)
start_date = end_date.replace(day=1)
days = Helper.get_days(
Helper.convert_datetime_to_timestamp(start_date) * 1000,
Helper.convert_datetime_to_timestamp(end_date) * 1000,
)
return days + 1
@staticmethod
def convert_datetime_to_timestamp(date: datetime.date, option: str = None) -> int:
"""
Converts datetime object to timestamp.
Returns by default timestamp in seconds.
Args:
date (datetime.date): Datetime object to convert.
option (str, optional): If set to "ms", returns timestamp as milliseconds. Defaults to None.
Returns:
int: _description_
"""
date_datetime = datetime.combine(date, datetime.min.time())
epoch = datetime(1970, 1, 1)
date_timestamp = (date_datetime - epoch).total_seconds()
if option == "ms":
date_timestamp = date_timestamp * 1000
return int(date_timestamp)
@staticmethod
def console_output(text: str, indent=False):
"""
Helper function for uniform console output, when debugging is enabled.
Args:
text (str): _description_
indent (bool, optional): _description_. Defaults to False.
"""
if DEBUG:
if indent:
print(f"{' '*10}{text}")
else:
print(text)
print("-" * 80)
@staticmethod
def cleanup_debug_files():
"""
Cleans up files created in debugging mode.
"""
Helper.console_output("Cleaning up debug files")
files = ["./metricexpressions.json", "./slo_results.txt", "./test.xlsx"]
for file in files:
if os.path.exists(file):
os.remove(file)
Helper.console_output(f"{file.replace('./', '')} removed.", indent=True)
else:
Helper.console_output(
f"{file.replace('./', '')} not found. Nothing removed.", indent=True
)
Helper.console_output("=" * 80)
def main():
"""
Entrypoint.
"""
kpi_getter = KPIGetter()
kpi_getter.run()
if __name__ == "__main__":
main()