620 lines
20 KiB
Python
620 lines
20 KiB
Python
import logging
|
|
from tracemalloc import start
|
|
from typing import Dict
|
|
from decouple import config
|
|
import sys
|
|
import yaml
|
|
import datetime
|
|
import time
|
|
import pandas as pd
|
|
import argparse
|
|
import warnings
|
|
import os
|
|
import dynatraceAPI
|
|
from pagination import Pagionation
|
|
|
|
warnings.filterwarnings("ignore")
|
|
# warning, there are warnings which are ignored!
|
|
|
|
try:
|
|
os.environ["TZ"] = "Europe/Berlin" # set new timezone
|
|
time.tzset()
|
|
except Exception as e:
|
|
print(f"This error was encounterted : {e}")
|
|
|
|
"""
|
|
def make_request(url, headers,verify,parameters):
|
|
try:
|
|
response = requests.get(url, headers=headers,verify=verify,params=parameters)
|
|
response.raise_for_status()
|
|
except requests.exceptions.HTTPError as errh:
|
|
return "An Http Error occurred:" + repr(errh)
|
|
except requests.exceptions.ConnectionError as errc:
|
|
return "An Error Connecting to the API occurred:" + repr(errc)
|
|
except requests.exceptions.Timeout as errt:
|
|
return "A Timeout Error occurred:" + repr(errt)
|
|
except requests.exceptions.RequestException as err:
|
|
return "An Unknown Error occurred" + repr(err)
|
|
|
|
return response
|
|
"""
|
|
|
|
|
|
def previous_day_range(date):
|
|
start_date = date - datetime.timedelta(days=1)
|
|
end_date = date - datetime.timedelta(days=1)
|
|
return start_date, end_date
|
|
|
|
|
|
def previous_week_range(date):
|
|
start_date = date + datetime.timedelta(-date.weekday(), weeks=-1)
|
|
end_date = date + datetime.timedelta(-date.weekday() - 1)
|
|
return start_date, end_date
|
|
|
|
|
|
def previous_month_range(date):
|
|
end_date = date.replace(day=1) - datetime.timedelta(days=1)
|
|
start_date = end_date.replace(day=1)
|
|
return start_date, end_date
|
|
|
|
|
|
"""
|
|
def getSLO(DTAPIToken, DTENV, fromDate, toDate):
|
|
|
|
env = DTENV
|
|
DTAPIToken = DTAPIToken
|
|
|
|
if (DTENV.find('dynatracemgd') != -1):
|
|
verify=False
|
|
if (DTENV.find('dyna-synt') != -1):
|
|
verify=False
|
|
else:
|
|
verify=True
|
|
|
|
DTAPIURL= env + "/api/v2/slo"
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': 'Api-Token ' + DTAPIToken
|
|
}
|
|
|
|
parameters = {
|
|
"pageSize": 25,
|
|
"from": int(fromDate),
|
|
"to": int(toDate),
|
|
"timeFrame": "GTF",
|
|
"evaluate": "true",
|
|
"sloSelector": "text(\"CoCo-QM-Report\")"
|
|
}
|
|
print(DTAPIURL)
|
|
print("parameter",parameters)
|
|
r = make_request(DTAPIURL,headers=headers,parameters=parameters,verify=verify)
|
|
try:
|
|
cont = r.json()
|
|
#python3
|
|
#df = pd.json_normalize(cont['slo'])
|
|
|
|
#python2
|
|
df = pd.DataFrame.from_dict(cont['slo'])
|
|
except Exception as e:
|
|
print(e)
|
|
print("No SLO json returned")
|
|
print(parameters)
|
|
print(r)
|
|
df = pd.DataFrame()
|
|
|
|
|
|
return df
|
|
"""
|
|
|
|
|
|
def getSLO(
|
|
DTAPIToken, DTENV, fromDate, toDate, selector_var, selector_type, header_name
|
|
):
|
|
# DTENV = base url
|
|
# DTAPIToken = sec token
|
|
dtclient = dynatraceAPI.Dynatrace(
|
|
DTENV, DTAPIToken, logging.Logger("ERROR"), None, None, 0, 2 * 1000
|
|
)
|
|
my_params_report = {
|
|
"pageSize": 25,
|
|
"from": int(fromDate),
|
|
"to": int(toDate),
|
|
"timeFrame": "GTF",
|
|
"evaluate": "true",
|
|
# name = exact name, text = like
|
|
"sloSelector": f"""{selector_type}("{header_name}")"""
|
|
# 'sloSelector': f"""name("{header_name}")"""
|
|
}
|
|
# gets all slos and filter later
|
|
api_url_report = "/api/v2/slo"
|
|
pages = dtclient.returnPageination(api_url_report, my_params_report, "slo")
|
|
# only_wanted = [x for x in pages.elements if str.lower(selector) in str.lower(x['description'])]
|
|
df = pd.DataFrame(pages.elements)
|
|
return df
|
|
|
|
|
|
def get_daily_slice(start_date, end_date):
|
|
tempstart = start_date
|
|
days = pd.DataFrame()
|
|
|
|
# Add the first day
|
|
tempend = tempstart + datetime.timedelta(hours=24)
|
|
startms = time.mktime(tempstart.timetuple()) * 1000
|
|
endms = time.mktime(tempend.timetuple()) * 1000
|
|
|
|
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
|
|
days = pd.concat([days, pd.DataFrame([row])], ignore_index=True)
|
|
|
|
while tempstart < end_date:
|
|
tempstart = tempstart + datetime.timedelta(hours=24)
|
|
tempend = tempstart + datetime.timedelta(hours=24)
|
|
startms = time.mktime(tempstart.timetuple()) * 1000
|
|
endms = time.mktime(tempend.timetuple()) * 1000
|
|
|
|
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
|
|
days = pd.concat([days, pd.DataFrame([row])], ignore_index=True)
|
|
|
|
return days
|
|
|
|
|
|
def get_hourly_slice(start_date, end_date):
|
|
# date object to datetime
|
|
tempstart = datetime.datetime(start_date.year, start_date.month, start_date.day)
|
|
|
|
# date object to datetime
|
|
final_end = datetime.datetime.combine(end_date, datetime.datetime.max.time())
|
|
hours = pd.DataFrame()
|
|
|
|
# Add the first slice
|
|
tempend = tempstart + datetime.timedelta(hours=1)
|
|
startms = time.mktime(tempstart.timetuple()) * 1000
|
|
endms = time.mktime(tempend.timetuple()) * 1000
|
|
|
|
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
|
|
hours = pd.concat([hours, pd.DataFrame([row])], ignore_index=True)
|
|
|
|
while tempstart < final_end:
|
|
tempstart = tempstart + datetime.timedelta(hours=1)
|
|
tempend = tempstart + datetime.timedelta(hours=1)
|
|
startms = time.mktime(tempstart.timetuple()) * 1000
|
|
endms = time.mktime(tempend.timetuple()) * 1000
|
|
|
|
row = {"Date": tempstart, "startTime": startms, "endTime": endms}
|
|
hours = pd.concat([hours, pd.DataFrame([row])], ignore_index=True)
|
|
|
|
return hours
|
|
|
|
|
|
def init_argparse():
|
|
parser = argparse.ArgumentParser(
|
|
usage="%(prog)s [--fromDate] [toDate] or [preSelect]",
|
|
description="gather SLO in daily slices for given Timeframe",
|
|
)
|
|
parser.add_argument("-f", "--fromDate", help="YYYY-mm-dd e.g. 2022-01-01")
|
|
parser.add_argument("-t", "--toDate", help="YYYY-mm-dd e.g. 2022-01-31")
|
|
parser.add_argument(
|
|
"-p",
|
|
"--preSelect",
|
|
help="day | week | month - gathers the data for the last full day, week or month",
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--slices",
|
|
help="h | d | t | y - writes the slices hourly, daily, total or year to date into ecxel. given in any order",
|
|
)
|
|
|
|
return parser
|
|
|
|
|
|
def check_inputs(args):
|
|
"""
|
|
This functions is the single point of true for arguments. If new arguments are added they need to be added in here. Returns from and to date.
|
|
"""
|
|
if args.preSelect and (args.fromDate or args.toDate):
|
|
print(
|
|
"--preSelect must not be used in conjuntion with --fromDate and/or --toDate"
|
|
)
|
|
sys.exit()
|
|
elif args.fromDate and not args.toDate:
|
|
print("--fromDate only in conjunction with --toDate")
|
|
sys.exit()
|
|
elif args.toDate and not args.fromDate:
|
|
print("--toDate only in conjunction with --fromDate")
|
|
sys.exit()
|
|
elif args.toDate and args.fromDate and not args.preSelect:
|
|
try:
|
|
# fromDate = datetime.date.fromisoformat(args.fromDate)
|
|
fromDate = datetime.datetime.strptime(args.fromDate, "%Y-%m-%d")
|
|
|
|
# toDate = datetime.date.fromisoformat(args.toDate)
|
|
toDate = datetime.datetime.strptime(args.toDate, "%Y-%m-%d")
|
|
except Exception as e:
|
|
print("Progam closed: " + str(e))
|
|
sys.exit()
|
|
|
|
if toDate < fromDate:
|
|
print("--toDate can't be older than --fromDate")
|
|
sys.exit()
|
|
|
|
if toDate > datetime.datetime.today() or fromDate > datetime.datetime.today():
|
|
print("--toDate or --fromDate can't be in the future")
|
|
sys.exit()
|
|
elif args.preSelect and not args.fromDate and not args.toDate:
|
|
date = datetime.date.today()
|
|
|
|
if args.preSelect == "week":
|
|
fromDate, toDate = previous_week_range(date)
|
|
elif args.preSelect == "month":
|
|
fromDate, toDate = previous_month_range(date)
|
|
elif args.preSelect == "day":
|
|
fromDate, toDate = previous_day_range(date)
|
|
else:
|
|
print("--preSelect must be day, week or month")
|
|
sys.exit()
|
|
else:
|
|
print("Invalid arguments, please use --help")
|
|
sys.exit()
|
|
if args.slices == None:
|
|
print(
|
|
"-s or --slices must not be null and needs at least one letter of h d t or y, lower- or uppercase."
|
|
)
|
|
sys.exit()
|
|
elif (
|
|
sum(
|
|
[
|
|
1 if one_inp in str.lower(args.slices) else 0
|
|
for one_inp in ["h", "d", "t", "y"]
|
|
]
|
|
)
|
|
== 0
|
|
):
|
|
print(
|
|
"-s or --slices must has at least one letter of h d t or y, lower- or uppercase."
|
|
)
|
|
sys.exit()
|
|
return fromDate, toDate
|
|
|
|
|
|
def get_one_slice(
|
|
item, DTTOKEN, DTURL, slice, out_df, selector_var, selector_type, header_name
|
|
):
|
|
# Calc daily SLO
|
|
df = pd.DataFrame()
|
|
for index, row in slice.iterrows():
|
|
num_probs = len(slice)
|
|
percentage = str(round((100 * (index + 1)) / num_probs, 2)).split(".")
|
|
print(
|
|
"{:0>4d} von {:0>4d} = {:0>3d}.{:0>2d} %".format(
|
|
index + 1, num_probs, int(percentage[0]), int(percentage[1])
|
|
),
|
|
end="\r",
|
|
)
|
|
temp_df = getSLO(
|
|
DTTOKEN,
|
|
DTURL,
|
|
row["startTime"],
|
|
row["endTime"],
|
|
selector_var,
|
|
selector_type,
|
|
header_name,
|
|
)
|
|
temp_df["Date"] = row["Date"]
|
|
temp_df["HUB"] = item
|
|
df = pd.concat([df, temp_df], ignore_index=True)
|
|
|
|
# sort columns in a try block - if API is returning columns which are non exist, this will not fail the script
|
|
try:
|
|
df[["description", "Touchpoint"]] = df["description"].str.split(
|
|
"_", expand=True
|
|
)
|
|
except Exception as e:
|
|
print(f"This error was encounterted : {e}")
|
|
try:
|
|
df = df[
|
|
[
|
|
"Date",
|
|
"HUB",
|
|
"id",
|
|
"enabled",
|
|
"name",
|
|
"description",
|
|
"Touchpoint",
|
|
"evaluatedPercentage",
|
|
"errorBudget",
|
|
"status",
|
|
"error",
|
|
"target",
|
|
"warning",
|
|
"evaluationType",
|
|
"timeframe",
|
|
"metricExpression",
|
|
"filter",
|
|
]
|
|
]
|
|
except Exception as e:
|
|
print("Could not rearrange columns: " + str(e))
|
|
out_df = pd.concat([out_df, df], ignore_index=True)
|
|
print() # newline to remove \r from progress bar
|
|
return out_df
|
|
|
|
|
|
def get_slice_ytd_total(
|
|
DTTOKEN,
|
|
DTURL,
|
|
item,
|
|
start_date,
|
|
end_date,
|
|
time_name,
|
|
time_val,
|
|
out_df,
|
|
selector_var,
|
|
selector_type,
|
|
header_name,
|
|
):
|
|
df = getSLO(
|
|
DTTOKEN, DTURL, start_date, end_date, selector_var, selector_type, header_name
|
|
)
|
|
df[time_name] = time_val
|
|
df["HUB"] = item
|
|
try:
|
|
df[["description", "Touchpoint"]] = df["description"].str.split(
|
|
"_", expand=True
|
|
)
|
|
except Exception as e:
|
|
print(f"This error was encounterted : {e}")
|
|
try:
|
|
df = df[
|
|
[
|
|
"Date",
|
|
"HUB",
|
|
"id",
|
|
"enabled",
|
|
"name",
|
|
"description",
|
|
"Touchpoint",
|
|
"evaluatedPercentage",
|
|
"errorBudget",
|
|
"status",
|
|
"error",
|
|
"target",
|
|
"warning",
|
|
"evaluationType",
|
|
"timeframe",
|
|
"metricExpression",
|
|
"filter",
|
|
]
|
|
]
|
|
except Exception as e:
|
|
print("Could not rearrange columns: " + str(e))
|
|
|
|
out_df = pd.concat([out_df, df], ignore_index=True)
|
|
return out_df
|
|
|
|
def load_slo_parameter(path):
|
|
# the first part is to read a yaml and only select latest, valid config
|
|
mandatory_fields = ["hubs", "selector_type", "selector_var", "yearstart"]
|
|
all_yaml_configs = []
|
|
with open(path) as file:
|
|
slo_doc = yaml.safe_load(file)
|
|
for header_name, configs in slo_doc.items():
|
|
tmp_dict = {}
|
|
if not len(slo_doc[header_name]) == 13:
|
|
print(f"Slo Configuration {header_name} is broken")
|
|
continue
|
|
for key, value in configs.items():
|
|
tmp_dict.update({key: value})
|
|
if all(
|
|
[element in sorted(list(tmp_dict.keys())) for element in mandatory_fields]
|
|
):
|
|
# python 3.7+
|
|
# yearstart = datetime.date.fromisoformat(tmp_dict['yearstart'])
|
|
# python <3.7
|
|
|
|
yearstart = datetime.datetime.strptime(tmp_dict["yearstart"], "%Y-%m-%d")
|
|
|
|
# common code
|
|
yearstart = datetime.datetime(
|
|
yearstart.year, yearstart.month, yearstart.day
|
|
)
|
|
yearstart = time.mktime(yearstart.timetuple()) * 1000
|
|
|
|
selector_type = tmp_dict["selector_type"] # name if exact name is wanted
|
|
selector_var = tmp_dict["selector_var"]
|
|
|
|
hub = ",".join(list(map(lambda x: x +"-"+ tmp_dict["hubs"][x]["type"],tmp_dict["hubs"].keys())))
|
|
|
|
all_yaml_configs.append(
|
|
[hub, selector_type, selector_var, yearstart, header_name]
|
|
)
|
|
else:
|
|
print(f"Slo Configuration {header_name} is broken")
|
|
return all_yaml_configs
|
|
|
|
|
|
# def write_slo_to_excel(args, fromDate, hourlyall, dailyall, totalall, ytd):
|
|
# touchpoints = ["Vehicle", "Mobile"]
|
|
# if args.preSelect == "day":
|
|
# today = datetime.date.today()
|
|
# yesterday = today - datetime.timedelta(days=1)
|
|
# fileName = "./QM_Report_" + str(yesterday) + ".xlsx"
|
|
# else:
|
|
# fileName = "./QM_Report_" + str(fromDate.isocalendar()[1]) + ".xlsx"
|
|
|
|
# writer = pd.ExcelWriter(fileName)
|
|
# workbook = writer.book
|
|
|
|
# if not totalall.empty and "t" in str.lower(args.slices):
|
|
# totalall = totalall[totalall["Touchpoint"].isin(touchpoints)]
|
|
# totalall.to_excel(writer, sheet_name="total", index=False)
|
|
# worksheet = writer.sheets["total"]
|
|
# worksheet.autofilter(0, 0, totalall.shape[0], totalall.shape[1])
|
|
|
|
# if not dailyall.empty and "d" in str.lower(args.slices):
|
|
# dailyall = dailyall[dailyall["Touchpoint"].isin(touchpoints)]
|
|
# dailyall["Date"] = (
|
|
# dailyall["Date"].astype("datetime64[ns]").dt.strftime("%Y-%m-%d")
|
|
# )
|
|
# dailyall.to_excel(writer, sheet_name="daily", index=False)
|
|
# worksheet = writer.sheets["daily"]
|
|
# worksheet.autofilter(0, 0, dailyall.shape[0], dailyall.shape[1])
|
|
|
|
# if not hourlyall.empty and "h" in str.lower(args.slices):
|
|
# hourlyall = hourlyall[hourlyall["Touchpoint"].isin(touchpoints)]
|
|
# hourlyall["Date"] = hourlyall["Date"].astype("datetime64[ns]")
|
|
# hourlyall.to_excel(writer, sheet_name="hourly", index=False)
|
|
# worksheet = writer.sheets["hourly"]
|
|
# worksheet.autofilter(0, 0, hourlyall.shape[0], hourlyall[1])
|
|
|
|
# if not ytd.empty and "y" in str.lower(args.slices):
|
|
# ytd = ytd[ytd["Touchpoint"].isin(touchpoints)]
|
|
# ytd.to_excel(writer, sheet_name="YTD", index=False)
|
|
# worksheet = writer.sheets["YTD"]
|
|
# worksheet.autofilter(0, 0, ytd.shape[0], ytd.shape[1])
|
|
|
|
# workbook.close()
|
|
# writer.save()
|
|
# # writer.close()
|
|
|
|
|
|
# function Arnel
|
|
def write_slo_to_excel(args, fromDate, hourlyall, dailyall, totalall, ytd):
|
|
touchpoints = ["Vehicle", "Mobile"]
|
|
if args.preSelect == "day":
|
|
today = datetime.date.today()
|
|
yesterday = today - datetime.timedelta(days=1)
|
|
fileName = "./QM_Report_" + str(yesterday) + ".xlsx"
|
|
else:
|
|
fileName = "./QM_Report_" + str(fromDate.isocalendar()[1]) + ".xlsx"
|
|
|
|
writer = pd.ExcelWriter(fileName)
|
|
|
|
if not totalall.empty and "t" in str.lower(args.slices):
|
|
totalall = totalall[totalall["Touchpoint"].isin(touchpoints)]
|
|
totalall.to_excel(writer, sheet_name="total")
|
|
|
|
if not dailyall.empty and "d" in str.lower(args.slices):
|
|
dailyall = dailyall[dailyall["Touchpoint"].isin(touchpoints)]
|
|
dailyall["Date"] = (
|
|
dailyall["Date"].astype("datetime64[ns]").dt.strftime("%Y-%m-%d")
|
|
)
|
|
dailyall.to_excel(writer, sheet_name="daily")
|
|
|
|
if not hourlyall.empty and "h" in str.lower(args.slices):
|
|
hourlyall = hourlyall[hourlyall["Touchpoint"].isin(touchpoints)]
|
|
hourlyall["Date"] = hourlyall["Date"].astype("datetime64[ns]")
|
|
hourlyall.to_excel(writer, sheet_name="hourly")
|
|
|
|
if not ytd.empty and "y" in str.lower(args.slices):
|
|
ytd = ytd[ytd["Touchpoint"].isin(touchpoints)]
|
|
ytd.to_excel(writer, sheet_name="YTD")
|
|
|
|
# writer.save()
|
|
writer.close()
|
|
|
|
|
|
def main(slo_path):
|
|
start_timer = time.time()
|
|
parser = init_argparse()
|
|
args = parser.parse_args()
|
|
fromDate, toDate = check_inputs(args)
|
|
print("slices", args.slices)
|
|
print("fromDate: " + str(fromDate))
|
|
print("toDate: " + str(toDate))
|
|
|
|
# days = get_daily_slice(fromDate,toDate)
|
|
days = get_daily_slice(fromDate, toDate)
|
|
hours = get_hourly_slice(fromDate, toDate)
|
|
with open(os.path.basename("./environment.yaml")) as file:
|
|
env_doc = yaml.safe_load(file)
|
|
|
|
hourlyall = pd.DataFrame()
|
|
dailyall = pd.DataFrame()
|
|
totalall = pd.DataFrame()
|
|
ytd = pd.DataFrame()
|
|
|
|
slo_configs = load_slo_parameter(slo_path)
|
|
|
|
for one_slo_config in slo_configs:
|
|
hub, selector_type, selector_var, yearstart, header_name = one_slo_config
|
|
print(
|
|
f"For the slo config was '{slo_path}' used with the config '{header_name}'."
|
|
)
|
|
for item, doc in env_doc.items():
|
|
if not item in hub:
|
|
print(
|
|
f"{item} will be skipped since it is not in {hub}, which was selected in {slo_path}"
|
|
)
|
|
continue
|
|
token = dict(doc[2])
|
|
url = dict(doc[1])
|
|
print("Crawling through: " + item)
|
|
print("Check if token exists in environment...")
|
|
if config(token.get("env-token-name")) != "":
|
|
print("Gather data, hold on a minute")
|
|
DTTOKEN = config(token.get("env-token-name"))
|
|
DTURL = url.get("env-url")
|
|
|
|
# Calc daily SLO
|
|
if "d" in str.lower(args.slices):
|
|
dailyall = get_one_slice(
|
|
doc[0]["name"],
|
|
DTTOKEN,
|
|
DTURL,
|
|
days,
|
|
dailyall,
|
|
selector_var,
|
|
selector_type,
|
|
header_name,
|
|
)
|
|
# Calc hourly SLO
|
|
if "h" in str.lower(args.slices):
|
|
hourlyall = get_one_slice(
|
|
doc[0]["name"],
|
|
DTTOKEN,
|
|
DTURL,
|
|
hours,
|
|
hourlyall,
|
|
selector_var,
|
|
selector_type,
|
|
header_name,
|
|
)
|
|
# Calc Overall YTD SLO
|
|
if "y" in str.lower(args.slices):
|
|
ytd = get_slice_ytd_total(
|
|
DTTOKEN,
|
|
DTURL,
|
|
doc[0]["name"],
|
|
yearstart,
|
|
days["endTime"].max(),
|
|
"Date",
|
|
fromDate.year,
|
|
ytd,
|
|
selector_var,
|
|
selector_type,
|
|
header_name,
|
|
)
|
|
# Calc Overall SLO
|
|
if "t" in str.lower(args.slices):
|
|
totalall = get_slice_ytd_total(
|
|
DTTOKEN,
|
|
DTURL,
|
|
doc[0]["name"],
|
|
days["startTime"].min(),
|
|
days["endTime"].max(),
|
|
"Date",
|
|
fromDate.isocalendar()[1],
|
|
totalall,
|
|
selector_var,
|
|
selector_type,
|
|
header_name,
|
|
)
|
|
else:
|
|
print("token not found, skipping " + item)
|
|
write_slo_to_excel(args, fromDate, hourlyall, dailyall, totalall, ytd)
|
|
print("\n")
|
|
print("It took {} seconds to run this script".format(time.time() - start_timer))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main("./slo_parameter.yaml")
|