Oid extractor from SNMP Mib files

I currently have the following 3 … versions… none working though.

They are at different levels, as would be seen, at min as a start I was thinking lets just get the data decomposed/decompiled so that it can be printed to console… as the database component can be added after…

NOTE: the name change for RFC-1215 from norm… this is due to files I tried to load needed that mib named as such and with the header matching…

The --mib-dirs contain:

total 472
drwxr-xr-x  11 george  staff     352 Jul  9 15:42 .
drwxr-xr-x  16 george  staff     512 Jul  9 21:06 ..
-rw-r--r--   1 george  staff     935 Jul  4 22:04 README.md
-rw-r--r--   1 george  staff    2692 Jul  9 13:18 RFC-1212.mib
-rw-r--r--   1 george  staff    2814 Jul  9 15:50 RFC-1215.mib
-rw-r--r--   1 george  staff    3077 Jul  9 13:18 RFC1155-SMI.mib
-rw-r--r--   1 george  staff  103000 Jun 28 21:12 RFC1213-MIB.mib
-rw-r--r--   1 george  staff   31385 Jul  9 13:18 RFC1231-MIB.mib
-rw-r--r--   1 george  staff   31946 Jun 28 21:12 SNMPv2-MIB.mib
-rw-r--r--   1 george  staff   10283 Jun 28 21:12 SNMPv2-SMI.mib
-rw-r--r--   1 george  staff   38034 Jul  6 15:55 SNMPv2-TC.mib

Option 1

#######################################################################################################################
#
#
#  	Project     	: 	SNMP MIB Loader/Ingester.
#
#   File            :   mib_parser.py
#
#   Description     :   Load the oid name/description data from MIB files directly into designated tables that cna be
#                   :   exposed into Apache Flink
#
#                   :   Database engines currently supported: (Redis, PostgreSQL or MySql).
#
#	By              :   George Leonard ( georgelza@gmail.com )
#
#   Created     	:   05 Jul 2025
#
#   TO BE
#
#   Example         :   python mib_parser.py \
#                           --mib-file mibs/RFC1213-MIB.mib \
#                           --mib-dirs mibstd \
#                           --db-type postgresql \
#                           --db-host localhost \
#                           --db-port 5433 \
#                           --db-name snmp \
#                           --db-schema public \
#                           --db-user dbadmin \
#                           --db-password dbpassword \
#                           --tbl-name snmp_oid_data
# OR
#                       python3 mib_parser.py \
#                           --mib-directory mibsx \
#                           --mib-dirs mibstd \
#                           --db-type mysqlsql \
#                           --db-host localhost \
#                           --db-port 3306 \
#                           --db-name snmp \
#                           --db-schema snmp \
#                           --db-user dbadmin \
#                           --db-password dbpassword \
#                           --tbl-name snmp_oid_data
#
#   The script now uses argparse to accept command-line arguments for:
#
#       --mib-file (mutually exclusive with --mib-directory): Path to a single MIB file.
#       --mib-directory (mutually exclusive with --mib-file): Path to a directory containing MIB files.
#           --mib-dirs:         Optional list of directories where dependent MIBs are located, aka standard mibs, 
#                               i.e: RFC-1212, RFC1155-SMI, RFC1213-MIB, RFC1215, RFC1231-MIB, SNMPv2-MIB, SNMPv2-SMI, SNMPv2-TC
#           --db-type           (required): postgresql, mysql, or redis.
#           --db-host           (required): Database hostname.
#           --db-port           (required): Database port.
#           --db-name:          Database name (for SQL) or DB index (for Redis).
#           --db-schema:        Schema name (for PostgreSQL:public or dbname for MySQL).
#           --db-user:          Username (for SQL).
#           --db-password:      Password (for all).
#           --tbl-name:         Target table to load data into (for PostgreSQL/MySQL).
#           --redis-key-prefix: Custom key prefix for Redis (defaults to oid:).
#
#   INTERIM Test
#
#       python mib_parser.py --mib-directory mibstxt --mib-file MIB-Dell-10892.mib
#       python mib_parser.py --mib-directory mibstxt --mib-file MIB-Dell-10892.mib --mib-dirs mibstd
#
########################################################################################################################
__author__      = "George Leonard"
__email__       = "georgelza@gmail.com"
__version__     = "0.0.1"
__copyright__   = "Copyright 2025, George Leonard"


import os
import re
import argparse         # Import argparse for command-line arguments
import pysnmp.debug     # Import pysnmp.debug for logging

from datetime           import datetime

# Import PySMI components for explicit MIB compilation
from pysmi.parser       import SmiStarParser
from pysmi.codegen      import PySnmpCodeGen

# Import pysnmp MIB builder and view for accessing compiled MIBs
from pysnmp.smi         import builder, view
from pysnmp.smi.error   import SmiError # Import specific error for catching

# Import the logger from utils.py
from utils              import logger

current_datetime = datetime.now()
datetime_str     = current_datetime.strftime("%Y%m%d_%H%M%S") # Example format: 20250709_165531
             
# Initialize the logger
# You can adjust these levels as needed. INFO for console, DEBUG for file.
log = logger(filename=f"mib_parser_{datetime_str}.log", console_debuglevel=1, file_debuglevel=0)

# Enable pysnmp debug logging for verbose output
# This will print detailed information about MIB loading and parsing.
# Now using our custom logger
pysnmp.debug.set_logger(pysnmp.debug.Debug('all', logger=log)) # Integrate pysnmp debug with our logger

