coco_apm_slo_report_host_pg/create_report.py

463 lines
19 KiB
Python

import yaml
from decouple import config
import dynatraceAPI
import pandas as pd
from pagination import Pagionation
from KRParser import krparser
from datetime import datetime, timedelta
import datetime
import json
import typing
from decorators import timer
import requests
import urllib.parse
import time
def get_slo(ENV: str, DTAPIToken: str, DTENV: str) -> pd.DataFrame:
"""
Returns SLO data from dynatrace
Args:
ENV (str): Environment (euprod, naprod, cnprod)
DTAPIToken (str): Token for respective environment
DTENV (str): Full URL for the respective environment
Returns:
pd.DataFrame: Dataframe containing data from dynatrace
"""
# DTENV = base url
# DTAPIToken = sec token
dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken)
my_params_report = {"pageSize": 25}
# gets all slos and filter later
api_url_report = "/api/v2/slo"
pages = dtclient.returnPageination(api_url_report, my_params_report, "slo")
df = pd.DataFrame(pages.elements)
df["env"] = ENV
return df
def build_params(params: typing.Dict) -> str:
"""
Builds the parameter dictionary to a formatted string
Args:
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation
Returns:
str: Returns the query string
"""
query_string = "&".join(
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
)
return query_string
def get_data_from_dynatrace(
throttling_rate: float | int,
token: str,
env_url: str,
params: typing.Dict | str,
route: str,
) -> typing.Dict:
"""
Sends out GET request to dynatrace
Args:
throttling (float | int ): If needed set timeout for throttling
token (str): Token for dynatrace API
env_url (str): Url for the respective environment
params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation
route (str): Route for the request
Returns:
typing.Dict: Returns the response as
"""
time.sleep(throttling_rate)
if type(params) is dict:
params_string = f"?{build_params(params)}"
elif type(params) is str:
params_string = f"/{params}"
headers = {"Authorization": f"Api-Token {token}"}
host_response = requests.get(
f"{env_url}/api/v2/{route}{params_string}",
headers=headers,
verify=False,
)
if host_response.status_code == 200:
return host_response.json()
else:
# TODO: proper error handling
print(f"ERROR - {host_response.status_code}")
def check_if_service_already_exists(services: list, entity_id: str) -> bool:
"""
Requests point to the same service. This leads to double entries but we only need the data once.
Args:
services (list): List with services
entity_id (str): Entity Id for lookup
Returns:
bool: Returns True if the service is already present else False.
"""
result = False
for service in services:
if service["entityId"] == entity_id:
result = True
return result
def get_process_group_data(df: pd.DataFrame) -> typing.Dict:
"""
Gets process group data from dynatrace
Args:
df (pd.DataFrame): Dataframe with process group ids
Returns:
typing.Dict: Returns dictionary with unique process group data
"""
hub_data = {}
with open("./environment.yaml") as file:
env_doc = yaml.safe_load(file)
for env, doc in env_doc.items():
token = dict(doc[2])
url = dict(doc[1])
hub_data[env] = {}
if config(token.get("env-token-name")) != "":
DTTOKEN = config(token.get("env-token-name"))
DTURL = url.get("env-url")
hub_data[env]["token"] = DTTOKEN
hub_data[env]["url"] = DTURL
unique_process_groups_per_hub = {}
unique_hubs = df["environment"].unique()
for hub in unique_hubs:
unique_process_groups_per_hub[hub] = {}
# hub_value = hub
process_groups_unique = df.query(f"environment == @hub")
process_groups_unique = process_groups_unique["process_group_id"].unique()
for process_group in process_groups_unique:
params = {
"entitySelector": f'type("PROCESS_GROUP"),entityId("{process_group}")',
"fields": "firstSeenTms,lastSeenTms,tags",
}
data = get_data_from_dynatrace(
0.1, hub_data[hub]["token"], hub_data[hub]["url"], params, "entities"
)
if data is not None:
unique_process_groups_per_hub[hub][process_group] = data["entities"]
else:
print(f"{process_group} returned None")
return unique_process_groups_per_hub
def build_dataframe_for_report(report_items: typing.Dict) -> pd.DataFrame:
"""
Builds a pandas dataframe based on received items from dynatrace
Args:
report_items (typing.Dict): Dictionary containing the data from dynatrace
Returns:
pd.DataFrame: Contains data as requested for further processing
"""
df = pd.DataFrame(report_items)
process_group_data = get_process_group_data(df)
for hub in process_group_data:
for pgid in process_group_data[hub]:
if len(process_group_data[hub][pgid]) == 0:
# TODO: Custom device group returns null data - handling needed
print(f"ERROR: {hub} - {pgid} | no data returned from dynatrace")
else:
df.loc[
(df["environment"] == hub) & (df["process_group_id"] == pgid),
"process_group_name",
] = process_group_data[hub][pgid][0]["displayName"]
df.loc[
(df["environment"] == hub) & (df["process_group_id"] == pgid),
"first_seen_process_group",
] = process_group_data[hub][pgid][0]["firstSeenTms"]
print("Writing to xlsx")
write_xlsx(df)
return df
def write_xlsx(df: pd.DataFrame) -> None:
"""
Takes in a pandas dataframe and generates writes it into a XLSX file
Args:
df (pd.DataFrame): Dataframe containing the necessary data for the report
"""
filename = f"CoCo-APM-Report_{datetime.date.today()}.xlsx"
writer = pd.ExcelWriter(filename, engine="xlsxwriter")
df.to_excel(writer, sheet_name="hosts", index=False)
writer.close()
def build_dataframe_data(data: typing.Dict) -> None:
"""
This function builds the data for the dataframe, which will be used to generate the report. Contains all data but process_groups.
Args:
data (typing.Dict): Takes in the dictionary containing all the raw data from dynatrace.
"""
df_data = []
for hub in data:
for slo in data[hub]:
slo_name = data[hub][slo]["sloname"]
if len(data[hub][slo]["services"]) > 0:
for service in data[hub][slo]["services"]:
if len(service["entities"]) > 0:
for entity in service["entities"]:
# get compass id of service here. in tags
compass_id_service = []
if "tags" in entity:
for tag in entity["tags"]:
if tag["key"] == "compass-id":
compass_id_service.append(tag["value"])
compass_id_service = ",".join(compass_id_service)
# get container name here
container_name = "None"
if "properties" in entity:
if "softwareTechnologies" in entity["properties"]:
for technology in entity["properties"][
"softwareTechnologies"
]:
if (
technology["type"] == "DOCKER"
or technology["type"] == "CONTAINERD"
):
container_name = entity["properties"][
"detectedName"
]
if "fromRelationships" in entity:
if "runsOnHost" in entity["fromRelationships"]:
for host in entity["fromRelationships"][
"runsOnHost"
]:
df_data_item = {
"slo_name": slo_name,
"host_name": host["details"]["displayName"],
"host_id": host["id"],
"environment": hub,
"container_name": container_name,
"process_group_id": "",
"process_group_name": "",
"licensing_tag_host": "",
"licensing_tag_process_group": "",
"first_seen_process_group": "",
"first_seen_host": host["details"][
"firstSeenTms"
],
"last_seen_host": host["details"][
"lastSeenTms"
],
"compass_id_host": "",
"compass_id_service": compass_id_service,
}
compass_id = []
namespace = []
for tag in host["details"]["tags"]:
if tag["key"] == "Platform":
df_data_item["platform"] = tag["value"]
if tag["key"] == "Namespace":
# df_data_item["namespace"] = tag["value"]
namespace.append(tag["value"])
if tag["key"] == "PaaS":
df_data_item["paas"] = tag["value"]
if tag["key"] == "compass-id":
# df_data_item["compass_id"] = tag[
# "value"
# ]
if "value" in tag:
compass_id.append(tag["value"])
df_data_item["compass_id_host"] = ",".join(
compass_id
)
df_data_item["namespace"] = ",".join(namespace)
# TODO: rework
if "runsOn" in entity["fromRelationships"]:
for process_group in entity[
"fromRelationships"
]["runsOn"]:
df_data_item[
"process_group_id"
] = process_group["id"]
df_data.append(df_data_item)
build_dataframe_for_report(df_data)
@timer
def main() -> None:
"""
Entrypoint.
"""
throttling_rate: int | float = 0 # only tested with 0.5
reportItem = {}
with open("./environment.yaml") as file:
env_doc = yaml.safe_load(file)
for env, doc in env_doc.items():
token = dict(doc[2])
url = dict(doc[1])
if config(token.get("env-token-name")) != "":
print("Gather data, hold on a minute")
DTTOKEN = config(token.get("env-token-name"))
DTURL = url.get("env-url")
# krp = krparser.KRParser(krparser.KROption.VALIDATE_EXISTS | krparser.KROption.VALIDATE_HASDATA ,DTURL, DTTOKEN)
slosF = get_slo(env, DTTOKEN, DTURL)
# slosF = slosF[slosF["id"]=="9c5b0581-acc2-3e70-97d3-531700f78b65"]
slosF = slosF[slosF["name"].str.startswith("TP_")]
# parse the metric Expression to get Services and Requests
krs = []
# krp = krparser.KRParser(options=krparser.KROption.RESOLVEKEYREQUETS | krparser.KROption.RESOLVESERVICES, DTAPIURL=DTURL, DTAPIToken=DTTOKEN)
krp = krparser.KRParser(
name=env,
options=krparser.KROption.RESOLVESERVICES
| krparser.KROption.VALIDATE_HASDATA,
config={
"threads": 10,
"serviceLookupParams": {"fields": "tags,fromRelationships"},
"extendResultObjects": {"env": env},
},
DTAPIURL=DTURL,
DTAPIToken=DTTOKEN,
)
krs = krp.parse(slosF)
reportItem[str(env)] = {}
for kr in krs:
reportItem[str(env)][kr.metadata["sloName"]] = {}
reportItem[str(env)][kr.metadata["sloName"]]["sloname"] = kr.metadata[
"sloName"
]
reportItem[str(env)][kr.metadata["sloName"]]["services"] = []
reportItem[str(env)][kr.metadata["sloName"]]["requests"] = []
for key_request in kr.keyRequests:
reportItem[str(env)][kr.metadata["sloName"]]["requests"].append(
{
"displayName": key_request["displayName"],
"entityId": key_request["entityId"],
}
)
for service in key_request["services"]:
# TODO: check if service already exists
if (
len(
reportItem[str(env)][kr.metadata["sloName"]]["services"]
)
> 0
or len(
reportItem[str(env)][kr.metadata["sloName"]]["services"]
)
== 0
):
if not check_if_service_already_exists(
reportItem[env][kr.metadata["sloName"]]["services"],
service["entityId"],
):
reportItem[str(env)][kr.metadata["sloName"]][
"services"
].append(
{
"type": service["type"],
"displayName": service["displayName"],
"entityId": service["entityId"],
}
)
if (
len(
reportItem[str(env)][kr.metadata["sloName"]][
"services"
]
)
== 0
):
# DEBUG
print(
f"ERROR: {reportItem[str(env)][kr.metadata['sloName']]} has no services"
)
else:
for service in reportItem[str(env)][
kr.metadata["sloName"]
]["services"]:
params = {
"entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")',
"fields": "fromRelationships,tags,properties",
}
entities = get_data_from_dynatrace(
throttling_rate,
DTTOKEN,
DTURL,
params,
"entities",
)
# print(entities["entities"])
# TODO: it is possible that "entities" is empty. maybe create check.
service["entities"] = entities["entities"]
for hosts in service["entities"]:
if "fromRelationships" in hosts:
if (
"runsOnHost"
in hosts["fromRelationships"]
):
for hosts in service["entities"]:
for host in hosts[
"fromRelationships"
]["runsOnHost"]:
host_response = (
get_data_from_dynatrace(
throttling_rate,
DTTOKEN,
DTURL,
host["id"],
"entities",
)
)
host["details"] = host_response
build_dataframe_data(reportItem)
if __name__ == "__main__":
main()