msend_bhom2binary/msend_for_bhom.py

601 lines
21 KiB
Python

#!/usr/bin/env python3
################################################################################
# #
# Purpose : Python msend for BHOM #
# #
# (C) Copyright 2024 BMC Software, All Rights Reserved #
# #
################################################################################
import sys
import json
import getopt
import socket
import random
import requests
import os
import time
import logging
from logging.handlers import RotatingFileHandler
import datetime
import shutil
import configparser
import requests
from retrying import retry
import threading
import glob
################################################################################
# Constants and Error Messages
################################################################################
ITM_VERSION = "0.4"
ITM_BUILD = "June 20th, 2024"
ITM_COPYRIGHT = "(c) Copyright 2024 BMC Software, All Rights Reserved"
ERR_CELLNAME = "Cannot get information about cell '{}' in directory file\n"
ERR_CLI = "Invalid argument(s): {}\n"
ERR_DONOTHING = "No event input (eg {SourceFile}, '-' or -a Class) is provided"
ERR_FILECLASS = "{SourceFile} argument (or '-') is not compatible with -a, -b, -m, -o or -r options"
ERR_FILESTDIN = "{SourceFile} argument is not compatible with '-' (read from standard input)"
ERR_GETHOSTBYNAME = "gethostbyname failed on '{}': {}\n"
ERR_HOSTNAME = "Cannot get the hostname of the local machine: {}\n"
ERR_MISSINGCLASS = "Class (-a options) is missing"
ERR_OPEN = "Cannot open file '{}': {}\n"
ERR_SOCKCLOSE = "Couldn't disconnect on '{}:{}': {}\n"
ERR_SOCKCONNECT = "Couldn't connect to '{}:{}': {}\n"
ERR_SOCKPRINT = "Print failed on socket: {}\n"
ERR_NODESTORKEY = "BHOM destination or apiKey not properly provided\n"
################################################################################
# Loading Configuration from config.ini
################################################################################
config = configparser.ConfigParser()
config.read('config.ini')
################################################################################
# Logging and Verbose
################################################################################
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
logger = logging.getLogger()
quiet = False
verbose = False
def log_verbose(message,verbose_flag):
if(verbose_flag==0):
logger.info(message)
else:
logger.error(message)
if(verbose):
print(message)
def setup_logging(log_file, max_bytes=int(config['Logging']['max_bytes']), backup_count=int(config['Logging']['backup_count'])):
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(level=logging.DEBUG)
# Get the current time
now = datetime.datetime.now()
# Get the local time zone name
local_tz = now.astimezone().tzinfo
local_tzname = local_tz.tzname(now)
# Get the GMT offset (time difference from UTC)
gmt_offset_seconds = local_tz.utcoffset(now).total_seconds()
gmt_offset_hours = gmt_offset_seconds // 3600
gmt_offset_minutes = (gmt_offset_seconds % 3600) // 60
# Format the GMT offset as "-HHMM" or "+HHMM"
gmt_offset = f"{int(gmt_offset_hours):+03d}{int(gmt_offset_minutes):02d}"
# Create a custom logger
logger = logging.getLogger('MyLogger')
logger.propagate = False
# Create formatter
formatter = logging.Formatter('%(asctime)s GMT{} [%(levelname)s] : %(message)s'.format(gmt_offset), datefmt='%Y-%m-%d %H:%M:%S')
# Create a rotating file handler
handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count)
handler.setLevel(logging.INFO)
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
# Initialize logging
log_file = config['Logging']['log_file']
logger = setup_logging(log_file)
################################################################################
# Helper Functions
################################################################################
def header():
if (quiet==False):
print("Helix Monitor %s Python Simple Sender (build: %s)"%(ITM_VERSION, ITM_BUILD))
print(ITM_COPYRIGHT)
def usage(exit_code=0):
header()
print("""
usage:
python msend_4_bhom.py [-a Class] [-h|-?] -n Helix_Monitor_Hostname:apiKey [-q] [-v] [- | {SourceFile} | -a Class [-p] [-b SlotSetValue] [-m Message] [-o source_identifier] [-r Severity]]
-a Send object of class Class
-b Add SlotSetValue settings (format: "slot=value;...")
-h Print this help and exit
-m Send event message to Message
-n Connect to BHOM Instance with apiKey (colon separated)
-o Set event source_identifier
-p For PATROL_EV class only, when sending event from file or standard input stream, randomize the p_origin_key value (overriding existing value)
-q quiet execution (no banner)
-r Send event severity to Severity
-v Verbose
-z Print version number and exit
- BAROC Input from standard input stream
examples:
1) echo 'EVENT;severity=MAJOR;msg=Test_Message;END' | python msend_for_bhom.py -n <tenant>:<apikey> -v -
2) python msend_for_bhom.py -n <tenant>:<apikey> -v -f "sample.baroc"
3) python msend_for_bhom.py -n <tenant>:<apikey> -a HELIX_SM_EV -r CRITICAL -m "Testing from Python CLI" -b "object_class=test-object-class" -v server sample-itom-demo.onbmc.com
""")
sys.exit(exit_code)
################################################################################
# Parsing arguments
################################################################################
def parse_args():
args = sys.argv[1:]
read_from_stdin = False
if '-' in args:
read_from_stdin = True
#argv.remove('-')
file_path = None
global verbose
verbose = False
global quiet
quiet = False
if not args:
usage(1)
try:
global opts
opts, args = getopt.getopt(args, "a:b:f:h?l:m:n:o:pqr:vz")
except getopt.GetoptError as err:
message=ERR_CLI.format(err)
print(message)
log_verbose("parse_args: "+message,1)
usage(1)
server = apiKey = event_class = message = severity = None
global slot_values
slot_values = {}
source_identifier = "msend4bhom.py_{}".format(random.randint(0, 99999)) #Default
p_origin_key = None
for opt, arg in opts:
if opt in ("-h", "-?"):
usage()
elif opt == "-z":
header()
sys.exit(0)
elif opt == "-v":
verbose = True
elif opt == '-q':
quiet = True
verbose = False
elif opt == "-n":
server,apiKey = arg.split(":",1)
elif opt == "-a":
event_class = arg
elif opt == "-r":
severity = arg
elif opt == "-m":
message = arg
elif opt == "-b":
for pair in arg.split(";"):
if "=" in pair:
slot,value = pair.split("=",1)
slot_values[slot]=value
elif opt == "-":
read_from_stdin = True
elif opt == "-f":
file_path = arg
elif opt == "-o":
source_identifier = arg
elif opt == "-p":
if event_class == "PATROL_EV":
#change msednpl
p_origin_key = "msend4bhom.py_{}".format(random.randint(0,99999))
else:
p_origin_key = arg
if "-n" not in [opt for opt, _ in opts]:
#if not server or not apiKey:
try:
server = config['Server']['server']
apiKey = config['Server']['api_key']
except:
print(ERR_NODESTORKEY)
log_verbose("parse_args: "+ERR_NODESTORKEY,1)
usage(1)
return server, apiKey, event_class, message, severity, slot_values, read_from_stdin, file_path, source_identifier, p_origin_key
################################################################################
# Convert raw into JSON event
################################################################################
def convert_raw_to_json(event_class, message, severity, slot_values, source_identifier, p_origin_key, source_hostname, location, source_address):
event = [{
"class": event_class,
"msg": message,
"severity": severity,
"source_identifier": source_identifier,
"source_hostname": source_hostname,
"location": location,
"source_address": source_address
}]
event[0].update(slot_values)
#for slot, value in slot_values.items():
#if slot not in event[0]:
#event[0][slot]=value
if event_class == "PATROL_EV" and 'p' in opts and 'p_origin_key' not in event[0]:
event[0]["p_origin_key"] = p_origin_key
json_event = json.dumps(event)
if (quiet==False):print("JSON EVENT: ", json_event, "\n")
return json_event
################################################################################
# Networking
################################################################################
@retry(wait_fixed=int(config['REST API retry']['wait_time']), stop_max_attempt_number=int(config['REST API retry']['max_retry_count']))
def send(server, apiKey, json_event):
if(retry_flag==False):
file_path = buffer_event(json.loads(json_event))
else:
file_path=filepath
if (quiet==False):print("Sending JSON event to BHOM:\n", json_event, "\n")
headers = {
"Authorization": "apiKey {}".format(apiKey),
"Content-Type": "application/json"
}
url = "https://{}/events-service/api/v1.0/events".format(server)
try:
response = requests.post(url, headers=headers, data=json_event)
# Raise an exception for non-200 status codes
response.raise_for_status()
if (quiet==False):print("Helix Monitor response:\n", response.content.decode(), "\n")
move_buffered_event(file_path)
except requests.exceptions.RequestException as e:
print("Error sending event:", e)
log_verbose("send: Failed event {} kept in the {} directory for retry".format(file_path,buffer_dir),1)
################################################################################
# Persistent Buffering
################################################################################
global buffer_dir, success_flag, success_dir
buffer_dir = config['Buffering']['buffer_dir']
max_buffer_files = int(config['Buffering']['max_buffer_files'])
max_buffer_size_bytes = int(config['Buffering']['max_buffer_size_bytes'])
lock_file = config['Buffering']['lock_filename']
max_lock_file_size = int(config['Buffering']['max_lock_file_size'])
success_flag = config.getboolean('EventHandling','success_flag')
success_dir = config['EventHandling']['success_dir']
max_success_files = int(config['EventHandling']['max_success_files'])
max_success_size_bytes = int(config['EventHandling']['max_success_size_bytes'])
# Define the external log file
lock_log_file = "lock_log.txt"
try:
if not os.path.exists(success_dir):
os.makedirs(success_dir)
if not os.path.exists(buffer_dir):
os.makedirs(buffer_dir)
except Exception as e:
message="Error in creating directory: {}".format(e)
print(message)
log_verbose("sys: "+message,1)
sys.exit(0)
def buffer_event(event):
try:
# Write to the buffer_event file
global filename
filename = int(time.time()*1000)
try:
file_path = os.path.join(buffer_dir, "event_{}.json".format(filename))
except Exception as e:
message="Error in creating file {}: ".format(filename),e
print(message)
log_verbose("buffer_event: "+message,1)
check_lock_status(file_path, lock_file)
check_and_trim_file(lock_file,max_lock_file_size)
# Write event data to the buffer file
try:
with open(file_path, 'w') as file:
json.dump(event, file)
except IOError as e:
message="Error writing to file {}: {}".format(file_path,e)
print(message)
log_verbose("buffer_event: "+message,1)
return None
log_verbose("buffer_event: Locking File - {}.json".format(filename),0)
# Append the file path to the lock log file
try:
with open(lock_log_file, 'a') as log_file:
log_file.write(file_path + '\n')
except IOError as e:
message="Error writing to lock log file {}: {}".format(lock_log_file,e)
print(message)
log_verbose("buffer_event: "+message,1)
# Perform rolling saves if the number of buffer files exceeds the limit
buffer_files = sorted(glob.glob(os.path.join(buffer_dir, 'event_*.json')))
buffer_total_size = sum(os.path.getsize(file) for file in buffer_files)
while len(buffer_files) > max_buffer_files or buffer_total_size > max_buffer_size_bytes:
oldest_file = buffer_files.pop(0)
buffer_total_size -= os.path.getsize(oldest_file)
os.remove(oldest_file)
return file_path
finally:
# Release the lock
release_lock(file_path, lock_file)
log_verbose("buffer_event: Unlocking File - {}.json".format(filename),0)
def move_buffered_event(file_path):
file_name = str(file_path).split('\\')[-1]
dest_path = os.path.join(success_dir, file_name)
if success_flag:
# Move the file to the success directory
try:
log_verbose("move_buffered_event: JSON event sent to BHOM successfully, Buffered event moved into {} directory".format(success_dir),0)
shutil.move(file_path, dest_path)
except shutil.Error as e:
message="Error moving file {} to {}: {}".format(file_path,dest_path,e)
print(message)
log_verbose("move_buffered_event: "+message,1)
return
# Perform rolling saves if the number of success files exceeds the limit
success_files = sorted(glob.glob(os.path.join(success_dir, 'event_*.json')))
total_size = sum(os.path.getsize(file) for file in success_files)
while len(success_files) > max_success_files or total_size > max_success_size_bytes:
oldest_file = success_files.pop(0)
total_size -= os.path.getsize(oldest_file)
os.remove(oldest_file)
else:
# Delete the buffer file if success_flag is False
try:
log_verbose("move_buffered_event: JSON event sent to BHOM successfully, Buffered event deleted",0)
os.remove(file_path)
except OSError as e:
message="Error deleting file {}: {}".format(file_path,e)
print(message)
log_verbose("move_buffered_event: "+message,1)
def retry_buffered_events(server, apiKey):
count = 1
global filepath
for file_name in os.listdir(buffer_dir):
log_verbose("retry_buffered_events: Retrying Buffered Event - {}: {}".format(str(count),file_name),0)
file_path = os.path.join(buffer_dir, file_name)
filepath = file_path
check_lock_status(file_path, lock_file)
check_and_trim_file(lock_file,max_lock_file_size)
with open(file_path,'r') as file:
event = json.load(file)
json_event = json.dumps(event)
send(server, apiKey, json_event)
count+=1
# Function to acquire lock for a buffer file
def acquire_lock(file_name, lock_file):
with open(lock_file, 'a+') as f:
f.seek(0)
if file_name in f.read():
return False # Lock not acquired, file already in use
f.write(file_name + '\n') # Append file name to lock file
return True # Lock acquired successfully
# Function to release lock for a buffer file
def release_lock(file_name, lock_file):
with open(lock_file, 'r') as f:
lines = f.readlines()
with open(lock_file, 'w') as f:
for line in lines:
if line.strip() != file_name:
f.write(line) # Rewrite lock file without the released file name
# Function to check if a buffer file is locked
def check_lock_status(file_path, lock_file):
# Check if the buffer file is locked
lock_status = acquire_lock(file_path, lock_file)
retry_time = int(config['Buffering']['retry_check_buffer_status_time'])
retry_count = int(config['Buffering']['retry_check_buffer_status_count'])
# Try to acquire the lock, wait for retry_time seconds if it's already locked
for retry_val in range(retry_count): # Try retry_count times
if lock_status:
lock_status=True
break
else:
log_verbose("Buffer file {} is found locked, retrying after {} seconds, retries remaining - {}".format(file_path, retry_time,str(int(retry_count-retry_val-1))),1)
time.sleep(retry_time) # Wait for retry_time seconds before trying again
if not lock_status:
print("Buffer file is locked. Exiting.")
log_verbose("buffer_event: Buffer file {} is still found locked after {} seconds, hence exited.".format(file_path,str(int(retry_time*retry_count))),1)
sys.exit(1)
def check_and_trim_file(file_path, max_size):
try:
# Get the size of the file
file_size = os.path.getsize(file_path)
# Check if the size exceeds the limit
if file_size > max_size:
# Read the file content
with open(file_path, 'r+') as file:
lines = file.readlines()
# Trim the file content to remove the oldest records/lines
file.seek(0)
file.truncate()
file.writelines(lines[-1:]) # Keep the last line
except Exception as e:
message="Error while checking and trimming file:"+e
print(message)
log_verbose("check_and_trim_file: "+message,1)
################################################################################
# Read event from a file
################################################################################
def read_event_from_file(file_path):
try:
with open(file_path,'r') as file:
return file.read().strip()
except Exception as e:
message="Error reading from file %s" % (e)
print(message)
log_verbose("read_event_from_file: "+message,1)
sys.exit(1)
################################################################################
# Parse Input
################################################################################
def parse_input(stdin_input):
#Default values
event_class = "PATROL_EV"
severity = "MAJOR"
message = ""
source_hostname = socket.getfqdn()
location = socket.gethostbyaddr(socket.gethostname())[0]
source_address = socket.gethostbyname(socket.gethostname())
for part in stdin_input.split(';'):
if '=' in part:
key,value = part.split('=',1)
if key == 'class' or key == 'event_class':
event_class = value
elif key == 'severity':
severity = value
elif key == 'msg' or key == 'message':
message = value
elif key == 'source_hostname':
source_hostname = value
elif key == 'location':
location = value
elif key == 'source_address':
source_address = value
else:
slot_values[key] = value
return event_class, severity, message, source_hostname, location, source_address
################################################################################
# Main Logic
################################################################################
def main():
global verbose_flag
verbose_flag = 0
log_verbose("-----------------------------xxx-----------------------------",0)
server, apiKey, event_class, message, severity, slot_values, read_from_stdin, file_path, source_identifier, p_origin_key = parse_args()
header()
log_verbose("main: Beginning main",0)
global retry_flag
retry_flag = True
log_verbose("main: Retrying buffered events if any",0)
retry_buffered_events(server, apiKey)
retry_flag = False
if read_from_stdin:
log_verbose("main: Reading from stdinput",0)
stdin_input = sys.stdin.read().strip()
event_class, severity, message, source_hostname, location, source_address = parse_input(stdin_input)
elif file_path:
log_verbose("main: Reading from file",0)
file_input = read_event_from_file(file_path)
event_class, severity, message, source_hostname, location, source_address = parse_input(file_input)
else:
log_verbose("main: Parsing given arguments",0)
event_class, severity, message, source_hostname, location, source_address = parse_input("EVENT;event_class={};severity={};msg={};END".format(event_class,severity,message))
log_verbose("main: Creating event",0)
event = convert_raw_to_json(event_class, message, severity, slot_values, source_identifier, p_origin_key, source_hostname, location, source_address)
log_verbose("main: Sending JSON event to BHOM",0)
send(server, apiKey, event)
log_verbose("-----------------------------xxx-----------------------------",0)
if __name__ == "__main__":
main()