import yaml from decouple import config import dynatraceAPI import pandas as pd from pagination import Pagionation from key_request_parser import krparser from datetime import datetime, timedelta import datetime import json import typing from decorators import timer import requests import urllib.parse import time def get_slo(ENV: str, DTAPIToken: str, DTENV: str) -> pd.DataFrame: """ Returns SLO data from dynatrace Args: ENV (str): Environment (euprod, naprod, cnprod) DTAPIToken (str): Token for respective environment DTENV (str): Full URL for the respective environment Returns: pd.DataFrame: Dataframe containing data from dynatrace """ # DTENV = base url # DTAPIToken = sec token dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken) my_params_report = {"pageSize": 25} # gets all slos and filter later api_url_report = "/api/v2/slo" pages = dtclient.returnPageination(api_url_report, my_params_report, "slo") df = pd.DataFrame(pages.elements) df["env"] = ENV return df def build_params(params: typing.Dict) -> str: """ Builds the parameter dictionary to a formatted string Args: params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation Returns: str: Returns the query string """ query_string = "&".join( f"{key}={urllib.parse.quote(value)}" for key, value in params.items() ) return query_string def get_data_from_dynatrace( throttling_rate: float | int, token: str, env_url: str, params: typing.Dict | str, route: str, ) -> typing.Dict: """ Sends out GET request to dynatrace Args: throttling (float | int ): If needed set timeout for throttling token (str): Token for dynatrace API env_url (str): Url for the respective environment params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation route (str): Route for the request Returns: typing.Dict: Returns the response as """ time.sleep(throttling_rate) if type(params) is dict: params_string = f"?{build_params(params)}" elif type(params) is str: params_string = f"/{params}" headers = {"Authorization": f"Api-Token {token}"} host_response = requests.get( f"{env_url}/api/v2/{route}{params_string}", headers=headers, verify=False, ) if host_response.status_code == 200: return host_response.json() else: # TODO: proper error handling print(f"ERROR - {host_response.status_code}") def previous_week_range(date: datetime): start_date = date + timedelta(-date.weekday(), weeks=-1) end_date = date + timedelta(-date.weekday() - 1) return start_date, end_date def get_process_group_data(df: pd.DataFrame) -> typing.Dict: """ Gets process group data from dynatrace Args: df (pd.DataFrame): Dataframe with process group ids Returns: typing.Dict: Returns dictionary with unique process group data """ hub_data = {} with open("./environment.yaml") as file: env_doc = yaml.safe_load(file) for env, doc in env_doc.items(): token = dict(doc[2]) url = dict(doc[1]) hub_data[env] = {} if config(token.get("env-token-name")) != "": DTTOKEN = config(token.get("env-token-name")) DTURL = url.get("env-url") hub_data[env]["token"] = DTTOKEN hub_data[env]["url"] = DTURL unique_process_groups_per_hub = {} unique_hubs = df["environment"].unique() for hub in unique_hubs: unique_process_groups_per_hub[hub] = {} hub_value = hub process_groups_unique = df.query(f"environment == @hub_value") process_groups_unique = process_groups_unique["process_group_id"].unique() for process_group in process_groups_unique: params = { "entitySelector": f'type("PROCESS_GROUP"),entityId("{process_group}")', "fields": "firstSeenTms,tags", } data = get_data_from_dynatrace( 0.1, hub_data[hub]["token"], hub_data[hub]["url"], params, "entities" ) unique_process_groups_per_hub[hub][process_group] = data["entities"] return unique_process_groups_per_hub def build_dataframe_for_report(report_items: typing.Dict) -> pd.DataFrame: """ Builds a pandas dataframe based on received items from dynatrace Args: report_items (typing.Dict): Dictionary containing the data from dynatrace Returns: pd.DataFrame: Contains data as requested for further processing """ df = pd.DataFrame(report_items) process_group_data = get_process_group_data(df) for hub in process_group_data: for pgid in process_group_data[hub]: if len(process_group_data[hub][pgid]) == 0: # TODO: Custom device group returns null data - handling needed print(f"ERROR: {hub} - {pgid} | no data returned from dynatrace") else: df.loc[ (df["environment"] == hub) & (df["process_group_id"] == pgid), "process_group_name", ] = process_group_data[hub][pgid][0]["displayName"] df.loc[ (df["environment"] == hub) & (df["process_group_id"] == pgid), "first_seen_process_group", ] = process_group_data[hub][pgid][0]["firstSeenTms"] print("Writing to xlsx") write_xlsx(df) return df def write_xlsx(df: pd.DataFrame) -> None: """ Takes in a pandas dataframe and generates writes it into a XLSX file Args: df (pd.DataFrame): Dataframe containing the necessary data for the report """ filename = f"CoCo-APM-Report_{datetime.date.today()}.xlsx" writer = pd.ExcelWriter(filename, engine="xlsxwriter") df.to_excel(writer, sheet_name="hosts", index=False) writer.close() def develop_load_json(): with open("test-data-with-hosts-main.json", "r") as f: data = json.loads(f.read()) df_data = [] for hub in data: for slo in data[hub]: slo_name = data[hub][slo]["sloname"] if len(data[hub][slo]["services"]) > 0: for service in data[hub][slo]["services"]: if len(service["entities"]) > 0: for entity in service["entities"]: if "fromRelationships" in entity: if "runsOnHost" in entity["fromRelationships"]: for host in entity["fromRelationships"][ "runsOnHost" ]: df_data_item = { "slo_name": slo_name, "host_name": host["details"]["displayName"], "host_id": host["id"], "environment": hub, "process_group_id": "", "process_group_name": "", "licensing_tag_host": "", "licensing_tag_process_group": "", "first_seen_process_group": "", "first_seen_host": host["details"][ "firstSeenTms" ], } for tag in host["details"]["tags"]: if tag["key"] == "Platform": df_data_item["platform"] = tag["value"] if tag["key"] == "Namespace": df_data_item["namespace"] = tag["value"] if tag["key"] == "PaaS": df_data_item["paas"] = tag["value"] # TODO: rework - add else. so datastructure is complete if "runsOn" in entity["fromRelationships"]: for process_group in entity[ "fromRelationships" ]["runsOn"]: df_data_item[ "process_group_id" ] = process_group["id"] df_data.append(df_data_item) build_dataframe_for_report(df_data) # with open("./environment.yaml") as file: # env_doc = yaml.safe_load(file) # for env, doc in env_doc.items(): # # DEBUG # if env == "euprod": # token = dict(doc[2]) # url = dict(doc[1]) # if config(token.get("env-token-name")) != "": # print("Gather data, hold on a minute") # DTTOKEN = config(token.get("env-token-name")) # DTURL = url.get("env-url") # for slo in data[env]: # if len(data[env][slo]["services"]) == 0: # # DEBUG # print(f"ERROR: {slo} has no services") # else: # for service in data[env][slo]["services"]: # params = { # "entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")', # "fields": "fromRelationships,tags", # } # entities = get_data_from_dynatrace( # 0.5, DTTOKEN, DTURL, params, "entities" # ) # # TODO: it is possible that "entities" is empty. maybe create check. # service["entities"] = entities["entities"] # for hosts in service["entities"]: # if "fromRelationships" in hosts: # if "runsOnHost" in hosts["fromRelationships"]: # for host in hosts["fromRelationships"][ # "runsOnHost" # ]: # # TODO: make dynatrace call to /entites/{entityId} # host_response = get_data_from_dynatrace( # 0.5, DTTOKEN, DTURL, host["id"], "entities" # ) # host["details"] = host_response def check_if_service_already_exists(services: list, entity_id: str) -> bool: """ Requests point to the same service. This leads to double entries but we only need the data once. Args: services (list): List with services entity_id (str): Entity Id for lookup Returns: bool: Returns True if the service is already present else False. """ result = False for service in services: if service["entityId"] == entity_id: result = True return result def build_dataframe_data(data: typing.Dict) -> None: """ This function builds the data for the dataframe, which will be used to generate the report. Contains all data but process_groups. Args: data (typing.Dict): Takes in the dictionary containing all the raw data from dynatrace. """ df_data = [] for hub in data: for slo in data[hub]: slo_name = data[hub][slo]["sloname"] if len(data[hub][slo]["services"]) > 0: for service in data[hub][slo]["services"]: if len(service["entities"]) > 0: for entity in service["entities"]: if "fromRelationships" in entity: if "runsOnHost" in entity["fromRelationships"]: for host in entity["fromRelationships"][ "runsOnHost" ]: df_data_item = { "slo_name": slo_name, "host_name": host["details"]["displayName"], "host_id": host["id"], "environment": hub, "process_group_id": "", "process_group_name": "", "licensing_tag_host": "", "licensing_tag_process_group": "", "first_seen_process_group": "", "first_seen_host": host["details"][ "firstSeenTms" ], } for tag in host["details"]["tags"]: if tag["key"] == "Platform": df_data_item["platform"] = tag["value"] if tag["key"] == "Namespace": df_data_item["namespace"] = tag["value"] if tag["key"] == "PaaS": df_data_item["paas"] = tag["value"] # TODO: rework if "runsOn" in entity["fromRelationships"]: for process_group in entity[ "fromRelationships" ]["runsOn"]: df_data_item[ "process_group_id" ] = process_group["id"] df_data.append(df_data_item) build_dataframe_for_report(df_data) @timer def main() -> None: """ Entrypoint. """ throttling_rate: int | float = 0.25 # only tested with 0.5 reportItem = {} with open("./environment.yaml") as file: env_doc = yaml.safe_load(file) for env, doc in env_doc.items(): token = dict(doc[2]) url = dict(doc[1]) if config(token.get("env-token-name")) != "": print("Gather data, hold on a minute") DTTOKEN = config(token.get("env-token-name")) DTURL = url.get("env-url") # krp = krparser.KRParser(krparser.KROption.VALIDATE_EXISTS | krparser.KROption.VALIDATE_HASDATA ,DTURL, DTTOKEN) slosF = get_slo(env, DTTOKEN, DTURL) slosF = slosF[slosF["id"]=="9c5b0581-acc2-3e70-97d3-531700f78b65"] #slosF = slosF[slosF["name"].str.startswith("TP_")] # parse the metric Expression to get Services and Requests krs = [] krp = krparser.KRParser( krparser.KROption.VALIDATE_EXISTS # | krparser.KROption.VALIDATE_HASDATA | krparser.KROption.RESOLVESERVICES, DTURL, DTTOKEN, ) for index, row in slosF.iterrows(): krs.append(krp.parseBySLO(row)) # x = 0 # SLO Name | SERVICE | PROCESS GRUPPE | TAGS # {"sloname": { # "sloname":$sloname$, # "services":[{ # "serviceName": "$servicename$" # }] # }, # "sloname": { # "sloname":$sloname$, # "services":[{ # "serviceName": "$servicename$" # }] reportItem[str(env)] = {} for kr in krs: reportItem[str(env)][kr.metadata["sloName"]] = {} reportItem[str(env)][kr.metadata["sloName"]]["sloname"] = kr.metadata[ "sloName" ] reportItem[str(env)][kr.metadata["sloName"]]["services"] = [] reportItem[str(env)][kr.metadata["sloName"]]["requests"] = [] for key_request in kr.keyRequests: reportItem[str(env)][kr.metadata["sloName"]]["requests"].append( { "displayName": key_request["displayName"], "entityId": key_request["entityId"], } ) for service in key_request["services"]: # TODO: check if service already exists if ( len( reportItem[str(env)][kr.metadata["sloName"]]["services"] ) > 0 or len( reportItem[str(env)][kr.metadata["sloName"]]["services"] ) == 0 ): if not check_if_service_already_exists( reportItem[str(env)][kr.metadata["sloName"]][ "services" ], service["entityId"], ): reportItem[str(env)][kr.metadata["sloName"]][ "services" ].append( { "type": service["type"], "displayName": service["displayName"], "entityId": service["entityId"], } ) if ( len( reportItem[str(env)][kr.metadata["sloName"]][ "services" ] ) == 0 ): # DEBUG print( f"ERROR: {reportItem[str(env)][kr.metadata['sloName']]} has no services" ) else: for service in reportItem[str(env)][ kr.metadata["sloName"] ]["services"]: params = { "entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")', "fields": "fromRelationships,tags", } entities = get_data_from_dynatrace( throttling_rate, DTTOKEN, DTURL, params, "entities", ) # TODO: it is possible that "entities" is empty. maybe create check. service["entities"] = entities["entities"] for hosts in service["entities"]: if "fromRelationships" in hosts: if ( "runsOnHost" in hosts["fromRelationships"] ): for hosts in service["entities"]: for host in hosts[ "fromRelationships" ]["runsOnHost"]: host_response = ( get_data_from_dynatrace( throttling_rate, DTTOKEN, DTURL, host["id"], "entities", ) ) host["details"] = host_response build_dataframe_data(reportItem) # with open("test-data-with-hosts-main.json", "w") as f: # f.write(json.dumps(reportItem, indent=4)) if __name__ == "__main__": main() #develop_load_json()