import requests import sys import yaml import datetime import time import pandas as pd import argparse import warnings import typing import logging from collections import defaultdict from decorators import timer warnings.filterwarnings("ignore") # DEPRECATED: We work with UTC data and server ist set to UTC. Leaving for reference. # os.environ['TZ'] = 'Europe/Berlin' # set new timezone # time.tzset() # global script configurations SLO_PARAMETER_FILE: str = "./slo_parameter_dl.yaml" URL: str = "https://d99gwrp86d.execute-api.eu-central-1.amazonaws.com/enriched-slos-daily/{}?secret={}&identifier={}&hub={}&fromTimestamp={}&toTimestamp={}" # TODO: Need to create a new key as thisone is leaked. Also, needs to be added to environment on the executing machine. API_KEY: str = "SD4tEMoG6zbM8mwoSfCVeNMsJbK8XcYF" IDENTIFIERS = ["ALL_SLOS_DAILY"] # "ALL_SLOS_WEEKLY", # add on ALL_SLOs. need to move query time window for 5 mins. DELAY_IN_SECONDS_ALL_SLOS = 300 DELAY_IN_SECONDS_ALL_SLOS_DAILY = 86400 # setup logging - proper logging does not work on jenkins machines # logging.basicConfig(filename=f'./logs/logs-{datetime.date.today()}.log', level=logging.DEBUG, # format='%(asctime)s %(levelname)s %(name)s %(message)s') # logger = logging.getLogger(__name__) def slo_parameter_builder(slos: list, from_date: datetime.datetime, to_date: datetime.datetime) -> typing.Dict: """ Builds a dictionary for requests to endpoint """ result = {} from_date_ts = int(time.mktime(from_date.timetuple())) to_date_ts = int(time.mktime(to_date.timetuple())) for identifier in IDENTIFIERS: slo_result = [] for slo in slos: hubs = slo[0].split(",") for hub in hubs: tmp = {} tmp["identifier"] = identifier tmp["slo"] = slo[1] tmp["year_start"] = slo[2] tmp["hub"] = hub tmp["from_date"] = from_date_ts tmp["to_date"] = to_date_ts slo_result.append(tmp) result[identifier] = slo_result return result def get_slos(urls_request: typing.Dict) -> typing.Dict: """ request SLOs from Datalake endpoint and process. """ responses = defaultdict(list) for hub in urls_request.keys(): for url in urls_request[hub]: response = requests.get(url, verify=False) if response.status_code == 200: responses[hub].append(response.json()) else: responses[hub].append(False) return responses # def build_request_links(type: str, slo_parameters: list, time_df: pd.DataFrame = None, year_to_date: bool = False) -> typing.Dict: # """ # DEPRECATED # ========== # Builds request links for quering data from datalake endpoint on AWS. # Kept for reference for now. # Attributes # type: str # ALL_SLOS or ALL_SLOS_DAILY # slo_parameters: list # list with parameters built in slo_parameter_builder(). # time_df: pd.Dataframe # Dataframe generated in get_hourly_slice_dataframe(). # year_to_date: bool # By default False. If set to True it uses year_start from slo_parameters as start date. # """ # urls_request = {} # [] # if type == "ALL_SLOS_DAILY": # # TODO: we might need to move the from date by +24 hours # for slo_parameter in slo_parameters: # if slo_parameter["hub"] not in urls_request: # urls_request[slo_parameter["hub"]] = [] # if not year_to_date: # for index, row in time_df.iterrows(): # # delay = 0 # # if index == len(time_df) - 1: # # delay = DELAY_IN_SECONDS_ALL_SLOS_DAILY # # TODO: add delay on starttime # urls_request[slo_parameter["hub"]].append(URL.format(slo_parameter["slo"], API_KEY, slo_parameter["identifier"], # slo_parameter["hub"], row["startTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY, row["endTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY)) # else: # urls_request.append(URL.format(slo_parameter["slo"], API_KEY, slo_parameter["identifier"], # slo_parameter["hub"], slo_parameter["year_start"], slo_parameter["to_date"])) # if type == "ALL_SLOS": # # remove when more hubs are available. currently there is only euprod available on the datalake # for slo_parameter in slo_parameters: # # if slo_parameter["hub"] == "euprod": # urls_request[slo_parameter["hub"]] = [] # for _, row in time_df.iterrows(): # urls_request[slo_parameter["hub"]].append(URL.format(slo_parameter["slo"], API_KEY, slo_parameter["identifier"], # slo_parameter["hub"], row["startTime"]+DELAY_IN_SECONDS_ALL_SLOS, row["endTime"]+DELAY_IN_SECONDS_ALL_SLOS)) # return urls_request def build_request_links(type: str, slo_parameters: list, time_df: pd.DataFrame = None, year_to_date: bool = False) -> typing.Dict: """ Builds request links for quering data from datalake endpoint on AWS. Attributes type: str ALL_SLOS or ALL_SLOS_DAILY slo_parameters: list list with parameters built in slo_parameter_builder(). time_df: pd.Dataframe Dataframe generated in get_hourly_slice_dataframe(). year_to_date: bool By default False. If set to True it uses year_start from slo_parameters as start date. """ urls_request = defaultdict(list) if type == "ALL_SLOS_DAILY": for slo_parameter in slo_parameters: if len(urls_request[slo_parameter["hub"]]) < len(time_df): for _, row in time_df.iterrows(): urls_request[slo_parameter["hub"]].append(URL.format("no-filter", API_KEY, slo_parameter["identifier"], slo_parameter["hub"], row["startTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY, row["endTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY)) return urls_request else: return "nothing else supported yet" def previous_day_range(date: datetime.datetime) -> typing.Tuple[datetime.datetime, datetime.datetime]: start_date = date - datetime.timedelta(days=1) end_date = date - datetime.timedelta(days=1) return start_date, end_date def previous_week_range(date: datetime.datetime) -> typing.Tuple[datetime.datetime, datetime.datetime]: start_date = date + datetime.timedelta(-date.weekday(), weeks=-1) end_date = date + datetime.timedelta(-date.weekday() - 1) return start_date, end_date def previous_month_range(date: datetime.datetime) -> typing.Tuple[datetime.datetime, datetime.datetime]: end_date = date.replace(day=1) - datetime.timedelta(days=1) start_date = end_date.replace(day=1) return start_date, end_date def load_yml(path: str) -> typing.Dict: """ Loads yaml file and returns a dictionary Attributes path: str path to file """ with open(path) as file: data = yaml.safe_load(file) return data def load_slo_parameter() -> typing.List: """ Creates a list with SLO configs from the slo_parameter_dl.yaml """ slo_data = load_yml(SLO_PARAMETER_FILE) # used for check if config is complete mandatory_fields = ['hub', 'selector_var', 'yearstart'] # result list. will be returned result = [] for slo_header, slo_config in slo_data.items(): tmp = {} # check if header is broken if len(slo_data[slo_header]) == 4: # logger.error(f"SLO Configuration {slo_header} is broken") print(f"SLO Configuration {slo_header} is broken") continue for config in slo_config: tmp.update(config) if sorted(list(tmp.keys())) == mandatory_fields: year_start = datetime.datetime.strptime( tmp['yearstart'], "%Y-%m-%d") year_start = datetime.datetime( year_start.year, year_start.month, year_start.day) year_start = int(time.mktime(year_start.timetuple())) result.append([tmp["hub"], tmp["selector_var"], year_start, slo_header]) else: # logger.error(f"SLO Configuration {slo_header} is broken") print(f"SLO Configuration {slo_header} is broken") return result def init_argparse() -> argparse.ArgumentParser: """ Argument parser """ parser = argparse.ArgumentParser( usage="%(prog)s [--fromDate] [toDate] or [preSelect]", description="gather SLO in daily slices for given Timeframe" ) parser.add_argument( "-f", "--fromDate", help="YYYY-mm-dd e.g. 2022-01-01" ) parser.add_argument( "-t", "--toDate", help="YYYY-mm-dd e.g. 2022-01-31" ) parser.add_argument( "-p", "--preSelect", help="day | week | month - gathers the data for the last full day, week or month" ) parser.add_argument( "-s", "--slices", help="h | d | t | y - !! h - hourly not supported yet !! writes the slices hourly, daily, total or year to date into ecxel. given in any order" ) return parser def check_inputs(args: argparse.ArgumentParser) -> typing.Tuple[datetime.datetime, datetime.datetime]: """ This function is the single point of true for arguments. If new arguments are added they need to be added in here. Attributes args: argsparse.ArgumentParser Arguments from argument parser """ if args.preSelect and (args.fromDate or args.toDate): print("--preSelect must not be used in conjuntion with --fromDate and/or --toDate") sys.exit() elif args.fromDate and not args.toDate: print("--fromDate only in conjunction with --toDate") sys.exit() elif args.toDate and not args.fromDate: print("--toDate only in conjunction with --fromDate") sys.exit() elif args.toDate and args.fromDate and not args.preSelect: try: fromDate = datetime.datetime.strptime(args.fromDate, "%Y-%m-%d") toDate = datetime.datetime.strptime(args.toDate, "%Y-%m-%d") except Exception as e: # logger.error(str(e)) print(f"ERROR: {str(e)}") sys.exit() if toDate < fromDate: print("--toDate can't be older than --fromDate") if toDate.date() > datetime.date.today() or fromDate.date() > datetime.date.today(): print("--toDate or --fromDate can't be in the future") sys.exit() elif args.preSelect and not args.fromDate and not args.toDate: date = datetime.date.today() if args.preSelect == "week": fromDate, toDate = previous_week_range(date) elif args.preSelect == "month": fromDate, toDate = previous_month_range(date) elif args.preSelect == "day": fromDate, toDate = previous_day_range(date) else: print("--preSelect must be week or month") sys.exit() else: print("Invalid arguments, please use --help") sys.exit() if args.slices == None: print("-s or --slices must not be null and needs at least one letter of h d t or y, lower- or uppercase.") sys.exit() elif sum([1 if one_inp in str.lower(args.slices) else 0 for one_inp in ['h', 'd', 't', 'y']]) == 0: print("-s or --slices must has at least one letter of h d t or y, lower- or uppercase.") sys.exit() return fromDate, toDate def get_daily_slice_dataframe(from_date: datetime.datetime, to_date: datetime.datetime) -> pd.DataFrame: """ Builds pandas dataframe with days Attributes from_date: datetime to_date: datetime """ tmp_start = from_date result_df = pd.DataFrame() # Add first day tmp_end = tmp_start + datetime.timedelta(hours=24) start_ts = time.mktime(tmp_start.timetuple()) end_ts = time.mktime(tmp_end.timetuple()) row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts} result_df = result_df.append(row, ignore_index=True) while tmp_start < to_date: tmp_start = tmp_start + datetime.timedelta(hours=24) tmp_end = tmp_start + datetime.timedelta(hours=24) start_ts = time.mktime(tmp_start.timetuple()) end_ts = time.mktime(tmp_end.timetuple()) row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts} result_df = result_df.append(row, ignore_index=True) return result_df def get_hourly_slice_dataframe(from_date: datetime.datetime, to_date: datetime.datetime) -> pd.DataFrame: """ Build pandas dataframe with hours Attributes from_date: datetime to_date: datetime """ tmp_start = datetime.datetime( from_date.year, from_date.month, from_date.day) final_end = datetime.datetime.combine( to_date, datetime.datetime.max.time()) result_df = pd.DataFrame() tmp_end = tmp_start + datetime.timedelta(hours=1) start_ts = time.mktime(tmp_start.timetuple()) end_ts = time.mktime(tmp_end.timetuple()) row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts} result_df = result_df.append(row, ignore_index=True) while tmp_start < final_end: tmp_start = tmp_start + datetime.timedelta(hours=1) tmp_end = tmp_start + datetime.timedelta(hours=1) start_ts = time.mktime(tmp_start.timetuple()) end_ts = time.mktime(tmp_end.timetuple()) row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts} result_df = result_df.append(row, ignore_index=True) return result_df def process_data(data_from_datalake: typing.Dict, slo_parameters: typing.Dict) -> pd.DataFrame: """ Processes data from the datalake and puts everything into a pandas dataframe. Function includes a small integrity check of the SLOs. Does not throw errors but logs them. Attributes data_from_datalake: dict Data from the response of the datalake on AWS. slo_parameters: dict Pass in for example slo_parameters["ALL_SLOS_DAILY"] """ null_data_count = 0 # create a dict with list of slos for each hub slos = defaultdict(list) for slo in slo_parameters: slos[slo["hub"]].append(slo["slo"]) tmp = [] # check if all slos are in the report def compare_lists(l1, l2, hub): l1 = l1.sort() l2 = l2.sort() if (l1 == l2): # logger.info(f"{hub} - Complete") print(f"{hub} - Complete") else: # logger.warn(f"{hub} - Incomplete") print(f"{hub} - Incomplete") for hub in data_from_datalake.keys(): # only used for checking if all slos are in the report assigned_slos = [] for x in data_from_datalake[hub]: for data in x: if data: if data["name"] in slos[hub]: timestamp = int(data["timeframe"].split(" to ")[0]) data["Date"] = datetime.datetime.fromtimestamp( timestamp/1000).date() data["HUB"] = hub tmp.append(data) # only used for checking if all slos are in the report assigned_slos.append(data["name"]) else: null_data_count += 1 # only used for checking if all slos are in the report compare_lists(assigned_slos, slos[hub], hub) df = pd.DataFrame(tmp) df[["description", "Touchpoint"] ] = df["description"].str.split("_", expand=True) df = df[['Date', 'HUB', 'id', 'enabled', 'name', 'description', 'Touchpoint', 'evaluatedPercentage', 'errorBudget', 'status', 'error', 'target', 'warning', 'evaluationType', 'timeframe', 'metricExpression', 'filter']] # logger.info(f"Null Data Count: {null_data_count}") print(f"Null Data Count: {null_data_count}") return df # TODO: Make process_data more efficient. # def process_data(data_from_datalake: typing.Dict, time_df: pd.DataFrame) -> pd.DataFrame: # """ # Processes data from the datalake and puts everything into a pandas dataframe. # """ # null_data_count = 0 # tmp = [dict(data[0], **{'Date': time_df.iloc[i % len(time_df)]['Date'], 'HUB': hub}) # for hub in data_from_datalake.keys() # for i, data in enumerate(data_from_datalake[hub]) # if data] # df = pd.DataFrame(tmp) # df[["description", "Touchpoint"] # ] = df["description"].str.split("_", expand=True) # df = df[['Date', 'HUB', 'id', 'enabled', 'name', 'description', 'Touchpoint', 'evaluatedPercentage', 'errorBudget', # 'status', 'error', 'target', 'warning', 'evaluationType', 'timeframe', 'metricExpression', 'filter']] # print(f"Null Data Count: {null_data_count}") # logger.info(f"Null Data Count: {null_data_count}") # return df def write_to_excel(from_date: datetime.datetime, args: argparse.ArgumentParser, daily_all: pd.DataFrame = None, ytd_all: pd.DataFrame = None) -> None: """ Writes data from pandas dataframe to excel format. Attributes from_date: datetime Used for building the calendar week in the filename for the weekly report. args: argparse.ArgumentParser Arguments from the argument parser. daily_all: pd.DataFrame Pandas DataFrame containing the daily data. ytd_all: pd.DataFrame Pandas DataFrame containing the year to date data. """ touchpoints = ["Vehicle", "Mobile"] if args.preSelect == "day": filename = f"./QM_Report_Datalake_{datetime.date.today() - datetime.timedelta(days=1)}.xlsx" else: filename = f"./QM_Report_Datalake_{from_date.isocalendar()[1]}.xlsx" writer = pd.ExcelWriter(filename, engine='xlsxwriter') if not daily_all.empty and "d" in str.lower(args.slices): daily_all = daily_all[daily_all["Touchpoint"].isin(touchpoints)] daily_all.to_excel(writer, sheet_name='daily', index=False) if not ytd_all.empty and "y" in str.lower(args.slices): ytd_all = ytd_all[ytd_all["Touchpoint"].isin(touchpoints)] ytd_all.to_excel(writer, sheet_name="ytd", index=False) writer.save() writer.close() @timer def main() -> None: parser = init_argparse() args = parser.parse_args() from_date, to_date = check_inputs(args) # logger.info( # f"slices: {args.slices} | fromDate: {from_date} | toDate: {to_date}") print( f"slices: {args.slices} | fromDate: {from_date} | toDate: {to_date}") # build slice dataframes TODO: only build whats needed days = get_daily_slice_dataframe(from_date, to_date) # hours = get_hourly_slice_dataframe(from_date, to_date) slo_parameters = slo_parameter_builder( load_slo_parameter(), from_date, to_date) daily_all = pd.DataFrame() ytd_all = pd.DataFrame() if "d" in str.lower(args.slices): urls_request = build_request_links( "ALL_SLOS_DAILY", slo_parameters["ALL_SLOS_DAILY"], time_df=days) data_from_datalake = get_slos(urls_request) daily_all = process_data( data_from_datalake, slo_parameters["ALL_SLOS_DAILY"]) if "h" in str.lower(args.slices): print("hourly not supported yet") if "y" in str.lower(args.slices): print("ytd not supported yet") if "t" in str.lower(args.slices): print("total not supported yet") write_to_excel(from_date, args, daily_all, ytd_all) if __name__ == "__main__": main()