def extract_mib_details(mib_directory, mib_file, mib_dirs_list=None):
    """
    Extracts OID details from a MIB file using PySMI for explicit compilation.

    Args:
        mib_directory (str): The path to the directory containing the MIB file,
                             relative to the script's location.
        mib_file (str): The name of the MIB file (e.g., 'MIB-Dell-10892.mib').
        mib_dirs_list (list, optional): A list of additional directories where dependent MIBs can be found,
                                        relative to the script's location. Defaults to None.
    """
    # Get the directory where this script (mib_parser.py) is located
    script_dir              = os.path.dirname(os.path.abspath(__file__))

    # Resolve mib_directory to an absolute path relative to the script's directory
    absolute_mib_directory = os.path.join(script_dir, mib_directory)
    original_mib_file_path = os.path.join(absolute_mib_directory, mib_file)

    if not os.path.exists(original_mib_file_path):
        
        log.error(f"Error: MIB file not found at '{original_mib_file_path}'")
        return

    log.info(f"Parsing MIB file: {original_mib_file_path}\n")

    # Initialize PySMI parser and code generator
    mibParser   = SmiStarParser()
    mibCodeGen  = PySnmpCodeGen()

    # Initialize MibBuilder directly
    mibBuilder  = builder.MibBuilder() # This line is critical!
    mibView     = view.MibViewController(mibBuilder)

    # Create a temporary directory for compiled MIBs if it doesn't exist.
    # This directory will be used by pysnmp to store compiled versions of imported MIBs.
    compiled_mibs_dir = os.path.join(absolute_mib_directory, "compiled_mibs")
    os.makedirs(compiled_mibs_dir, exist_ok=True)

    module_name = None
    extracted_oids = [] # Initialize extracted_oids here to prevent UnboundLocalError

    try:
        # Determine the MIB module name from the file content.
        with open(original_mib_file_path, 'r') as f:
            for line in f:
                if 'DEFINITIONS ::=' in line:
                    module_name = line.split(' ')[0].strip()
                    break
            else:
                raise ValueError(f"Could not determine MIB module name from '{original_mib_file_path}'. "
                                 "Ensure it starts with 'MODULE-NAME DEFINITIONS ::= BEGIN'.")

        # Prepare MIB source readers for PySMI. We use pysnmp's own MibBuilder.add_mib_sources.
        # Order is crucial for MIB resolution:
        # 1. Writable directory for compiled MIBs (for caching compiled MIBs)
        # 2. Directory of the input MIB file (for the primary MIB and its local dependencies)
        # 3. User-specified MIB directories
        # 4. PySNMP's own standard MIBs (usually found automatically by pysnmp, so often not needed to explicitly add)

        # Add the directory for compiled MIBs (for caching)
        mibBuilder.add_mib_sources(builder.DirMibSource(compiled_mibs_dir))

        # Add the directory containing the primary MIB file
        mibBuilder.add_mib_sources(builder.DirMibSource(absolute_mib_directory))

        # Add user-specified MIB directories if provided, resolving them relative to script_dir
        if mib_dirs_list:
            for mib_dir_path_relative in mib_dirs_list:
                absolute_mib_dir_path = os.path.join(script_dir, mib_dir_path_relative)
                if os.path.isdir(absolute_mib_dir_path):
                    mibBuilder.add_mib_sources(builder.DirMibSource(absolute_mib_dir_path))
                else:
                    log.warning(f"Warning: MIB directory '{absolute_mib_dir_path}' (resolved from '{mib_dir_path_relative}') specified in --mib-dirs does not exist or is not a directory. Skipping.")

        # Explicitly compile the main MIB file and its dependencies
        try:
            # Read the content of the main MIB file
            with open(original_mib_file_path, 'r') as f:
                mib_content = f.read()

            # Parse the MIB content
            ast = mibParser.parse(mib_content)

            # Generate pysnmp MIB objects from the parsed AST
            # Pass the mibBuilder instance directly to codegen
            mibCodeGen.codegen(ast, mibBuilder, genTexts=True, genDescr=True)

            log.info(f"Successfully compiled and loaded MIB module: {module_name}")

        except SmiError as e:
            log.error(f"Error compiling MIB module '{module_name}': {e}")
            log.error(f"This often means a dependent MIB is missing or incorrectly named.")
            log.error(f"Please ensure all required IMPORTS for '{mib_file}' (e.g., RFC1155-SMI, RFC-1212, RFC-1215, RFC1213-MIB) "
                      f"are present and correctly named (e.g., 'RFC-1215.mib' if importing 'RFC-1215') "
                      f"in '{absolute_mib_directory}' or any of the specified --mib-dirs paths, or standard PySNMP MIB paths.")
            return # Exit if loading fails

        # Iterate through all loaded symbols/objects in the MIB builder.
        for mibVar in mibBuilder.getMibNodes():
            # Check if the object has a name and an OID, indicating it's a defined MIB object.
            if hasattr(mibVar, 'getName') and hasattr(mibVar, 'getOid'):
                oid_name   = str(mibVar.getName())
                oid_string = str(mibVar.getOid())

                # Skip common root OIDs and module definitions themselves.
                if oid_string in ['.1', '.1.3', '.1.3.6', '.1.3.6.1', '.1.3.6.1.4', '.1.3.6.1.4.1']:
                    continue
                if hasattr(mibVar, 'kind') and mibVar.kind == 'module':
                     continue

                oid_description = "No description available."
                if hasattr(mibVar, 'getDescription') and mibVar.getDescription():
                    oid_description = mibVar.getDescription().strip()

                syntax_type = "UNKNOWN"
                if hasattr(mibVar, 'syntax'):
                    syntax_obj = mibVar.syntax
                    if hasattr(syntax_obj, 'getName'):
                        syntax_type = syntax_obj.getName()
                    elif hasattr(syntax_obj, '__class__'):
                        syntax_type = syntax_obj.__class__.__name__
                elif hasattr(mibVar, '__class__'):
                    if 'MibModule' in str(mibVar.__class__) or 'MibNode' in str(mibVar.__class__):
                         syntax_type = "OBJECT IDENTIFIER"
                    else:
                         syntax_type = mibVar.__class__.__name__

                unit = None

                oid_type = "unknown"
                if isinstance(mibVar, builder.MibScalar):
                    oid_type = "scalar"
                    if not oid_string.endswith('.0'):
                        oid_string += '.0'

                elif isinstance(mibVar, builder.MibTable):
                    oid_type = "table"

                elif isinstance(mibVar, builder.MibTableRow):
                    oid_type = "tableRow"

                elif isinstance(mibVar, builder.MibTableColumn):
                    oid_type = "tableColumn"

                elif isinstance(mibVar, builder.MibNotification):
                    oid_type = "notification"

                elif 'OBJECT IDENTIFIER' in syntax_type or 'MibNode' in str(mibVar.__class__):
                     oid_type = "group"

                extracted_oids.append({
                    "oid_string":       oid_string,
                    "oid_name":         oid_name,
                    "oid_description":  oid_description,
                    "syntax_type":      syntax_type,
                    "unit":             unit,
                    "oid_type":         oid_type,
                })

    except SmiError as e:
        log.error(f"MIB parsing error: {e}")
        log.error("Please ensure the MIB file is well-formed and its IMPORTS are resolvable.")

    except Exception as e:
        log.exception(f"An unexpected error occurred: {e}")

    finally:
        pass # No specific cleanup for temp files needed with this approach

    if not extracted_oids:
        log.warning("No OIDs extracted. This might be due to an empty MIB, a MIB with only MODULE-IDENTITY, or issues in parsing.")
        log.warning("Ensure the MIB file contains OBJECT-TYPE, OBJECT IDENTIFIER, or TRAP-TYPE definitions.")
        return

    # Print the extracted details in a table format.
    log.info(f"\n--- Extracted {len(extracted_oids)} OIDs ---")
    log.info("{:<30} {:<30} {:<20} {:<15} {:<10} {:<15}".format(
        "OID String", "OID Name", "Syntax Type", "OID Type", "Unit", "Description"
    ))
    log.info("-" * 130)

    for oid_data in extracted_oids:
        display_description = (oid_data["oid_description"][:45] + '...') if len(oid_data["oid_description"]) > 48 else oid_data["oid_description"]

        log.info("{:<30} {:<30} {:<20} {:<15} {:<10} {:<15}".format(
            oid_data["oid_string"],
            oid_data["oid_name"],
            oid_data["syntax_type"],
            oid_data["oid_type"],
            oid_data["unit"] if oid_data["unit"] else "N/A",
            display_description
        ))
    log.info("-" * 130)

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Extract OID details from a MIB file.")
    parser.add_argument(
        "--mib-directory",
        type=str,
        required=True,
        help="Path to the directory containing the MIB file, relative to this script's location."
    )

    parser.add_argument(
        "--mib-file",
        type=str,
        required=True,
        help="Name of the MIB file (e.g., 'MIB-Dell-10892.mib')."
    )

    parser.add_argument(
        "--mib-dirs",
        type=str,
        nargs='+', # Allows one or more arguments
        help="Additional directories where dependent MIBs are located, relative to this script's location. "
             "Provide multiple paths separated by spaces. E.g., 'standard_mibs vendor_mibs'."
    )

    args = parser.parse_args()

    extract_mib_details(args.mib_directory, args.mib_file, args.mib_dirs)

