import yaml from decouple import config import dynatraceAPI import pandas as pd from pagination import Pagionation from KRParser import krparser from datetime import datetime, timedelta import datetime import json import typing from decorators import timer import requests import urllib.parse import time def get_slo(ENV: str, DTAPIToken: str, DTENV: str) -> pd.DataFrame: """ Returns SLO data from dynatrace Args: ENV (str): Environment (euprod, naprod, cnprod) DTAPIToken (str): Token for respective environment DTENV (str): Full URL for the respective environment Returns: pd.DataFrame: Dataframe containing data from dynatrace """ # DTENV = base url # DTAPIToken = sec token dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken) my_params_report = {"pageSize": 25} # gets all slos and filter later api_url_report = "/api/v2/slo" pages = dtclient.returnPageination(api_url_report, my_params_report, "slo") df = pd.DataFrame(pages.elements) df["env"] = ENV return df def build_params(params: typing.Dict) -> str: """ Builds the parameter dictionary to a formatted string Args: params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation Returns: str: Returns the query string """ query_string = "&".join( f"{key}={urllib.parse.quote(value)}" for key, value in params.items() ) return query_string def get_data_from_dynatrace( throttling_rate: float | int, token: str, env_url: str, params: typing.Dict | str, route: str, ) -> typing.Dict: """ Sends out GET request to dynatrace Args: throttling (float | int ): If needed set timeout for throttling token (str): Token for dynatrace API env_url (str): Url for the respective environment params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation route (str): Route for the request Returns: typing.Dict: Returns the response as """ time.sleep(throttling_rate) if type(params) is dict: params_string = f"?{build_params(params)}" elif type(params) is str: params_string = f"/{params}" headers = {"Authorization": f"Api-Token {token}"} host_response = requests.get( f"{env_url}/api/v2/{route}{params_string}", headers=headers, verify=False, ) if host_response.status_code == 200: return host_response.json() else: # TODO: proper error handling print(f"ERROR - {host_response.status_code}") def check_if_service_already_exists(services: list, entity_id: str) -> bool: """ Requests point to the same service. This leads to double entries but we only need the data once. Args: services (list): List with services entity_id (str): Entity Id for lookup Returns: bool: Returns True if the service is already present else False. """ result = False for service in services: if service["entityId"] == entity_id: result = True return result def get_process_group_data(df: pd.DataFrame) -> typing.Dict: """ Gets process group data from dynatrace Args: df (pd.DataFrame): Dataframe with process group ids Returns: typing.Dict: Returns dictionary with unique process group data """ hub_data = {} with open("./environment.yaml") as file: env_doc = yaml.safe_load(file) for env, doc in env_doc.items(): token = dict(doc[2]) url = dict(doc[1]) hub_data[env] = {} if config(token.get("env-token-name")) != "": DTTOKEN = config(token.get("env-token-name")) DTURL = url.get("env-url") hub_data[env]["token"] = DTTOKEN hub_data[env]["url"] = DTURL unique_process_groups_per_hub = {} unique_hubs = df["environment"].unique() for hub in unique_hubs: unique_process_groups_per_hub[hub] = {} # hub_value = hub process_groups_unique = df.query(f"environment == @hub") process_groups_unique = process_groups_unique["process_group_id"].unique() for process_group in process_groups_unique: params = { "entitySelector": f'type("PROCESS_GROUP"),entityId("{process_group}")', "fields": "firstSeenTms,lastSeenTms,tags", } data = get_data_from_dynatrace( 0.1, hub_data[hub]["token"], hub_data[hub]["url"], params, "entities" ) if data is not None: unique_process_groups_per_hub[hub][process_group] = data["entities"] else: print(f"{process_group} returned None") return unique_process_groups_per_hub def build_dataframe_for_report(report_items: typing.Dict) -> pd.DataFrame: """ Builds a pandas dataframe based on received items from dynatrace Args: report_items (typing.Dict): Dictionary containing the data from dynatrace Returns: pd.DataFrame: Contains data as requested for further processing """ df = pd.DataFrame(report_items) process_group_data = get_process_group_data(df) for hub in process_group_data: for pgid in process_group_data[hub]: if len(process_group_data[hub][pgid]) == 0: # TODO: Custom device group returns null data - handling needed print(f"ERROR: {hub} - {pgid} | no data returned from dynatrace") else: df.loc[ (df["environment"] == hub) & (df["process_group_id"] == pgid), "process_group_name", ] = process_group_data[hub][pgid][0]["displayName"] df.loc[ (df["environment"] == hub) & (df["process_group_id"] == pgid), "first_seen_process_group", ] = process_group_data[hub][pgid][0]["firstSeenTms"] print("Writing to xlsx") write_xlsx(df) return df def write_xlsx(df: pd.DataFrame) -> None: """ Takes in a pandas dataframe and generates writes it into a XLSX file Args: df (pd.DataFrame): Dataframe containing the necessary data for the report """ filename = f"CoCo-APM-Report_{datetime.date.today()}.xlsx" writer = pd.ExcelWriter(filename, engine="xlsxwriter") df.to_excel(writer, sheet_name="hosts", index=False) writer.close() def build_dataframe_data(data: typing.Dict) -> None: """ This function builds the data for the dataframe, which will be used to generate the report. Contains all data but process_groups. Args: data (typing.Dict): Takes in the dictionary containing all the raw data from dynatrace. """ df_data = [] for hub in data: for slo in data[hub]: slo_name = data[hub][slo]["sloname"] if len(data[hub][slo]["services"]) > 0: for service in data[hub][slo]["services"]: if len(service["entities"]) > 0: for entity in service["entities"]: # get compass id of service here. in tags compass_id_service = [] if "tags" in entity: for tag in entity["tags"]: if tag["key"] == "compass-id": compass_id_service.append(tag["value"]) compass_id_service = ",".join(compass_id_service) # get container name here container_name = "None" if "properties" in entity: if "softwareTechnologies" in entity["properties"]: for technology in entity["properties"][ "softwareTechnologies" ]: if ( technology["type"] == "DOCKER" or technology["type"] == "CONTAINERD" ): container_name = entity["properties"][ "detectedName" ] if "fromRelationships" in entity: if "runsOnHost" in entity["fromRelationships"]: for host in entity["fromRelationships"][ "runsOnHost" ]: df_data_item = { "slo_name": slo_name, "host_name": host["details"]["displayName"], "host_id": host["id"], "environment": hub, "container_name": container_name, "process_group_id": "", "process_group_name": "", "licensing_tag_host": "", "licensing_tag_process_group": "", "first_seen_process_group": "", "first_seen_host": host["details"][ "firstSeenTms" ], "last_seen_host": host["details"][ "lastSeenTms" ], "compass_id_host": "", "compass_id_service": compass_id_service, } compass_id = [] namespace = [] for tag in host["details"]["tags"]: if tag["key"] == "Platform": df_data_item["platform"] = tag["value"] if tag["key"] == "Namespace": # df_data_item["namespace"] = tag["value"] namespace.append(tag["value"]) if tag["key"] == "PaaS": df_data_item["paas"] = tag["value"] if tag["key"] == "compass-id": # df_data_item["compass_id"] = tag[ # "value" # ] if "value" in tag: compass_id.append(tag["value"]) df_data_item["compass_id_host"] = ",".join( compass_id ) df_data_item["namespace"] = ",".join(namespace) # TODO: rework if "runsOn" in entity["fromRelationships"]: for process_group in entity[ "fromRelationships" ]["runsOn"]: df_data_item[ "process_group_id" ] = process_group["id"] df_data.append(df_data_item) build_dataframe_for_report(df_data) @timer def main() -> None: """ Entrypoint. """ throttling_rate: int | float = 0 # only tested with 0.5 reportItem = {} with open("./environment.yaml") as file: env_doc = yaml.safe_load(file) for env, doc in env_doc.items(): token = dict(doc[2]) url = dict(doc[1]) if config(token.get("env-token-name")) != "": print("Gather data, hold on a minute") DTTOKEN = config(token.get("env-token-name")) DTURL = url.get("env-url") # krp = krparser.KRParser(krparser.KROption.VALIDATE_EXISTS | krparser.KROption.VALIDATE_HASDATA ,DTURL, DTTOKEN) slosF = get_slo(env, DTTOKEN, DTURL) # slosF = slosF[slosF["id"]=="9c5b0581-acc2-3e70-97d3-531700f78b65"] slosF = slosF[slosF["name"].str.startswith("TP_")] # parse the metric Expression to get Services and Requests krs = [] # krp = krparser.KRParser(options=krparser.KROption.RESOLVEKEYREQUETS | krparser.KROption.RESOLVESERVICES, DTAPIURL=DTURL, DTAPIToken=DTTOKEN) krp = krparser.KRParser( name=env, options=krparser.KROption.RESOLVESERVICES | krparser.KROption.VALIDATE_HASDATA, config={ "threads": 10, "serviceLookupParams": {"fields": "tags,fromRelationships"}, "extendResultObjects": {"env": env}, }, DTAPIURL=DTURL, DTAPIToken=DTTOKEN, ) krs = krp.parse(slosF) reportItem[str(env)] = {} for kr in krs: reportItem[str(env)][kr.metadata["sloName"]] = {} reportItem[str(env)][kr.metadata["sloName"]]["sloname"] = kr.metadata[ "sloName" ] reportItem[str(env)][kr.metadata["sloName"]]["services"] = [] reportItem[str(env)][kr.metadata["sloName"]]["requests"] = [] for key_request in kr.keyRequests: reportItem[str(env)][kr.metadata["sloName"]]["requests"].append( { "displayName": key_request["displayName"], "entityId": key_request["entityId"], } ) for service in key_request["services"]: # TODO: check if service already exists if ( len( reportItem[str(env)][kr.metadata["sloName"]]["services"] ) > 0 or len( reportItem[str(env)][kr.metadata["sloName"]]["services"] ) == 0 ): if not check_if_service_already_exists( reportItem[env][kr.metadata["sloName"]]["services"], service["entityId"], ): reportItem[str(env)][kr.metadata["sloName"]][ "services" ].append( { "type": service["type"], "displayName": service["displayName"], "entityId": service["entityId"], } ) if ( len( reportItem[str(env)][kr.metadata["sloName"]][ "services" ] ) == 0 ): # DEBUG print( f"ERROR: {reportItem[str(env)][kr.metadata['sloName']]} has no services" ) else: for service in reportItem[str(env)][ kr.metadata["sloName"] ]["services"]: params = { "entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")', "fields": "fromRelationships,tags,properties", } entities = get_data_from_dynatrace( throttling_rate, DTTOKEN, DTURL, params, "entities", ) # print(entities["entities"]) # TODO: it is possible that "entities" is empty. maybe create check. service["entities"] = entities["entities"] for hosts in service["entities"]: if "fromRelationships" in hosts: if ( "runsOnHost" in hosts["fromRelationships"] ): for hosts in service["entities"]: for host in hosts[ "fromRelationships" ]["runsOnHost"]: host_response = ( get_data_from_dynatrace( throttling_rate, DTTOKEN, DTURL, host["id"], "entities", ) ) host["details"] = host_response build_dataframe_data(reportItem) if __name__ == "__main__": main()