qm_report/create_datalake_report.py

576 lines
20 KiB
Python

import requests
import sys
import yaml
import datetime
import time
import pandas as pd
import argparse
import warnings
import typing
import logging
from collections import defaultdict
from decorators import timer
warnings.filterwarnings("ignore")
# DEPRECATED: We work with UTC data and server ist set to UTC. Leaving for reference.
# os.environ['TZ'] = 'Europe/Berlin' # set new timezone
# time.tzset()
# global script configurations
SLO_PARAMETER_FILE: str = "./slo_parameter_dl.yaml"
URL: str = "https://d99gwrp86d.execute-api.eu-central-1.amazonaws.com/enriched-slos-daily/{}?secret={}&identifier={}&hub={}&fromTimestamp={}&toTimestamp={}"
# TODO: Need to create a new key as thisone is leaked. Also, needs to be added to environment on the executing machine.
API_KEY: str = "SD4tEMoG6zbM8mwoSfCVeNMsJbK8XcYF"
IDENTIFIERS = ["ALL_SLOS_DAILY"] # "ALL_SLOS_WEEKLY",
# add on ALL_SLOs. need to move query time window for 5 mins.
DELAY_IN_SECONDS_ALL_SLOS = 300
DELAY_IN_SECONDS_ALL_SLOS_DAILY = 86400
# setup logging - proper logging does not work on jenkins machines
# logging.basicConfig(filename=f'./logs/logs-{datetime.date.today()}.log', level=logging.DEBUG,
# format='%(asctime)s %(levelname)s %(name)s %(message)s')
# logger = logging.getLogger(__name__)
def slo_parameter_builder(slos: list, from_date: datetime.datetime, to_date: datetime.datetime) -> typing.Dict:
"""
Builds a dictionary for requests to endpoint
"""
result = {}
from_date_ts = int(time.mktime(from_date.timetuple()))
to_date_ts = int(time.mktime(to_date.timetuple()))
for identifier in IDENTIFIERS:
slo_result = []
for slo in slos:
hubs = slo[0].split(",")
for hub in hubs:
tmp = {}
tmp["identifier"] = identifier
tmp["slo"] = slo[1]
tmp["year_start"] = slo[2]
tmp["hub"] = hub
tmp["from_date"] = from_date_ts
tmp["to_date"] = to_date_ts
slo_result.append(tmp)
result[identifier] = slo_result
return result
def get_slos(urls_request: typing.Dict) -> typing.Dict:
"""
request SLOs from Datalake endpoint and process.
"""
responses = defaultdict(list)
for hub in urls_request.keys():
for url in urls_request[hub]:
response = requests.get(url, verify=False)
if response.status_code == 200:
responses[hub].append(response.json())
else:
responses[hub].append(False)
return responses
# def build_request_links(type: str, slo_parameters: list, time_df: pd.DataFrame = None, year_to_date: bool = False) -> typing.Dict:
# """
# DEPRECATED
# ==========
# Builds request links for quering data from datalake endpoint on AWS.
# Kept for reference for now.
# Attributes
# type: str
# ALL_SLOS or ALL_SLOS_DAILY
# slo_parameters: list
# list with parameters built in slo_parameter_builder().
# time_df: pd.Dataframe
# Dataframe generated in get_hourly_slice_dataframe().
# year_to_date: bool
# By default False. If set to True it uses year_start from slo_parameters as start date.
# """
# urls_request = {} # []
# if type == "ALL_SLOS_DAILY":
# # TODO: we might need to move the from date by +24 hours
# for slo_parameter in slo_parameters:
# if slo_parameter["hub"] not in urls_request:
# urls_request[slo_parameter["hub"]] = []
# if not year_to_date:
# for index, row in time_df.iterrows():
# # delay = 0
# # if index == len(time_df) - 1:
# # delay = DELAY_IN_SECONDS_ALL_SLOS_DAILY
# # TODO: add delay on starttime
# urls_request[slo_parameter["hub"]].append(URL.format(slo_parameter["slo"], API_KEY, slo_parameter["identifier"],
# slo_parameter["hub"], row["startTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY, row["endTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY))
# else:
# urls_request.append(URL.format(slo_parameter["slo"], API_KEY, slo_parameter["identifier"],
# slo_parameter["hub"], slo_parameter["year_start"], slo_parameter["to_date"]))
# if type == "ALL_SLOS":
# # remove when more hubs are available. currently there is only euprod available on the datalake
# for slo_parameter in slo_parameters:
# # if slo_parameter["hub"] == "euprod":
# urls_request[slo_parameter["hub"]] = []
# for _, row in time_df.iterrows():
# urls_request[slo_parameter["hub"]].append(URL.format(slo_parameter["slo"], API_KEY, slo_parameter["identifier"],
# slo_parameter["hub"], row["startTime"]+DELAY_IN_SECONDS_ALL_SLOS, row["endTime"]+DELAY_IN_SECONDS_ALL_SLOS))
# return urls_request
def build_request_links(type: str, slo_parameters: list, time_df: pd.DataFrame = None, year_to_date: bool = False) -> typing.Dict:
"""
Builds request links for quering data from datalake endpoint on AWS.
Attributes
type: str
ALL_SLOS or ALL_SLOS_DAILY
slo_parameters: list
list with parameters built in slo_parameter_builder().
time_df: pd.Dataframe
Dataframe generated in get_hourly_slice_dataframe().
year_to_date: bool
By default False. If set to True it uses year_start from slo_parameters as start date.
"""
urls_request = defaultdict(list)
if type == "ALL_SLOS_DAILY":
for slo_parameter in slo_parameters:
if len(urls_request[slo_parameter["hub"]]) < len(time_df):
for _, row in time_df.iterrows():
urls_request[slo_parameter["hub"]].append(URL.format("no-filter", API_KEY, slo_parameter["identifier"], slo_parameter["hub"],
row["startTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY, row["endTime"]+DELAY_IN_SECONDS_ALL_SLOS_DAILY))
return urls_request
else:
return "nothing else supported yet"
def previous_day_range(date: datetime.datetime) -> typing.Tuple[datetime.datetime, datetime.datetime]:
start_date = date - datetime.timedelta(days=1)
end_date = date - datetime.timedelta(days=1)
return start_date, end_date
def previous_week_range(date: datetime.datetime) -> typing.Tuple[datetime.datetime, datetime.datetime]:
start_date = date + datetime.timedelta(-date.weekday(), weeks=-1)
end_date = date + datetime.timedelta(-date.weekday() - 1)
return start_date, end_date
def previous_month_range(date: datetime.datetime) -> typing.Tuple[datetime.datetime, datetime.datetime]:
end_date = date.replace(day=1) - datetime.timedelta(days=1)
start_date = end_date.replace(day=1)
return start_date, end_date
def load_yml(path: str) -> typing.Dict:
"""
Loads yaml file and returns a dictionary
Attributes
path: str
path to file
"""
with open(path) as file:
data = yaml.safe_load(file)
return data
def load_slo_parameter() -> typing.List:
"""
Creates a list with SLO configs from the slo_parameter_dl.yaml
"""
slo_data = load_yml(SLO_PARAMETER_FILE)
# used for check if config is complete
mandatory_fields = ['hub', 'selector_var', 'yearstart']
# result list. will be returned
result = []
for slo_header, slo_config in slo_data.items():
tmp = {}
# check if header is broken
if len(slo_data[slo_header]) == 4:
# logger.error(f"SLO Configuration {slo_header} is broken")
print(f"SLO Configuration {slo_header} is broken")
continue
for config in slo_config:
tmp.update(config)
if sorted(list(tmp.keys())) == mandatory_fields:
year_start = datetime.datetime.strptime(
tmp['yearstart'], "%Y-%m-%d")
year_start = datetime.datetime(
year_start.year, year_start.month, year_start.day)
year_start = int(time.mktime(year_start.timetuple()))
result.append([tmp["hub"], tmp["selector_var"],
year_start, slo_header])
else:
# logger.error(f"SLO Configuration {slo_header} is broken")
print(f"SLO Configuration {slo_header} is broken")
return result
def init_argparse() -> argparse.ArgumentParser:
"""
Argument parser
"""
parser = argparse.ArgumentParser(
usage="%(prog)s [--fromDate] [toDate] or [preSelect]",
description="gather SLO in daily slices for given Timeframe"
)
parser.add_argument(
"-f", "--fromDate",
help="YYYY-mm-dd e.g. 2022-01-01"
)
parser.add_argument(
"-t", "--toDate",
help="YYYY-mm-dd e.g. 2022-01-31"
)
parser.add_argument(
"-p", "--preSelect",
help="day | week | month - gathers the data for the last full day, week or month"
)
parser.add_argument(
"-s", "--slices",
help="h | d | t | y - !! h - hourly not supported yet !! writes the slices hourly, daily, total or year to date into ecxel. given in any order"
)
return parser
def check_inputs(args: argparse.ArgumentParser) -> typing.Tuple[datetime.datetime, datetime.datetime]:
"""
This function is the single point of true for arguments.
If new arguments are added they need to be added in here.
Attributes
args: argsparse.ArgumentParser
Arguments from argument parser
"""
if args.preSelect and (args.fromDate or args.toDate):
print("--preSelect must not be used in conjuntion with --fromDate and/or --toDate")
sys.exit()
elif args.fromDate and not args.toDate:
print("--fromDate only in conjunction with --toDate")
sys.exit()
elif args.toDate and not args.fromDate:
print("--toDate only in conjunction with --fromDate")
sys.exit()
elif args.toDate and args.fromDate and not args.preSelect:
try:
fromDate = datetime.datetime.strptime(args.fromDate, "%Y-%m-%d")
toDate = datetime.datetime.strptime(args.toDate, "%Y-%m-%d")
except Exception as e:
# logger.error(str(e))
print(f"ERROR: {str(e)}")
sys.exit()
if toDate < fromDate:
print("--toDate can't be older than --fromDate")
if toDate.date() > datetime.date.today() or fromDate.date() > datetime.date.today():
print("--toDate or --fromDate can't be in the future")
sys.exit()
elif args.preSelect and not args.fromDate and not args.toDate:
date = datetime.date.today()
if args.preSelect == "week":
fromDate, toDate = previous_week_range(date)
elif args.preSelect == "month":
fromDate, toDate = previous_month_range(date)
elif args.preSelect == "day":
fromDate, toDate = previous_day_range(date)
else:
print("--preSelect must be week or month")
sys.exit()
else:
print("Invalid arguments, please use --help")
sys.exit()
if args.slices == None:
print("-s or --slices must not be null and needs at least one letter of h d t or y, lower- or uppercase.")
sys.exit()
elif sum([1 if one_inp in str.lower(args.slices) else 0 for one_inp in ['h', 'd', 't', 'y']]) == 0:
print("-s or --slices must has at least one letter of h d t or y, lower- or uppercase.")
sys.exit()
return fromDate, toDate
def get_daily_slice_dataframe(from_date: datetime.datetime, to_date: datetime.datetime) -> pd.DataFrame:
"""
Builds pandas dataframe with days
Attributes
from_date: datetime
to_date: datetime
"""
tmp_start = from_date
result_df = pd.DataFrame()
# Add first day
tmp_end = tmp_start + datetime.timedelta(hours=24)
start_ts = time.mktime(tmp_start.timetuple())
end_ts = time.mktime(tmp_end.timetuple())
row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts}
result_df = result_df.append(row, ignore_index=True)
while tmp_start < to_date:
tmp_start = tmp_start + datetime.timedelta(hours=24)
tmp_end = tmp_start + datetime.timedelta(hours=24)
start_ts = time.mktime(tmp_start.timetuple())
end_ts = time.mktime(tmp_end.timetuple())
row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts}
result_df = result_df.append(row, ignore_index=True)
return result_df
def get_hourly_slice_dataframe(from_date: datetime.datetime, to_date: datetime.datetime) -> pd.DataFrame:
"""
Build pandas dataframe with hours
Attributes
from_date: datetime
to_date: datetime
"""
tmp_start = datetime.datetime(
from_date.year, from_date.month, from_date.day)
final_end = datetime.datetime.combine(
to_date, datetime.datetime.max.time())
result_df = pd.DataFrame()
tmp_end = tmp_start + datetime.timedelta(hours=1)
start_ts = time.mktime(tmp_start.timetuple())
end_ts = time.mktime(tmp_end.timetuple())
row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts}
result_df = result_df.append(row, ignore_index=True)
while tmp_start < final_end:
tmp_start = tmp_start + datetime.timedelta(hours=1)
tmp_end = tmp_start + datetime.timedelta(hours=1)
start_ts = time.mktime(tmp_start.timetuple())
end_ts = time.mktime(tmp_end.timetuple())
row = {"Date": tmp_start, "startTime": start_ts, "endTime": end_ts}
result_df = result_df.append(row, ignore_index=True)
return result_df
def process_data(data_from_datalake: typing.Dict, slo_parameters: typing.Dict) -> pd.DataFrame:
"""
Processes data from the datalake and puts everything into a pandas dataframe.
Function includes a small integrity check of the SLOs. Does not throw errors but logs them.
Attributes
data_from_datalake: dict
Data from the response of the datalake on AWS.
slo_parameters: dict
Pass in for example slo_parameters["ALL_SLOS_DAILY"]
"""
null_data_count = 0
# create a dict with list of slos for each hub
slos = defaultdict(list)
for slo in slo_parameters:
slos[slo["hub"]].append(slo["slo"])
tmp = []
# check if all slos are in the report
def compare_lists(l1, l2, hub):
l1 = l1.sort()
l2 = l2.sort()
if (l1 == l2):
# logger.info(f"{hub} - Complete")
print(f"{hub} - Complete")
else:
# logger.warn(f"{hub} - Incomplete")
print(f"{hub} - Incomplete")
for hub in data_from_datalake.keys():
# only used for checking if all slos are in the report
assigned_slos = []
for x in data_from_datalake[hub]:
for data in x:
if data:
if data["name"] in slos[hub]:
timestamp = int(data["timeframe"].split(" to ")[0])
data["Date"] = datetime.datetime.fromtimestamp(
timestamp/1000).date()
data["HUB"] = hub
tmp.append(data)
# only used for checking if all slos are in the report
assigned_slos.append(data["name"])
else:
null_data_count += 1
# only used for checking if all slos are in the report
compare_lists(assigned_slos, slos[hub], hub)
df = pd.DataFrame(tmp)
df[["description", "Touchpoint"]
] = df["description"].str.split("_", expand=True)
df = df[['Date', 'HUB', 'id', 'enabled', 'name', 'description', 'Touchpoint', 'evaluatedPercentage', 'errorBudget',
'status', 'error', 'target', 'warning', 'evaluationType', 'timeframe', 'metricExpression', 'filter']]
# logger.info(f"Null Data Count: {null_data_count}")
print(f"Null Data Count: {null_data_count}")
return df
# TODO: Make process_data more efficient.
# def process_data(data_from_datalake: typing.Dict, time_df: pd.DataFrame) -> pd.DataFrame:
# """
# Processes data from the datalake and puts everything into a pandas dataframe.
# """
# null_data_count = 0
# tmp = [dict(data[0], **{'Date': time_df.iloc[i % len(time_df)]['Date'], 'HUB': hub})
# for hub in data_from_datalake.keys()
# for i, data in enumerate(data_from_datalake[hub])
# if data]
# df = pd.DataFrame(tmp)
# df[["description", "Touchpoint"]
# ] = df["description"].str.split("_", expand=True)
# df = df[['Date', 'HUB', 'id', 'enabled', 'name', 'description', 'Touchpoint', 'evaluatedPercentage', 'errorBudget',
# 'status', 'error', 'target', 'warning', 'evaluationType', 'timeframe', 'metricExpression', 'filter']]
# print(f"Null Data Count: {null_data_count}")
# logger.info(f"Null Data Count: {null_data_count}")
# return df
def write_to_excel(from_date: datetime.datetime, args: argparse.ArgumentParser, daily_all: pd.DataFrame = None, ytd_all: pd.DataFrame = None) -> None:
"""
Writes data from pandas dataframe to excel format.
Attributes
from_date: datetime
Used for building the calendar week in the filename for the weekly report.
args: argparse.ArgumentParser
Arguments from the argument parser.
daily_all: pd.DataFrame
Pandas DataFrame containing the daily data.
ytd_all: pd.DataFrame
Pandas DataFrame containing the year to date data.
"""
touchpoints = ["Vehicle", "Mobile"]
if args.preSelect == "day":
filename = f"./QM_Report_Datalake_{datetime.date.today() - datetime.timedelta(days=1)}.xlsx"
else:
filename = f"./QM_Report_Datalake_{from_date.isocalendar()[1]}.xlsx"
writer = pd.ExcelWriter(filename, engine='xlsxwriter')
if not daily_all.empty and "d" in str.lower(args.slices):
daily_all = daily_all[daily_all["Touchpoint"].isin(touchpoints)]
daily_all.to_excel(writer, sheet_name='daily', index=False)
if not ytd_all.empty and "y" in str.lower(args.slices):
ytd_all = ytd_all[ytd_all["Touchpoint"].isin(touchpoints)]
ytd_all.to_excel(writer, sheet_name="ytd", index=False)
writer.save()
writer.close()
@timer
def main() -> None:
parser = init_argparse()
args = parser.parse_args()
from_date, to_date = check_inputs(args)
# logger.info(
# f"slices: {args.slices} | fromDate: {from_date} | toDate: {to_date}")
print(
f"slices: {args.slices} | fromDate: {from_date} | toDate: {to_date}")
# build slice dataframes TODO: only build whats needed
days = get_daily_slice_dataframe(from_date, to_date)
# hours = get_hourly_slice_dataframe(from_date, to_date)
slo_parameters = slo_parameter_builder(
load_slo_parameter(), from_date, to_date)
daily_all = pd.DataFrame()
ytd_all = pd.DataFrame()
if "d" in str.lower(args.slices):
urls_request = build_request_links(
"ALL_SLOS_DAILY", slo_parameters["ALL_SLOS_DAILY"], time_df=days)
data_from_datalake = get_slos(urls_request)
daily_all = process_data(
data_from_datalake, slo_parameters["ALL_SLOS_DAILY"])
if "h" in str.lower(args.slices):
print("hourly not supported yet")
if "y" in str.lower(args.slices):
print("ytd not supported yet")
if "t" in str.lower(args.slices):
print("total not supported yet")
write_to_excel(from_date, args, daily_all, ytd_all)
if __name__ == "__main__":
main()