Option 2

__author__      = "George Leonard"
__email__       = "georgelza@gmail.com"
__version__     = "0.0.1"
__copyright__   = "Copyright 2025, George Leonard"


import os
import json
import argparse
from pysnmp.smi import builder, view, error
from pysnmp.smi.rfc1902 import ObjectType, NotificationType # Import necessary types
from datetime import datetime # For timestamp in logs
from utils import logger # Import our custom logger from utils.py
import sys # To find pysnmp mibs path

# Initialize logger instance globally for this script
LOG_TAG             = 'snmp_mib_ingester'
LOG_FILE            = f'{LOG_TAG}.log'
CONSOLE_DEBUG_LEVEL = 0 # Set to DEBUG for detailed MIB loading info
FILE_DEBUG_LEVEL    = 0 # Set to DEBUG for detailed MIB loading info
CUSTOM_LOG_FORMAT   = f'{LOG_TAG}, %(asctime)s, %(levelname)s, %(message)s' # Added levelname
logger_instance     = logger(LOG_FILE, CONSOLE_DEBUG_LEVEL, FILE_DEBUG_LEVEL, CUSTOM_LOG_FORMAT)

# Conditional imports for database connectors
# These will need to be installed:
# pip install pysnmp psycopg2-binary mysql-connector-python redis
try:
    import psycopg2

except ImportError:
    psycopg2 = None
    logger_instance.warning("psycopg2 not found. PostgreSQL database option will not be available.")

# end try

try:
    import mysql.connector

except ImportError:
    mysql = None
    logger_instance.warning("mysql-connector-python not found. MySQL database option will not be available.")

# end try

try:
    import redis

except ImportError:
    redis = None
    logger_instance.warning("redis not found. Redis database option will not be available.")

# end try


