added data structure
parent
daa21616ff
commit
22ce1c53c1
241
create_report.py
241
create_report.py
|
|
@ -11,6 +11,9 @@ import json
|
|||
|
||||
import typing
|
||||
from decorators import timer
|
||||
import requests
|
||||
import urllib.parse
|
||||
import time
|
||||
|
||||
|
||||
def get_slo(ENV, DTAPIToken, DTENV) -> pd.DataFrame:
|
||||
|
|
@ -26,16 +29,168 @@ def get_slo(ENV, DTAPIToken, DTENV) -> pd.DataFrame:
|
|||
return df
|
||||
|
||||
|
||||
def build_params(params: typing.Dict) -> str:
|
||||
"""
|
||||
Builds the parameter dictionary to a formatted string
|
||||
|
||||
Args:
|
||||
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation
|
||||
|
||||
Returns:
|
||||
str: Returns the query string
|
||||
"""
|
||||
query_string = "&".join(
|
||||
f"{key}={urllib.parse.quote(value)}" for key, value in params.items()
|
||||
)
|
||||
return query_string
|
||||
|
||||
|
||||
# TODO: remove env parameter
|
||||
def get_hosts_from_dynatrace(
|
||||
env: str, token: str, env_url: str, params: typing.Dict, route: str
|
||||
) -> typing.Dict:
|
||||
"""
|
||||
Sends out GET request to dynatrace
|
||||
|
||||
Args:
|
||||
env (str): Environment (euprod|naprod|cnprod)
|
||||
token (str): Token for dynatrace API
|
||||
env_url (str): Url for the respective environment
|
||||
params (typing.Dict): Parameters as dictionary as stated on dynatrace documentation
|
||||
route (str): Route for the request
|
||||
|
||||
Returns:
|
||||
typing.Dict: Returns the response as
|
||||
"""
|
||||
|
||||
# TODO: add nextpage key feature
|
||||
|
||||
time.sleep(0.5)
|
||||
|
||||
if type(params) is dict:
|
||||
params_string = f"?{build_params(params)}"
|
||||
elif type(params) is str:
|
||||
params_string = f"/{params}"
|
||||
|
||||
headers = {"Authorization": f"Api-Token {token}"}
|
||||
host_response = requests.get(
|
||||
f"{env_url}/api/v2/{route}{params_string}",
|
||||
headers=headers,
|
||||
verify=False,
|
||||
)
|
||||
if host_response.status_code == 200:
|
||||
return host_response.json()
|
||||
else:
|
||||
# TODO: proper error handling
|
||||
print(f"ERROR - {host_response.status_code}")
|
||||
|
||||
|
||||
def previous_week_range(date: datetime):
|
||||
start_date = date + timedelta(-date.weekday(), weeks=-1)
|
||||
end_date = date + timedelta(-date.weekday() - 1)
|
||||
return start_date, end_date
|
||||
|
||||
|
||||
def build_dataframe_for_report(report_items: typing.Dict) -> pd.DataFrame:
|
||||
"""
|
||||
Builds a pandas dataframe based on received items from dynatrace
|
||||
|
||||
Args:
|
||||
report_items (typing.Dict): Dictionary containing the data from dynatrace
|
||||
|
||||
Returns:
|
||||
pd.DataFrame: Contains data as requested for further processing
|
||||
"""
|
||||
|
||||
df_data = [] # fill list with dictionary objects which contain requested data
|
||||
|
||||
df = pd.DataFrame(
|
||||
df_data,
|
||||
columns=[
|
||||
"slo_name",
|
||||
"host_name",
|
||||
"host_id",
|
||||
"environment",
|
||||
"paas",
|
||||
"platform",
|
||||
"process_group_id",
|
||||
"process_group_name",
|
||||
"namespace",
|
||||
"licensing_tag_host",
|
||||
"licensing_tag_process_group",
|
||||
"first_seen_process_group",
|
||||
"first_seen_host",
|
||||
],
|
||||
)
|
||||
|
||||
print(df)
|
||||
return df
|
||||
|
||||
|
||||
def write_xlsx(df: pd.DataFrame) -> bool:
|
||||
pass
|
||||
|
||||
|
||||
def develop_load_json():
|
||||
with open("test-data.json", "r") as f:
|
||||
data = json.loads(f.read())
|
||||
|
||||
with open("./environment.yaml") as file:
|
||||
env_doc = yaml.safe_load(file)
|
||||
|
||||
for env, doc in env_doc.items():
|
||||
# DEBUG
|
||||
if env == "euprod":
|
||||
token = dict(doc[2])
|
||||
url = dict(doc[1])
|
||||
|
||||
if config(token.get("env-token-name")) != "":
|
||||
print("Gather data, hold on a minute")
|
||||
DTTOKEN = config(token.get("env-token-name"))
|
||||
DTURL = url.get("env-url")
|
||||
|
||||
for slo in data[env]:
|
||||
if len(data[env][slo]["services"]) == 0:
|
||||
# DEBUG
|
||||
print(f"ERROR: {slo} has no services")
|
||||
else:
|
||||
for service in data[env][slo]["services"]:
|
||||
params = {
|
||||
"entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")',
|
||||
"fields": "fromRelationships,tags",
|
||||
}
|
||||
entities = get_hosts_from_dynatrace(
|
||||
env, DTTOKEN, DTURL, params, "entities"
|
||||
)
|
||||
# TODO: it is possible that "entities" is empty. maybe create check.
|
||||
service["entities"] = entities["entities"]
|
||||
for hosts in service["entities"]:
|
||||
if "fromRelationships" in hosts:
|
||||
if "runsOnHost" in hosts["fromRelationships"]:
|
||||
for host in hosts["fromRelationships"][
|
||||
"runsOnHost"
|
||||
]:
|
||||
# TODO: make dynatrace call to /entites/{entityId}
|
||||
print(f'{slo} - {host["id"]}')
|
||||
# host_response = get_hosts_from_dynatrace(
|
||||
# env, DTTOKEN, DTURL, host["id"], "entities"
|
||||
# )
|
||||
# host["details"] = host_response
|
||||
|
||||
with open("test-data-with-hosts-2.json", "w") as f:
|
||||
f.write(json.dumps(data, indent=4))
|
||||
|
||||
|
||||
def check_if_service_already_exists(services: list, entity_id: str) -> bool:
|
||||
result = False
|
||||
for service in services:
|
||||
if service["entityId"] == entity_id:
|
||||
result = True
|
||||
return result
|
||||
|
||||
|
||||
@timer
|
||||
def main() -> None:
|
||||
# Get All SLOs
|
||||
|
||||
reportItem = {}
|
||||
with open("./environment.yaml") as file:
|
||||
env_doc = yaml.safe_load(file)
|
||||
|
|
@ -102,14 +257,82 @@ def main() -> None:
|
|||
)
|
||||
|
||||
for service in key_request["services"]:
|
||||
reportItem[str(env)][kr.metadata["sloName"]]["services"].append(
|
||||
{
|
||||
"type": service["type"],
|
||||
"displayName": service["displayName"],
|
||||
"entityId": service["entityId"],
|
||||
}
|
||||
)
|
||||
# TODO: check if service already exists
|
||||
if (
|
||||
len(
|
||||
reportItem[str(env)][kr.metadata["sloName"]]["services"]
|
||||
)
|
||||
> 0
|
||||
or len(
|
||||
reportItem[str(env)][kr.metadata["sloName"]]["services"]
|
||||
)
|
||||
== 0
|
||||
):
|
||||
if not check_if_service_already_exists(
|
||||
reportItem[str(env)][kr.metadata["sloName"]][
|
||||
"services"
|
||||
],
|
||||
service["entityId"],
|
||||
):
|
||||
reportItem[str(env)][kr.metadata["sloName"]][
|
||||
"services"
|
||||
].append(
|
||||
{
|
||||
"type": service["type"],
|
||||
"displayName": service["displayName"],
|
||||
"entityId": service["entityId"],
|
||||
}
|
||||
)
|
||||
if (
|
||||
len(
|
||||
reportItem[str(env)][kr.metadata["sloName"]][
|
||||
"services"
|
||||
]
|
||||
)
|
||||
== 0
|
||||
):
|
||||
# DEBUG
|
||||
print(
|
||||
f"ERROR: {reportItem[str(env)][kr.metadata['sloName']]} has no services"
|
||||
)
|
||||
else:
|
||||
for service in reportItem[str(env)][
|
||||
kr.metadata["sloName"]
|
||||
]["services"]:
|
||||
params = {
|
||||
"entitySelector": f'type("SERVICE"),entityId("{service["entityId"]}")',
|
||||
"fields": "fromRelationships,tags",
|
||||
}
|
||||
entities = get_hosts_from_dynatrace(
|
||||
env, DTTOKEN, DTURL, params, "entities"
|
||||
)
|
||||
# TODO: it is possible that "entities" is empty. maybe create check.
|
||||
service["entities"] = entities["entities"]
|
||||
for hosts in service["entities"]:
|
||||
if "fromRelationships" in hosts:
|
||||
if (
|
||||
"runsOnHost"
|
||||
in hosts["fromRelationships"]
|
||||
):
|
||||
for hosts in service["entities"]:
|
||||
for host in hosts[
|
||||
"fromRelationships"
|
||||
]["runsOnHost"]:
|
||||
host_response = (
|
||||
get_hosts_from_dynatrace(
|
||||
env,
|
||||
DTTOKEN,
|
||||
DTURL,
|
||||
host["id"],
|
||||
"entities",
|
||||
)
|
||||
)
|
||||
host["details"] = host_response
|
||||
|
||||
with open("test-data-with-hosts-main.json", "w") as f:
|
||||
f.write(json.dumps(reportItem, indent=4))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
# develop_load_json()
|
||||
|
|
|
|||
Loading…
Reference in New Issue