coco_apm_dashboard_cleaner/main.py

426 lines
16 KiB
Python

import copy
import git
import glob
import logging
import os
import pandas as pd
import shutil
import time
import yaml
from decouple import config
from dynatrace import Dynatrace
from pathlib import Path
t = time.strftime("%Y%m%d-%H%M%S")
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
MAX_RETRIES = 3
def git_push(repo, origin, branch, message):
for _ in range(MAX_RETRIES):
try:
repo.git.add(all=True)
repo.git.commit("-m", message)
# set_origin = repo.remote(name=origin)
repo.git.push(origin, branch)
break
except Exception as e:
logging.info(e)
logging.info("retry attempt %d/%d" % (_+1, MAX_RETRIES))
def delete_dashboard(dt_client, environment, current_db):
try:
response = dt_client.dashboards.delete(str(current_db["id"]))
logging.info("Deletion of dashbord %s (%s) in %s successful" %
(str(current_db["name"]), current_db["id"],
str(environment)))
except Exception as e:
logging.info("During deletion of dashbaord the following exception has\
been encountered: %s", e)
return response
def get_credentials(e, environment):
for env, doc in environment.items():
if str(e) == str(env):
DT_URL = dict(doc[1]).get("env-url")
DT_TOKEN = config(dict(doc[2]).get("env-token-name"), default='')
return [DT_URL, DT_TOKEN]
def delete_file(branch, file):
is_deleted = False
try:
os.remove(file)
with open(Path("./log_deleted_" + str(t) + ".txt"), "a+",
encoding="utf-8") as f:
f.write("File on branch %s in %s has been deleted\n" % (branch,
file))
logging.debug("File on branch %s in %s has been deleted" % (branch,
file))
is_deleted = True
except OSError as e:
logging.info("Error: %s - %s." % (e.filename, e.strerror))
is_deleted = False
return is_deleted
# delete based only by id! if there is no id, delete not possible!
def check_dashboard(branch, file, current_db, list_environments,
dict_dashboards):
is_deleted = False
is_stop = False
for e in list_environments:
for k, v in dict_dashboards[e]["obsolete"].items():
if current_db["id"] == v["id"]:
is_stop = True
logging.debug("Obsolete dashboard on branch %s in %s",
str(branch), str(file))
# return True
is_deleted = delete_file(branch, Path(file))
break
if is_stop == True:
break
return is_deleted, e
def check_metadata(file):
id = None
name = None
owner = None
with open(file, "r") as f:
lines = [next(f) for _ in range(2)]
if "LEGACY" in str(lines[1]):
id = lines[1].strip().replace("\n", "").replace("#", "").strip()\
.split(" ")[1].strip()
elif "ID" in str(lines[0]):
id = lines[0].strip().replace("\n", "").replace("#", "").strip()\
.split(" ")[1].strip()
elif "DEFINE" in str(lines[0]):
id = lines[0].strip().replace("\n", "").split("=")[1].strip()
else:
id = None
with open(file, "r") as f:
num_lines = sum(1 for _ in f)
with open(file, "r") as f:
lines = [next(f) for _ in range(int(num_lines))]
for x, line in enumerate(lines):
if "dashboard_metadata {" in line:
metadata = lines[x:x+5]
for md in metadata:
if "name" in md:
name = md.strip().replace("\n", "").split("=")[1].strip()\
.replace('"',"").strip()
if "owner" in md:
owner = md.strip().replace("\n", "").split("=")[1].strip()\
.replace('"',"").strip()
elif "dashboardName" in line:
name = line.strip().replace("\n", "").split("=")[1].strip()\
.replace('"',"").strip()
owner = None
else:
name = None
owner = None
return [id, name, owner]
def format_block(string, max):
string_length = len(string)
string = (f'{" "*(max-string_length)}{string}')
return string
def onerror(func, path, exc_info):
"""
Error handler for ``shutil.rmtree``.
If the error is due to an access error (read only file)
it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : ``shutil.rmtree(path, onerror=onerror)``
"""
import stat
# Is the error an access error?
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWUSR)
func(path)
else:
raise
def delete_dir(path):
logging.info("cleaning up...")
try:
shutil.rmtree(path, onerror=onerror)
logging.info("%s successfully deleted", str(path))
except OSError as e:
logging.info("Error: %s - %s." % (e.filename, e.strerror))
def checkout_master(repo):
logging.info("master branch name is: %s", str(repo.heads.master.name))
logging.info("checking active branch ...")
if repo.active_branch.name != repo.heads.master.name:
logging.info("active branch name is: %s", str(repo.active_branch.name))
if repo.active_branch.is_detached:
logging.info("active branch (%s) is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
logging.info("checking out master...")
repo.git.checkout("master")
logging.info("checkout to master successful")
logging.info("active branch is %s and is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
else:
# repo.heads.master.checkout()
logging.info("active branch (%s) is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
logging.info("checking out master...")
repo.git.checkout("master")
logging.info("checkout to master successful")
logging.info("active branch is %s and is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
else:
logging.info("active branch is already master (%s) and is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
return repo
def fetch_branches(repo):
logging.info("fetching branches...")
# branches = [repo.git.branch("-r").replace("origin/", "").split("\n ")]
# branches = repo.remotes.origin.fetch()
branch_list = [r.remote_head for r in repo.remote().refs]
return branch_list
def fetch_repository(REPOSITORY_URL, REPOSITORY_PATH):
# logging.info("fetching repository %s", str(REPOSITORY_URL))
# repo = git.Repo.clone_from(REPOSITORY_URL,
# Path("../coco_apm_terraform_onboarding"))
logging.info("repository path %s", str(REPOSITORY_PATH))
repo = git.Repo(Path(REPOSITORY_PATH))
return repo
def writeToExcel(env, t, result):
list_available = []
list_legacy = []
list_obsolete = []
for type in ["available", "legacy", "obsolete"]:
for i, (ki, vi) in enumerate(result[type].items()):
if type == "available":
list_available.append([vi["id"], vi["name"], vi["owner"]])
if type == "legacy":
list_legacy.append([vi["id"], vi["name"], vi["owner"]])
if type == "obsolete":
list_obsolete.append([vi["id"], vi["name"], vi["owner"]])
df_available = pd.DataFrame(list_available, columns=['id', 'name', 'owner'])
df_legacy = pd.DataFrame(list_legacy, columns=['id', 'name', 'owner'])
df_obsolete = pd.DataFrame(list_obsolete, columns=['id', 'name', 'owner'])
filename = os.path.join(".\log",
str(t) + "_" + str(env) + '_dashboards.xlsx')
os.makedirs(os.path.dirname(filename), exist_ok=True)
with pd.ExcelWriter(filename) as writer:
df_available.to_excel(writer, sheet_name='available')
df_legacy.to_excel(writer, sheet_name='legacy')
df_obsolete.to_excel(writer, sheet_name='obsolete')
def evaluate(env, data):
legacy = {}
available = {}
obsolete = {}
dict_dashboards = data[0]
list_dashboard_ids = data[1]
dict_metric_queries = data[2]
list_metric_query_ids = data[3]
dict_metric_queries_copy = copy.deepcopy(dict_metric_queries)
list_metric_query_copy_ids = copy.deepcopy(list_metric_query_ids)
for x, (m, metric_query) in enumerate(dict_metric_queries.items()):
if metric_query["id"] not in list_dashboard_ids:
legacy[x] = {"id" : metric_query["id"],
"name" : metric_query["name"],
"owner" : metric_query["owner"]}
del dict_metric_queries_copy[m]
list_metric_query_copy_ids.remove(metric_query["id"])
logging.debug("%s %s have been deleted in the past", str(env), len(legacy))
logging.debug("%s %s dashboards with viewCount and active", str(env),
len(dict_metric_queries_copy))
for i, (d, dashboard) in enumerate(dict_dashboards.items()):
if dashboard["id"] in list_metric_query_copy_ids:
available[i] = dashboard
if dashboard["id"] not in list_metric_query_copy_ids:
obsolete[i] = dashboard
logging.info("%s %s dashboards with viewCount!", str(env), len(available))
logging.info("%s %s dashboards with 0 viewCount!", str(env), len(obsolete))
return {"available" : available, "legacy" : legacy, "obsolete" : obsolete}
def adaptDataStructure(dashboards, metric_queries):
dict_dashboards= {}
list_dashboard_ids = []
dict_metric_queries = {}
list_metric_query_ids = []
for s, stub in enumerate(getattr(dashboards, "_PaginatedList__elements")):
dict_dashboards[s] = {"id" : getattr(stub, "id"),
"name" : getattr(stub, "name"),
"owner" : getattr(stub, "owner")}
list_dashboard_ids.append(getattr(stub, "id"))
for collection in getattr(metric_queries, "_PaginatedList__elements"):
for m, q in enumerate(getattr(collection, "data")):
dict_metric_queries[m] = {"id" : getattr(q, "dimension_map")["id"],
"name" : None,
"owner" : None}
list_metric_query_ids.append(getattr(q, "dimension_map")["id"])
return [dict_dashboards, list_dashboard_ids, dict_metric_queries,
list_metric_query_ids]
def getDashboardsWithViewCount(env, client, METRIC_SELECTOR, RESOLUTION,
FROM_DATE, TO_DATE):
logging.debug("%s get dashboards with viewCount, resolution %s ...",
str(env), RESOLUTION)
metric_query = client.metrics.query(METRIC_SELECTOR, RESOLUTION, FROM_DATE,
TO_DATE)
n_metric_query = getattr(metric_query, "_PaginatedList__total_count")
logging.debug("%s %s dashboards with viewCount and older than 6 Months",
str(env), str(n_metric_query))
return metric_query
def getDashboards(env, client):
logging.debug("%s get all dashboards...", str(env))
dashboards = client.dashboards.list(owner=None, tags=None)
n_dashboards = getattr(dashboards, "_PaginatedList__total_count")
logging.info("%s %s total dashboards", str(env), str(n_dashboards))
return dashboards
def initDtClient(env, DT_URL, DT_TOKEN):
logging.debug("%s init Dynatrace client...", str(env))
DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"), None, None,
0, 10*1000)
return DT_CLIENT
if __name__ == "__main__":
dict_dashboards = {}
list_environments = []
# do it manually for CD_TS-CMS
list_exclude_branches = ["HEAD", "master", "template", "CD_TS-CMS"]
list_exclude_files = ["providers.tf", "data_source.tf"]
with open(Path("./environment.yaml")) as env_cfg:
environments = yaml.safe_load(env_cfg)
for env, doc in environments.items():
logging.debug("%s checking token...", str(env))
if config(dict(doc[2]).get("env-token-name"), default='') != "":
DT_URL = dict(doc[1]).get("env-url")
DT_TOKEN = config(dict(doc[2]).get("env-token-name"), default='')
METRIC_SELECTOR = dict(doc[5]).get("metricSelector")
RESOLUTION = dict(doc[6]).get("resolution")
FROM_DATE= dict(doc[7]).get("fromDate")
TO_DATE= dict(doc[8]).get("toDate")
client = initDtClient(env, DT_URL, DT_TOKEN)
dashboards = getDashboards(env, client)
metric_queries = getDashboardsWithViewCount(env, client,
METRIC_SELECTOR,
RESOLUTION, FROM_DATE,
TO_DATE)
data = adaptDataStructure(dashboards, metric_queries)
result = evaluate(env, data)
# writeToExcel(env, t, result)
dict_dashboards[env] = result
list_environments.append(env)
repo = fetch_repository(config("REPOSITORY_URL"), config("REPOSITORY_PATH"))
list_branches = fetch_branches(repo)
for b in list_exclude_branches:
list_branches.remove(b)
# repo_ = checkout_master(repo)
repo_ = repo
wd = Path(repo_.git.working_dir)
# try:
# with open(Path("./dashboards.txt"), "a+", encoding="utf-8") as f:
for i, branch in enumerate(list_branches):
is_commit = False
repo_.git.checkout(branch)
logging.info("%d - branch: %s", i, str(branch))
files = glob.glob(str(wd) + '/**/dashboard/*.tf', recursive=True)
for file in files:
is_deleted = False
if os.path.basename(file) not in list_exclude_files:
# f.write("%s | %s\n" % (format_block(branch, 50), file))
iid, nname, oowner = check_metadata(file)
current_db = {"id": iid, "name" : nname ,"owner" : oowner}
is_deleted, environment = check_dashboard(branch, file,
current_db,
list_environments,
dict_dashboards)
if is_deleted == True:
is_commit = True
dt_url, dt_token = get_credentials(environment,
environments)
dt_client = initDtClient(dt_url, dt_token)
# I have not tested the deletion yet !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# delete_dashboard(dt_client, environment, current_db)
if is_commit == True:
git_push(repo_, "origin", branch, "Dashboard cleanup")
# ToDo Create Pull Request and Merge --> Manually or via Code?
# except Exception as e:
# print("FINAL Exception:", e)
# delete_dir(Path(config("REPOSITORY_PATH")))
logging.info("finished")