import copy import git import glob import logging import os import pandas as pd import shutil import time import yaml from decouple import config from dynatrace import Dynatrace from pathlib import Path t = time.strftime("%Y%m%d-%H%M%S") logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') MAX_RETRIES = 3 def git_push(repo, origin, branch, message): for _ in range(MAX_RETRIES): try: repo.git.add(all=True) repo.git.commit("-m", message) # set_origin = repo.remote(name=origin) repo.git.push(origin, branch) break except Exception as e: logging.info(e) logging.info("retry attempt %d/%d" % (_+1, MAX_RETRIES)) def delete_dashboard(dt_client, environment, current_db): try: response = dt_client.dashboards.delete(str(current_db["id"])) logging.info("Deletion of dashbord %s (%s) in %s successful" % (str(current_db["name"]), current_db["id"], str(environment))) except Exception as e: logging.info("During deletion of dashbaord the following exception has\ been encountered: %s", e) return response def get_credentials(e, environment): for env, doc in environment.items(): if str(e) == str(env): DT_URL = dict(doc[1]).get("env-url") DT_TOKEN = config(dict(doc[2]).get("env-token-name"), default='') return [DT_URL, DT_TOKEN] def delete_file(branch, file): is_deleted = False try: os.remove(file) with open(Path("./log_deleted_" + str(t) + ".txt"), "a+", encoding="utf-8") as f: f.write("File on branch %s in %s has been deleted\n" % (branch, file)) logging.debug("File on branch %s in %s has been deleted" % (branch, file)) is_deleted = True except OSError as e: logging.info("Error: %s - %s." % (e.filename, e.strerror)) is_deleted = False return is_deleted # delete based only by id! if there is no id, delete not possible! def check_dashboard(branch, file, current_db, list_environments, dict_dashboards): is_deleted = False is_stop = False for e in list_environments: for k, v in dict_dashboards[e]["obsolete"].items(): if current_db["id"] == v["id"]: is_stop = True logging.debug("Obsolete dashboard on branch %s in %s", str(branch), str(file)) # return True is_deleted = delete_file(branch, Path(file)) break if is_stop == True: break return is_deleted, e def check_metadata(file): id = None name = None owner = None with open(file, "r") as f: lines = [next(f) for _ in range(2)] if "LEGACY" in str(lines[1]): id = lines[1].strip().replace("\n", "").replace("#", "").strip()\ .split(" ")[1].strip() elif "ID" in str(lines[0]): id = lines[0].strip().replace("\n", "").replace("#", "").strip()\ .split(" ")[1].strip() elif "DEFINE" in str(lines[0]): id = lines[0].strip().replace("\n", "").split("=")[1].strip() else: id = None with open(file, "r") as f: num_lines = sum(1 for _ in f) with open(file, "r") as f: lines = [next(f) for _ in range(int(num_lines))] for x, line in enumerate(lines): if "dashboard_metadata {" in line: metadata = lines[x:x+5] for md in metadata: if "name" in md: name = md.strip().replace("\n", "").split("=")[1].strip()\ .replace('"',"").strip() if "owner" in md: owner = md.strip().replace("\n", "").split("=")[1].strip()\ .replace('"',"").strip() elif "dashboardName" in line: name = line.strip().replace("\n", "").split("=")[1].strip()\ .replace('"',"").strip() owner = None else: name = None owner = None return [id, name, owner] def format_block(string, max): string_length = len(string) string = (f'{" "*(max-string_length)}{string}') return string def onerror(func, path, exc_info): """ Error handler for ``shutil.rmtree``. If the error is due to an access error (read only file) it attempts to add write permission and then retries. If the error is for another reason it re-raises the error. Usage : ``shutil.rmtree(path, onerror=onerror)`` """ import stat # Is the error an access error? if not os.access(path, os.W_OK): os.chmod(path, stat.S_IWUSR) func(path) else: raise def delete_dir(path): logging.info("cleaning up...") try: shutil.rmtree(path, onerror=onerror) logging.info("%s successfully deleted", str(path)) except OSError as e: logging.info("Error: %s - %s." % (e.filename, e.strerror)) def checkout_master(repo): logging.info("master branch name is: %s", str(repo.heads.master.name)) logging.info("checking active branch ...") if repo.active_branch.name != repo.heads.master.name: logging.info("active branch name is: %s", str(repo.active_branch.name)) if repo.active_branch.is_detached: logging.info("active branch (%s) is detached: %s", str(repo.active_branch.name), str(repo.active_branch.is_detached)) logging.info("checking out master...") repo.git.checkout("master") logging.info("checkout to master successful") logging.info("active branch is %s and is detached: %s", str(repo.active_branch.name), str(repo.active_branch.is_detached)) else: # repo.heads.master.checkout() logging.info("active branch (%s) is detached: %s", str(repo.active_branch.name), str(repo.active_branch.is_detached)) logging.info("checking out master...") repo.git.checkout("master") logging.info("checkout to master successful") logging.info("active branch is %s and is detached: %s", str(repo.active_branch.name), str(repo.active_branch.is_detached)) else: logging.info("active branch is already master (%s) and is detached: %s", str(repo.active_branch.name), str(repo.active_branch.is_detached)) return repo def fetch_branches(repo): logging.info("fetching branches...") # branches = [repo.git.branch("-r").replace("origin/", "").split("\n ")] # branches = repo.remotes.origin.fetch() branch_list = [r.remote_head for r in repo.remote().refs] return branch_list def fetch_repository(REPOSITORY_URL, REPOSITORY_PATH): # logging.info("fetching repository %s", str(REPOSITORY_URL)) # repo = git.Repo.clone_from(REPOSITORY_URL, # Path("../coco_apm_terraform_onboarding")) logging.info("repository path %s", str(REPOSITORY_PATH)) repo = git.Repo(Path(REPOSITORY_PATH)) return repo def writeToExcel(env, t, result): list_available = [] list_legacy = [] list_obsolete = [] for type in ["available", "legacy", "obsolete"]: for i, (ki, vi) in enumerate(result[type].items()): if type == "available": list_available.append([vi["id"], vi["name"], vi["owner"]]) if type == "legacy": list_legacy.append([vi["id"], vi["name"], vi["owner"]]) if type == "obsolete": list_obsolete.append([vi["id"], vi["name"], vi["owner"]]) df_available = pd.DataFrame(list_available, columns=['id', 'name', 'owner']) df_legacy = pd.DataFrame(list_legacy, columns=['id', 'name', 'owner']) df_obsolete = pd.DataFrame(list_obsolete, columns=['id', 'name', 'owner']) filename = os.path.join(".\log", str(t) + "_" + str(env) + '_dashboards.xlsx') os.makedirs(os.path.dirname(filename), exist_ok=True) with pd.ExcelWriter(filename) as writer: df_available.to_excel(writer, sheet_name='available') df_legacy.to_excel(writer, sheet_name='legacy') df_obsolete.to_excel(writer, sheet_name='obsolete') def evaluate(env, data): legacy = {} available = {} obsolete = {} dict_dashboards = data[0] list_dashboard_ids = data[1] dict_metric_queries = data[2] list_metric_query_ids = data[3] dict_metric_queries_copy = copy.deepcopy(dict_metric_queries) list_metric_query_copy_ids = copy.deepcopy(list_metric_query_ids) for x, (m, metric_query) in enumerate(dict_metric_queries.items()): if metric_query["id"] not in list_dashboard_ids: legacy[x] = {"id" : metric_query["id"], "name" : metric_query["name"], "owner" : metric_query["owner"]} del dict_metric_queries_copy[m] list_metric_query_copy_ids.remove(metric_query["id"]) logging.debug("%s %s have been deleted in the past", str(env), len(legacy)) logging.debug("%s %s dashboards with viewCount and active", str(env), len(dict_metric_queries_copy)) for i, (d, dashboard) in enumerate(dict_dashboards.items()): if dashboard["id"] in list_metric_query_copy_ids: available[i] = dashboard if dashboard["id"] not in list_metric_query_copy_ids: obsolete[i] = dashboard logging.info("%s %s dashboards with viewCount!", str(env), len(available)) logging.info("%s %s dashboards with 0 viewCount!", str(env), len(obsolete)) return {"available" : available, "legacy" : legacy, "obsolete" : obsolete} def adaptDataStructure(dashboards, metric_queries): dict_dashboards= {} list_dashboard_ids = [] dict_metric_queries = {} list_metric_query_ids = [] for s, stub in enumerate(getattr(dashboards, "_PaginatedList__elements")): dict_dashboards[s] = {"id" : getattr(stub, "id"), "name" : getattr(stub, "name"), "owner" : getattr(stub, "owner")} list_dashboard_ids.append(getattr(stub, "id")) for collection in getattr(metric_queries, "_PaginatedList__elements"): for m, q in enumerate(getattr(collection, "data")): dict_metric_queries[m] = {"id" : getattr(q, "dimension_map")["id"], "name" : None, "owner" : None} list_metric_query_ids.append(getattr(q, "dimension_map")["id"]) return [dict_dashboards, list_dashboard_ids, dict_metric_queries, list_metric_query_ids] def getDashboardsWithViewCount(env, client, METRIC_SELECTOR, RESOLUTION, FROM_DATE, TO_DATE): logging.debug("%s get dashboards with viewCount, resolution %s ...", str(env), RESOLUTION) metric_query = client.metrics.query(METRIC_SELECTOR, RESOLUTION, FROM_DATE, TO_DATE) n_metric_query = getattr(metric_query, "_PaginatedList__total_count") logging.debug("%s %s dashboards with viewCount and older than 6 Months", str(env), str(n_metric_query)) return metric_query def getDashboards(env, client): logging.debug("%s get all dashboards...", str(env)) dashboards = client.dashboards.list(owner=None, tags=None) n_dashboards = getattr(dashboards, "_PaginatedList__total_count") logging.info("%s %s total dashboards", str(env), str(n_dashboards)) return dashboards def initDtClient(env, DT_URL, DT_TOKEN): logging.debug("%s init Dynatrace client...", str(env)) DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"), None, None, 0, 10*1000) return DT_CLIENT if __name__ == "__main__": dict_dashboards = {} list_environments = [] # do it manually for CD_TS-CMS list_exclude_branches = ["HEAD", "master", "template", "CD_TS-CMS"] list_exclude_files = ["providers.tf", "data_source.tf"] with open(Path("./environment.yaml")) as env_cfg: environments = yaml.safe_load(env_cfg) for env, doc in environments.items(): logging.debug("%s checking token...", str(env)) if config(dict(doc[2]).get("env-token-name"), default='') != "": DT_URL = dict(doc[1]).get("env-url") DT_TOKEN = config(dict(doc[2]).get("env-token-name"), default='') METRIC_SELECTOR = dict(doc[5]).get("metricSelector") RESOLUTION = dict(doc[6]).get("resolution") FROM_DATE= dict(doc[7]).get("fromDate") TO_DATE= dict(doc[8]).get("toDate") client = initDtClient(env, DT_URL, DT_TOKEN) dashboards = getDashboards(env, client) metric_queries = getDashboardsWithViewCount(env, client, METRIC_SELECTOR, RESOLUTION, FROM_DATE, TO_DATE) data = adaptDataStructure(dashboards, metric_queries) result = evaluate(env, data) # writeToExcel(env, t, result) dict_dashboards[env] = result list_environments.append(env) repo = fetch_repository(config("REPOSITORY_URL"), config("REPOSITORY_PATH")) list_branches = fetch_branches(repo) for b in list_exclude_branches: list_branches.remove(b) # repo_ = checkout_master(repo) repo_ = repo wd = Path(repo_.git.working_dir) # try: # with open(Path("./dashboards.txt"), "a+", encoding="utf-8") as f: for i, branch in enumerate(list_branches): is_commit = False repo_.git.checkout(branch) logging.info("%d - branch: %s", i, str(branch)) files = glob.glob(str(wd) + '/**/dashboard/*.tf', recursive=True) for file in files: is_deleted = False if os.path.basename(file) not in list_exclude_files: # f.write("%s | %s\n" % (format_block(branch, 50), file)) iid, nname, oowner = check_metadata(file) current_db = {"id": iid, "name" : nname ,"owner" : oowner} is_deleted, environment = check_dashboard(branch, file, current_db, list_environments, dict_dashboards) if is_deleted == True: is_commit = True dt_url, dt_token = get_credentials(environment, environments) dt_client = initDtClient(dt_url, dt_token) # I have not tested the deletion yet !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # delete_dashboard(dt_client, environment, current_db) if is_commit == True: git_push(repo_, "origin", branch, "Dashboard cleanup") # ToDo Create Pull Request and Merge --> Manually or via Code? # except Exception as e: # print("FINAL Exception:", e) # delete_dir(Path(config("REPOSITORY_PATH"))) logging.info("finished")