463 lines
19 KiB
Python
463 lines
19 KiB
Python
import yaml
|
|
from decouple import config
|
|
import dynatraceAPI
|
|
import pandas as pd
|
|
from pagination import Pagionation
|
|
from KRParser import krparser
|
|
|
|
|
|
from datetime import datetime, timedelta
|
|
import datetime
|
|
import json
|
|
|
|
import typing
|
|
from decorators import timer
|
|
import requests
|
|
import urllib.parse
|
|
import time
|
|
|
|
|
|
def get_slo(ENV: str, DTAPIToken: str, DTENV: str) -> pd.DataFrame:
|
|
"""
|
|
Returns SLO data from dynatrace
|
|
|
|
Args:
|
|
ENV (str): Environment (euprod, naprod, cnprod)
|
|
DTAPIToken (str): Token for respective environment
|
|
DTENV (str): Full URL for the respective environment
|
|
|
|
Returns:
|
|
pd.DataFrame: Dataframe containing data from dynatrace
|
|
"""
|
|
# DTENV = base url
|
|
# DTAPIToken = sec token
|
|
dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken)
|
|
my_params_report = {"pageSize": 25}
|
|
# gets all slos and filter later
|
|
api_url_report = "/api/v2/slo"
|
|
pages = dtclient.returnPageination(api_url_report, my_params_report, "slo")
|
|
df = pd.DataFrame(pages.elements)
|
|
df["env"] = ENV
|
|
return df
|
|
|
|
|
|
def build_params(params: typing.Dict) -> str:
|
|
"""
|
|
Builds the parameter dictionary to a formatted string
|
|
|
|
Args:
|
|
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation
|
|
|
|
Returns:
|
|
str: Returns the query string
|
|
"""
|
|
query_string = "&".join(
|
|
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
|
|
)
|
|
return query_string
|
|
|
|
|
|
def get_data_from_dynatrace(
|
|
throttling_rate: float | int,
|
|
token: str,
|
|
env_url: str,
|
|
params: typing.Dict | str,
|
|
route: str,
|
|
) -> typing.Dict:
|
|
"""
|
|
Sends out GET request to dynatrace
|
|
|
|
Args:
|
|
throttling (float | int ): If needed set timeout for throttling
|
|
token (str): Token for dynatrace API
|
|
env_url (str): Url for the respective environment
|
|
params (typing.Dict | str): Parameters as dictionary as stated on dynatrace documentation
|
|
route (str): Route for the request
|
|
|
|
Returns:
|
|
typing.Dict: Returns the response as
|
|
"""
|
|
|
|
time.sleep(throttling_rate)
|
|
|
|
if type(params) is dict:
|
|
params_string = f"?{build_params(params)}"
|
|
elif type(params) is str:
|
|
params_string = f"/{params}"
|
|
|
|
headers = {"Authorization": f"Api-Token {token}"}
|
|
host_response = requests.get(
|
|
f"{env_url}/api/v2/{route}{params_string}",
|
|
headers=headers,
|
|
verify=False,
|
|
)
|
|
if host_response.status_code == 200:
|
|
return host_response.json()
|
|
else:
|
|
# TODO: proper error handling
|
|
print(f"ERROR - {host_response.status_code}")
|
|
|
|
|
|
def check_if_service_already_exists(services: list, entity_id: str) -> bool:
|
|
"""
|
|
Requests point to the same service. This leads to double entries but we only need the data once.
|
|
|
|
Args:
|
|
services (list): List with services
|
|
entity_id (str): Entity Id for lookup
|
|
|
|
Returns:
|
|
bool: Returns True if the service is already present else False.
|
|
"""
|
|
result = False
|
|
for service in services:
|
|
if service["entityId"] == entity_id:
|
|
result = True
|
|
return result
|
|
|
|
|
|
def get_process_group_data(df: pd.DataFrame) -> typing.Dict:
|
|
"""
|
|
Gets process group data from dynatrace
|
|
|
|
Args:
|
|
df (pd.DataFrame): Dataframe with process group ids
|
|
|
|
Returns:
|
|
typing.Dict: Returns dictionary with unique process group data
|
|
"""
|
|
hub_data = {}
|
|
with open("./environment.yaml") as file:
|
|
env_doc = yaml.safe_load(file)
|
|
|
|
for env, doc in env_doc.items():
|
|
token = dict(doc[2])
|
|
url = dict(doc[1])
|
|
|
|
hub_data[env] = {}
|
|
|
|
if config(token.get("env-token-name")) != "":
|
|
DTTOKEN = config(token.get("env-token-name"))
|
|
DTURL = url.get("env-url")
|
|
|
|
hub_data[env]["token"] = DTTOKEN
|
|
hub_data[env]["url"] = DTURL
|
|
|
|
unique_process_groups_per_hub = {}
|
|
unique_hubs = df["environment"].unique()
|
|
for hub in unique_hubs:
|
|
unique_process_groups_per_hub[hub] = {}
|
|
|
|
# hub_value = hub
|
|
process_groups_unique = df.query(f"environment == @hub")
|
|
|
|
process_groups_unique = process_groups_unique["process_group_id"].unique()
|
|
for process_group in process_groups_unique:
|
|
params = {
|
|
"entitySelector": f'type("PROCESS_GROUP"),entityId("{process_group}")',
|
|
"fields": "firstSeenTms,lastSeenTms,tags",
|
|
}
|
|
data = get_data_from_dynatrace(
|
|
0.1, hub_data[hub]["token"], hub_data[hub]["url"], params, "entities"
|
|
)
|
|
if data is not None:
|
|
unique_process_groups_per_hub[hub][process_group] = data["entities"]
|
|
else:
|
|
print(f"{process_group} returned None")
|
|
|
|
return unique_process_groups_per_hub
|
|
|
|
|
|
def build_dataframe_for_report(report_items: typing.Dict) -> pd.DataFrame:
|
|
"""
|
|
Builds a pandas dataframe based on received items from dynatrace
|
|
|
|
Args:
|
|
report_items (typing.Dict): Dictionary containing the data from dynatrace
|
|
|
|
Returns:
|
|
pd.DataFrame: Contains data as requested for further processing
|
|
"""
|
|
|
|
df = pd.DataFrame(report_items)
|
|
process_group_data = get_process_group_data(df)
|
|
|
|
for hub in process_group_data:
|
|
for pgid in process_group_data[hub]:
|
|
if len(process_group_data[hub][pgid]) == 0:
|
|
# TODO: Custom device group returns null data - handling needed
|
|
print(f"ERROR: {hub} - {pgid} | no data returned from dynatrace")
|
|
else:
|
|
df.loc[
|
|
(df["environment"] == hub) & (df["process_group_id"] == pgid),
|
|
"process_group_name",
|
|
] = process_group_data[hub][pgid][0]["displayName"]
|
|
df.loc[
|
|
(df["environment"] == hub) & (df["process_group_id"] == pgid),
|
|
"first_seen_process_group",
|
|
] = process_group_data[hub][pgid][0]["firstSeenTms"]
|
|
|
|
print("Writing to xlsx")
|
|
write_xlsx(df)
|
|
return df
|
|
|
|
|
|
def write_xlsx(df: pd.DataFrame) -> None:
|
|
"""
|
|
Takes in a pandas dataframe and generates writes it into a XLSX file
|
|
|
|
Args:
|
|
df (pd.DataFrame): Dataframe containing the necessary data for the report
|
|
"""
|
|
filename = f"CoCo-APM-Report_{datetime.date.today()}.xlsx"
|
|
writer = pd.ExcelWriter(filename, engine="xlsxwriter")
|
|
df.to_excel(writer, sheet_name="hosts", index=False)
|
|
writer.close()
|
|
|
|
|
|
def build_dataframe_data(data: typing.Dict) -> None:
|
|
"""
|
|
This function builds the data for the dataframe, which will be used to generate the report. Contains all data but process_groups.
|
|
|
|
Args:
|
|
data (typing.Dict): Takes in the dictionary containing all the raw data from dynatrace.
|
|
"""
|
|
df_data = []
|
|
|
|
for hub in data:
|
|
for slo in data[hub]:
|
|
slo_name = data[hub][slo]["sloname"]
|
|
if len(data[hub][slo]["services"]) > 0:
|
|
for service in data[hub][slo]["services"]:
|
|
if len(service["entities"]) > 0:
|
|
for entity in service["entities"]:
|
|
# get compass id of service here. in tags
|
|
compass_id_service = []
|
|
if "tags" in entity:
|
|
for tag in entity["tags"]:
|
|
if tag["key"] == "compass-id":
|
|
compass_id_service.append(tag["value"])
|
|
compass_id_service = ",".join(compass_id_service)
|
|
# get container name here
|
|
container_name = "None"
|
|
if "properties" in entity:
|
|
if "softwareTechnologies" in entity["properties"]:
|
|
for technology in entity["properties"][
|
|
"softwareTechnologies"
|
|
]:
|
|
if (
|
|
technology["type"] == "DOCKER"
|
|
or technology["type"] == "CONTAINERD"
|
|
):
|
|
container_name = entity["properties"][
|
|
"detectedName"
|
|
]
|
|
if "fromRelationships" in entity:
|
|
if "runsOnHost" in entity["fromRelationships"]:
|
|
for host in entity["fromRelationships"][
|
|
"runsOnHost"
|
|
]:
|
|
df_data_item = {
|
|
"slo_name": slo_name,
|
|
"host_name": host["details"]["displayName"],
|
|
"host_id": host["id"],
|
|
"environment": hub,
|
|
"container_name": container_name,
|
|
"process_group_id": "",
|
|
"process_group_name": "",
|
|
"licensing_tag_host": "",
|
|
"licensing_tag_process_group": "",
|
|
"first_seen_process_group": "",
|
|
"first_seen_host": host["details"][
|
|
"firstSeenTms"
|
|
],
|
|
"last_seen_host": host["details"][
|
|
"lastSeenTms"
|
|
],
|
|
"compass_id_host": "",
|
|
"compass_id_service": compass_id_service,
|
|
}
|
|
|
|
compass_id = []
|
|
namespace = []
|
|
|
|
for tag in host["details"]["tags"]:
|
|
if tag["key"] == "Platform":
|
|
df_data_item["platform"] = tag["value"]
|
|
if tag["key"] == "Namespace":
|
|
# df_data_item["namespace"] = tag["value"]
|
|
namespace.append(tag["value"])
|
|
if tag["key"] == "PaaS":
|
|
df_data_item["paas"] = tag["value"]
|
|
if tag["key"] == "compass-id":
|
|
# df_data_item["compass_id"] = tag[
|
|
# "value"
|
|
# ]
|
|
if "value" in tag:
|
|
compass_id.append(tag["value"])
|
|
|
|
df_data_item["compass_id_host"] = ",".join(
|
|
compass_id
|
|
)
|
|
|
|
df_data_item["namespace"] = ",".join(namespace)
|
|
|
|
# TODO: rework
|
|
if "runsOn" in entity["fromRelationships"]:
|
|
for process_group in entity[
|
|
"fromRelationships"
|
|
]["runsOn"]:
|
|
df_data_item[
|
|
"process_group_id"
|
|
] = process_group["id"]
|
|
|
|
df_data.append(df_data_item)
|
|
|
|
build_dataframe_for_report(df_data)
|
|
|
|
|
|
@timer
|
|
def main() -> None:
|
|
"""
|
|
Entrypoint.
|
|
"""
|
|
throttling_rate: int | float = 0 # only tested with 0.5
|
|
reportItem = {}
|
|
with open("./environment.yaml") as file:
|
|
env_doc = yaml.safe_load(file)
|
|
|
|
for env, doc in env_doc.items():
|
|
token = dict(doc[2])
|
|
url = dict(doc[1])
|
|
|
|
if config(token.get("env-token-name")) != "":
|
|
print("Gather data, hold on a minute")
|
|
DTTOKEN = config(token.get("env-token-name"))
|
|
DTURL = url.get("env-url")
|
|
|
|
# krp = krparser.KRParser(krparser.KROption.VALIDATE_EXISTS | krparser.KROption.VALIDATE_HASDATA ,DTURL, DTTOKEN)
|
|
|
|
slosF = get_slo(env, DTTOKEN, DTURL)
|
|
# slosF = slosF[slosF["id"]=="9c5b0581-acc2-3e70-97d3-531700f78b65"]
|
|
slosF = slosF[slosF["name"].str.startswith("TP_")]
|
|
|
|
# parse the metric Expression to get Services and Requests
|
|
|
|
krs = []
|
|
# krp = krparser.KRParser(options=krparser.KROption.RESOLVEKEYREQUETS | krparser.KROption.RESOLVESERVICES, DTAPIURL=DTURL, DTAPIToken=DTTOKEN)
|
|
|
|
krp = krparser.KRParser(
|
|
name=env,
|
|
options=krparser.KROption.RESOLVESERVICES
|
|
| krparser.KROption.VALIDATE_HASDATA,
|
|
config={
|
|
"threads": 10,
|
|
"serviceLookupParams": {"fields": "tags,fromRelationships"},
|
|
"extendResultObjects": {"env": env},
|
|
},
|
|
DTAPIURL=DTURL,
|
|
DTAPIToken=DTTOKEN,
|
|
)
|
|
|
|
krs = krp.parse(slosF)
|
|
|
|
reportItem[str(env)] = {}
|
|
|
|
for kr in krs:
|
|
reportItem[str(env)][kr.metadata["sloName"]] = {}
|
|
reportItem[str(env)][kr.metadata["sloName"]]["sloname"] = kr.metadata[
|
|
"sloName"
|
|
]
|
|
reportItem[str(env)][kr.metadata["sloName"]]["services"] = []
|
|
reportItem[str(env)][kr.metadata["sloName"]]["requests"] = []
|
|
|
|
for key_request in kr.keyRequests:
|
|
reportItem[str(env)][kr.metadata["sloName"]]["requests"].append(
|
|
{
|
|
"displayName": key_request["displayName"],
|
|
"entityId": key_request["entityId"],
|
|
}
|
|
)
|
|
|
|
for service in key_request["services"]:
|
|
# TODO: check if service already exists
|
|
if (
|
|
len(
|
|
reportItem[str(env)][kr.metadata["sloName"]]["services"]
|
|
)
|
|
> 0
|
|
or len(
|
|
reportItem[str(env)][kr.metadata["sloName"]]["services"]
|
|
)
|
|
== 0
|
|
):
|
|
if not check_if_service_already_exists(
|
|
reportItem[env][kr.metadata["sloName"]]["services"],
|
|
service["entityId"],
|
|
):
|
|
reportItem[str(env)][kr.metadata["sloName"]][
|
|
"services"
|
|
].append(
|
|
{
|
|
"type": service["type"],
|
|
"displayName": service["displayName"],
|
|
"entityId": service["entityId"],
|
|
}
|
|
)
|
|
|
|
if (
|
|
len(
|
|
reportItem[str(env)][kr.metadata["sloName"]][
|
|
"services"
|
|
]
|
|
)
|
|
== 0
|
|
):
|
|
# DEBUG
|
|
print(
|
|
f"ERROR: {reportItem[str(env)][kr.metadata['sloName']]} has no services"
|
|
)
|
|
else:
|
|
for service in reportItem[str(env)][
|
|
kr.metadata["sloName"]
|
|
]["services"]:
|
|
params = {
|
|
"entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")',
|
|
"fields": "fromRelationships,tags,properties",
|
|
}
|
|
entities = get_data_from_dynatrace(
|
|
throttling_rate,
|
|
DTTOKEN,
|
|
DTURL,
|
|
params,
|
|
"entities",
|
|
)
|
|
# print(entities["entities"])
|
|
# TODO: it is possible that "entities" is empty. maybe create check.
|
|
service["entities"] = entities["entities"]
|
|
for hosts in service["entities"]:
|
|
if "fromRelationships" in hosts:
|
|
if (
|
|
"runsOnHost"
|
|
in hosts["fromRelationships"]
|
|
):
|
|
for hosts in service["entities"]:
|
|
for host in hosts[
|
|
"fromRelationships"
|
|
]["runsOnHost"]:
|
|
host_response = (
|
|
get_data_from_dynatrace(
|
|
throttling_rate,
|
|
DTTOKEN,
|
|
DTURL,
|
|
host["id"],
|
|
"entities",
|
|
)
|
|
)
|
|
host["details"] = host_response
|
|
build_dataframe_data(reportItem)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|