qm_report/createReport.py

546 lines
18 KiB
Python

#!/usr/bin/python3.8
import logging
from tracemalloc import start
from typing import Dict
from decouple import config
import sys
import yaml
import datetime
import time
import pandas as pd
import argparse
import warnings
import os
import dynatraceAPI
from pagination import Pagionation
warnings.filterwarnings("ignore")
# warning, there are warnings which are ignored!
try:
os.environ["TZ"] = "Europe/Berlin" # set new timezone
time.tzset()
except Exception as e:
print(f"This error was encounterted : {e}")
COLUMNS_IN_CSV = ["Date",
"HUB",
"id",
"name",
"evaluatedPercentage",
"status"]
COLUMNS_IN_XLSX = [
"Date",
"HUB",
"id",
"enabled",
"name",
"description",
"Touchpoint",
"evaluatedPercentage",
"errorBudget",
"status",
"error",
"target",
"warning",
"evaluationType",
"timeframe",
"metricExpression",
"filter",
"type"
]
def previous_day_range(date):
start_date = date - datetime.timedelta(days=1)
end_date = date - datetime.timedelta(days=1)
return start_date, end_date
def previous_week_range(date):
start_date = date + datetime.timedelta(-date.weekday(), weeks=-1)
end_date = date + datetime.timedelta(-date.weekday() - 1)
return start_date, end_date
def previous_month_range(date):
end_date = date.replace(day=1) - datetime.timedelta(days=1)
start_date = end_date.replace(day=1)
return start_date, end_date
def getSLO(
DTAPIToken, DTENV, fromDate, toDate, selector_var, selector_type, header_name
):
# DTENV = base url
# DTAPIToken = sec token
dtclient = dynatraceAPI.Dynatrace(
DTENV, DTAPIToken, logging.Logger("ERROR"), None, None, 0, 2 * 1000
)
my_params_report = {
"pageSize": 25,
"from": int(fromDate),
"to": int(toDate),
"timeFrame": "GTF",
"evaluate": "true",
# name = exact name, text = like
"sloSelector": f"""{selector_type}("{header_name}")"""
# 'sloSelector': f"""name("{header_name}")"""
}
# gets all slos and filter later
api_url_report = "/api/v2/slo"
pages = dtclient.returnPageination(api_url_report, my_params_report, "slo")
# only_wanted = [x for x in pages.elements if str.lower(selector) in str.lower(x['description'])]
df = pd.DataFrame(pages.elements)
return df
def get_daily_slice(start_date, end_date):
tempstart = start_date
days = pd.DataFrame()
# Add the first day
tempend = tempstart + datetime.timedelta(hours=24)
startms = time.mktime(tempstart.timetuple()) * 1000
endms = time.mktime(tempend.timetuple()) * 1000
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
days = pd.concat([days, pd.DataFrame([row])], ignore_index=True)
while tempstart < end_date:
tempstart = tempstart + datetime.timedelta(hours=24)
tempend = tempstart + datetime.timedelta(hours=24)
startms = time.mktime(tempstart.timetuple()) * 1000
endms = time.mktime(tempend.timetuple()) * 1000
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
days = pd.concat([days, pd.DataFrame([row])], ignore_index=True)
return days
def get_hourly_slice(start_date, end_date):
# date object to datetime
tempstart = datetime.datetime(start_date.year, start_date.month, start_date.day)
# date object to datetime
final_end = datetime.datetime.combine(end_date, datetime.datetime.max.time())
hours = pd.DataFrame()
# Add the first slice
tempend = tempstart + datetime.timedelta(hours=1)
startms = time.mktime(tempstart.timetuple()) * 1000
endms = time.mktime(tempend.timetuple()) * 1000
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
hours = pd.concat([hours, pd.DataFrame([row])], ignore_index=True)
while tempstart < final_end:
tempstart = tempstart + datetime.timedelta(hours=1)
tempend = tempstart + datetime.timedelta(hours=1)
startms = time.mktime(tempstart.timetuple()) * 1000
endms = time.mktime(tempend.timetuple()) * 1000
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
hours = pd.concat([hours, pd.DataFrame([row])], ignore_index=True)
return hours
def init_argparse():
parser = argparse.ArgumentParser(
usage="%(prog)s [--fromDate] [toDate] or [preSelect]",
description="gather SLO in daily slices for given Timeframe",
)
parser.add_argument("-f", "--fromDate", help="YYYY-mm-dd e.g. 2022-01-01")
parser.add_argument("-t", "--toDate", help="YYYY-mm-dd e.g. 2022-01-31")
parser.add_argument(
"-p",
"--preSelect",
help="day | week | month - gathers the data for the last full day, week or month",
)
parser.add_argument(
"-s",
"--slices",
help="h | d | t | y - writes the slices hourly, daily, total or year to date into ecxel. given in any order",
)
parser.add_argument(
"-o",
"--output",
help="x | c - creates xlsx (x) and/or CSV (c) file. The CSV file will include a reduced list per sheet",
)
return parser
def check_inputs(args):
"""
This functions is the single point of true for arguments. If new arguments are added they need to be added in here. Returns from and to date.
"""
if args.preSelect and (args.fromDate or args.toDate):
print(
"--preSelect must not be used in conjuntion with --fromDate and/or --toDate"
)
sys.exit()
elif args.fromDate and not args.toDate:
print("--fromDate only in conjunction with --toDate")
sys.exit()
elif args.toDate and not args.fromDate:
print("--toDate only in conjunction with --fromDate")
sys.exit()
elif args.toDate and args.fromDate and not args.preSelect:
try:
# fromDate = datetime.date.fromisoformat(args.fromDate)
fromDate = datetime.datetime.strptime(args.fromDate, "%Y-%m-%d")
# toDate = datetime.date.fromisoformat(args.toDate)
toDate = datetime.datetime.strptime(args.toDate, "%Y-%m-%d")
except Exception as e:
print("Progam closed: " + str(e))
sys.exit()
if toDate < fromDate:
print("--toDate can't be older than --fromDate")
sys.exit()
if toDate > datetime.datetime.today() or fromDate > datetime.datetime.today():
print("--toDate or --fromDate can't be in the future")
sys.exit()
elif args.preSelect and not args.fromDate and not args.toDate:
date = datetime.date.today()
if args.preSelect == "week":
fromDate, toDate = previous_week_range(date)
elif args.preSelect == "month":
fromDate, toDate = previous_month_range(date)
elif args.preSelect == "day":
fromDate, toDate = previous_day_range(date)
else:
print("--preSelect must be day, week or month")
sys.exit()
else:
print("Invalid arguments, please use --help")
sys.exit()
if args.slices == None:
print(
"-s or --slices must not be null and needs at least one letter of h d t or y, lower- or uppercase."
)
sys.exit()
elif (
sum(
[
1 if one_inp in str.lower(args.slices) else 0
for one_inp in ["h", "d", "t", "y"]
]
)
== 0
):
print(
"-s or --slices must has at least one letter of h d t or y, lower- or uppercase."
)
sys.exit()
if not args.output:
args.output = "x"
elif sum(
[
1 if one_inp in str.lower(args.output) else 0
for one_inp in ["x", "c"]
]
) == 0:
print(
"-o or --output requires at least one letter of x or c, lower- or uppercase."
)
sys.exit()
return fromDate, toDate
def get_one_slice(
item, DTTOKEN, DTURL, slice, out_df, selector_var, selector_type, header_name, env_type
):
# Calc daily SLO
df = pd.DataFrame()
for index, row in slice.iterrows():
num_probs = len(slice)
percentage = str(round((100 * (index + 1)) / num_probs, 2)).split(".")
print(
"{:0>4d} von {:0>4d} = {:0>3d}.{:0>2d} %".format(
index + 1, num_probs, int(percentage[0]), int(percentage[1])
),
end="\r",
)
temp_df = getSLO(
DTTOKEN,
DTURL,
row["startTime"],
row["endTime"],
selector_var,
selector_type,
header_name,
)
temp_df["Date"] = row["Date"]
temp_df["HUB"] = item
temp_df["type"] = env_type
df = pd.concat([df, temp_df], ignore_index=True)
# sort columns in a try block - if API is returning columns which are non exist, this will not fail the script
try:
df[["description", "Touchpoint"]] = df["description"].str.split(
"_", expand=True
)
except Exception as e:
print(f"This error was encounterted : {e}")
out_df = pd.concat([out_df, df], ignore_index=True)
print() # newline to remove \r from progress bar
return out_df
def get_slice_ytd_total(
DTTOKEN,
DTURL,
item,
start_date,
end_date,
time_name,
time_val,
out_df,
selector_var,
selector_type,
header_name,
env_type
):
df = getSLO(
DTTOKEN, DTURL, start_date, end_date, selector_var, selector_type, header_name
)
df[time_name] = time_val
df["HUB"] = item
df["type"] = env_type
try:
df[["description", "Touchpoint"]] = df["description"].str.split(
"_", expand=True
)
except Exception as e:
print(f"This error was encounterted : {e}")
out_df = pd.concat([out_df, df], ignore_index=True)
return out_df
def load_slo_parameter(path):
# the first part is to read a yaml and only select latest, valid config
mandatory_fields = ["hubs", "selector_type", "selector_var", "yearstart"]
all_yaml_configs = []
with open(path) as file:
slo_doc = yaml.safe_load(file)
for header_name, configs in slo_doc.items():
tmp_dict = {}
if not len(slo_doc[header_name]) == 13:
print(f"Slo Configuration {header_name} is broken")
continue
for key, value in configs.items():
tmp_dict.update({key: value})
if all(
[element in sorted(list(tmp_dict.keys())) for element in mandatory_fields]
):
# python 3.7+
# yearstart = datetime.date.fromisoformat(tmp_dict['yearstart'])
# python <3.7
yearstart = datetime.datetime.strptime(tmp_dict["yearstart"], "%Y-%m-%d")
# common code
yearstart = datetime.datetime(
yearstart.year, yearstart.month, yearstart.day
)
yearstart = time.mktime(yearstart.timetuple()) * 1000
selector_type = tmp_dict["selector_type"] # name if exact name is wanted
selector_var = tmp_dict["selector_var"]
hub = ",".join(list(map(lambda x: x +"-"+ tmp_dict["hubs"][x]["type"],tmp_dict["hubs"].keys())))
all_yaml_configs.append(
[hub, selector_type, selector_var, yearstart, header_name]
)
else:
print(f"Slo Configuration {header_name} is broken")
return all_yaml_configs
def write_slo_to_csv(fileName:str, slice: str, df: pd.DataFrame):
try:
df = df[COLUMNS_IN_CSV]
except Exception as e:
print("Could not rearrange columns: " + str(e))
csvName = "".join([fileName, "_", slice, ".csv"])
df.to_csv(csvName, encoding='utf-8', index=False)
def write_slo_to_excel(writer, sheet: str, df: pd.DataFrame):
try:
df = df[COLUMNS_IN_XLSX]
except Exception as e:
print("Could not rearrange columns: " + str(e))
df.to_excel(writer, sheet_name=sheet)
def create_report_files(args, fromDate, hourlyall, dailyall, totalall, ytd):
touchpoints = ["Vehicle", "Mobile"]
if args.preSelect == "day":
today = datetime.date.today()
yesterday = today - datetime.timedelta(days=1)
fileName = "./QM_Report_" + str(yesterday)
else:
fileName = "./QM_Report_" + str(fromDate.isocalendar()[1])
if "x" in str.lower(args.output):
writer = pd.ExcelWriter(fileName + ".xlsx")
if not totalall.empty and "t" in str.lower(args.slices):
totalall = totalall[totalall["Touchpoint"].isin(touchpoints)]
if "x" in str.lower(args.output):
write_slo_to_excel(writer, "total", totalall)
if "c" in str.lower(args.output):
write_slo_to_csv(fileName, "total", totalall)
if not dailyall.empty and "d" in str.lower(args.slices):
dailyall = dailyall[dailyall["Touchpoint"].isin(touchpoints)]
dailyall["Date"] = (dailyall["Date"].astype("datetime64[ns]").dt.strftime("%Y-%m-%d"))
if "x" in str.lower(args.output):
write_slo_to_excel(writer, "daily", dailyall)
if "c" in str.lower(args.output):
write_slo_to_csv(fileName, "daily", dailyall)
if not hourlyall.empty and "h" in str.lower(args.slices):
hourlyall = hourlyall[hourlyall["Touchpoint"].isin(touchpoints)]
hourlyall["Date"] = hourlyall["Date"].astype("datetime64[ns]")
if "x" in str.lower(args.output):
write_slo_to_excel(writer, "hourly", hourlyall)
if "c" in str.lower(args.output):
write_slo_to_csv(fileName, "hourly", hourlyall)
if not ytd.empty and "y" in str.lower(args.slices):
ytd = ytd[ytd["Touchpoint"].isin(touchpoints)]
if "x" in str.lower(args.output):
write_slo_to_excel(writer, "YTD", ytd)
if "c" in str.lower(args.output):
write_slo_to_csv(fileName, "YTD", ytd)
if "x" in str.lower(args.output):
writer.close()
def main(slo_path):
start_timer = time.time()
parser = init_argparse()
args = parser.parse_args()
fromDate, toDate = check_inputs(args)
print("slices", args.slices)
print("fromDate: " + str(fromDate))
print("toDate: " + str(toDate))
# days = get_daily_slice(fromDate,toDate)
days = get_daily_slice(fromDate, toDate)
hours = get_hourly_slice(fromDate, toDate)
with open(os.path.basename("./environment.yaml")) as file:
env_doc = yaml.safe_load(file)
hourlyall = pd.DataFrame()
dailyall = pd.DataFrame()
totalall = pd.DataFrame()
ytd = pd.DataFrame()
slo_configs = load_slo_parameter(slo_path)
for one_slo_config in slo_configs:
hub, selector_type, selector_var, yearstart, header_name = one_slo_config
print(
f"For the slo config was '{slo_path}' used with the config '{header_name}'."
)
for item, doc in env_doc.items():
if not item in hub:
print(
f"{item} will be skipped since it is not in {hub}, which was selected in {slo_path}"
)
continue
token = dict(doc[2])
url = dict(doc[1])
print("Crawling through: " + item)
print("Check if token exists in environment...")
if config(token.get("env-token-name"), default='') != "":
print("Gather data, hold on a minute")
DTTOKEN = config(token.get("env-token-name"), default='')
DTURL = url.get("env-url")
# Calc daily SLO
if "d" in str.lower(args.slices):
dailyall = get_one_slice(
doc[0]["name"],
DTTOKEN,
DTURL,
days,
dailyall,
selector_var,
selector_type,
header_name,
doc[4]["type"]
)
# Calc hourly SLO
if "h" in str.lower(args.slices):
hourlyall = get_one_slice(
doc[0]["name"],
DTTOKEN,
DTURL,
hours,
hourlyall,
selector_var,
selector_type,
header_name,
doc[4]["type"]
)
# Calc Overall YTD SLO
if "y" in str.lower(args.slices):
ytd = get_slice_ytd_total(
DTTOKEN,
DTURL,
doc[0]["name"],
yearstart,
days["endTime"].max(),
"Date",
fromDate.year,
ytd,
selector_var,
selector_type,
header_name,
doc[4]["type"]
)
# Calc Overall SLO
if "t" in str.lower(args.slices):
totalall = get_slice_ytd_total(
DTTOKEN,
DTURL,
doc[0]["name"],
days["startTime"].min(),
days["endTime"].max(),
"Date",
fromDate.isocalendar()[1],
totalall,
selector_var,
selector_type,
header_name,
doc[4]["type"]
)
else:
print("token not found, skipping " + item)
create_report_files(args, fromDate, hourlyall, dailyall, totalall, ytd)
print("\n")
print("It took {} seconds to run this script".format(time.time() - start_timer))
if __name__ == "__main__":
main("../shared_configuration/slo_parameter.yaml")