def parse_mib_files(mib_file_path=None, mib_directory_path=None, mib_dirs=None, logger_instance=None):
    """
    Parses MIB files from a given path (file or directory) and extracts OID metadata.

    Args:
        mib_file_path (str, optional): The path to a single MIB file to parse.
        mib_directory_path (str, optional): The path to a directory containing MIB files.
        mib_dirs (list, optional): A list of directories where dependent MIBs
                                   might be located. Defaults to None.
        logger_instance (logging.Logger, optional): The logger instance to use.

    Returns:
        list: A list of dictionaries, each representing an OID's metadata.
              Returns an empty list if parsing fails or no OIDs are found.
    """
    if logger_instance is None:
        import logging
        logger_instance = logging.getLogger(__name__) # Fallback if not provided

    mib_data = []
    mibBuilder = builder.MibBuilder()

    # --- Configure MIB Sources ---
    # 1. Add the directory where the user's MIB file/directory is located
    user_mib_base_dir = None
    if mib_file_path:
        user_mib_base_dir = os.path.dirname(os.path.abspath(mib_file_path))
    elif mib_directory_path:
        user_mib_base_dir = os.path.abspath(mib_directory_path)

    if user_mib_base_dir and os.path.isdir(user_mib_base_dir):
        mibBuilder.add_mib_sources(builder.DirMibSource(user_mib_base_dir)) # Corrected: add_mib_sources
        logger_instance.debug(f"Added user-specified MIB source: {user_mib_base_dir}")
    else:
        logger_instance.error(f"User-specified MIB source '{user_mib_base_dir}' does not exist or is not a directory. Exiting.")
        return []

    # 2. Add pysnmp's default MIB sources (critical for RFC MIBs like SNMPv2-SMI, RFC1213-MIB)
    try:
        # A more robust way to get the pysnmp MIBs path by iterating through sys.path
        # and looking for 'pysnmp/smi/mibs' within site-packages or similar.
        pysnmp_mibs_path = None
        for p in sys.path:
            potential_path = os.path.join(p, 'pysnmp', 'smi', 'mibs')
            if os.path.isdir(potential_path):
                pysnmp_mibs_path = potential_path
                break
        
        if pysnmp_mibs_path:
            mibBuilder.add_mib_sources(builder.DirMibSource(pysnmp_mibs_path)) # Corrected: add_mib_sources
            logger_instance.debug(f"Added pysnmp default MIB source: {pysnmp_mibs_path}")
        else:
            logger_instance.warning("Could not automatically find pysnmp default MIBs path. "
                                    "Ensure standard RFC MIBs are manually placed in your MIB directory or specified via --mib-dirs.")

    except Exception as e:
        logger_instance.warning(f"Error while trying to add pysnmp default MIB source: {e}")
        logger_instance.warning("Ensure standard RFC MIBs are manually placed in your MIB directory or specified via --mib-dirs.")

    # 3. Add any additional MIB directories provided via --mib-dirs
    if mib_dirs:
        for mib_dir in mib_dirs:
            abs_mib_dir = os.path.abspath(mib_dir)
            if os.path.isdir(abs_mib_dir):
                mibBuilder.add_mib_sources(builder.DirMibSource(abs_mib_dir)) # Corrected: add_mib_sources
                logger_instance.debug(f"Added additional MIB source: {abs_mib_dir}")
            else:
                logger_instance.warning(f"Additional MIB directory '{abs_mib_dir}' does not exist or is not a directory. Skipping.")

    # --- Debugging: Print all MIB sources configured ---
    logger_instance.debug("Current MIB Builder search paths:")
    for source in mibBuilder.get_mib_sources(): # Corrected: Use get_mib_sources()
        logger_instance.debug(f"- {source}")
    logger_instance.debug("-" * 30)
    # --- End Debugging ---

    # --- Determine MIB files to load ---
    mib_files_to_load = []
    if mib_file_path:
        mib_files_to_load.append(os.path.basename(mib_file_path))
    elif mib_directory_path:
        for filename in os.listdir(os.path.abspath(mib_directory_path)):
            if filename.endswith(('.mib', '.txt')):
                mib_files_to_load.append(filename)
    
    if not mib_files_to_load:
        logger_instance.warning("No MIB files found to load in the specified path(s).")
        return []

    # --- Load MIB modules ---
    for mib_filename in mib_files_to_load:
        mib_module_name = os.path.splitext(mib_filename)[0]
        logger_instance.info(f"Attempting to load MIB module: {mib_module_name}")
        try:
            # Use load_modules (with underscore) for the non-deprecated method
            mibBuilder.load_modules(mib_module_name) 
            logger_instance.info(f"Successfully loaded MIB module: {mib_module_name}")
        except error.SmiError as e:
            logger_instance.error(f"Error loading MIB module {mib_module_name} (from file '{mib_filename}'): {e}")
            logger_instance.error(f"  This often means a MIB file it IMPORTS is missing from the search paths.")
            logger_instance.error(f"  Please ensure all MIB files imported by '{mib_filename}' are in your --mib-directory or --mib-dirs.")
        except Exception as e:
            logger_instance.error(f"An unexpected error occurred while loading MIB module {mib_module_name} from '{mib_filename}': {e}")

    # --- Extract OID data from loaded MIBs ---
    # Iterate over all loaded MIB objects using mibBuilder.mibSymbols.values()
    for mibVar in mibBuilder.mibSymbols.values():
        try:
            # Only process ObjectType and NotificationType (which covers TRAP-TYPE)
            if not (isinstance(mibVar, ObjectType) or isinstance(mibVar, NotificationType)):
                continue

            oid_name    = str(mibVar.getName())
            oid_string  = str(mibVar.getOid())
            description = getattr(mibVar, 'getDescription', lambda: '')().strip()
            syntax_type = ""
            oid_type    = ""
            unit        = ""

            if isinstance(mibVar, ObjectType):
                if mibVar.isScalar():
                    oid_type = "scalar"
                elif mibVar.isColumn():
                    oid_type = "table" # Use "table" for columns as per your schema
                else: # Could be a table root, or other ObjectType
                    oid_type = "other" # Or refine this if you need to distinguish table roots

                syntax = mibVar.getSyntax()
                if hasattr(syntax, 'getLabel'):
                    syntax_type = syntax.getLabel()
                else:
                    syntax_type = syntax.__class__.__name__
            elif isinstance(mibVar, NotificationType):
                oid_type = "notification"
                syntax_type = "Notification" # Generic for notifications

            # Basic unit extraction (can be expanded if needed)
            desc_lower = description.lower()
            if "seconds" in desc_lower:
                unit = "seconds"
            elif "bytes" in desc_lower:
                unit = "bytes"
            elif "kilobytes" in desc_lower:
                unit = "kilobytes"
            elif "bits" in desc_lower:
                unit = "bits"
            elif "percent" in desc_lower:
                unit = "percent"


            mib_data.append({
                "oid_string":   oid_string,
                "oid_name":     oid_name,
                "description":  description,
                "syntax_type":  syntax_type,
                "unit":         unit,
                "oid_type":     oid_type,
            })
        except Exception as e:
            logger_instance.warning(f"Could not process MIB object {mibVar} ({type(mibVar)}): {e}")
            continue
    
    # Sort for consistent output
    mib_data.sort(key=lambda x: x['oid_string'])
    return mib_data


