#!/usr/bin/env python3 ################################################################################ # # # Purpose : Python msend for BHOM # # # # (C) Copyright 2024 BMC Software, All Rights Reserved # # # ################################################################################ import sys import json import getopt import socket import random import requests import os import time import logging from logging.handlers import RotatingFileHandler import datetime import shutil import configparser import requests from retrying import retry import threading import glob ################################################################################ # Constants and Error Messages ################################################################################ ITM_VERSION = "0.4" ITM_BUILD = "June 20th, 2024" ITM_COPYRIGHT = "(c) Copyright 2024 BMC Software, All Rights Reserved" ERR_CELLNAME = "Cannot get information about cell '{}' in directory file\n" ERR_CLI = "Invalid argument(s): {}\n" ERR_DONOTHING = "No event input (eg {SourceFile}, '-' or -a Class) is provided" ERR_FILECLASS = "{SourceFile} argument (or '-') is not compatible with -a, -b, -m, -o or -r options" ERR_FILESTDIN = "{SourceFile} argument is not compatible with '-' (read from standard input)" ERR_GETHOSTBYNAME = "gethostbyname failed on '{}': {}\n" ERR_HOSTNAME = "Cannot get the hostname of the local machine: {}\n" ERR_MISSINGCLASS = "Class (-a options) is missing" ERR_OPEN = "Cannot open file '{}': {}\n" ERR_SOCKCLOSE = "Couldn't disconnect on '{}:{}': {}\n" ERR_SOCKCONNECT = "Couldn't connect to '{}:{}': {}\n" ERR_SOCKPRINT = "Print failed on socket: {}\n" ERR_NODESTORKEY = "BHOM destination or apiKey not properly provided\n" ################################################################################ # Loading Configuration from config.ini ################################################################################ config = configparser.ConfigParser() config.read('config.ini') ################################################################################ # Logging and Verbose ################################################################################ logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') logger = logging.getLogger() quiet = False verbose = False def log_verbose(message,verbose_flag): if(verbose_flag==0): logger.info(message) else: logger.error(message) if(verbose): print(message) def setup_logging(log_file, max_bytes=int(config['Logging']['max_bytes']), backup_count=int(config['Logging']['backup_count'])): for handler in logging.root.handlers[:]: logging.root.removeHandler(handler) logging.basicConfig(level=logging.DEBUG) # Get the current time now = datetime.datetime.now() # Get the local time zone name local_tz = now.astimezone().tzinfo local_tzname = local_tz.tzname(now) # Get the GMT offset (time difference from UTC) gmt_offset_seconds = local_tz.utcoffset(now).total_seconds() gmt_offset_hours = gmt_offset_seconds // 3600 gmt_offset_minutes = (gmt_offset_seconds % 3600) // 60 # Format the GMT offset as "-HHMM" or "+HHMM" gmt_offset = f"{int(gmt_offset_hours):+03d}{int(gmt_offset_minutes):02d}" # Create a custom logger logger = logging.getLogger('MyLogger') logger.propagate = False # Create formatter formatter = logging.Formatter('%(asctime)s GMT{} [%(levelname)s] : %(message)s'.format(gmt_offset), datefmt='%Y-%m-%d %H:%M:%S') # Create a rotating file handler handler = RotatingFileHandler(log_file, maxBytes=max_bytes, backupCount=backup_count) handler.setLevel(logging.INFO) handler.setFormatter(formatter) logger.addHandler(handler) return logger # Initialize logging log_file = config['Logging']['log_file'] logger = setup_logging(log_file) ################################################################################ # Helper Functions ################################################################################ def header(): if (quiet==False): print("Helix Monitor %s Python Simple Sender (build: %s)"%(ITM_VERSION, ITM_BUILD)) print(ITM_COPYRIGHT) def usage(exit_code=0): header() print(""" usage: python msend_4_bhom.py [-a Class] [-h|-?] -n Helix_Monitor_Hostname:apiKey [-q] [-v] [- | {SourceFile} | -a Class [-p] [-b SlotSetValue] [-m Message] [-o source_identifier] [-r Severity]] -a Send object of class Class -b Add SlotSetValue settings (format: "slot=value;...") -h Print this help and exit -m Send event message to Message -n Connect to BHOM Instance with apiKey (colon separated) -o Set event source_identifier -p For PATROL_EV class only, when sending event from file or standard input stream, randomize the p_origin_key value (overriding existing value) -q quiet execution (no banner) -r Send event severity to Severity -v Verbose -z Print version number and exit - BAROC Input from standard input stream examples: 1) echo 'EVENT;severity=MAJOR;msg=Test_Message;END' | python msend_for_bhom.py -n : -v - 2) python msend_for_bhom.py -n : -v -f "sample.baroc" 3) python msend_for_bhom.py -n : -a HELIX_SM_EV -r CRITICAL -m "Testing from Python CLI" -b "object_class=test-object-class" -v server sample-itom-demo.onbmc.com """) sys.exit(exit_code) ################################################################################ # Parsing arguments ################################################################################ def parse_args(): args = sys.argv[1:] read_from_stdin = False if '-' in args: read_from_stdin = True #argv.remove('-') file_path = None global verbose verbose = False global quiet quiet = False if not args: usage(1) try: global opts opts, args = getopt.getopt(args, "a:b:f:h?l:m:n:o:pqr:vz") except getopt.GetoptError as err: message=ERR_CLI.format(err) print(message) log_verbose("parse_args: "+message,1) usage(1) server = apiKey = event_class = message = severity = None global slot_values slot_values = {} source_identifier = "msend4bhom.py_{}".format(random.randint(0, 99999)) #Default p_origin_key = None for opt, arg in opts: if opt in ("-h", "-?"): usage() elif opt == "-z": header() sys.exit(0) elif opt == "-v": verbose = True elif opt == '-q': quiet = True verbose = False elif opt == "-n": server,apiKey = arg.split(":",1) elif opt == "-a": event_class = arg elif opt == "-r": severity = arg elif opt == "-m": message = arg elif opt == "-b": for pair in arg.split(";"): if "=" in pair: slot,value = pair.split("=",1) slot_values[slot]=value elif opt == "-": read_from_stdin = True elif opt == "-f": file_path = arg elif opt == "-o": source_identifier = arg elif opt == "-p": if event_class == "PATROL_EV": #change msednpl p_origin_key = "msend4bhom.py_{}".format(random.randint(0,99999)) else: p_origin_key = arg if "-n" not in [opt for opt, _ in opts]: #if not server or not apiKey: try: server = config['Server']['server'] apiKey = config['Server']['api_key'] except: print(ERR_NODESTORKEY) log_verbose("parse_args: "+ERR_NODESTORKEY,1) usage(1) return server, apiKey, event_class, message, severity, slot_values, read_from_stdin, file_path, source_identifier, p_origin_key ################################################################################ # Convert raw into JSON event ################################################################################ def convert_raw_to_json(event_class, message, severity, slot_values, source_identifier, p_origin_key, source_hostname, location, source_address): event = [{ "class": event_class, "msg": message, "severity": severity, "source_identifier": source_identifier, "source_hostname": source_hostname, "location": location, "source_address": source_address }] event[0].update(slot_values) #for slot, value in slot_values.items(): #if slot not in event[0]: #event[0][slot]=value if event_class == "PATROL_EV" and 'p' in opts and 'p_origin_key' not in event[0]: event[0]["p_origin_key"] = p_origin_key json_event = json.dumps(event) if (quiet==False):print("JSON EVENT: ", json_event, "\n") return json_event ################################################################################ # Networking ################################################################################ @retry(wait_fixed=int(config['REST API retry']['wait_time']), stop_max_attempt_number=int(config['REST API retry']['max_retry_count'])) def send(server, apiKey, json_event): if(retry_flag==False): file_path = buffer_event(json.loads(json_event)) else: file_path=filepath if (quiet==False):print("Sending JSON event to BHOM:\n", json_event, "\n") headers = { "Authorization": "apiKey {}".format(apiKey), "Content-Type": "application/json" } url = "https://{}/events-service/api/v1.0/events".format(server) try: response = requests.post(url, headers=headers, data=json_event) # Raise an exception for non-200 status codes response.raise_for_status() if (quiet==False):print("Helix Monitor response:\n", response.content.decode(), "\n") move_buffered_event(file_path) except requests.exceptions.RequestException as e: print("Error sending event:", e) log_verbose("send: Failed event {} kept in the {} directory for retry".format(file_path,buffer_dir),1) ################################################################################ # Persistent Buffering ################################################################################ global buffer_dir, success_flag, success_dir buffer_dir = config['Buffering']['buffer_dir'] max_buffer_files = int(config['Buffering']['max_buffer_files']) max_buffer_size_bytes = int(config['Buffering']['max_buffer_size_bytes']) lock_file = config['Buffering']['lock_filename'] max_lock_file_size = int(config['Buffering']['max_lock_file_size']) success_flag = config.getboolean('EventHandling','success_flag') success_dir = config['EventHandling']['success_dir'] max_success_files = int(config['EventHandling']['max_success_files']) max_success_size_bytes = int(config['EventHandling']['max_success_size_bytes']) # Define the external log file lock_log_file = "lock_log.txt" try: if not os.path.exists(success_dir): os.makedirs(success_dir) if not os.path.exists(buffer_dir): os.makedirs(buffer_dir) except Exception as e: message="Error in creating directory: {}".format(e) print(message) log_verbose("sys: "+message,1) sys.exit(0) def buffer_event(event): try: # Write to the buffer_event file global filename filename = int(time.time()*1000) try: file_path = os.path.join(buffer_dir, "event_{}.json".format(filename)) except Exception as e: message="Error in creating file {}: ".format(filename),e print(message) log_verbose("buffer_event: "+message,1) check_lock_status(file_path, lock_file) check_and_trim_file(lock_file,max_lock_file_size) # Write event data to the buffer file try: with open(file_path, 'w') as file: json.dump(event, file) except IOError as e: message="Error writing to file {}: {}".format(file_path,e) print(message) log_verbose("buffer_event: "+message,1) return None log_verbose("buffer_event: Locking File - {}.json".format(filename),0) # Append the file path to the lock log file try: with open(lock_log_file, 'a') as log_file: log_file.write(file_path + '\n') except IOError as e: message="Error writing to lock log file {}: {}".format(lock_log_file,e) print(message) log_verbose("buffer_event: "+message,1) # Perform rolling saves if the number of buffer files exceeds the limit buffer_files = sorted(glob.glob(os.path.join(buffer_dir, 'event_*.json'))) buffer_total_size = sum(os.path.getsize(file) for file in buffer_files) while len(buffer_files) > max_buffer_files or buffer_total_size > max_buffer_size_bytes: oldest_file = buffer_files.pop(0) buffer_total_size -= os.path.getsize(oldest_file) os.remove(oldest_file) return file_path finally: # Release the lock release_lock(file_path, lock_file) log_verbose("buffer_event: Unlocking File - {}.json".format(filename),0) def move_buffered_event(file_path): file_name = str(file_path).split('\\')[-1] dest_path = os.path.join(success_dir, file_name) if success_flag: # Move the file to the success directory try: log_verbose("move_buffered_event: JSON event sent to BHOM successfully, Buffered event moved into {} directory".format(success_dir),0) shutil.move(file_path, dest_path) except shutil.Error as e: message="Error moving file {} to {}: {}".format(file_path,dest_path,e) print(message) log_verbose("move_buffered_event: "+message,1) return # Perform rolling saves if the number of success files exceeds the limit success_files = sorted(glob.glob(os.path.join(success_dir, 'event_*.json'))) total_size = sum(os.path.getsize(file) for file in success_files) while len(success_files) > max_success_files or total_size > max_success_size_bytes: oldest_file = success_files.pop(0) total_size -= os.path.getsize(oldest_file) os.remove(oldest_file) else: # Delete the buffer file if success_flag is False try: log_verbose("move_buffered_event: JSON event sent to BHOM successfully, Buffered event deleted",0) os.remove(file_path) except OSError as e: message="Error deleting file {}: {}".format(file_path,e) print(message) log_verbose("move_buffered_event: "+message,1) def retry_buffered_events(server, apiKey): count = 1 global filepath for file_name in os.listdir(buffer_dir): log_verbose("retry_buffered_events: Retrying Buffered Event - {}: {}".format(str(count),file_name),0) file_path = os.path.join(buffer_dir, file_name) filepath = file_path check_lock_status(file_path, lock_file) check_and_trim_file(lock_file,max_lock_file_size) with open(file_path,'r') as file: event = json.load(file) json_event = json.dumps(event) send(server, apiKey, json_event) count+=1 # Function to acquire lock for a buffer file def acquire_lock(file_name, lock_file): with open(lock_file, 'a+') as f: f.seek(0) if file_name in f.read(): return False # Lock not acquired, file already in use f.write(file_name + '\n') # Append file name to lock file return True # Lock acquired successfully # Function to release lock for a buffer file def release_lock(file_name, lock_file): with open(lock_file, 'r') as f: lines = f.readlines() with open(lock_file, 'w') as f: for line in lines: if line.strip() != file_name: f.write(line) # Rewrite lock file without the released file name # Function to check if a buffer file is locked def check_lock_status(file_path, lock_file): # Check if the buffer file is locked lock_status = acquire_lock(file_path, lock_file) retry_time = int(config['Buffering']['retry_check_buffer_status_time']) retry_count = int(config['Buffering']['retry_check_buffer_status_count']) # Try to acquire the lock, wait for retry_time seconds if it's already locked for retry_val in range(retry_count): # Try retry_count times if lock_status: lock_status=True break else: log_verbose("Buffer file {} is found locked, retrying after {} seconds, retries remaining - {}".format(file_path, retry_time,str(int(retry_count-retry_val-1))),1) time.sleep(retry_time) # Wait for retry_time seconds before trying again if not lock_status: print("Buffer file is locked. Exiting.") log_verbose("buffer_event: Buffer file {} is still found locked after {} seconds, hence exited.".format(file_path,str(int(retry_time*retry_count))),1) sys.exit(1) def check_and_trim_file(file_path, max_size): try: # Get the size of the file file_size = os.path.getsize(file_path) # Check if the size exceeds the limit if file_size > max_size: # Read the file content with open(file_path, 'r+') as file: lines = file.readlines() # Trim the file content to remove the oldest records/lines file.seek(0) file.truncate() file.writelines(lines[-1:]) # Keep the last line except Exception as e: message="Error while checking and trimming file:"+e print(message) log_verbose("check_and_trim_file: "+message,1) ################################################################################ # Read event from a file ################################################################################ def read_event_from_file(file_path): try: with open(file_path,'r') as file: return file.read().strip() except Exception as e: message="Error reading from file %s" % (e) print(message) log_verbose("read_event_from_file: "+message,1) sys.exit(1) ################################################################################ # Parse Input ################################################################################ def parse_input(stdin_input): #Default values event_class = "PATROL_EV" severity = "MAJOR" message = "" source_hostname = socket.getfqdn() location = socket.gethostbyaddr(socket.gethostname())[0] source_address = socket.gethostbyname(socket.gethostname()) for part in stdin_input.split(';'): if '=' in part: key,value = part.split('=',1) if key == 'class' or key == 'event_class': event_class = value elif key == 'severity': severity = value elif key == 'msg' or key == 'message': message = value elif key == 'source_hostname': source_hostname = value elif key == 'location': location = value elif key == 'source_address': source_address = value else: slot_values[key] = value return event_class, severity, message, source_hostname, location, source_address ################################################################################ # Main Logic ################################################################################ def main(): global verbose_flag verbose_flag = 0 log_verbose("-----------------------------xxx-----------------------------",0) server, apiKey, event_class, message, severity, slot_values, read_from_stdin, file_path, source_identifier, p_origin_key = parse_args() header() log_verbose("main: Beginning main",0) global retry_flag retry_flag = True log_verbose("main: Retrying buffered events if any",0) retry_buffered_events(server, apiKey) retry_flag = False if read_from_stdin: log_verbose("main: Reading from stdinput",0) stdin_input = sys.stdin.read().strip() event_class, severity, message, source_hostname, location, source_address = parse_input(stdin_input) elif file_path: log_verbose("main: Reading from file",0) file_input = read_event_from_file(file_path) event_class, severity, message, source_hostname, location, source_address = parse_input(file_input) else: log_verbose("main: Parsing given arguments",0) event_class, severity, message, source_hostname, location, source_address = parse_input("EVENT;event_class={};severity={};msg={};END".format(event_class,severity,message)) log_verbose("main: Creating event",0) event = convert_raw_to_json(event_class, message, severity, slot_values, source_identifier, p_origin_key, source_hostname, location, source_address) log_verbose("main: Sending JSON event to BHOM",0) send(server, apiKey, event) log_verbose("-----------------------------xxx-----------------------------",0) if __name__ == "__main__": main()