not tested yet and pull request via atlassian needs to be implemented

master
Arnel Arnautovic 2023-07-18 16:58:59 +02:00
parent 95d6cbfdcb
commit ff32507e77
3 changed files with 636 additions and 545 deletions

185
main.py
View File

@ -5,8 +5,7 @@ import logging
import os import os
import pandas as pd import pandas as pd
import shutil import shutil
import stat import time
import time;
import yaml import yaml
from decouple import config from decouple import config
@ -16,26 +15,102 @@ from pathlib import Path
t = time.strftime("%Y%m%d-%H%M%S") t = time.strftime("%Y%m%d-%H%M%S")
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
MAX_RETRIES = 3
def git_push(repo, origin, branch, message):
for _ in range(MAX_RETRIES):
try:
repo.git.add(all=True)
repo.git.commit("-m", message)
# set_origin = repo.remote(name=origin)
repo.git.push(origin, branch)
break
except Exception as e:
logging.info(e)
logging.info("retry attempt %d/%d" % (_+1, MAX_RETRIES))
def delete_dashboard(dt_client, environment, current_db):
try:
response = dt_client.dashboards.delete(str(current_db["id"]))
logging.info("Deletion of dashbord %s (%s) in %s successful" %
(str(current_db["name"]), current_db["id"],
str(environment)))
except Exception as e:
logging.info("During deletion of dashbaord the following exception has\
been encountered: %s", e)
return response
def get_credentials(e, environment):
for env, doc in environment.items():
if str(e) == str(env):
DT_URL = dict(doc[1]).get("env-url")
DT_TOKEN = config(dict(doc[2]).get("env-token-name"), default='')
return [DT_URL, DT_TOKEN]
def delete_file(branch, file):
is_deleted = False
try:
os.remove(file)
with open(Path("./log_deleted_" + str(t) + ".txt"), "a+",
encoding="utf-8") as f:
f.write("File on branch %s in %s has been deleted\n" % (branch,
file))
logging.debug("File on branch %s in %s has been deleted" % (branch,
file))
is_deleted = True
except OSError as e:
logging.info("Error: %s - %s." % (e.filename, e.strerror))
is_deleted = False
return is_deleted
# delete based only by id! if there is no id, delete not possible!
def check_dashboard(branch, file, current_db, list_environments,
dict_dashboards):
is_deleted = False
is_stop = False
for e in list_environments:
for k, v in dict_dashboards[e]["obsolete"].items():
if current_db["id"] == v["id"]:
is_stop = True
logging.debug("Obsolete dashboard on branch %s in %s",
str(branch), str(file))
# return True
is_deleted = delete_file(branch, Path(file))
break
if is_stop == True:
break
return is_deleted, e
def check_metadata(file): def check_metadata(file):
id = None
name = None
owner = None
with open(file, "r") as f: with open(file, "r") as f:
lines = [next(f) for _ in range(2)] lines = [next(f) for _ in range(2)]
if "LEGACY" in str(lines[1]): if "LEGACY" in str(lines[1]):
# l_stripped = lines[1].strip().replace("\n", "").replace("#", "").strip().split(" ")[1].strip() id = lines[1].strip().replace("\n", "").replace("#", "").strip()\
# l_replaced = l_stripped.replace("\n", "") .split(" ")[1].strip()
# l_rr = l_replaced.replace("#", "") elif "ID" in str(lines[0]):
# l_s = l_rr.strip() id = lines[0].strip().replace("\n", "").replace("#", "").strip()\
# l_splitted = l_s.split(" ") .split(" ")[1].strip()
# legacy_id = l_splitted[1].strip() elif "DEFINE" in str(lines[0]):
id = lines[1].strip().replace("\n", "").replace("#", "").strip().split(" ")[1].strip()
else:
# stripped = lines[0].strip().replace("\n", "").split("=")[1].strip()
# replaced = stripped.replace("\n", "")
# splitted = replaced.split("=")
# id = splitted[1].strip()
id = lines[0].strip().replace("\n", "").split("=")[1].strip() id = lines[0].strip().replace("\n", "").split("=")[1].strip()
else:
id = None
with open(file, "r") as f: with open(file, "r") as f:
num_lines = sum(1 for _ in f) num_lines = sum(1 for _ in f)
@ -48,9 +123,18 @@ def check_metadata(file):
metadata = lines[x:x+5] metadata = lines[x:x+5]
for md in metadata: for md in metadata:
if "name" in md: if "name" in md:
name = md.strip().replace("\n", "").split("=")[1].strip().replace('"',"").strip() name = md.strip().replace("\n", "").split("=")[1].strip()\
.replace('"',"").strip()
if "owner" in md: if "owner" in md:
owner = md.strip().replace("\n", "").split("=")[1].strip().replace('"',"").strip() owner = md.strip().replace("\n", "").split("=")[1].strip()\
.replace('"',"").strip()
elif "dashboardName" in line:
name = line.strip().replace("\n", "").split("=")[1].strip()\
.replace('"',"").strip()
owner = None
else:
name = None
owner = None
return [id, name, owner] return [id, name, owner]
@ -58,6 +142,7 @@ def check_metadata(file):
def format_block(string, max): def format_block(string, max):
string_length = len(string) string_length = len(string)
string = (f'{" "*(max-string_length)}{string}') string = (f'{" "*(max-string_length)}{string}')
return string return string
@ -81,13 +166,13 @@ def onerror(func, path, exc_info):
raise raise
def delete_directory(path): def delete_dir(path):
logging.info("cleaning up...") logging.info("cleaning up...")
try: try:
shutil.rmtree(path, onerror=onerror) shutil.rmtree(path, onerror=onerror)
logging.info("%s directory successfully deleted", str(path)) logging.info("%s successfully deleted", str(path))
except OSError as e: except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror)) logging.info("Error: %s - %s." % (e.filename, e.strerror))
def checkout_master(repo): def checkout_master(repo):
@ -182,7 +267,6 @@ def evaluate(env, data):
dict_metric_queries_copy = copy.deepcopy(dict_metric_queries) dict_metric_queries_copy = copy.deepcopy(dict_metric_queries)
list_metric_query_copy_ids = copy.deepcopy(list_metric_query_ids) list_metric_query_copy_ids = copy.deepcopy(list_metric_query_ids)
for x, (m, metric_query) in enumerate(dict_metric_queries.items()): for x, (m, metric_query) in enumerate(dict_metric_queries.items()):
if metric_query["id"] not in list_dashboard_ids: if metric_query["id"] not in list_dashboard_ids:
legacy[x] = {"id" : metric_query["id"], legacy[x] = {"id" : metric_query["id"],
@ -254,21 +338,23 @@ def initDtClient(env, DT_URL, DT_TOKEN):
logging.debug("%s init Dynatrace client...", str(env)) logging.debug("%s init Dynatrace client...", str(env))
DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"), None, None, DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"), None, None,
0, 10*1000) 0, 10*1000)
return DT_CLIENT return DT_CLIENT
if __name__ == "__main__": if __name__ == "__main__":
dictionary_dashboards = {} dict_dashboards = {}
list_environments = [] list_environments = []
# do it manually for CD_TS-CMS # do it manually for CD_TS-CMS
list_exclude_branches = ["HEAD", "master", "template", "CD_TS-CMS"] list_exclude_branches = ["HEAD", "master", "template", "CD_TS-CMS"]
list_exclude_files = ["providers.tf", "data_source.tf"] list_exclude_files = ["providers.tf", "data_source.tf"]
with open(Path("./environment.yaml")) as env_cfg: with open(Path("./environment.yaml")) as env_cfg:
environment = yaml.safe_load(env_cfg) environments = yaml.safe_load(env_cfg)
for env, doc in environment.items(): for env, doc in environments.items():
logging.debug("%s checking token...", str(env)) logging.debug("%s checking token...", str(env))
if config(dict(doc[2]).get("env-token-name"), default='') != "": if config(dict(doc[2]).get("env-token-name"), default='') != "":
@ -289,7 +375,7 @@ if __name__ == "__main__":
result = evaluate(env, data) result = evaluate(env, data)
# writeToExcel(env, t, result) # writeToExcel(env, t, result)
dictionary_dashboards[env] = result dict_dashboards[env] = result
list_environments.append(env) list_environments.append(env)
repo = fetch_repository(config("REPOSITORY_URL"), config("REPOSITORY_PATH")) repo = fetch_repository(config("REPOSITORY_URL"), config("REPOSITORY_PATH"))
@ -302,32 +388,37 @@ if __name__ == "__main__":
repo_ = repo repo_ = repo
wd = Path(repo_.git.working_dir) wd = Path(repo_.git.working_dir)
try: # try:
# with open(Path("./dashboards.txt"), "a+", encoding="utf-8") as f: # with open(Path("./dashboards.txt"), "a+", encoding="utf-8") as f:
for i, branch in enumerate(list_branches): for i, branch in enumerate(list_branches):
repo_.git.checkout(branch) is_commit = False
logging.info("%d - branch: %s", i, str(branch)) repo_.git.checkout(branch)
for file in glob.glob(str(wd) + '/**/dashboard/*.tf', recursive=True): logging.info("%d - branch: %s", i, str(branch))
if os.path.basename(file) not in list_exclude_files: files = glob.glob(str(wd) + '/**/dashboard/*.tf', recursive=True)
# f.write("%s | %s\n" % (format_block(branch, 50), file)) for file in files:
id, name, owner = check_metadata(file) is_deleted = False
current_db = {"id": id, "name" : name ,"owner" : owner} if os.path.basename(file) not in list_exclude_files:
for e in list_environments: # f.write("%s | %s\n" % (format_block(branch, 50), file))
for k, v in dictionary_dashboards[e]["obsolete"].items(): iid, nname, oowner = check_metadata(file)
if current_db == v: current_db = {"id": iid, "name" : nname ,"owner" : oowner}
print(current_db) is_deleted, environment = check_dashboard(branch, file,
print(v) current_db,
print("DELETING", "BRANCH:", str(branch), "FILE:", file) list_environments,
print("") dict_dashboards)
else: if is_deleted == True:
print(current_db) is_commit = True
print(v) dt_url, dt_token = get_credentials(environment,
print("") environments)
dt_client = initDtClient(dt_url, dt_token)
# I have not tested the deletion yet !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# delete_dashboard(dt_client, environment, current_db)
if is_commit == True:
git_push(repo_, "origin", branch, "Dashboard cleanup")
except Exception as e: # except Exception as e:
print("Exception:", e) # print("FINAL Exception:", e)
# delete_directory(Path(config("REPOSITORY_PATH"))) # delete_dir(Path(config("REPOSITORY_PATH")))
logging.info("finished") logging.info("finished")