class DatabaseManager:
    """
    Manages database connections and data insertion for various database types.
    """
    def __init__(self, db_type, host, port, user=None, password=None, dbname=None, schema=None, tbl_name="snmp_oid_metadata", key_prefix="oid:", logger_instance=None):
        self.db_type    = db_type.lower()
        self.host       = host
        self.port       = port
        self.user       = user
        self.password   = password
        self.dbname     = dbname
        self.schema     = schema        # Schema for PostgreSQL
        self.tbl_name   = tbl_name      # Table name for SQL databases
        self.key_prefix = key_prefix
        self.connection = None
        self.cursor     = None          # For SQL databases
        self.logger     = logger_instance if logger_instance else logging.getLogger(__name__)

    # end def

    def connect(self):
        """Establishes a connection to the specified database."""
        try:
            if self.db_type == 'postgresql':
                if not psycopg2:
                    raise ImportError("psycopg2 is not installed. Cannot connect to PostgreSQL.")

                # end if
                self.connection = psycopg2.connect(
                    host    = self.host,
                    port    = self.port,
                    user    = self.user,
                    password= self.password,
                    dbname  = self.dbname
                )
                
                self.connection.autocommit  = True # Auto-commit changes
                self.cursor                 = self.connection.cursor()

                self.logger.info(f"Connected to PostgreSQL database: {self.dbname} on {self.host}:{self.port}")

                if self.schema:
                    self.cursor.execute(f"SET search_path TO {self.schema}, public;")
                    self.logger.info(f"PostgreSQL search_path set to: {self.schema}, public")

                # end if
            elif self.db_type == 'mysql':
                if not mysql:
                    raise ImportError("mysql-connector-python is not installed. Cannot connect to MySQL.")

                # end if
                
                self.connection = mysql.connector.connect(
                    host        = self.host,
                    port        = self.port,
                    user        = self.user,
                    password    = self.password,
                    database    = self.dbname
                )

                self.cursor = self.connection.cursor()
                self.logger.info(f"Connected to MySQL database: {self.dbname} on {self.host}:{self.port}")

            elif self.db_type == 'redis':
                if not redis:
                    raise ImportError("redis is not installed. Cannot connect to Redis.")

                #end if
                
                self.connection = redis.Redis(
                    host     = self.host,
                    port     = self.port,
                    db       = self.dbname,     # In Redis, 'database' is an integer index
                    password = self.password
                )

                self.connection.ping()          # Test connection
                self.logger.info(f"Connected to Redis database: {self.dbname} on {self.host}:{self.port}")

            else:
                raise ValueError(f"Unsupported database type: {self.db_type}")

            #end if
        except Exception as e:
            self.logger.error(f"Error connecting to {self.db_type} database: {e}")
            self.connection = None
            self.cursor     = None
            raise
        
        #end try
    # end def


    def insert_oid_metadata(self, oid_data_list):
        """
        Inserts a list of OID metadata dictionaries into the connected database.
        Uses UPSERT logic for SQL databases.
        """
        if not self.connection:
            self.logger.error("No active database connection. Please connect first.")
            return

        # end if
        
        if self.db_type == 'postgresql':
            table_full_name = f"{self.schema}.{self.tbl_name}" if self.schema else self.tbl_name
            sql = f"""
                INSERT INTO {table_full_name} (oid_string, oid_name, description, syntax_type, unit, oid_type)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON CONFLICT (oid_string) DO UPDATE SET
                    oid_name    = EXCLUDED.oid_name,
                    description = EXCLUDED.description,
                    syntax_type = EXCLUDED.syntax_type,
                    unit        = EXCLUDED.unit,
                    oid_type    = EXCLUDED.oid_type;
            """
            try:
                for oid in oid_data_list:
                    self.cursor.execute(sql, (
                        oid['oid_string'],
                        oid['oid_name'],
                        oid['description'],
                        oid['syntax_type'],
                        oid['unit'],
                        oid['oid_type']
                    ))
                
                # end for
                self.logger.info(f"Successfully inserted/updated {len(oid_data_list)} OIDs into PostgreSQL table '{table_full_name}'.")

            except Exception as e:
                self.logger.error(f"Error inserting into PostgreSQL table '{table_full_name}': {e}")

            # end try
        elif self.db_type == 'mysql':
            # For MySQL, schema is part of the database connection, not table name directly in query
            sql = f"""
                INSERT INTO {self.tbl_name} (oid_string, oid_name, description, syntax_type, unit, oid_type)
                VALUES (%s, %s, %s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    oid_name    = VALUES(oid_name),
                    description = VALUES(description),
                    syntax_type = VALUES(syntax_type),
                    unit        = VALUES(unit),
                    oid_type    = VALUES(oid_type);
            """
            try:
                for oid in oid_data_list:
                    self.cursor.execute(sql, (
                        oid['oid_string'],
                        oid['oid_name'],
                        oid['description'],
                        oid['syntax_type'],
                        oid['unit'],
                        oid['oid_type']
                    ))

                # end for
                self.connection.commit()
                self.logger.info(f"Successfully inserted/updated {len(oid_data_list)} OIDs into MySQL table '{self.tbl_name}'.")

            except Exception as e:
                self.logger.error(f"Error inserting into MySQL table '{self.tbl_name}': {e}")
                self.connection.rollback()

            # enf try
        elif self.db_type == 'redis':
            try:
                pipe = self.connection.pipeline()
                for oid in oid_data_list:
                    redis_key = f"{self.key_prefix}{oid['oid_string']}"
                    pipe.set(redis_key, json.dumps(oid))

                # end fr
                pipe.execute()
                self.logger.info(f"Successfully inserted/updated {len(oid_data_list)} OIDs into Redis (keys prefixed with '{self.key_prefix}').")

            except Exception as e:
                self.logger.error(f"Error inserting into Redis: {e}")
            
            # end try
        else:
            self.logger.warning(f"Insertion not supported for database type: {self.db_type}")

        # end if
    # end insert_oid_metadata


    def close(self):
        """Closes the database connection."""
        if self.connection:
            if self.db_type in ['postgresql', 'mysql'] and self.cursor:
                self.cursor.close()

            # end if
            self.connection.close()
            self.logger.info(f"Closed {self.db_type} database connection.")

        else:
            self.logger.info("No active connection to close.")
            
        # end if
    # end def close
