coco_apm_dashboard_cleaner/main.py

334 lines
13 KiB
Python

import copy
import git
import glob
import logging
import os
import pandas as pd
import shutil
import stat
import time;
import yaml
from decouple import config
from dynatrace import Dynatrace
from pathlib import Path
t = time.strftime("%Y%m%d-%H%M%S")
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
def check_metadata(file):
with open(file, "r") as f:
lines = [next(f) for _ in range(2)]
if "LEGACY" in str(lines[1]):
# l_stripped = lines[1].strip().replace("\n", "").replace("#", "").strip().split(" ")[1].strip()
# l_replaced = l_stripped.replace("\n", "")
# l_rr = l_replaced.replace("#", "")
# l_s = l_rr.strip()
# l_splitted = l_s.split(" ")
# legacy_id = l_splitted[1].strip()
id = lines[1].strip().replace("\n", "").replace("#", "").strip().split(" ")[1].strip()
else:
# stripped = lines[0].strip().replace("\n", "").split("=")[1].strip()
# replaced = stripped.replace("\n", "")
# splitted = replaced.split("=")
# id = splitted[1].strip()
id = lines[0].strip().replace("\n", "").split("=")[1].strip()
with open(file, "r") as f:
num_lines = sum(1 for _ in f)
with open(file, "r") as f:
lines = [next(f) for _ in range(int(num_lines))]
for x, line in enumerate(lines):
if "dashboard_metadata {" in line:
metadata = lines[x:x+5]
for md in metadata:
if "name" in md:
name = md.strip().replace("\n", "").split("=")[1].strip().replace('"',"").strip()
if "owner" in md:
owner = md.strip().replace("\n", "").split("=")[1].strip().replace('"',"").strip()
return [id, name, owner]
def format_block(string, max):
string_length = len(string)
string = (f'{" "*(max-string_length)}{string}')
return string
def onerror(func, path, exc_info):
"""
Error handler for ``shutil.rmtree``.
If the error is due to an access error (read only file)
it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : ``shutil.rmtree(path, onerror=onerror)``
"""
import stat
# Is the error an access error?
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWUSR)
func(path)
else:
raise
def delete_directory(path):
logging.info("cleaning up...")
try:
shutil.rmtree(path, onerror=onerror)
logging.info("%s directory successfully deleted", str(path))
except OSError as e:
print("Error: %s - %s." % (e.filename, e.strerror))
def checkout_master(repo):
logging.info("master branch name is: %s", str(repo.heads.master.name))
logging.info("checking active branch ...")
if repo.active_branch.name != repo.heads.master.name:
logging.info("active branch name is: %s", str(repo.active_branch.name))
if repo.active_branch.is_detached:
logging.info("active branch (%s) is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
logging.info("checking out master...")
repo.git.checkout("master")
logging.info("checkout to master successful")
logging.info("active branch is %s and is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
else:
# repo.heads.master.checkout()
logging.info("active branch (%s) is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
logging.info("checking out master...")
repo.git.checkout("master")
logging.info("checkout to master successful")
logging.info("active branch is %s and is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
else:
logging.info("active branch is already master (%s) and is detached: %s",
str(repo.active_branch.name),
str(repo.active_branch.is_detached))
return repo
def fetch_branches(repo):
logging.info("fetching branches...")
# branches = [repo.git.branch("-r").replace("origin/", "").split("\n ")]
# branches = repo.remotes.origin.fetch()
branch_list = [r.remote_head for r in repo.remote().refs]
return branch_list
def fetch_repository(REPOSITORY_URL, REPOSITORY_PATH):
# logging.info("fetching repository %s", str(REPOSITORY_URL))
# repo = git.Repo.clone_from(REPOSITORY_URL,
# Path("../coco_apm_terraform_onboarding"))
logging.info("repository path %s", str(REPOSITORY_PATH))
repo = git.Repo(Path(REPOSITORY_PATH))
return repo
def writeToExcel(env, t, result):
list_available = []
list_legacy = []
list_obsolete = []
for type in ["available", "legacy", "obsolete"]:
for i, (ki, vi) in enumerate(result[type].items()):
if type == "available":
list_available.append([vi["id"], vi["name"], vi["owner"]])
if type == "legacy":
list_legacy.append([vi["id"], vi["name"], vi["owner"]])
if type == "obsolete":
list_obsolete.append([vi["id"], vi["name"], vi["owner"]])
df_available = pd.DataFrame(list_available, columns=['id', 'name', 'owner'])
df_legacy = pd.DataFrame(list_legacy, columns=['id', 'name', 'owner'])
df_obsolete = pd.DataFrame(list_obsolete, columns=['id', 'name', 'owner'])
filename = os.path.join(".\log",
str(t) + "_" + str(env) + '_dashboards.xlsx')
os.makedirs(os.path.dirname(filename), exist_ok=True)
with pd.ExcelWriter(filename) as writer:
df_available.to_excel(writer, sheet_name='available')
df_legacy.to_excel(writer, sheet_name='legacy')
df_obsolete.to_excel(writer, sheet_name='obsolete')
def evaluate(env, data):
legacy = {}
available = {}
obsolete = {}
dict_dashboards = data[0]
list_dashboard_ids = data[1]
dict_metric_queries = data[2]
list_metric_query_ids = data[3]
dict_metric_queries_copy = copy.deepcopy(dict_metric_queries)
list_metric_query_copy_ids = copy.deepcopy(list_metric_query_ids)
for x, (m, metric_query) in enumerate(dict_metric_queries.items()):
if metric_query["id"] not in list_dashboard_ids:
legacy[x] = {"id" : metric_query["id"],
"name" : metric_query["name"],
"owner" : metric_query["owner"]}
del dict_metric_queries_copy[m]
list_metric_query_copy_ids.remove(metric_query["id"])
logging.debug("%s %s have been deleted in the past", str(env), len(legacy))
logging.debug("%s %s dashboards with viewCount and active", str(env),
len(dict_metric_queries_copy))
for i, (d, dashboard) in enumerate(dict_dashboards.items()):
if dashboard["id"] in list_metric_query_copy_ids:
available[i] = dashboard
if dashboard["id"] not in list_metric_query_copy_ids:
obsolete[i] = dashboard
logging.info("%s %s dashboards with viewCount!", str(env), len(available))
logging.info("%s %s dashboards with 0 viewCount!", str(env), len(obsolete))
return {"available" : available, "legacy" : legacy, "obsolete" : obsolete}
def adaptDataStructure(dashboards, metric_queries):
dict_dashboards= {}
list_dashboard_ids = []
dict_metric_queries = {}
list_metric_query_ids = []
for s, stub in enumerate(getattr(dashboards, "_PaginatedList__elements")):
dict_dashboards[s] = {"id" : getattr(stub, "id"),
"name" : getattr(stub, "name"),
"owner" : getattr(stub, "owner")}
list_dashboard_ids.append(getattr(stub, "id"))
for collection in getattr(metric_queries, "_PaginatedList__elements"):
for m, q in enumerate(getattr(collection, "data")):
dict_metric_queries[m] = {"id" : getattr(q, "dimension_map")["id"],
"name" : None,
"owner" : None}
list_metric_query_ids.append(getattr(q, "dimension_map")["id"])
return [dict_dashboards, list_dashboard_ids, dict_metric_queries,
list_metric_query_ids]
def getDashboardsWithViewCount(env, client, METRIC_SELECTOR, RESOLUTION,
FROM_DATE, TO_DATE):
logging.debug("%s get dashboards with viewCount, resolution %s ...",
str(env), RESOLUTION)
metric_query = client.metrics.query(METRIC_SELECTOR, RESOLUTION, FROM_DATE,
TO_DATE)
n_metric_query = getattr(metric_query, "_PaginatedList__total_count")
logging.debug("%s %s dashboards with viewCount and older than 6 Months",
str(env), str(n_metric_query))
return metric_query
def getDashboards(env, client):
logging.debug("%s get all dashboards...", str(env))
dashboards = client.dashboards.list(owner=None, tags=None)
n_dashboards = getattr(dashboards, "_PaginatedList__total_count")
logging.info("%s %s total dashboards", str(env), str(n_dashboards))
return dashboards
def initDtClient(env, DT_URL, DT_TOKEN):
logging.debug("%s init Dynatrace client...", str(env))
DT_CLIENT = Dynatrace(DT_URL, DT_TOKEN, logging.Logger("ERROR"), None, None,
0, 10*1000)
return DT_CLIENT
if __name__ == "__main__":
dictionary_dashboards = {}
list_environments = []
# do it manually for CD_TS-CMS
list_exclude_branches = ["HEAD", "master", "template", "CD_TS-CMS"]
list_exclude_files = ["providers.tf", "data_source.tf"]
with open(Path("./environment.yaml")) as env_cfg:
environment = yaml.safe_load(env_cfg)
for env, doc in environment.items():
logging.debug("%s checking token...", str(env))
if config(dict(doc[2]).get("env-token-name"), default='') != "":
DT_URL = dict(doc[1]).get("env-url")
DT_TOKEN = config(dict(doc[2]).get("env-token-name"), default='')
METRIC_SELECTOR = dict(doc[5]).get("metricSelector")
RESOLUTION = dict(doc[6]).get("resolution")
FROM_DATE= dict(doc[7]).get("fromDate")
TO_DATE= dict(doc[8]).get("toDate")
client = initDtClient(env, DT_URL, DT_TOKEN)
dashboards = getDashboards(env, client)
metric_queries = getDashboardsWithViewCount(env, client,
METRIC_SELECTOR,
RESOLUTION, FROM_DATE,
TO_DATE)
data = adaptDataStructure(dashboards, metric_queries)
result = evaluate(env, data)
# writeToExcel(env, t, result)
dictionary_dashboards[env] = result
list_environments.append(env)
repo = fetch_repository(config("REPOSITORY_URL"), config("REPOSITORY_PATH"))
list_branches = fetch_branches(repo)
for b in list_exclude_branches:
list_branches.remove(b)
# repo_ = checkout_master(repo)
repo_ = repo
wd = Path(repo_.git.working_dir)
try:
# with open(Path("./dashboards.txt"), "a+", encoding="utf-8") as f:
for i, branch in enumerate(list_branches):
repo_.git.checkout(branch)
logging.info("%d - branch: %s", i, str(branch))
for file in glob.glob(str(wd) + '/**/dashboard/*.tf', recursive=True):
if os.path.basename(file) not in list_exclude_files:
# f.write("%s | %s\n" % (format_block(branch, 50), file))
id, name, owner = check_metadata(file)
current_db = {"id": id, "name" : name ,"owner" : owner}
for e in list_environments:
for k, v in dictionary_dashboards[e]["obsolete"].items():
if current_db == v:
print(current_db)
print(v)
print("DELETING", "BRANCH:", str(branch), "FILE:", file)
print("")
else:
print(current_db)
print(v)
print("")
except Exception as e:
print("Exception:", e)
# delete_directory(Path(config("REPOSITORY_PATH")))
logging.info("finished")