master
ermisw 2022-12-16 09:59:01 +01:00
commit c151e57310
12 changed files with 942 additions and 0 deletions

138
.gitignore vendored Normal file
View File

@ -0,0 +1,138 @@
.vscode
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
### Terraform stuff
**/.terraform/*
crash.log
*.tfvars
#excel reports
*.xlsx

133
SLO.py Normal file
View File

@ -0,0 +1,133 @@
try:
# Python 3
from collections.abc import MutableSequence
except ImportError:
# Python 2.7
from collections import MutableSequence
class KeyRequestGroup(MutableSequence):
"""A container for manipulating lists of hosts"""
def __init__(self, data=None):
"""Initialize the class"""
super(KeyRequestGroup, self).__init__()
if (data is not None):
self._list = list(data)
else:
self._list = list()
def __repr__(self):
return "<{0} {1}>".format(self.__class__.__name__, self._list)
def __len__(self):
"""List length"""
return len(self._list)
def __getitem__(self, ii):
"""Get a list item"""
if isinstance(ii, slice):
return self.__class__(self._list[ii])
else:
return self._list[ii]
def __delitem__(self, ii):
"""Delete an item"""
del self._list[ii]
def __setitem__(self, ii, val):
# optional: self._acl_check(val)
self._list[ii] = val
def __str__(self):
return str(self._list)
def createExistsQuery(self, val):
query="type(service_method)"
#case Service Names exists
if len(val["services"]) > 0:
if val["services"][0].startswith("SERVICE-"):
query+=",fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityId(\""+'","'.join(val["services"])+"\"))"
else:
query+=",fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityName.in(\""+'","'.join(val["services"])+"\"))"
if val["methods"][0].startswith("SERVICE_METHOD-"):
query+=",entityId(\""+'","'.join(val["methods"])+"\")"
else:
query+=",entityName.in(\""+'","'.join(val["methods"])+"\")"
val["existsQuery"]= query
def insert(self, ii, val):
self.createExistsQuery(val)
self._list.insert(ii, val)
def append(self, val):
if len(self._list) == 0:
#self._list.insert(ii, val)
self.insert(len(self._list), val)
return
for group in self._list:
if len(set(group["services"]) - set(val["services"])) > 0 or len(set(group["methods"]) - set(val["methods"])) > 0:
self.insert(len(self._list), val)
from helper import get_request,contains
class SLO:
def checkKeyRequetsExists(self, DTAPIURL, DTAPIToken):
DTAPIURL = DTAPIURL + "/api/v2/entities"
headers = {
'Content-Type': 'application/json',
'Authorization': 'Api-Token ' + DTAPIToken
}
for group in self.keyRequestGroup:
params={"entitySelector": group["existsQuery"]}
response = get_request(DTAPIURL, headers, params)
entities = (response.json())['entities']
for method in group["methods"]:
comparer=None
if method.startswith('SERVICE_METHOD-'):
comparer="entityId"
else:
comparer="displayName"
found = [x for x in entities if x[comparer] == method]
if len(found) > 0:
#Keyrequest exists
tmp=found[0]
tmp["exists"]=True
else:
#Keyrequest not exists
tmp={"displayName":method,"type": "SERVICE_METHOD", "entityId":method, "exists":False}
self.keyRequests.append(tmp)
def checkKeyRequestsHasData(self):
pass
def __init__(self,
sloName,
metricExpression,
keyRequests_groups: KeyRequestGroup = None):
self.sloName=sloName
self.metricExpression=metricExpression
if keyRequests_groups == None:
self.keyRequestGroup = KeyRequestGroup()
else:
self.keyRequestGroup = keyRequests_groups
self.keyRequests=[]

346
createKeyRequestReport.py Normal file
View File

@ -0,0 +1,346 @@
from tracemalloc import start
from decouple import config
import sys
import yaml
import datetime
import time
import pandas as pd
#import requests
#import openpyxl
import argparse
import warnings
import os
import re
#import glob
import dynatraceAPI
from pagination import Pagionation
import types
import SLO
from patterns.Pattern1 import Pattern1, Pattern2, Pattern3
# import importlib
# from dynamic_import import *
# from patterns import a
warnings.filterwarnings("ignore")
# module = __import__("patterns.a")
# my_class = getattr(module, "a.APattern")
# instance = my_class()
# mod, modCl = dynamic_imp("patterns", "APattern")
# modCl.APattern("asdsad")
# path = os.path.dirname(os.path.abspath(__file__))+"/patterns"
# for py in [f[:-3] for f in os.listdir(path) if f.endswith('.py') and f != '__init__.py']:
# mod = __import__('.'.join([__name__, py]), fromlist=[py])
# classes = [getattr(mod, x) for x in dir(mod) if isinstance(getattr(mod, x), type)]
# for cls in classes:
# setattr(sys.modules[__name__], cls.__name__, cls)
# module = importlib.import_module('patterns.a')
# my_class = getattr(module, 'MyClass')
# my_instance = my_class()
#warning, there are warnings which are ignored!
# patterns=[
# {"regex": "type\(\"?service_method\"?\),.*fromRelationship\.isServiceMethodOfService\(.*type\(\"?service\"?\),entityName\.in\(([^\)]*).*,entityName\.in\(([^\)]*)"}
# ];
patterns=[Pattern1(), Pattern2(), Pattern3()]
def get_request(url, headers):
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
except requests.exceptions.HTTPError as errh:
return "An Http Error occurred:" + repr(errh)
except requests.exceptions.ConnectionError as errc:
return "An Error Connecting to the API occurred:" + repr(errc)
except requests.exceptions.Timeout as errt:
return "A Timeout Error occurred:" + repr(errt)
except requests.exceptions.RequestException as err:
return "An Unknown Error occurred" + repr(err)
return response
def getSLO(DTAPIToken, DTENV):
# DTENV = base url
# DTAPIToken = sec token
dtclient = dynatraceAPI.Dynatrace(DTENV, DTAPIToken)
my_params_report = {'pageSize': 25}
# gets all slos and filter later
api_url_report = "/api/v2/slo"
pages = dtclient.returnPageination(api_url_report, my_params_report, "slo")
#only_wanted = [x for x in pages.elements if str.lower(selector) in str.lower(x['description'])]
df = pd.DataFrame(pages.elements)
return df
def init_argparse():
parser = argparse.ArgumentParser(
usage="%(prog)s [--fromDate] [toDate] or [preSelect]",
description="gather SLO in daily slices for given Timeframe"
)
parser.add_argument(
"-f","--fromDate",
help = "YYYY-mm-dd e.g. 2022-01-01"
)
parser.add_argument(
"-t","--toDate",
help = "YYYY-mm-dd e.g. 2022-01-31"
)
parser.add_argument(
"-p","--preSelect",
help = "day | week | month - gathers the data for the last full day, week or month"
)
parser.add_argument(
"-s","--slices",
help = "h | d | t | y - writes the slices hourly, daily, total or year to date into ecxel. given in any order"
)
return parser
def check_inputs(args):
'''
This functions is the single point of true for arguments. If new arguments are added they need to be added in here. Returns from and to date.
'''
if args.preSelect and (args.fromDate or args.toDate):
print("--preSelect must not be used in conjuntion with --fromDate and/or --toDate")
sys.exit()
elif args.fromDate and not args.toDate:
print("--fromDate only in conjunction with --toDate")
sys.exit()
elif args.toDate and not args.fromDate:
print("--toDate only in conjunction with --fromDate")
sys.exit()
elif args.toDate and args.fromDate and not args.preSelect:
try:
#fromDate = datetime.date.fromisoformat(args.fromDate)
fromDate = datetime.datetime.strptime(args.fromDate, "%Y-%m-%d")
#toDate = datetime.date.fromisoformat(args.toDate)
toDate = datetime.datetime.strptime(args.toDate, "%Y-%m-%d")
except Exception as e:
print("Progam closed: " + str(e))
sys.exit()
if toDate < fromDate:
print("--toDate can't be older than --fromDate")
sys.exit()
if toDate > datetime.date.today() or fromDate > datetime.date.today():
print("--toDate or --fromDate can't be in the future")
sys.exit()
elif args.preSelect and not args.fromDate and not args.toDate:
date = datetime.date.today()
if args.preSelect == "week":
fromDate, toDate = previous_week_range(date)
elif args.preSelect == "month":
fromDate, toDate = previous_month_range(date)
elif args.preSelect == "day":
fromDate, toDate = previous_day_range(date)
else:
print("--preSelect must be week or month")
sys.exit()
else:
print("Invalid arguments, please use --help")
sys.exit()
if args.slices == None:
print("-s or --slices must not be null and needs at least one letter of h d t or y, lower- or uppercase.")
sys.exit()
elif sum([1 if one_inp in str.lower(args.slices) else 0 for one_inp in ['h','d','t','y'] ]) == 0:
print("-s or --slices must has at least one letter of h d t or y, lower- or uppercase.")
sys.exit()
return fromDate, toDate
def write_slo_to_excel(args, fromDate, hourlyall, dailyall, totalall, ytd):
touchpoints = ['Vehicle' , 'Mobile']
if args.preSelect == 'day':
today = datetime.date.today()
yesterday = today - datetime.timedelta(days = 1)
fileName = "./QM_Report_"+ str(yesterday) +".xlsx"
else:
fileName = "./QM_Report_" + str(fromDate.isocalendar()[1]) + ".xlsx"
writer = pd.ExcelWriter(fileName)
if not totalall.empty and 't' in str.lower(args.slices):
totalall = totalall[totalall['Touchpoint'].isin(touchpoints)]
totalall.to_excel(writer, sheet_name='total')
if not dailyall.empty and 'd' in str.lower(args.slices):
dailyall = dailyall[dailyall['Touchpoint'].isin(touchpoints)]
dailyall.to_excel(writer, sheet_name='daily')
if not hourlyall.empty and 'h' in str.lower(args.slices):
hourlyall = hourlyall[hourlyall['Touchpoint'].isin(touchpoints)]
hourlyall.to_excel(writer, sheet_name='hourly')
if not ytd.empty and 'y' in str.lower(args.slices):
ytd = ytd[ytd['Touchpoint'].isin(touchpoints)]
ytd.to_excel(writer, sheet_name='YTD')
writer.save()
writer.close()
def testProcess(matches):
services=[s.strip() for s in matches.group(1).split(",")]
methods=[s.strip() for s in matches.group(2).split(",")]
query="type(service_method),fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityName.in(\""+'","'.join(services)+"\")),entityName.in(\""+'","'.join(methods)+"\")"
# env = DTENV
# DTAPIToken = DTAPIToken
# DTAPIURL = env + "/api/v2/apiTokens"
# headers = {
# 'Content-Type': 'application/json',
# 'Authorization': 'Api-Token ' + DTAPIToken
# }
# data = '{"name":"' + tokenname + '","scopes":["InstallerDownload","ReadConfig","WriteConfig","DataExport"]}'
# #data = '{"name":"' + tokenname + '","scopes":["ReadConfig","DataExport","entities.read"]}'
# r = post_request(DTAPIURL,headers,data, proxies)
return
def parseAndCreateSLOObject(row):
normalizedMetric=normalize(row['metricExpression'])
tmp_SLO=SLO.SLO(row["name"], normalizedMetric, None)
for p in patterns:
services, methods=p.parseServicesAndMethods(normalizedMetric)
if methods != None and len(methods) > 0:
tmp_SLO.keyRequestGroup.append({"services":services,"methods":methods})
break
return tmp_SLO
def normalize(x):
tmp=x.replace("~","")
tmp=tmp.replace("\n","")
#tmp=tmp.replace("\"/","\"")
tmp=tmp.replace("\"/","")
#tmp=tmp.replace("/\"","\"")
tmp=tmp.replace("/\"","")
tmp=tmp.replace("\"","")
return tmp
# def start(dfSLOS):
# #dfSLOS['normalizedMetricExpression']= dfSLOS.apply(lambda row : normalize(row['metricExpression'], row['filter']))
# for index, row in dfSLOS.iterrows():
# #if row["id"]=="3a2d280d-83bb-3e45-bafc-e79c1429c79b":
# findKeyrequests(normalize(row['metricExpression']))
def main(slo_path):
with open('./environment.yaml') as file:
env_doc = yaml.safe_load(file)
slos=[]
#iterate through all environments
for item, doc in env_doc.items():
token = dict(doc[2])
url = dict(doc[1])
if(config(token.get('env-token-name')) != ""):
print("Gather data, hold on a minute")
DTTOKEN = config(token.get('env-token-name'))
DTURL = url.get('env-url')
slosF=getSLO(DTTOKEN, DTURL)
for index, row in slosF.iterrows():
#if row['id'] == "75165058-75c6-385e-a78e-b6ea3457f87d":
slos.append(parseAndCreateSLOObject(row))
print("huhu")
for slo in slos:
slo.checkKeyRequetsExists(DTURL, DTTOKEN)
x=0
# slo_configs = load_slo_parameter(slo_path)
# for one_slo_config in slo_configs:
# hub, selector_type, selector_var, yearstart, header_name = one_slo_config
# print(f"For the slo config was '{slo_path}' used with the config '{header_name}'.")
# for item, doc in env_doc.items():
# if not item in hub:
# print(f"{item} will be skipped since it is not in {hub}, which was selected in {slo_path}")
# continue
# token = dict(doc[2])
# url = dict(doc[1])
# print("Crawling through: " + item)
# print("Check if token exists in environment...")
# if(config(token.get('env-token-name')) != ""):
# print("Gather data, hold on a minute")
# DTTOKEN = config(token.get('env-token-name'))
# DTURL = url.get('env-url')
# ###Calc daily SLO
# if 'd' in str.lower(args.slices):
# dailyall = get_one_slice(item, DTTOKEN, DTURL, days, dailyall, selector_var, selector_type)
# #Calc hourly SLO
# if 'h' in str.lower(args.slices):
# hourlyall = get_one_slice(item, DTTOKEN, DTURL, hours, hourlyall, selector_var, selector_type)
# ###Calc Overall YTD SLO
# if 'y' in str.lower(args.slices):
# ytd = get_slice_ytd_total(DTTOKEN,DTURL,item, yearstart, days['endTime'].max(), 'Date', fromDate.year, ytd, selector_var, selector_type)
# ###Calc Overall SLO
# if 't' in str.lower(args.slices):
# totalall = get_slice_ytd_total(DTTOKEN,DTURL,item, days['startTime'].min(), days['endTime'].max(), 'Date', fromDate.isocalendar()[1], totalall, selector_var, selector_type)
# else:
# print("token not found, skipping " + item)
# write_slo_to_excel(args, fromDate, hourlyall, dailyall, totalall, ytd)
# print("It took {} seconds to run this script".format(time.time()-start_timer))
if __name__ == "__main__":
# for file in glob(os.path.join(os.path.dirname(os.path.abspath(__file__)), "*.py")):
# name = os.path.splitext(os.path.basename(file))[0]
# if name == "Animal" or name == "main": # avoid main.py and Animal.py
# continue
# # add package prefix to name, if required
# module = __import__(name)
# try:
# # get the class
# cls = getattr(module, name)
# # instantiate the class and call the method
# cls().feed()
# except Exception as e:
# print(e)
main('./slo_parameter.yaml')

40
dynatraceAPI.py Normal file
View File

@ -0,0 +1,40 @@
import logging
from typing import Dict
import os
import sys
file_dir = os.path.dirname(__file__)
sys.path.append(file_dir)
from more_utils.httpClient import HttpClient
import pagination
class Dynatrace:
def __init__(
self,
base_url: str,
token: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
):
self.__http_client = HttpClient(
base_url, token, log, proxies, too_many_requests_strategy, retries, retry_delay_ms
)
def returnPageination(self,path,params,list_item):
page = pagination.Pagionation(self.__http_client,path,params,list_item=list_item)
return page
def returnSingle(self,path):
response = self.__http_client.make_request(path)
json_response = response.json()
return json_response

31
environment.yaml Normal file
View File

@ -0,0 +1,31 @@
---
euprod:
- name: "EUprod"
- env-url: "https://xxu26128.live.dynatrace.com"
- env-token-name: "EUPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
#eupreprod:
- name: "eupreprod"
- env-url: "https://qqk70169.live.dynatrace.com"
- env-token-name: "EUPREPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
#napreprod:
- name: "napreprod"
- env-url: "https://onb44935.live.dynatrace.com"
- env-token-name: "NAPREPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
naprod:
- name: "naprod"
- env-url: "https://wgv50241.live.dynatrace.com"
- env-token-name: "NAPROD_TOKEN_VAR"
- jenkins: "https://jaws.bmwgroup.net/opapm/"
cnprod:
- name: "cnprod"
- env-url: "https://dyna-synth-cn.bmwgroup.com.cn/e/b921f1b9-c00e-4031-b9d1-f5a0d530757b"
- env-token-name: "CNPROD_TOKEN_VAR"
- jenkins: "https://jaws-china.bmwgroup.net/opmaas/"
#cnpreprod:
- name: "cnpreprod"
- env-url: "https://dynatracemgd-tsp.bmwgroup.net/e/ab88c03b-b7fc-45f0-9115-9e9ecc0ced35"
- env-token-name: "CNPREPROD_TOKEN_VAR"
- jenkins: "https://jaws-china.bmwgroup.net/opmaas/"

22
helper.py Normal file
View File

@ -0,0 +1,22 @@
import requests
def get_request(url, headers, params):
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status()
except requests.exceptions.HTTPError as errh:
return "An Http Error occurred:" + repr(errh)
except requests.exceptions.ConnectionError as errc:
return "An Error Connecting to the API occurred:" + repr(errc)
except requests.exceptions.Timeout as errt:
return "A Timeout Error occurred:" + repr(errt)
except requests.exceptions.RequestException as err:
return "An Unknown Error occurred" + repr(err)
return response
def contains(list, filter):
for x in list:
if filter(x):
return True
return False

0
more_utils/__init__.py Normal file
View File

116
more_utils/httpClient.py Normal file
View File

@ -0,0 +1,116 @@
import logging
from typing import Dict, Optional, Any
import time
import requests
import urllib3
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
TOO_MANY_REQUESTS_WAIT = "wait"
##Not sure where/why this is here
class DynatraceRetry(Retry):
def get_backoff_time(self):
return self.backoff_factor
class HttpClient:
def __init__(
self,
base_url: str,
token: str,
log: logging.Logger = None,
proxies: Dict = None,
too_many_requests_strategy=None,
retries: int = 0,
retry_delay_ms: int = 0,
#mc_jsession_id: Optional[str] = None,
#mc_b925d32c: Optional[str] = None,
#mc_sso_csrf_cookie: Optional[str] = None,
):
while base_url.endswith("/"):
base_url = base_url[:-1]
self.base_url = base_url
if proxies is None:
proxies = {}
self.proxies = proxies
self.auth_header = {"Authorization": f"Api-Token {token}"}
self.log = log
if self.log is None:
self.log = logging.getLogger(__name__)
self.log.setLevel(logging.WARNING)
st = logging.StreamHandler()
fmt = logging.Formatter("%(asctime)s - %(levelname)s - %(name)s - %(thread)d - %(filename)s:%(lineno)d - %(message)s")
st.setFormatter(fmt)
self.log.addHandler(st)
self.too_many_requests_strategy = too_many_requests_strategy
retry_delay_s = retry_delay_ms / 1000
try:
self.retries = Retry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
allowed_methods=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
except TypeError: # Older version of urllib3?
self.retries = Retry(
total=retries,
backoff_factor=retry_delay_s,
status_forcelist=[400, 401, 403, 404, 413, 429, 500, 502, 503, 504],
method_whitelist=["TRACE", "PUT", "DELETE", "OPTIONS", "HEAD", "GET", "POST"],
raise_on_status=False,
)
# This is for internal dynatrace usage
#self.mc_jsession_id = mc_jsession_id
#self.mc_b925d32c = mc_b925d32c
#self.mc_sso_csrf_cookie = mc_sso_csrf_cookie
def make_request(
self, path: str, params: Optional[Any] = None, headers: Optional[Dict] = None, method="GET", data=None, files=None, query_params=None
) -> requests.Response:
url = f"{self.base_url}{path}"
body = None
if method in ["POST", "PUT"]:
body = params
params = query_params
if headers is None:
headers = {}
if files is None and "content-type" not in [key.lower() for key in headers.keys()]:
headers.update({"content-type": "application/json"})
headers.update(self.auth_header)
cookies = None
#if self.mc_b925d32c and self.mc_sso_csrf_cookie and self.mc_jsession_id:
# headers.update({"Cookie": f"JSESSIONID={self.mc_jsession_id}; ssoCSRFCookie={self.mc_sso_csrf_cookie}; b925d32c={self.mc_b925d32c}"})
# cookies = {"JSESSIONID": self.mc_jsession_id, "ssoCSRFCookie": self.mc_sso_csrf_cookie, "b925d32c": self.mc_b925d32c}
s = requests.Session()
s.mount("https://", HTTPAdapter(max_retries=self.retries))
self.log.debug(f"Making {method} request to '{url}' with params {params} and body: {body}")
r = s.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies, data=data, cookies=cookies, files=files)
self.log.debug(f"Received response '{r}'")
while r.status_code == 429 and self.too_many_requests_strategy == TOO_MANY_REQUESTS_WAIT:
sleep_amount = int(r.headers.get("retry-after", 5))
self.log.warning(f"Sleeping for {sleep_amount}s because we have received an HTTP 429")
time.sleep(sleep_amount)
r = requests.request(method, url, headers=headers, params=params, json=body, verify=False, proxies=self.proxies)
if r.status_code >= 400:
raise Exception(f"Error making request to {url}: {r}. Response: {r.text}")
return r

68
pagination.py Normal file
View File

@ -0,0 +1,68 @@
from more_utils.httpClient import HttpClient
class Pagionation():
def __init__(self, http_client, target_url, target_params=None, headers=None, list_item="result"):
#self.__target_class = target_class
self.__http_client: HttpClient = http_client
self.__target_url = target_url
self.__target_params = target_params
self.__headers = headers
self.__list_item = list_item
self._has_next_page = True
self.__total_count = None
self.__page_size = None
self.elements = self._get_next_page()
def __iter__(self):# -> Iterator[T]:
for element in self.__elements:
yield element
while self._has_next_page:
new_elements = self._get_next_page()
for element in new_elements:
yield element
def __len__(self):
return self.__total_count or len(self.__elements)
def _get_next_page(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("totalCount") or len(elements)
while self._has_next_page == True:
self.__target_url = self.__target_url.split("?")[0]
elements += self._get_response()
return elements
def _get_response(self):
response = self.__http_client.make_request(self.__target_url, params=self.__target_params, headers=self.__headers)
json_response = response.json()
data = []
if json_response.get("nextPageKey", None):
self._has_next_page = True
self.__target_params = {"nextPageKey": json_response["nextPageKey"]}
else:
self._has_next_page = False
if self.__list_item in json_response:
elements = json_response[self.__list_item]
self.__total_count = json_response.get("totalCount") or len(elements)
return elements

41
patterns/Pattern1.py Normal file
View File

@ -0,0 +1,41 @@
import re
class Pattern1:
def parseServicesAndMethods(self, metricExpression):
result = re.findall(r"type\(\"?service_method\"?\)[\s\n\r]*,[\s\n\r]*fromRelationship[\s\n\r]*\.[\s\n\r]*isServiceMethodOfService[\s\n\r]*\([\s\n\r]*type\(\"?service\"?\)[\s\n\r]*,[\s\n\r]*entityName[\s\n\r]*\.[\s\n\r]*in[\s\n\r]*\([\s\n\r]*([^\)]*)\)[\s\n\r]*\)[\s\n\r]*\,[\s\n\r]*entityName[\s\n\r]*\.[\s\n\r]*in\([\s\n\r]*([^\)]*)[\s\n\r]*\)", metricExpression,flags=re.IGNORECASE|re.X|re.MULTILINE)
services=[]
methods=[]
if result:
for r in result:
services=[s.strip() for s in r[0].split(",")]
methods=[s.strip() for s in r[1].split(",")]
return services, methods
class Pattern2:
def parseServicesAndMethods(self, metricExpression):
result = re.findall(r"type\(\"?service_method\"?\)[\s\n\r]*,[\s\n\r]*fromRelationship[\s\n\r]*\.[\s\n\r]*isServiceMethodOfService[\s\n\r]*\([\s\n\r]*type\(\"?service\"?\)[\s\n\r]*,[\s\n\r]*entityName[\s\n\r]*\.[\s\n\r]*in[\s\n\r]*\([\s\n\r]*([^\)]*)\),[\s\n\r]*tag\([^\)]*[\s\n\r]*\)[\s\n\r]*\)[\s\n\r]*\,[\s\n\r]*entityName[\s\n\r]*\.[\s\n\r]*in\([\s\n\r]*([^\)]*)[\s\n\r]*\)", metricExpression,flags=re.IGNORECASE|re.X|re.MULTILINE)
services=[]
methods=[]
if result:
for r in result:
services=[s.strip() for s in r[0].split(",")]
methods=[s.strip() for s in r[1].split(",")]
return services, methods
class Pattern3:
def parseServicesAndMethods(self, metricExpression):
result = re.findall(r"type\(\"?service_method\"?\)[\s\n\r]*,[\s\n\r]*entityId[\s\n\r]*[\s\n\r]*\([\s\n\r]*[\s\n\r]*([^\)]*)[\s\n\r]*\)", metricExpression,flags=re.IGNORECASE|re.X|re.MULTILINE)
services=[]
methods=[]
if result:
for r in result:
methods=[s.strip() for s in r.split(",")]
return services, methods

0
patterns/__init__.py Normal file
View File

7
requirements.txt Normal file
View File

@ -0,0 +1,7 @@
python-decouple
pyyaml
pandas
requests
datetime
argparse
openpyxl