import re from KRParser import patterns, keyrequests, helper from enum import Flag, auto import logging import threading import concurrent.futures import time from jsonmerge import merge class KROption(Flag): VALIDATE_EXISTS = auto() VALIDATE_HASDATA = auto() RESOLVEKEYREQUETS = auto() RESOLVESERVICES = auto() class KRParser: #threadLimiter = threading.BoundedSemaphore(3) patterns=[patterns.Pattern1(), patterns.Pattern2(), patterns.Pattern3(), patterns.Pattern5(), patterns.Pattern4() ] lock = threading.Lock() def normalize(self,x): #tmp=x.replace("~","") tmp=x.replace("\n","") #tmp=tmp.replace("\"/","\"") #tmp=tmp.replace("\"/","") -_>was active #tmp=tmp.replace("/\"","\"") tmp=tmp.replace("/\"","") tmp=tmp.replace("\"","") tmp=tmp.replace("\t","") tmp=re.sub("([\s]*)\)", ")", tmp) tmp=re.sub("\([\s\n\r]*", "(", tmp) tmp=re.sub("\,[\s\n\r]*", ",", tmp) tmp=re.sub("\)[\s\n\r]*,", "),", tmp) tmp=re.sub("in[\s\n\r]*\(", "in(", tmp) return tmp def applyPatterns(self,subject): groups=None for p in self.patterns: groups=p.parseServicesAndMethods(subject) if len(groups) > 0: break return groups def checkKeyRequetsHasData(self,kr, tfrom, DTAPIURL, DTAPIToken): DTAPIURL = DTAPIURL + "/api/v2/entities" headers = { 'Content-Type': 'application/json', 'Authorization': 'Api-Token ' + DTAPIToken } for gid, group in enumerate(kr.matchedGroups): params={"entitySelector": group["existsQuery"], "from":tfrom["tfrom"], "fields": "fromRelationships"} response = helper.get_request(DTAPIURL, headers, params) entities = (response.json())['entities'] if len(entities) > 0: y=0 for method in kr.keyRequests: if method["groupId"] == gid: found = [x for x in entities if x[method["comparer"]] == method[method["comparer"]]] if len(found) > 0: method["hasData"][tfrom["label"]]=True #method["displayName"]=found[0]["displayName"] #method["entityId"]=found[0]["entityId"] #method["services"]=found[0]["fromRelationships"]["isServiceMethodOfService"] # for idx,o in enumerate(method["services"]): # tmpS=[p for p in kr.services if p["entityId"]==o["id"]] # if len(tmpS)>0: # method["services"][idx]=tmpS[0] else: method["hasData"][tfrom["label"]]=False def resolveServices(self,services, DTAPIURL, DTAPIToken): #DTAPIURL = DTAPIURL + "/api/v2/entities" headers = { 'Content-Type': 'application/json', 'Authorization': 'Api-Token ' + DTAPIToken } for gid, service in enumerate(services): query="type(SERVICE),entityId("+service["id"]+")" params=merge(self.serviceLookupParams,{"entitySelector": query}) #params={"entitySelector": query,"from":"now-2y", "fields":"tags"} response = helper.get_request(DTAPIURL, headers, params) entities = (response.json())['entities'] if len(entities)>0: services[gid]=entities[0] def resolveKeyRequests(self,kr, DTAPIURL, DTAPIToken, options): DTAPIURL = DTAPIURL + "/api/v2/entities" headers = { 'Content-Type': 'application/json', 'Authorization': 'Api-Token ' + DTAPIToken } for gid, k in enumerate(kr.keyRequests): try: query="type(service_method)" group=kr.matchedGroups[k["groupId"]] if len(group["services"])> 0: if group["services"][0].startswith("SERVICE-"): query+=",fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityId(\""+'","'.join(group["services"])+"\"))" else: query+=",fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityName.in(\""+'","'.join(group["services"])+"\"))" if k["comparer"]=="entityId": query+=",entityId("+k["entityId"]+")" else: query+=",entityName.in(\""+k["displayName"]+"\")" params={"entitySelector": query, "from":"now-1y","fields": "fromRelationships"} response = helper.get_request(DTAPIURL, headers, params) entities = (response.json())['entities'] # if len(entities) > 1: # kr.keyRequests[gid]['foundCount']=len(entities) # print("Multiple keyrequest found: ") if len(entities)> 0: kr.keyRequests[gid]["found"]=True kr.keyRequests[gid]['foundCount']=len(entities) kr.keyRequests[gid]["displayName"]=entities[0]["displayName"] kr.keyRequests[gid]["entityId"]=entities[0]["entityId"] if "isServiceMethodOfService" in entities[0]["fromRelationships"]: kr.keyRequests[gid]["services"]=entities[0]["fromRelationships"]["isServiceMethodOfService"] if KROption.RESOLVESERVICES in options and len( kr.keyRequests[gid]["services"])>0: self.resolveServices(kr.keyRequests[gid]["services"], DTAPIURL, DTAPIToken) except Exception as err: kr.keyRequests[gid]["exception"]="resolveKeyRequests failed: "+repr(err) #kr.mergeServices(entities) def getKeyRequestsByServices(self, services): #type(SERVICE_METHOD),fromRelationship.isServiceMethodOfService(type("SERVICE"),entityName.in("btc-user-composite-service - PROD")) DTAPIURL = self.DTAPIURL + "/api/v2/entities" headers = { 'Content-Type': 'application/json', 'Authorization': 'Api-Token ' + self.DTAPIToken } if len(services) > 0: if services[0].startswith("SERVICE-"): query="type(service_method),fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityId(\""+'","'.join(services)+"\"))" else: query="type(service_method),fromRelationship.isServiceMethodOfService(type(\"SERVICE\"),entityName.in(\""+'","'.join(services)+"\"))" params={"entitySelector": query} response = helper.get_request(DTAPIURL, headers, params) entities = (response.json())['entities'] return entities def process(self, kr): for gid, group in enumerate(kr.matchedGroups): if len(group["services"]) > 0 and len(group["methods"])==0: tmp_methods=self.getKeyRequestsByServices(group["services"]) for m in tmp_methods: tmp={"displayName": None,"comparer": "entityId", "entityId":m["entityId"], "groupId":gid, "hasData":{}, "services":[], "found":False, "foundCount":0, "exception":""} #"exists":None, 'hasData_1W':None, kr.keyRequests.append(tmp) for method in group["methods"]: if method.startswith('SERVICE_METHOD-'): tmp={"displayName": None,"comparer": "entityId", "entityId":method, "groupId":gid, "hasData":{}, "services":[], "found":False, "foundCount":0, "exception":""} #"exists":None, 'hasData_1W':None, else: tmp={"displayName":method,"comparer": "displayName", "entityId":None, "groupId":gid, "hasData":{}, "services":[], "found":False, "foundCount":0, "exception":""} #"exists":None, 'hasData_1W':None, kr.keyRequests.append(tmp) # for service in group["services"]: # if service.startswith('SERVICE-'): # tmp={"displayName": None,"comparer": "entityId", "entityId":service, "groupId":gid, "hasData":{}, "keyReuqests":[], "found":False, "foundCount":0, "exception":""} #"exists":None, 'hasData_1W':None, # else: # tmp={"displayName":service,"comparer": "displayName", "entityId":None, "groupId":gid, "hasData":{}, "keyReuqests":[], "found":False, "foundCount":0, "exception":""} #"exists":None, 'hasData_1W':None, # kr.services.append(tmp) if self.options: if KROption.RESOLVEKEYREQUETS in self.options: self.resolveKeyRequests(kr,self.DTAPIURL, self.DTAPIToken, self.options) if KROption.VALIDATE_HASDATA in self.options: self.checkKeyRequetsHasData(kr,{"label":"1W", "tfrom":"now-1w"},self.DTAPIURL, self.DTAPIToken) self.checkKeyRequetsHasData(kr,{"label":"1M", "tfrom":"now-1M"},self.DTAPIURL, self.DTAPIToken) # elif KROption.RESOLVEKEYREQUETS in self.options: # self.checkKeyRequetsHasData(kr, {"label":"1W", "tfrom":"now-1w"},self.DTAPIURL, self.DTAPIToken) # if KROption.RESOLVESERVICES in self.options: # self.resolveServices(kr,self.DTAPIURL, self.DTAPIToken) return kr def parseBySLO(self,index,row): #normalize print(index) try: normFilter=self.normalize(row['filter']) normExpresseion=self.normalize(row['metricExpression']) tmp_KR = keyrequests.KR({"sloName":row["name"], "env":row["env"], "metricExpression": normExpresseion, "filter": normFilter, "matchedGroups": None}) #SLO with Filter if normFilter.upper().startswith("TYPE(SERVICE_METHOD),") or normFilter.upper().startswith("TYPE(SERVICE),"): subject=normFilter else: subject=normExpresseion groups=self.applyPatterns(subject) tmp_KR.matchedGroups.append(groups) # for g in groups: # #if g["methods"] != None and len(g["methods"]) > 0: # tmp_KR.matchedGroups.append(g) #self.process(tmp_KR) kr=self.process(tmp_KR) with self.lock: self.krs.append(kr) except Exception as err: print(repr(err)) #return self.process(tmp_KR) def parseBySLO_Threaded(self, slosF): self.krs=[] #i=1 # threads = list() # for index, row in slosF.iterrows(): # logging.info("Main : create and start thread %d.", index) # x = threading.Thread(target=self.parseBySLO, args=(row,)) # threads.append(x) # x.start() # #krs.append(krp.parseBySLO(row)) # for index, thread in enumerate(threads): # logging.info("Main : before joining thread %d.", index) # thread.join() # logging.info("Main : thread %d done", index) # #resultSlos.extend(krs) with concurrent.futures.ThreadPoolExecutor(10) as executor: for index, row in slosF.iterrows(): # if i % 25 == 0: # time.sleep(0) #args={index:index, } executor.submit(self.parseBySLO, index,row) # print(str(i)+"\n") # i=i+1 # x = threading.Thread(target=self.parseBySLO, args=(row,)) # threads.append(x) # x.start() return self.krs def __init__(self, options: KROption=None ,serviceLookupParams={}, DTAPIURL=None, DTAPIToken=None ): self.DTAPIURL= DTAPIURL self.DTAPIToken=DTAPIToken self.options=options self.serviceLookupParams=merge({"from":"now-2y"},serviceLookupParams) self.krs=[]