# end class DatabaseManager


if __name__ == "__main__":

    parser = argparse.ArgumentParser(description="Parse SNMP MIB files and insert OID metadata into a database.")

    # Create a mutually exclusive group for mib_file and mib_directory
    mib_input_group = parser.add_mutually_exclusive_group(required=True)
    
    mib_input_group.add_argument("--mib-file",      help="Path to a single MIB file to parse.")
    mib_input_group.add_argument("--mib-directory", help="Path to a directory containing MIB files.")

    parser.add_argument("--mib-dirs",               nargs='*', default=[], help="Optional: List of directories where dependent MIBs are located.")
    parser.add_argument("--db-type",                choices=['postgresql', 'mysql', 'redis'], required=True, help="Type of database to connect to (postgresql, mysql, redis).")
    parser.add_argument("--db-host",                required=True, help="Database hostname or IP address.")
    parser.add_argument("--db-port",                type=int, required=True, help="Database port number.")
    parser.add_argument("--db-name",                help="Database name (for PostgreSQL/MySQL) or DB index (for Redis).")
    parser.add_argument("--db-user",                help="Database username (for SQL).")
    parser.add_argument("--db-password",            help="Database password (for PostgreSQL/MySQL/Redis).")
    parser.add_argument("--db-schema",              help="Database schema name (for PostgreSQL).")
    parser.add_argument("--tbl-name",               default="snmp_oid_metadata", help="Table name for SQL databases (PostgreSQL/MySQL). Defaults to 'snmp_oid_metadata'.")
    parser.add_argument("--redis-key-prefix",       default="oid:", help="Prefix for Redis keys (e.g., 'oid:' for 'oid:1.3.6.1.2.1.1.3.0'). Only for Redis.")

    args = parser.parse_args()

    # Parse the MIB file(s) or directory
    # Pass the appropriate argument based on which one was provided
    parsed_oids = parse_mib_files(
        mib_file_path       = args.mib_file,
        mib_directory_path  = args.mib_directory,
        mib_dirs            = args.mib_dirs,
        logger_instance     = logger_instance
    )

    if not parsed_oids:
        logger_instance.info("No OID data extracted. Exiting.")
        exit()

    # end if
    
    logger_instance.info("\n--- Parsed OID Metadata Summary ---")
    # Changed to debug level for individual OID details to keep INFO logs cleaner
    for oid in parsed_oids:
        logger_instance.debug(f"OID String:     {oid['oid_string']}")
        logger_instance.debug(f"OID Name:       {oid['oid_name']}")
        logger_instance.debug(f"Description:    {oid['description']}")
        logger_instance.debug(f"Syntax Type:    {oid['syntax_type']}")
        logger_instance.debug(f"Unit:           {oid['unit']}")
        logger_instance.debug(f"OID Type:       {oid['oid_type']}")
        logger_instance.debug("-" * 20)
    
    # end for
    
    logger_instance.info(f"Total OIDs parsed: {len(parsed_oids)}")

    # Database Insertion
    db_manager = None
    try:
        db_manager = DatabaseManager(
            db_type     = args.db_type,
            host        = args.db_host,
            port        = args.db_port,
            user        = args.db_user,
            password    = args.db_password,
            dbname      = args.db_name,
            schema      = args.db_schema,
            tbl_name    = args.tbl_name,
            key_prefix  = args.redis_key_prefix,
            logger_instance = logger_instance # Pass the logger instance
        )

        db_manager.connect()
        db_manager.insert_oid_metadata(parsed_oids)

    except Exception as e:
        logger_instance.error(f"An error occurred during database operations: {e}")

    finally:
        if db_manager:
            db_manager.close()
        
        # end if
    # end try
# end main

Option 3

__author__      = "George Leonard"
__email__       = "georgelza@gmail.com"
__version__     = "0.0.1"
__copyright__   = "Copyright 2025, George Leonard"


import os
import json
from pysnmp.smi import builder, view, error
from pysnmp.smi.rfc1902 import ObjectType, NotificationType             # Corrected: Removed TrapType
import argparse
from datetime import datetime                                           # For timestamp in logs
from utils import *                                                     # Import our custom logger from utils.py

# Initialize logger instance globally for this script
LOG_TAG             = 'snmp_mib_ingester'
LOG_FILE            = f'{LOG_TAG}.log'
CONSOLE_DEBUG_LEVEL = 1                                                 # INFO
FILE_DEBUG_LEVEL    = 0                                                 # DEBUG
CUSTOM_LOG_FORMAT   = f'{LOG_TAG}, %(asctime)s, %(message)s'            # Custom format: "{LOG_TAG}, {time}, message"
logger_instance     = logger(LOG_FILE, CONSOLE_DEBUG_LEVEL, FILE_DEBUG_LEVEL, CUSTOM_LOG_FORMAT)

# --- Configuration ---
# IMPORTANT: Ensure these MIB files are placed in the './mibs' directory
# along with any other MIBs they might import (e.g., RFC1155-SMI, RFC-1212, RFC-1215
# if they are not part of pysnmp's default MIBs or are custom versions).

MIB_FILES = [
     "pfSenseMIB.mib"
    ,"RFC1213-MIB.mib"
]

# MIB_FILES = [
#      "RFC1213-MIB.mib"
#     ,"SNMPv2-MIB.mib"
#     ,"SNMPv2-SMI.mib"
#     ,"SNMPv2-TC.mib"
#     ,"10892.mib"
#     ,"TRUENAS-MIB.mib"
#     ,"FreeBSD-MIB.mib"
#     ,"pfSenseMIB.mib"
#     ,"UBNT-MIB.mib"
#     ,"UBNT-MIB1.mib"
#     ,"UBNT-MIB2.mib"
#     ,"UBNT-UniFi-MIB.mib"
# ]

# MIB_DIR = "./mibs" # Directory where your MIB files are located


