601 lines
21 KiB
Python
601 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
################################################################################
|
|
# #
|
|
# Purpose : Python msend for BHOM #
|
|
# #
|
|
# (C) Copyright 2024 BMC Software, All Rights Reserved #
|
|
# #
|
|
################################################################################
|
|
|
|
import sys
|
|
import json
|
|
import getopt
|
|
import socket
|
|
import random
|
|
import requests
|
|
import os
|
|
import time
|
|
import logging
|
|
from logging.handlers import RotatingFileHandler
|
|
import datetime
|
|
import shutil
|
|
import configparser
|
|
import requests
|
|
from retrying import retry
|
|
import threading
|
|
import glob
|
|
|
|
|
|
################################################################################
|
|
# Constants and Error Messages
|
|
################################################################################
|
|
|
|
|
|
ITM_VERSION = "0.4"
|
|
ITM_BUILD = "June 20th, 2024"
|
|
ITM_COPYRIGHT = "(c) Copyright 2024 BMC Software, All Rights Reserved"
|
|
|
|
|
|
ERR_CELLNAME = "Cannot get information about cell '{}' in directory file\n"
|
|
ERR_CLI = "Invalid argument(s): {}\n"
|
|
ERR_DONOTHING = "No event input (eg {SourceFile}, '-' or -a Class) is provided"
|
|
ERR_FILECLASS = "{SourceFile} argument (or '-') is not compatible with -a, -b, -m, -o or -r options"
|
|
ERR_FILESTDIN = "{SourceFile} argument is not compatible with '-' (read from standard input)"
|
|
ERR_GETHOSTBYNAME = "gethostbyname failed on '{}': {}\n"
|
|
ERR_HOSTNAME = "Cannot get the hostname of the local machine: {}\n"
|
|
ERR_MISSINGCLASS = "Class (-a options) is missing"
|
|
ERR_OPEN = "Cannot open file '{}': {}\n"
|
|
ERR_SOCKCLOSE = "Couldn't disconnect on '{}:{}': {}\n"
|
|
ERR_SOCKCONNECT = "Couldn't connect to '{}:{}': {}\n"
|
|
ERR_SOCKPRINT = "Print failed on socket: {}\n"
|
|
ERR_NODESTORKEY = "BHOM destination or apiKey not properly provided\n"
|
|
|
|
|
|
################################################################################
|
|
# Loading Configuration from config.ini
|
|
################################################################################
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read('config.ini')
|
|
|
|
################################################################################
|
|
# Logging and Verbose
|
|
################################################################################
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
|
|
logger = logging.getLogger()
|
|
|
|
quiet = False
|
|
verbose = False
|
|
|
|
def log_verbose(message,verbose_flag):
|
|
if(verbose_flag==0):
|
|
logger.info(message)
|
|
else:
|
|
logger.error(message)
|
|
if(verbose):
|
|
print(message)
|
|
|
|
def setup_logging(log_file, max_bytes=int(config['Logging']['max_bytes']), backup_count=int(config['Logging']['backup_count'])):
|
|
for handler in logging.root.handlers[:]:
|
|
logging.root.removeHandler(handler)
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
|
|
# Get the current time
|
|
now = datetime.datetime.now()
|
|
|
|
# Get the local time zone name
|
|
local_tz = now.astimezone().tzinfo
|
|
local_tzname = local_tz.tzname(now)
|
|
|
|
# Get the GMT offset (time difference from UTC)
|
|
gmt_offset_seconds = local_tz.utcoffset(now).total_seconds()
|
|
gmt_offset_hours = gmt_offset_seconds // 3600
|
|
gmt_offset_minutes = (gmt_offset_seconds % 3600) // 60
|
|
|
|
# Format the GMT offset as "-HHMM" or "+HHMM"
|
|
gmt_offset = f"{int(gmt_offset_hours):+03d}{int(gmt_offset_minutes):02d}"
|
|
|
|
|
|
# Create a custom logger
|
|
logger = logging.getLogger('MyLogger')
|
|
logger.propagate = False
|
|
|
|
# Create formatter
|
|
formatter = logging.Formatter('%(asctime)s GMT{} [%(levelname)s] : %(message)s'.format(gmt_offset), datefmt='%Y-%m-%d %H:%M:%S')
|
|
|
|
# Create a rotating file handler
|
|
handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
|
|
handler.setLevel(logging.INFO)
|
|
handler.setFormatter(formatter)
|
|
logger.addHandler(handler)
|
|
|
|
return logger
|
|
|
|
# Initialize logging
|
|
log_file = config['Logging']['log_file']
|
|
logger = setup_logging(log_file)
|
|
|
|
|
|
################################################################################
|
|
# Helper Functions
|
|
################################################################################
|
|
|
|
|
|
def header():
|
|
if (quiet==False):
|
|
print("Helix Monitor %s Python Simple Sender (build: %s)"%(ITM_VERSION, ITM_BUILD))
|
|
print(ITM_COPYRIGHT)
|
|
|
|
|
|
def usage(exit_code=0):
|
|
header()
|
|
print("""
|
|
usage:
|
|
|
|
python msend_4_bhom.py [-a Class] [-h|-?] -n Helix_Monitor_Hostname:apiKey [-q] [-v] [- | {SourceFile} | -a Class [-p] [-b SlotSetValue] [-m Message] [-o source_identifier] [-r Severity]]
|
|
|
|
-a Send object of class Class
|
|
-b Add SlotSetValue settings (format: "slot=value;...")
|
|
-h Print this help and exit
|
|
-m Send event message to Message
|
|
-n Connect to BHOM Instance with apiKey (colon separated)
|
|
-o Set event source_identifier
|
|
-p For PATROL_EV class only, when sending event from file or standard input stream, randomize the p_origin_key value (overriding existing value)
|
|
-q quiet execution (no banner)
|
|
-r Send event severity to Severity
|
|
-v Verbose
|
|
-z Print version number and exit
|
|
- BAROC Input from standard input stream
|
|
|
|
examples:
|
|
1) echo 'EVENT;severity=MAJOR;msg=Test_Message;END' | python msend_for_bhom.py -n <tenant>:<apikey> -v -
|
|
2) python msend_for_bhom.py -n <tenant>:<apikey> -v -f "sample.baroc"
|
|
3) python msend_for_bhom.py -n <tenant>:<apikey> -a HELIX_SM_EV -r CRITICAL -m "Testing from Python CLI" -b "object_class=test-object-class" -v server sample-itom-demo.onbmc.com
|
|
""")
|
|
sys.exit(exit_code)
|
|
|
|
|
|
################################################################################
|
|
# Parsing arguments
|
|
################################################################################
|
|
|
|
|
|
def parse_args():
|
|
args = sys.argv[1:]
|
|
read_from_stdin = False
|
|
if '-' in args:
|
|
read_from_stdin = True
|
|
#argv.remove('-')
|
|
file_path = None
|
|
global verbose
|
|
verbose = False
|
|
global quiet
|
|
quiet = False
|
|
if not args:
|
|
usage(1)
|
|
try:
|
|
global opts
|
|
opts, args = getopt.getopt(args, "a:b:f:h?l:m:n:o:pqr:vz")
|
|
except getopt.GetoptError as err:
|
|
message=ERR_CLI.format(err)
|
|
print(message)
|
|
log_verbose("parse_args: "+message,1)
|
|
usage(1)
|
|
|
|
server = apiKey = event_class = message = severity = None
|
|
global slot_values
|
|
slot_values = {}
|
|
source_identifier = "msend4bhom.py_{}".format(random.randint(0, 99999)) #Default
|
|
p_origin_key = None
|
|
|
|
for opt, arg in opts:
|
|
if opt in ("-h", "-?"):
|
|
usage()
|
|
elif opt == "-z":
|
|
header()
|
|
sys.exit(0)
|
|
elif opt == "-v":
|
|
verbose = True
|
|
elif opt == '-q':
|
|
quiet = True
|
|
verbose = False
|
|
elif opt == "-n":
|
|
server,apiKey = arg.split(":",1)
|
|
elif opt == "-a":
|
|
event_class = arg
|
|
elif opt == "-r":
|
|
severity = arg
|
|
elif opt == "-m":
|
|
message = arg
|
|
elif opt == "-b":
|
|
for pair in arg.split(";"):
|
|
if "=" in pair:
|
|
slot,value = pair.split("=",1)
|
|
slot_values[slot]=value
|
|
elif opt == "-":
|
|
read_from_stdin = True
|
|
elif opt == "-f":
|
|
file_path = arg
|
|
elif opt == "-o":
|
|
source_identifier = arg
|
|
elif opt == "-p":
|
|
if event_class == "PATROL_EV":
|
|
#change msednpl
|
|
p_origin_key = "msend4bhom.py_{}".format(random.randint(0,99999))
|
|
else:
|
|
p_origin_key = arg
|
|
|
|
if "-n" not in [opt for opt, _ in opts]:
|
|
#if not server or not apiKey:
|
|
try:
|
|
server = config['Server']['server']
|
|
apiKey = config['Server']['api_key']
|
|
except:
|
|
print(ERR_NODESTORKEY)
|
|
log_verbose("parse_args: "+ERR_NODESTORKEY,1)
|
|
usage(1)
|
|
|
|
return server, apiKey, event_class, message, severity, slot_values, read_from_stdin, file_path, source_identifier, p_origin_key
|
|
|
|
|
|
|
|
################################################################################
|
|
# Convert raw into JSON event
|
|
################################################################################
|
|
|
|
|
|
def convert_raw_to_json(event_class, message, severity, slot_values, source_identifier, p_origin_key, source_hostname, location, source_address):
|
|
event = [{
|
|
"class": event_class,
|
|
"msg": message,
|
|
"severity": severity,
|
|
"source_identifier": source_identifier,
|
|
"source_hostname": source_hostname,
|
|
"location": location,
|
|
"source_address": source_address
|
|
}]
|
|
|
|
event[0].update(slot_values)
|
|
#for slot, value in slot_values.items():
|
|
#if slot not in event[0]:
|
|
#event[0][slot]=value
|
|
|
|
if event_class == "PATROL_EV" and 'p' in opts and 'p_origin_key' not in event[0]:
|
|
event[0]["p_origin_key"] = p_origin_key
|
|
|
|
json_event = json.dumps(event)
|
|
if (quiet==False):print("JSON EVENT: ", json_event, "\n")
|
|
|
|
return json_event
|
|
|
|
|
|
################################################################################
|
|
# Networking
|
|
################################################################################
|
|
|
|
|
|
@retry(wait_fixed=int(config['REST API retry']['wait_time']), stop_max_attempt_number=int(config['REST API retry']['max_retry_count']))
|
|
def send(server, apiKey, json_event):
|
|
|
|
if(retry_flag==False):
|
|
file_path = buffer_event(json.loads(json_event))
|
|
else:
|
|
file_path=filepath
|
|
|
|
if (quiet==False):print("Sending JSON event to BHOM:\n", json_event, "\n")
|
|
headers = {
|
|
"Authorization": "apiKey {}".format(apiKey),
|
|
"Content-Type": "application/json"
|
|
}
|
|
url = "https://{}/events-service/api/v1.0/events".format(server)
|
|
try:
|
|
response = requests.post(url, headers=headers, data=json_event)
|
|
# Raise an exception for non-200 status codes
|
|
response.raise_for_status()
|
|
|
|
if (quiet==False):print("Helix Monitor response:\n", response.content.decode(), "\n")
|
|
|
|
move_buffered_event(file_path)
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
print("Error sending event:", e)
|
|
log_verbose("send: Failed event {} kept in the {} directory for retry".format(file_path,buffer_dir),1)
|
|
|
|
|
|
################################################################################
|
|
# Persistent Buffering
|
|
################################################################################
|
|
|
|
|
|
global buffer_dir, success_flag, success_dir
|
|
buffer_dir = config['Buffering']['buffer_dir']
|
|
max_buffer_files = int(config['Buffering']['max_buffer_files'])
|
|
max_buffer_size_bytes = int(config['Buffering']['max_buffer_size_bytes'])
|
|
lock_file = config['Buffering']['lock_filename']
|
|
max_lock_file_size = int(config['Buffering']['max_lock_file_size'])
|
|
success_flag = config.getboolean('EventHandling','success_flag')
|
|
success_dir = config['EventHandling']['success_dir']
|
|
max_success_files = int(config['EventHandling']['max_success_files'])
|
|
max_success_size_bytes = int(config['EventHandling']['max_success_size_bytes'])
|
|
|
|
# Define the external log file
|
|
lock_log_file = "lock_log.txt"
|
|
|
|
try:
|
|
if not os.path.exists(success_dir):
|
|
os.makedirs(success_dir)
|
|
|
|
if not os.path.exists(buffer_dir):
|
|
os.makedirs(buffer_dir)
|
|
except Exception as e:
|
|
message="Error in creating directory: {}".format(e)
|
|
print(message)
|
|
log_verbose("sys: "+message,1)
|
|
sys.exit(0)
|
|
|
|
def buffer_event(event):
|
|
try:
|
|
# Write to the buffer_event file
|
|
global filename
|
|
filename = int(time.time()*1000)
|
|
try:
|
|
file_path = os.path.join(buffer_dir, "event_{}.json".format(filename))
|
|
except Exception as e:
|
|
message="Error in creating file {}: ".format(filename),e
|
|
print(message)
|
|
log_verbose("buffer_event: "+message,1)
|
|
|
|
check_lock_status(file_path, lock_file)
|
|
check_and_trim_file(lock_file,max_lock_file_size)
|
|
|
|
# Write event data to the buffer file
|
|
try:
|
|
with open(file_path, 'w') as file:
|
|
json.dump(event, file)
|
|
except IOError as e:
|
|
message="Error writing to file {}: {}".format(file_path,e)
|
|
print(message)
|
|
log_verbose("buffer_event: "+message,1)
|
|
return None
|
|
|
|
log_verbose("buffer_event: Locking File - {}.json".format(filename),0)
|
|
|
|
# Append the file path to the lock log file
|
|
try:
|
|
with open(lock_log_file, 'a') as log_file:
|
|
log_file.write(file_path + '\n')
|
|
except IOError as e:
|
|
message="Error writing to lock log file {}: {}".format(lock_log_file,e)
|
|
print(message)
|
|
log_verbose("buffer_event: "+message,1)
|
|
|
|
|
|
# Perform rolling saves if the number of buffer files exceeds the limit
|
|
buffer_files = sorted(glob.glob(os.path.join(buffer_dir, 'event_*.json')))
|
|
buffer_total_size = sum(os.path.getsize(file) for file in buffer_files)
|
|
while len(buffer_files) > max_buffer_files or buffer_total_size > max_buffer_size_bytes:
|
|
oldest_file = buffer_files.pop(0)
|
|
buffer_total_size -= os.path.getsize(oldest_file)
|
|
os.remove(oldest_file)
|
|
|
|
return file_path
|
|
finally:
|
|
# Release the lock
|
|
release_lock(file_path, lock_file)
|
|
log_verbose("buffer_event: Unlocking File - {}.json".format(filename),0)
|
|
|
|
|
|
def move_buffered_event(file_path):
|
|
file_name = str(file_path).split('\\')[-1]
|
|
dest_path = os.path.join(success_dir, file_name)
|
|
if success_flag:
|
|
# Move the file to the success directory
|
|
try:
|
|
log_verbose("move_buffered_event: JSON event sent to BHOM successfully, Buffered event moved into {} directory".format(success_dir),0)
|
|
shutil.move(file_path, dest_path)
|
|
except shutil.Error as e:
|
|
message="Error moving file {} to {}: {}".format(file_path,dest_path,e)
|
|
print(message)
|
|
log_verbose("move_buffered_event: "+message,1)
|
|
return
|
|
|
|
|
|
# Perform rolling saves if the number of success files exceeds the limit
|
|
success_files = sorted(glob.glob(os.path.join(success_dir, 'event_*.json')))
|
|
total_size = sum(os.path.getsize(file) for file in success_files)
|
|
while len(success_files) > max_success_files or total_size > max_success_size_bytes:
|
|
oldest_file = success_files.pop(0)
|
|
total_size -= os.path.getsize(oldest_file)
|
|
os.remove(oldest_file)
|
|
else:
|
|
# Delete the buffer file if success_flag is False
|
|
try:
|
|
log_verbose("move_buffered_event: JSON event sent to BHOM successfully, Buffered event deleted",0)
|
|
os.remove(file_path)
|
|
except OSError as e:
|
|
message="Error deleting file {}: {}".format(file_path,e)
|
|
print(message)
|
|
log_verbose("move_buffered_event: "+message,1)
|
|
|
|
|
|
def retry_buffered_events(server, apiKey):
|
|
count = 1
|
|
global filepath
|
|
for file_name in os.listdir(buffer_dir):
|
|
log_verbose("retry_buffered_events: Retrying Buffered Event - {}: {}".format(str(count),file_name),0)
|
|
file_path = os.path.join(buffer_dir, file_name)
|
|
filepath = file_path
|
|
|
|
check_lock_status(file_path, lock_file)
|
|
check_and_trim_file(lock_file,max_lock_file_size)
|
|
|
|
with open(file_path,'r') as file:
|
|
event = json.load(file)
|
|
|
|
json_event = json.dumps(event)
|
|
|
|
send(server, apiKey, json_event)
|
|
count+=1
|
|
|
|
|
|
# Function to acquire lock for a buffer file
|
|
def acquire_lock(file_name, lock_file):
|
|
with open(lock_file, 'a+') as f:
|
|
f.seek(0)
|
|
if file_name in f.read():
|
|
return False # Lock not acquired, file already in use
|
|
f.write(file_name + '\n') # Append file name to lock file
|
|
return True # Lock acquired successfully
|
|
|
|
|
|
# Function to release lock for a buffer file
|
|
def release_lock(file_name, lock_file):
|
|
with open(lock_file, 'r') as f:
|
|
lines = f.readlines()
|
|
with open(lock_file, 'w') as f:
|
|
for line in lines:
|
|
if line.strip() != file_name:
|
|
f.write(line) # Rewrite lock file without the released file name
|
|
|
|
|
|
# Function to check if a buffer file is locked
|
|
def check_lock_status(file_path, lock_file):
|
|
# Check if the buffer file is locked
|
|
lock_status = acquire_lock(file_path, lock_file)
|
|
retry_time = int(config['Buffering']['retry_check_buffer_status_time'])
|
|
retry_count = int(config['Buffering']['retry_check_buffer_status_count'])
|
|
# Try to acquire the lock, wait for retry_time seconds if it's already locked
|
|
for retry_val in range(retry_count): # Try retry_count times
|
|
if lock_status:
|
|
lock_status=True
|
|
break
|
|
else:
|
|
log_verbose("Buffer file {} is found locked, retrying after {} seconds, retries remaining - {}".format(file_path, retry_time,str(int(retry_count-retry_val-1))),1)
|
|
time.sleep(retry_time) # Wait for retry_time seconds before trying again
|
|
|
|
if not lock_status:
|
|
print("Buffer file is locked. Exiting.")
|
|
log_verbose("buffer_event: Buffer file {} is still found locked after {} seconds, hence exited.".format(file_path,str(int(retry_time*retry_count))),1)
|
|
sys.exit(1)
|
|
|
|
|
|
def check_and_trim_file(file_path, max_size):
|
|
try:
|
|
# Get the size of the file
|
|
file_size = os.path.getsize(file_path)
|
|
# Check if the size exceeds the limit
|
|
if file_size > max_size:
|
|
# Read the file content
|
|
with open(file_path, 'r+') as file:
|
|
lines = file.readlines()
|
|
# Trim the file content to remove the oldest records/lines
|
|
file.seek(0)
|
|
file.truncate()
|
|
file.writelines(lines[-1:]) # Keep the last line
|
|
except Exception as e:
|
|
message="Error while checking and trimming file:"+e
|
|
print(message)
|
|
log_verbose("check_and_trim_file: "+message,1)
|
|
|
|
|
|
################################################################################
|
|
# Read event from a file
|
|
################################################################################
|
|
|
|
|
|
def read_event_from_file(file_path):
|
|
try:
|
|
with open(file_path,'r') as file:
|
|
return file.read().strip()
|
|
except Exception as e:
|
|
message="Error reading from file %s" % (e)
|
|
print(message)
|
|
log_verbose("read_event_from_file: "+message,1)
|
|
sys.exit(1)
|
|
|
|
|
|
################################################################################
|
|
# Parse Input
|
|
################################################################################
|
|
|
|
|
|
def parse_input(stdin_input):
|
|
#Default values
|
|
event_class = "PATROL_EV"
|
|
severity = "MAJOR"
|
|
message = ""
|
|
source_hostname = socket.getfqdn()
|
|
location = socket.gethostbyaddr(socket.gethostname())[0]
|
|
source_address = socket.gethostbyname(socket.gethostname())
|
|
|
|
for part in stdin_input.split(';'):
|
|
if '=' in part:
|
|
key,value = part.split('=',1)
|
|
if key == 'class' or key == 'event_class':
|
|
event_class = value
|
|
elif key == 'severity':
|
|
severity = value
|
|
elif key == 'msg' or key == 'message':
|
|
message = value
|
|
elif key == 'source_hostname':
|
|
source_hostname = value
|
|
elif key == 'location':
|
|
location = value
|
|
elif key == 'source_address':
|
|
source_address = value
|
|
else:
|
|
slot_values[key] = value
|
|
|
|
return event_class, severity, message, source_hostname, location, source_address
|
|
|
|
|
|
################################################################################
|
|
# Main Logic
|
|
################################################################################
|
|
|
|
|
|
def main():
|
|
|
|
global verbose_flag
|
|
verbose_flag = 0
|
|
log_verbose("-----------------------------xxx-----------------------------",0)
|
|
|
|
server, apiKey, event_class, message, severity, slot_values, read_from_stdin, file_path, source_identifier, p_origin_key = parse_args()
|
|
header()
|
|
log_verbose("main: Beginning main",0)
|
|
|
|
global retry_flag
|
|
retry_flag = True
|
|
log_verbose("main: Retrying buffered events if any",0)
|
|
retry_buffered_events(server, apiKey)
|
|
retry_flag = False
|
|
|
|
if read_from_stdin:
|
|
log_verbose("main: Reading from stdinput",0)
|
|
stdin_input = sys.stdin.read().strip()
|
|
event_class, severity, message, source_hostname, location, source_address = parse_input(stdin_input)
|
|
|
|
elif file_path:
|
|
log_verbose("main: Reading from file",0)
|
|
file_input = read_event_from_file(file_path)
|
|
event_class, severity, message, source_hostname, location, source_address = parse_input(file_input)
|
|
|
|
else:
|
|
log_verbose("main: Parsing given arguments",0)
|
|
event_class, severity, message, source_hostname, location, source_address = parse_input("EVENT;event_class={};severity={};msg={};END".format(event_class,severity,message))
|
|
|
|
log_verbose("main: Creating event",0)
|
|
event = convert_raw_to_json(event_class, message, severity, slot_values, source_identifier, p_origin_key, source_hostname, location, source_address)
|
|
|
|
log_verbose("main: Sending JSON event to BHOM",0)
|
|
send(server, apiKey, event)
|
|
|
|
log_verbose("-----------------------------xxx-----------------------------",0)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |