Compare commits

..

No commits in common. "master" and "production" have entirely different histories.

22 changed files with 42 additions and 930 deletions

135
.gitignore vendored
View File

@ -1,135 +0,0 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
### Terraform stuff
**/.terraform/*
crash.log
*.tfvars

89
Jenkinsfile vendored
View File

@ -1,46 +1,14 @@
//not required right now as CN is reachable from EMEA as well
def loopEnvironments(environments){
print env.JENKINS_URL
environments.each { key, val ->
//Execute only if you are on the same environment
//not required right now as CN is reachable from EMEA as well
if (env.JENKINS_URL == environments."${key}"[3].'jenkins')
{
envname = environments."${key}"[0].'name'
envurl = environments."${key}"[1].'env-url'
tokenname = environments."${key}"[2].'env-token-name'
sh 'python createReport.py "${envname}"'
}
}
}
pipeline {
options {
ansiColor('xterm')
}
//label libraryBuild is available in CN JAWS and ROW JAWS, therefore this one was used; no additional intents
agent {label 'libraryBuild'}
parameters {
string(name: 'maxUsers', defaultValue: '2', description: 'Enter the number of users, to consider a group as empty')
string(name: 'lastLogin', defaultValue: '60', description: 'Enter the amount of days since the last login to consider a user inactive')
}
agent{label 'libraryBuild'}
//here comes the trigger according to crontabs - jenkins is in UTC
//here comes the trigger according to crontabs
triggers {
//every 1st of every month at 00:00
cron('0 0 1 * *')
//every day at 08:00
//cron('0 8 * * *')
//every monday at 08:00
//cron('0 8 * * MON')
}
environment {
//ProxySettings
@ -54,56 +22,25 @@
HTTPS_PROXY="${https_proxy}"
NO_PROXY="${no_proxy}"
ACCOUNT_TOKEN_VAR = credentials('ACCOUNT_TOKEN_VAR')
EUPROD_TOKEN_VAR = credentials('EUPROD_TOKEN_VAR')
EUPREPROD_TOKEN_VAR = credentials('EUPREPROD_TOKEN_VAR')
NAPROD_TOKEN_VAR = credentials('NAPROD_TOKEN_VAR')
NAPREPROD_TOKEN_VAR = credentials('NAPREPROD_TOKEN_VAR')
CNPROD_TOKEN_VAR = credentials('CNPROD_TOKEN_VAR')
CNPREPROD_TOKEN_VAR = credentials('CNPREPROD_TOKEN_VAR')
}
stages {
stage('install required python packages') {
stage('install required packages') {
steps {
sh '''
pip3 install --user -r requirements.txt
pip install -upgrade pip
pip install -r requirements.txt
'''
print env.JENKINS_URL
}
}
stage('Execute Reporting Script') {
steps {
script{
def SCRIPT_PARAMETER = ''
SCRIPT_PARAMETER = " -mU " + maxUsers.toString()
SCRIPT_PARAMETER = SCRIPT_PARAMETER + " -ll " + lastLogin.toString()
sh """cd ./src
python3 createReport.py ${SCRIPT_PARAMETER}
pwd
ls -la
"""
}
//Only required once CN is not reachable from EMEA
//loopEnvironments(environments)
}
}
stage('Send report') {
steps {
script {
try {
emailext subject: env.JOB_NAME,
body: 'Please find the output of your reports attached',
to: 'bmw.dynatrace@nttdata.com, coco-apm@bmw.de, ops-xibix@list.bmw.com, omo-xibix@list.bmw.com, omo@bmwgroup.com',
replyTo: 'coco-apm@bmw.de',
attachmentsPattern: '*.xlsx'
}
catch ( mailExc ){
echo "Sending Email Failed: ${mailExc}"
}
}
}
}
}
}
post {
always {
cleanWs()

View File

@ -1,21 +1 @@
# Dynatrace Account Managment Reporting
This repository holds the code for creating the following reporting data from the Dynatrace Account Management API
- configured User Groups without any users
- local User groups including it's users
- User groups with identical permissions
- Users which have not logged in to Dynatrace since 60 days
- Users who never logged in to Dynatrace
The report is sent monthly via mail to coco-apm@bmw.de and bmw.dynatrace@nttdata.com
***
## Jenkins environments
EMEA & NA: https://jaws.bmwgroup.net/opapm/
## Run the script / parameters
The script takes 2 parameters
- -mU --> Maximum amount of users within a group to consider this group as empty(default is "2" to catch groups with only one user)
- -ll --> Days since the last login to consider an user as inactive
> createReport.py -mU 2 -ll 60
init repo

Binary file not shown.

Before

Width:  |  Height:  |  Size: 40 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 141 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 53 KiB

View File

@ -1,6 +1,24 @@
AccountAPI:
- name: "AccountAPI"
- accound-id: "0d209e1f-8c3b-46b5-8437-aeca260ef22f"
- client-id: "dt0s02.AIHLLMYX"
- scope: "account-idm-read account-idm-write"
- env-token-name: "ACCOUNT_TOKEN_VAR"
euprod:
- name: "euprod"
- env-url: "https://xxu26128.live.dynatrace.com"
- env-token-name: "EUPROD_TOKEN_VAR"
eupreprod:
- name: "eupreprod"
- env-url: "https://qqk70169.live.dynatrace.com"
- env-token-name: "EUPREPROD_TOKEN_VAR"
napreprod:
- name: "napreprod"
- env-url: "https://onb44935.live.dynatrace.com"
- env-token-name: "NAPREPROD_TOKEN_VAR"
naprod:
- name: "naprod"
- env-url: "https://wgv50241.live.dynatrace.com"
- env-token-name: "NAPROD_TOKEN_VAR"
cnprod:
- name: "cnprod"
- env-url: "https://dynatracemgd-cn.bmwgroup.net/e/b921f1b9-c00e-4031-b9d1-f5a0d530757b"
- env-token-name: "CNPROD_TOKEN_VAR"
cnpreprod:
- name: "cnpreprod"
- env-url: "https://dynatracemgd-cn.bmwgroup.net/e/b921f1b9-c00e-4031-b9d1-f5a0d530757b"
- env-token-name: "CNPREPROD_TOKEN_VAR"

View File

@ -1,6 +1,6 @@
pandas
python-decouple
PyYAML
pyyaml
pandas
requests
urllib3
openpyxl
datetime
argparse

Binary file not shown.

View File

@ -1,89 +0,0 @@
import pandas as pd
from datetime import datetime, timedelta
from dtaccountapi.services.groups import GroupOwner
from dtaccountapi.services.users import userStatus
from dtaccountapi import DynatraceAccountAPI
def flattend_pd(permissions,permname):
data = {}
for perm in permissions:
data.update({perm["scope"] + '_' + perm["permissionName"] : 1.0})
row = pd.Series(data, name=permname)
return row
def createReportData(client_id, client_secret, account_id, scope, maxUser, lastLogin):
dtAPI = DynatraceAccountAPI(account_id, client_id, client_secret, scope)
emptyGroups = {}
localGroups = {}
permissions = {}
permission_df = pd.DataFrame()
maxLogin = datetime.now() - timedelta(days=60)
for group in dtAPI.groups.list():
#get local groups
local = False
if group.owner == GroupOwner.LOCAL:
localGroups[group.name] = group.json()
local = True
#get all group permissions for comparison
permission_df = permission_df.append(flattend_pd(dtAPI.groups.get_permissions(group.uuid).json()["permissions"],group.name))
#get users in groups
users = dtAPI.groups.get_users(group.uuid)
#if group is local, add data to report
if local == True:
localGroups[group.name]["users"] = [user.json() for user in users]
#get empty groups
if len(users) < 4:
emptyGroups[group.name] = group.json()
emptyGroups[group.name]["users"] = [user.json() for user in users]
#find groups with identical permissions
permission_df = permission_df.fillna(0)
duplicated_df = permission_df[permission_df.duplicated(subset=permission_df.columns.difference(["name"]),keep=False)]
nunique = duplicated_df.nunique()
cols_to_drop = nunique[nunique == 1].index
duplicated_df = duplicated_df.drop(cols_to_drop,axis=1)
notLoggedInUsers = {}
neverLoggedInUsers = {}
for user in dtAPI.users.list():
if user.userStatus == userStatus.ACTIVE and user.userLoginMetadata is not None:
if user.userLoginMetadata.lastSuccessfulLogin < maxLogin:
notLoggedInUsers[user.email] = {}
notLoggedInUsers[user.email]["name"] = user.name
notLoggedInUsers[user.email]["surname"] = user.surname
notLoggedInUsers[user.email]["lastLogin"] = user.userLoginMetadata.lastSuccessfulLogin
notLoggedInUsers[user.email]["loginCount"] = user.userLoginMetadata.successfulLoginCounter
else:
neverLoggedInUsers[user.email] = {}
neverLoggedInUsers[user.email]["name"] = user.name
neverLoggedInUsers[user.email]["surname"] = user.surname
neverLoggedInUsers[user.email]["status"] = user.userStatus
writer = pd.ExcelWriter("../AccountReport.xlsx")
df_emptyGroups = pd.DataFrame.from_dict(emptyGroups,orient="index")
df_emptyGroups.to_excel(writer,sheet_name="EmptyGroups")
df_localGroups = pd.DataFrame.from_dict(localGroups,orient="index")
df_localGroups.to_excel(writer,sheet_name="LocalGroups")
df_notLoggedInUsers = pd.DataFrame.from_dict(notLoggedInUsers, orient="index")
df_notLoggedInUsers.to_excel(writer,sheet_name="NotLoggedInUsers_60d")
df_neverLoggedInUsers = pd.DataFrame.from_dict(neverLoggedInUsers, orient="index")
df_neverLoggedInUsers.to_excel(writer,sheet_name="NeverLoggedInUsers")
duplicated_df.to_excel(writer,sheet_name="IdenticalGroups")
writer.save()
writer.close()

View File

@ -1,48 +0,0 @@
from decouple import config
import yaml
import argparse
from createAccMgmtReport import createReportData
def init_argparse():
parser = argparse.ArgumentParser()
parser.add_argument(
"-mU","--maxUser",
default = 0,
type = int,
help = "Maximum User in a group to consider a group empty"
)
parser.add_argument(
"-ll","--lastLogin",
default = 60,
type = int,
help = "Days since last login to report inactive users"
)
return parser
def main():
parser = init_argparse()
args = parser.parse_args()
with open('../environment.yaml') as file:
doc = yaml.safe_load(file)
for item, doc in doc.items():
account_id = dict(doc[1]).get('accound-id')
client_id = dict(doc[2]).get('client-id')
scope = dict(doc[3]).get('scope')
client_secret = dict(doc[4])
print("Crawling through: " + item)
print("Check if token exists in environment...")
if(config(client_secret.get('env-token-name')) != ""):
print("Gather data, hold on a minute")
client_secret = config(client_secret.get('env-token-name'))
createReportData(client_id, client_secret, account_id, scope, args.maxUser, args.lastLogin)
else:
print("token not found, skipping " + item)
if __name__ == "__main__":
main()

View File

@ -1,2 +0,0 @@
from dtaccountapi.main import DynatraceAccountAPI
from dtaccountapi.http_client import TOO_MANY_REQUESTS_WAIT

View File

@ -1,36 +0,0 @@
import pprint
from typing import Any, Dict, Optional
from requests import Response
from dtaccountapi.http_client import HttpClient
class DynatraceObject:
def __init__(self,
http_client: Optional[HttpClient] = None,
headers: Optional[Dict[str, str]] = None,
raw_element: Optional[Dict[str, Any]] = None):
if raw_element is None:
raw_element = {}
self._http_client = http_client
self._headers = headers
self._raw_element = raw_element
self._create_from_raw_data(raw_element)
def _create_from_raw_data(self, raw_element: Dict[str, Any]):
pass
def __repr__(self):
return f"{self.__class__.__name__}({pprint.pformat(self._raw_element, width=130)})"
def _make_request(self,
path: str,
params: Optional[Dict] = None,
headers: Optional[Dict] = None,
method = "GET",
data = None) -> Response:
return self._http_client.make_request(path, params, headers, method, data)
def json(self):
return self._raw_element

View File

@ -1,171 +0,0 @@
import json
import logging
from typing import Dict, Optional, Any
import time
from datetime import datetime
import requests
import urllib3
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TOO_MANY_REQUESTS_WAIT = "wait"
class DynatraceRetry(Retry):
def get_backoff_time(self):
return self.backoff_factor
class HttpClient:
def __init__(
self,
auth_url: str,
base_url: str,
account_id: str,
client_id: str,
client_secret: str,
scope: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
print_bodies: bool = False,
):
while base_url.endswith("/"):
base_url = base_url[:-1]
self.base_url = base_url
self.auth_url = auth_url
self.account_id = account_id
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self.token_expires = 300
self.print_bodies = print_bodies
self.bearer = ""
self.token_age = ""
if proxies is None:
proxies = {}
self.proxies = proxies
self.log = log
if self.log is None:
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.WARNING)
st = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(thread)d - %(filename)s:%(lineno)d - %(message)s")
st.setFormatter(fmt)
self.log.addHandler(st)
self.too_many_requests_strategy = too_many_requests_strategy
retry_delay_s = retry_delay_ms / 1000
try:
self.retries = DynatraceRetry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
allowed_methods=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
except TypeError: # Older version of urllib3?
self.retries = DynatraceRetry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
method_whitelist=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
self.authorize()
self.auth_header = {"Authorization": f"Bearer {self.bearer}"}
def authorize(self) -> requests.Response:
headers = {}
headers.update({"content-type": "application/x-www-form-urlencoded"})
body = {
"grant_type" : "client_credentials",
"client_id" : self.client_id,
"client_secret" : self.client_secret,
"scope" : self.scope,
"resource" : f"urn:dtaccount:{self.account_id}"
}
self.log.debug(f"Auhtorization request to '{self.auth_url}'...")
if self.print_bodies:
print(body, self.auth_url)
if body:
print(json.dumps(body, indent=2))
r = requests.post(self.auth_url, data=body, headers=headers, verify=False, proxies=self.proxies)
self.log.debug(f"Received response '{r}'")
while r.status_code == 429 and self.too_many_requests_strategy == TOO_MANY_REQUESTS_WAIT:
sleep_amount = int(r.headers.get("retry-after", 5))
self.log.warning(f"Sleeping for {sleep_amount}s because we have received an HTTP 429")
time.sleep(sleep_amount)
r = requests.post(self.auth_url, data=body, headers=headers, verify=False, proxies=self.proxies)
if r.status_code >= 400:
raise Exception(f"Error making request to {self.auth_url}: {r}. Response: {r.text}")
self.bearer = r.json()['access_token']
self.token_age = datetime.now()
self.token_expires = r.json()['expires_in']
return r
def make_request(
self, path: str, params: Optional[Any] = None, headers: Optional[Dict] = None, method="GET", data=None, files=None, query_params=None
) -> requests.Response:
url = f"{self.base_url}/{self.account_id}{path}"
body = None
if method in ["POST", "PUT"]:
body = params
params = query_params
if headers is None:
headers = {}
if files is None and "content-type" not in [key.lower() for key in headers.keys()]:
headers.update({"content-type": "application/json"})
diff = datetime.now() - self.token_age
#reduce maximum token age by 20 seconds to avoid race conditions
if diff.seconds > (self.token_expires-20):
self.authorize()
self.auth_header = {"Authorization": f"Bearer {self.bearer}"}
headers.update(self.auth_header)
cookies = None
s = requests.Session()
s.mount("https://", HTTPAdapter(max_retries=self.retries))
self.log.debug(f"Making {method} request to '{url}' with params {params} and body: {body}")
if self.print_bodies:
print(method, url)
if body:
print(json.dumps(body, indent=2))
r = s.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies, data=data, cookies=cookies, files=files)
self.log.debug(f"Received response '{r}'")
while r.status_code == 429 and self.too_many_requests_strategy == TOO_MANY_REQUESTS_WAIT:
sleep_amount = int(r.headers.get("retry-after", 5))
self.log.warning(f"Sleeping for {sleep_amount}s because we have received an HTTP 429")
time.sleep(sleep_amount)
r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies)
if r.status_code >= 400:
raise Exception(f"Error making request to {url}: {r}. Response: {r.text}")
return r

View File

@ -1,29 +0,0 @@
import logging
import requests
from dtaccountapi.http_client import HttpClient
from dtaccountapi.services.groups import GroupService
from dtaccountapi.services.users import UserService
class DynatraceAccountAPI:
def __init__(
self,
account_id: str,
client_id: str,
client_secret: str,
scope: str
):
self.__http_client = HttpClient("https://sso.dynatrace.com/sso/oauth2/token","https://api.dynatrace.com/iam/v1/accounts/",account_id = account_id, client_id = client_id,client_secret = client_secret,scope = scope,print_bodies=False)
self.groups: GroupService = GroupService(self.__http_client)
self.users: UserService = UserService(self.__http_client)
def makeRequest(self,path: str, method="GET", data=None, query_params=None) -> requests.Response:
response = self.__http_client.make_request(path=path, method=method, data=data, query_params=query_params)
json_response = response.json()
return json_response

View File

@ -1,96 +0,0 @@
from typing import Generic, TypeVar, Iterator, TYPE_CHECKING
from dtaccountapi.dynatrace_object import DynatraceObject
from dtaccountapi.http_client import HttpClient
T = TypeVar("T", bound=DynatraceObject)
class PaginatedList(Generic[T]):
def __init__(self, target_class, http_client, target_url, target_params=None, headers=None, list_item="result"):
self.__target_class = target_class
self.__http_client: HttpClient = http_client
self.__target_url = target_url
self.__target_params = target_params
self.__headers = headers
self.__list_item = list_item
self._has_next_page = True
self.__total_count = None
self.__page_size = None
self.__elements = self._get_next_page()
def __getitem__(self, index):
pass
def __iter__(self) -> Iterator[T]:
for element in self.__elements:
yield element
while self._has_next_page:
new_elements = self._get_next_page()
for element in new_elements:
yield element
def __len__(self):
return self.__total_count or len(self.__elements)
def _get_next_page(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("count") or len(elements)
data = [self.__target_class(self.__http_client, response.headers, element) for element in elements]
return data
class HeaderPaginatedList(Generic[T]):
def __init__(self, target_class, http_client, target_url, target_params=None, headers=None):
self.__elements = list()
self.__target_class = target_class
self.__http_client: HttpClient = http_client
self.__target_url = target_url
self.__target_params = target_params
self.__headers = headers
self._has_next_page = True
self.__total_count = None
self.__page_size = None
def __getitem__(self, index):
pass
def __iter__(self) -> Iterator[T]:
for element in self.__elements:
yield element
while self._has_next_page:
new_elements = self._get_next_page()
for element in new_elements:
yield element
def __len__(self):
return self.__total_count or len(self.__elements)
def _get_next_page(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
headers = response.headers
if "next-page-key" in headers:
self._has_next_page = True
self.__target_params = {"nextPageKey": headers["next-page-key"]}
else:
self._has_next_page = False
elements = json_response
self.__total_count = headers.get("total-count") or len(elements)
data = [self.__target_class(self.__http_client, response.headers, element) for element in elements]
return data

View File

@ -1,74 +0,0 @@
from enum import Enum
from typing import List, Optional, Dict, Any
from datetime import datetime
from dynatrace_object import DynatraceObject
from pagination import PaginatedList
from http_client import HttpClient
class Permission(DynatraceObject):
def _create_from_raw_data(self, raw_element):
self.permissionName: Permission = Permission(raw_element["permissionName"])
self.scope: str = (raw_element["scope"])
self.scopeType: scopeType = scopeType(raw_element["scopeType"])
self.createdAt: Optional[datetime] = raw_element["createdAt"]
self.updatedAt: Optional[datetime] = raw_element["updatedAt"]
class Group(DynatraceObject):
def _create_from_raw_data(self, raw_element: Dict[str, Any]):
self.uuid: Optional[str] = raw_element.get("uuid")
self.name: str = raw_element["name"]
self.owner: GroupOwner = GroupOwner(raw_element["owner"])
self.description: Optional[str] = raw_element["description"]
self.federatedAttributeValues: Optional[List[raw_element["federatedAttributeValues"]]]
self.hidden: bool = raw_element["hidden"]
self.createdAt: Optional[datetime] = raw_element["createdAt"]
self.updatedAt: Optional[datetime] = raw_element["updatedAt"]
self.permissions: Optional[List[Permission]] = [Permission(raw_element=permission) for permission in raw_element.get("permissionName", [])]
class GroupStub(DynatraceObject):
def _create_from_raw_data(self, raw_element: Dict[str, Any]):
self.uuid: Optional[str] = raw_element.get("uuid")
self.name: str = raw_element["name"]
self.owner: GroupOwner = GroupOwner(raw_element["owner"])
self.description: Optional[str] = raw_element["description"]
self.federatedAttributeValues: Optional[List[raw_element["federatedAttributeValues"]]]
self.hidden: bool = raw_element["hidden"]
self.createdAt: Optional[datetime] = raw_element["createdAt"]
self.updatedAt: Optional[datetime] = raw_element["updatedAt"]
class GroupService:
ENDPOINT = "/groups"
def __init__(self, http_client: HttpClient):
self.__http_client = http_client
def list(self) -> PaginatedList[GroupStub]:
"""
Lists all alerting profiles in the environmemt. No configurable parameters.
"""
return PaginatedList(GroupStub, self.__http_client, f"{self.ENDPOINT}", list_item="items")
class GroupOwner(Enum):
SAML = "SAML"
LOCAL = "LOCAL"
class scopeType(Enum):
tenant = "tenant"
account = "account"
class permissionName(Enum):
account_company_info = "account-company-info"
account_user_management = "account-user-management"
account_viewer = "account-viewer"
tenant_viewer = "tenant-viewer"
tenant_manage_settings = "tenant-manage-settings"
tenant_agent_install = "tenant-agent-install"
tenant_logviewer = "tenant-logviewer"
tenant_view_sensitive_request_data = "tenant-view-sensitive-request-data"
tenant_configure_request_capture_data = "tenant-configure-request-capture-data"
tenant_replay_sessions_with_masking = "tenant-replay-sessions-with-masking"
tenant_replay_sessions_without_masking = "tenant-replay-sessions-without-masking"
tenant_manage_security_problems = "tenant-manage-security-problems"
tenant_manage_support_tickets = "tenant-manage-support-tickets"

View File

@ -1,80 +0,0 @@
from enum import Enum
from typing import List, Optional, Dict, Any
from datetime import datetime
from dtaccountapi.dynatrace_object import DynatraceObject
from dtaccountapi.pagination import PaginatedList
from dtaccountapi.http_client import HttpClient
from dtaccountapi.services.users import User
class Permission(DynatraceObject):
def _create_from_raw_data(self, raw_element):
self.permissionName: Permission = Permission(raw_element["permissionName"])
self.scope: str = (raw_element["scope"])
self.scopeType: scopeType = scopeType(raw_element["scopeType"])
self.createdAt: Optional[datetime] = raw_element["createdAt"]
self.updatedAt: Optional[datetime] = raw_element["updatedAt"]
class Group(DynatraceObject):
def _create_from_raw_data(self, raw_element: Dict[str, Any]):
self.uuid: Optional[str] = raw_element.get("uuid")
self.name: str = raw_element["name"]
self.owner: GroupOwner = GroupOwner(raw_element["owner"])
self.description: Optional[str] = raw_element["description"]
self.federatedAttributeValues: Optional[List[raw_element["federatedAttributeValues"]]]
self.hidden: bool = raw_element["hidden"]
self.createdAt: Optional[datetime] = raw_element["createdAt"]
self.updatedAt: Optional[datetime] = raw_element["updatedAt"]
self.permissions: Optional[List[Permission]] = [Permission(raw_element=permission) for permission in raw_element.get("permissionName", [])]
self.users: Optional[List[User]] = [User(raw_element=user) for user in raw_element.get("items", [])]
class GroupStub(DynatraceObject):
def _create_from_raw_data(self, raw_element: Dict[str, Any]):
self.uuid: Optional[str] = raw_element.get("uuid")
self.name: str = raw_element["name"]
self.owner: GroupOwner = GroupOwner(raw_element["owner"])
self.description: Optional[str] = raw_element["description"]
self.federatedAttributeValues: Optional[List[raw_element["federatedAttributeValues"]]]
self.hidden: bool = raw_element["hidden"]
self.createdAt: Optional[datetime] = raw_element["createdAt"]
self.updatedAt: Optional[datetime] = raw_element["updatedAt"]
class GroupService:
ENDPOINT = "/groups"
def __init__(self, http_client: HttpClient):
self.__http_client = http_client
def list(self) -> PaginatedList[Group]:
return PaginatedList(Group, self.__http_client, f"{self.ENDPOINT}", list_item="items")
def get_permissions(self, group_uuid: str) -> Group:
response = self.__http_client.make_request(f"{self.ENDPOINT}/{group_uuid}/permissions")
return Group(http_client=self.__http_client, raw_element=response.json())
def get_users(self, group_uuid: str) -> Group:
return PaginatedList(target_class = User, http_client = self.__http_client, target_url=f"{self.ENDPOINT}/{group_uuid}/users",list_item="items")
class GroupOwner(Enum):
SAML = "SAML"
LOCAL = "LOCAL"
class scopeType(Enum):
tenant = "tenant"
account = "account"
class permissionName(Enum):
account_company_info = "account-company-info"
account_user_management = "account-user-management"
account_viewer = "account-viewer"
tenant_viewer = "tenant-viewer"
tenant_manage_settings = "tenant-manage-settings"
tenant_agent_install = "tenant-agent-install"
tenant_logviewer = "tenant-logviewer"
tenant_view_sensitive_request_data = "tenant-view-sensitive-request-data"
tenant_configure_request_capture_data = "tenant-configure-request-capture-data"
tenant_replay_sessions_with_masking = "tenant-replay-sessions-with-masking"
tenant_replay_sessions_without_masking = "tenant-replay-sessions-without-masking"
tenant_manage_security_problems = "tenant-manage-security-problems"
tenant_manage_support_tickets = "tenant-manage-support-tickets"

View File

@ -1,52 +0,0 @@
from enum import Enum
from typing import List, Optional, Dict, Any
from datetime import datetime
from dtaccountapi.dynatrace_object import DynatraceObject
from dtaccountapi.pagination import PaginatedList
from dtaccountapi.http_client import HttpClient
from dtaccountapi.utils import iso8601_to_datetime
#from dtaccountapi.services.groups import Group
class User(DynatraceObject):
def _create_from_raw_data(self, raw_element):
self.uid: str = (raw_element["uid"])
self.email: str = (raw_element["email"])
self.name: str = (raw_element.get("name"))
self.surname: str = (raw_element.get("surname"))
self.emergencyContact: Optional[bool] = (raw_element.get("emergencyContact"))
self.userStatus: Optional[userStatus] = userStatus(raw_element.get("userStatus"))
self.userLoginMetadata: Optional[userLoginMetadata] = userLoginMetadata(raw_element = raw_element["userLoginMetadata"]) if raw_element.get("userLoginMetadata") else None
self.groups: Optional[List[Group]] = [Group(raw_element=g) for g in raw_element.get("groups",[]) ]
class userLoginMetadata(DynatraceObject):
def _create_from_raw_data(self, raw_element):
self.successfulLoginCounter: int = (raw_element["successfulLoginCounter"])
self.failedLoginCounter: int (raw_element["failedLoginCounter"])
self.lastSuccessfulLogin: Optional[datetime] = iso8601_to_datetime(raw_element.get("lastSuccessfulLogin"))
self.lastFailedLogin: Optional[datetime] = iso8601_to_datetime(raw_element.get("lastFailedLogin"))
self.resetPasswordTokenSentAt: Optional[datetime] = (raw_element.get("resetPasswordTokenSentAt"))
self.lastSuccessfulBasicAuthentication: Optional[datetime] = (raw_element.get("lastSuccessfulBasicAuthentication"))
self.createdAt: datetime = (raw_element.get("createdAt"))
self.updatedAt: Optional[datetime] = (raw_element.get("updatedAt"))
class UserService:
ENDPOINT = "/users"
def __init__(self, http_client: HttpClient):
self.__http_client = http_client
def list(self) -> PaginatedList[User]:
return PaginatedList(User, self.__http_client, f"{self.ENDPOINT}", list_item="items")
def get_user(self, mail: str) -> User:
response = self.__http_client.make_request(f"{self.ENDPOINT}/{mail}")
return User(http_client=self.__http_client, raw_element=response.json())
class userStatus(Enum):
ACTIVE = "ACTIVE"
INACTIVE = "INACTIVE"
PENDING = "PENDING"
DELETED = "DELETED"
ECUSTOMS_MANUALLY_BLOCKED = "ECUSTOMS_MANUALLY_BLOCKED"
NONE = None

View File

@ -1,11 +0,0 @@
from datetime import datetime
from typing import Optional
ISO_8601 = "%Y-%m-%dT%H:%M:%SZ"
def iso8601_to_datetime(timestamp: Optional[str]) -> Optional[datetime]:
if isinstance(timestamp, str):
return datetime.strptime(timestamp, ISO_8601)
return timestamp