def get_mib_oids(mib_files, mib_dir):
    
    """
    Parses MIB files and extracts OID metadata.

    Args:
        mib_files (list): A list of MIB filenames to parse.
        mib_dir (str): The directory where MIB files are located.

    Returns:
        list: A list of dictionaries, each representing an OID record.
    """
    
    mib_builder = builder.MibBuilder()
    
    # Add default MIB sources (where pysnmp finds standard RFC MIBs)
    # and our custom MIB directory. This is crucial for resolving IMPORTS.
    mib_builder.add_mib_sources(
        builder.DirMibSource(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pysnmp_mibs')), # A common place for pysnmp's own MIBs
        builder.DirMibSource(mib_dir)
    )

    oids_data = []

    # Load all specified MIB modules
    for mib_file in mib_files:
        mib_module_name = os.path.splitext(mib_file)[0]
        print("mib_module_name ", mib_module_name)
        
        try:
            # Load the MIB module. This will also load any imported modules.
            mib_builder.load_modules(mib_module_name)
            print(f"Successfully loaded MIB: {mib_module_name}")
            
        except error.SmiError as e:
            print(f"Warning: Could not load MIB file {mib_file}: {e}")
            print(f"  Ensure all IMPORTS in {mib_file} are available in {mib_dir} or standard pysnmp MIB paths.")
            continue
        
        except Exception as e:
            print(f"An unexpected error occurred while loading {mib_file}: {e}")
            continue

        # end try
    # end for
    
    
    # After loading all modules, iterate through all symbols in the MibBuilder
    # This ensures we capture objects from all loaded MIBs and their dependencies.
    for mib_node_name, mib_node_obj in mib_builder.mibSymbols.items():
        # Filter for OBJECT-TYPE and NOTIFICATION-TYPE (which includes SMIv1 TRAP-TYPE)
        if not (isinstance(mib_node_obj, ObjectType) or
                isinstance(mib_node_obj, NotificationType)):
            continue

        #end f
        
        oid_string      = str(mib_node_obj.getName())
        oid_name        = mib_node_name # The symbolic name (e.g., 'sysDescr')
        oid_description = getattr(mib_node_obj, 'getDescription', lambda: '')()
        syntax_type     = None
        oid_type        = None

        if isinstance(mib_node_obj, ObjectType):
            # Differentiate between scalar objects and table columns
            if mib_node_obj.isScalar():
                oid_type = "scalar"
                
            elif mib_node_obj.isColumn():
                oid_type = "table"
            
            #end fif
            
            syntax = mib_node_obj.getSyntax()
            # For Textual Conventions, get its label. For base types, get class name.
            if hasattr(syntax, 'getLabel'):
                syntax_type = syntax.getLabel()
                
            else:
                syntax_type = syntax.__class__.__name__
        
            #end if
            
        elif isinstance(mib_node_obj, NotificationType):
            oid_type = "notification" # Covers both SMIv1 TRAP-TYPE and SMIv2 NOTIFICATION-TYPE
            syntax_type = "Notification" # A generic type for notifications themselves

        #end if
        
        unit = None # Highly difficult to infer reliably from MIB description without NLP
        
        # Only add if we successfully determined a relevant OID type
        if oid_type:
            oids_data.append({
                "oid_string":       oid_string,
                "oid_name":         oid_name,
                "oid_description":  oid_description.strip(),
                "syntax_type":      syntax_type,
                "unit":             unit,
                "oid_type":         oid_type
            })

        #end if
        
    # Sort for consistent output
    oids_data.sort(key=lambda x: x['oid_string'])
    return oids_data

#end def

def generate_redis_commands(oids_data):

    """
    Generates Redis SET commands for the OID metadata.
    """
    commands = []
    commands.append("# Redis SET commands for SNMP OID Metadata")

    for oid in oids_data:
        json_value = {
            "oid_name":     oid["oid_name"],
            "description":  oid["oid_description"],
            "syntax_type":  oid["syntax_type"],
            "unit":         oid["unit"],
            "oid_type":     oid["oid_type"]
        }
        # Use json.dumps to convert dict to JSON string.
        # ensure_ascii=False allows non-ASCII characters directly (e.g., in descriptions).
        json_string = json.dumps(json_value, ensure_ascii=False)
        
        # Escape double quotes within the JSON string for shell command.
        # This is crucial for bash to interpret the entire string as a single argument.
        escaped_json_string = json_string.replace('"', '\\"')

        command = f'SET "oid:{oid["oid_string"]}" "{escaped_json_string}"'
        commands.append(command)

    #end for
    return "\n".join(commands)
#end def


def generate_sql_inserts(oids_data, db_type="postgresql"):
    """
    Generates SQL INSERT statements for the OID metadata.
    """
    inserts = []
    inserts.append(f"-- INSERT statements for snmp.snmp_oid_metadata ({db_type.upper()})")
    for oid in oids_data:
        # Escape single quotes for SQL string literals.
        # MySQL and PostgreSQL both use '' for escaping single quotes.
        description_sql = oid['oid_description'].replace("'", "''")
        
        # Handle NULL for the 'unit' field in SQL
        unit_sql = f"'{oid['unit']}'" if oid['unit'] else "NULL"
        
        insert_statement = f"""INSERT INTO snmp.snmp_oid_metadata (
            oid_string,
            oid_name,
            oid_description,
            syntax_type,
            unit,
            oid_type
        ) VALUES (
            '{oid['oid_string']}',
            '{oid['oid_name']}',
            '{description_sql}',
            '{oid['syntax_type']}',
             {unit_sql},
            '{oid['oid_type']}'
        );"""
        inserts.append(insert_statement)
    #end for
    
    return "\n".join(inserts)
#end def


if __name__ == "__main__":

    parser = argparse.ArgumentParser(description="Parse SNMP MIB files and insert OID metadata into a database.")

    # # Create a mutually exclusive group for mib_file and mib_directory
    # mib_input_group = parser.add_mutually_exclusive_group(required=True)
    
    parser.add_argument("--mib-files",      nargs='*', default=[], help="Path to a single MIB file to parse.")
    parser.add_argument("--mib-directory",  nargs='*', default=[], help="Path to a directory containing MIB files.")

    args        = parser.parse_args()

    # MIB_FILES   = args.mib_files,
    # MIB_DIR     = args.mib_directory,
    MIB_DIR       = "mibsx"
    
    print("FilePath   : ", MIB_FILES)
    print("FileDir    : ", MIB_DIR)
    
    print(f"Please ensure the following MIB files are placed in the '{MIB_DIR}' directory:")
    for mib_file in MIB_FILES:
        print(f"- {mib_file}")
    
    #end for
    
    print("\nAttempting to load MIBs and extract OID data...")

    oids = get_mib_oids(MIB_FILES, MIB_DIR)

    if oids:
        print(f"\nSuccessfully extracted {len(oids)} OID entries.")
        
        # Generate Redis Commands
        redis_commands = generate_redis_commands(oids)
        # Save to a file that can be used by your entrypoint.sh
        with open("redis_commands_generated.txt", "w", encoding="utf-8") as f:
            f.write(redis_commands)

        #end with

        print("\n--- Redis SET Commands (saved to redis_commands_generated.txt) ---")
        
        # Print a snippet for quick review
        print(redis_commands.splitlines()[0])
        print(redis_commands.splitlines()[1])
        print("...")
        print(redis_commands.splitlines()[-1])
        
        # Generate MySQL Inserts
        mysql_inserts = generate_sql_inserts(oids, "mysql")
        with open("mysql_inserts_generated.sql", "w", encoding="utf-8") as f:
            f.write(mysql_inserts)
        
        #end with
        
        print("\n--- MySQL INSERT Statements (saved to mysql_inserts_generated.sql) ---")
        # Print a snippet for quick review
        print(mysql_inserts.splitlines()[0])
        print(mysql_inserts.splitlines()[1])
        print("...")
        print(mysql_inserts.splitlines()[-1])

        # Generate PostgreSQL Inserts
        postgresql_inserts = generate_sql_inserts(oids, "postgresql")
        with open("postgresql_inserts_generated.sql", "w", encoding="utf-8") as f:
            f.write(postgresql_inserts)
            
        #end with
        
        print("\n--- PostgreSQL INSERT Statements (saved to postgresql_inserts_generated.sql) ---")
        
        # Print a snippet for quick review
        print(postgresql_inserts.splitlines()[0])
        print(postgresql_inserts.splitlines()[1])
        print("...")
        print(postgresql_inserts.splitlines()[-1])

    else:
        print("No OID data extracted. Please check MIB files, MIB_DIR, and any error messages during MIB loading.")
        
    #end if
#end def

Utils.py

#######################################################################################################################
#
#
#  	Project     	: 	Genericl utils for programs
#
#   File            :   utils.py
#
#   Description     :
#
#   Created     	:   22 November 2024
#
#   Notes           :   Python Logging Package:
#                   :   https://docs.python.com/3/library/logging.html
#                   :   https://realpython.com/python-logging/
#
########################################################################################################################
__author__      = "George Leonard"
__email__       = "georgelza@gmail.com"
__version__     = "3.0.0"
__copyright__   = "Copyright 2025, - G Leonard"


import logging

"""
Common Generic Logger setup, used by master loop for console and common file.
"""
def logger(filename, console_debuglevel, file_debuglevel, log_format=None):
    
    """
    Configures and returns a logger instance with console and file handlers.

    Args:
        filename (str): The name of the log file.
        console_debuglevel (int): Debug level for console output (0=DEBUG, 1=INFO, etc.).
        file_debuglevel (int): Debug level for file output (0=DEBUG, 1=INFO, etc.).
        log_format (str, optional): Custom log format string.
                                    Defaults to '%(asctime)s - %(levelname)s - %(processName)s - %(message)s'.

    Returns:
        logging.Logger: Configured logger instance.
    """
    
    logger = logging.getLogger(__name__)
    # Ensure handlers are not added multiple times if logger is called more than once
    
    if not logger.handlers:
        logger.setLevel(logging.DEBUG) # Set overall logger level to DEBUG to allow all messages to pass to handlers

        # Define default format if not provided
        default_console_format = '%(asctime)s - %(levelname)s - %(processName)s - %(message)s'
        default_file_format    = '%(asctime)s - %(levelname)s - %(message)s'
       
        # Create console handler
        ch = logging.StreamHandler()
        # Use provided format or default for console
        console_formatter = logging.Formatter(log_format if log_format else default_console_format)
        ch.setFormatter(console_formatter)

        # Set console log level
        if console_debuglevel == 0:
            ch.setLevel(logging.DEBUG)
            
        elif console_debuglevel == 1:
            ch.setLevel(logging.INFO)
            
        elif console_debuglevel == 2:
            ch.setLevel(logging.WARNING)
            
        elif console_debuglevel == 3:
            ch.setLevel(logging.ERROR)
            
        elif console_debuglevel == 4:
            ch.setLevel(logging.CRITICAL)
            
        else:
            ch.setLevel(logging.INFO) # Default
            
        logger.addHandler(ch)

        # Create file handler
        fh = logging.FileHandler(filename)
        
        # Use provided format or default for file
        file_formatter = logging.Formatter(log_format if log_format else default_file_format)
        fh.setFormatter(file_formatter)

        # Set file log level
        if file_debuglevel == 0:
            fh.setLevel(logging.DEBUG)
            
        elif file_debuglevel == 1:
            fh.setLevel(logging.INFO)
            
        elif file_debuglevel == 2:
            fh.setLevel(logging.WARNING)
            
        elif file_debuglevel == 3:
            fh.setLevel(logging.ERROR)
            
        elif file_debuglevel == 4:
            fh.setLevel(logging.CRITICAL)
            
        else:
            fh.setLevel(logging.INFO)  # Default
            
        logger.addHandler(fh)

    return logger

# end logger

requirements

pysnmp
psycopg2-binary 
mysql-connector-python 
redis
pysmi