I currently have the following 3 … versions… none working though.
They are at different levels, as would be seen, at min as a start I was thinking lets just get the data decomposed/decompiled so that it can be printed to console… as the database component can be added after…
NOTE: the name change for RFC-1215 from norm… this is due to files I tried to load needed that mib named as such and with the header matching…
The --mib-dirs contain:
total 472
drwxr-xr-x 11 george staff 352 Jul 9 15:42 .
drwxr-xr-x 16 george staff 512 Jul 9 21:06 ..
-rw-r--r-- 1 george staff 935 Jul 4 22:04 README.md
-rw-r--r-- 1 george staff 2692 Jul 9 13:18 RFC-1212.mib
-rw-r--r-- 1 george staff 2814 Jul 9 15:50 RFC-1215.mib
-rw-r--r-- 1 george staff 3077 Jul 9 13:18 RFC1155-SMI.mib
-rw-r--r-- 1 george staff 103000 Jun 28 21:12 RFC1213-MIB.mib
-rw-r--r-- 1 george staff 31385 Jul 9 13:18 RFC1231-MIB.mib
-rw-r--r-- 1 george staff 31946 Jun 28 21:12 SNMPv2-MIB.mib
-rw-r--r-- 1 george staff 10283 Jun 28 21:12 SNMPv2-SMI.mib
-rw-r--r-- 1 george staff 38034 Jul 6 15:55 SNMPv2-TC.mib
Option 1
#######################################################################################################################
#
#
# Project : SNMP MIB Loader/Ingester.
#
# File : mib_parser.py
#
# Description : Load the oid name/description data from MIB files directly into designated tables that cna be
# : exposed into Apache Flink
#
# : Database engines currently supported: (Redis, PostgreSQL or MySql).
#
# By : George Leonard ( georgelza@gmail.com )
#
# Created : 05 Jul 2025
#
# TO BE
#
# Example : python mib_parser.py \
# --mib-file mibs/RFC1213-MIB.mib \
# --mib-dirs mibstd \
# --db-type postgresql \
# --db-host localhost \
# --db-port 5433 \
# --db-name snmp \
# --db-schema public \
# --db-user dbadmin \
# --db-password dbpassword \
# --tbl-name snmp_oid_data
# OR
# python3 mib_parser.py \
# --mib-directory mibsx \
# --mib-dirs mibstd \
# --db-type mysqlsql \
# --db-host localhost \
# --db-port 3306 \
# --db-name snmp \
# --db-schema snmp \
# --db-user dbadmin \
# --db-password dbpassword \
# --tbl-name snmp_oid_data
#
# The script now uses argparse to accept command-line arguments for:
#
# --mib-file (mutually exclusive with --mib-directory): Path to a single MIB file.
# --mib-directory (mutually exclusive with --mib-file): Path to a directory containing MIB files.
# --mib-dirs: Optional list of directories where dependent MIBs are located, aka standard mibs,
# i.e: RFC-1212, RFC1155-SMI, RFC1213-MIB, RFC1215, RFC1231-MIB, SNMPv2-MIB, SNMPv2-SMI, SNMPv2-TC
# --db-type (required): postgresql, mysql, or redis.
# --db-host (required): Database hostname.
# --db-port (required): Database port.
# --db-name: Database name (for SQL) or DB index (for Redis).
# --db-schema: Schema name (for PostgreSQL:public or dbname for MySQL).
# --db-user: Username (for SQL).
# --db-password: Password (for all).
# --tbl-name: Target table to load data into (for PostgreSQL/MySQL).
# --redis-key-prefix: Custom key prefix for Redis (defaults to oid:).
#
# INTERIM Test
#
# python mib_parser.py --mib-directory mibstxt --mib-file MIB-Dell-10892.mib
# python mib_parser.py --mib-directory mibstxt --mib-file MIB-Dell-10892.mib --mib-dirs mibstd
#
########################################################################################################################
__author__ = "George Leonard"
__email__ = "georgelza@gmail.com"
__version__ = "0.0.1"
__copyright__ = "Copyright 2025, George Leonard"
import os
import re
import argparse # Import argparse for command-line arguments
import pysnmp.debug # Import pysnmp.debug for logging
from datetime import datetime
# Import PySMI components for explicit MIB compilation
from pysmi.parser import SmiStarParser
from pysmi.codegen import PySnmpCodeGen
# Import pysnmp MIB builder and view for accessing compiled MIBs
from pysnmp.smi import builder, view
from pysnmp.smi.error import SmiError # Import specific error for catching
# Import the logger from utils.py
from utils import logger
current_datetime = datetime.now()
datetime_str = current_datetime.strftime("%Y%m%d_%H%M%S") # Example format: 20250709_165531
# Initialize the logger
# You can adjust these levels as needed. INFO for console, DEBUG for file.
log = logger(filename=f"mib_parser_{datetime_str}.log", console_debuglevel=1, file_debuglevel=0)
# Enable pysnmp debug logging for verbose output
# This will print detailed information about MIB loading and parsing.
# Now using our custom logger
pysnmp.debug.set_logger(pysnmp.debug.Debug('all', logger=log)) # Integrate pysnmp debug with our logger
def extract_mib_details(mib_directory, mib_file, mib_dirs_list=None):
"""
Extracts OID details from a MIB file using PySMI for explicit compilation.
Args:
mib_directory (str): The path to the directory containing the MIB file,
relative to the script's location.
mib_file (str): The name of the MIB file (e.g., 'MIB-Dell-10892.mib').
mib_dirs_list (list, optional): A list of additional directories where dependent MIBs can be found,
relative to the script's location. Defaults to None.
"""
# Get the directory where this script (mib_parser.py) is located
script_dir = os.path.dirname(os.path.abspath(__file__))
# Resolve mib_directory to an absolute path relative to the script's directory
absolute_mib_directory = os.path.join(script_dir, mib_directory)
original_mib_file_path = os.path.join(absolute_mib_directory, mib_file)
if not os.path.exists(original_mib_file_path):
log.error(f"Error: MIB file not found at '{original_mib_file_path}'")
return
log.info(f"Parsing MIB file: {original_mib_file_path}\n")
# Initialize PySMI parser and code generator
mibParser = SmiStarParser()
mibCodeGen = PySnmpCodeGen()
# Initialize MibBuilder directly
mibBuilder = builder.MibBuilder() # This line is critical!
mibView = view.MibViewController(mibBuilder)
# Create a temporary directory for compiled MIBs if it doesn't exist.
# This directory will be used by pysnmp to store compiled versions of imported MIBs.
compiled_mibs_dir = os.path.join(absolute_mib_directory, "compiled_mibs")
os.makedirs(compiled_mibs_dir, exist_ok=True)
module_name = None
extracted_oids = [] # Initialize extracted_oids here to prevent UnboundLocalError
try:
# Determine the MIB module name from the file content.
with open(original_mib_file_path, 'r') as f:
for line in f:
if 'DEFINITIONS ::=' in line:
module_name = line.split(' ')[0].strip()
break
else:
raise ValueError(f"Could not determine MIB module name from '{original_mib_file_path}'. "
"Ensure it starts with 'MODULE-NAME DEFINITIONS ::= BEGIN'.")
# Prepare MIB source readers for PySMI. We use pysnmp's own MibBuilder.add_mib_sources.
# Order is crucial for MIB resolution:
# 1. Writable directory for compiled MIBs (for caching compiled MIBs)
# 2. Directory of the input MIB file (for the primary MIB and its local dependencies)
# 3. User-specified MIB directories
# 4. PySNMP's own standard MIBs (usually found automatically by pysnmp, so often not needed to explicitly add)
# Add the directory for compiled MIBs (for caching)
mibBuilder.add_mib_sources(builder.DirMibSource(compiled_mibs_dir))
# Add the directory containing the primary MIB file
mibBuilder.add_mib_sources(builder.DirMibSource(absolute_mib_directory))
# Add user-specified MIB directories if provided, resolving them relative to script_dir
if mib_dirs_list:
for mib_dir_path_relative in mib_dirs_list:
absolute_mib_dir_path = os.path.join(script_dir, mib_dir_path_relative)
if os.path.isdir(absolute_mib_dir_path):
mibBuilder.add_mib_sources(builder.DirMibSource(absolute_mib_dir_path))
else:
log.warning(f"Warning: MIB directory '{absolute_mib_dir_path}' (resolved from '{mib_dir_path_relative}') specified in --mib-dirs does not exist or is not a directory. Skipping.")
# Explicitly compile the main MIB file and its dependencies
try:
# Read the content of the main MIB file
with open(original_mib_file_path, 'r') as f:
mib_content = f.read()
# Parse the MIB content
ast = mibParser.parse(mib_content)
# Generate pysnmp MIB objects from the parsed AST
# Pass the mibBuilder instance directly to codegen
mibCodeGen.codegen(ast, mibBuilder, genTexts=True, genDescr=True)
log.info(f"Successfully compiled and loaded MIB module: {module_name}")
except SmiError as e:
log.error(f"Error compiling MIB module '{module_name}': {e}")
log.error(f"This often means a dependent MIB is missing or incorrectly named.")
log.error(f"Please ensure all required IMPORTS for '{mib_file}' (e.g., RFC1155-SMI, RFC-1212, RFC-1215, RFC1213-MIB) "
f"are present and correctly named (e.g., 'RFC-1215.mib' if importing 'RFC-1215') "
f"in '{absolute_mib_directory}' or any of the specified --mib-dirs paths, or standard PySNMP MIB paths.")
return # Exit if loading fails
# Iterate through all loaded symbols/objects in the MIB builder.
for mibVar in mibBuilder.getMibNodes():
# Check if the object has a name and an OID, indicating it's a defined MIB object.
if hasattr(mibVar, 'getName') and hasattr(mibVar, 'getOid'):
oid_name = str(mibVar.getName())
oid_string = str(mibVar.getOid())
# Skip common root OIDs and module definitions themselves.
if oid_string in ['.1', '.1.3', '.1.3.6', '.1.3.6.1', '.1.3.6.1.4', '.1.3.6.1.4.1']:
continue
if hasattr(mibVar, 'kind') and mibVar.kind == 'module':
continue
oid_description = "No description available."
if hasattr(mibVar, 'getDescription') and mibVar.getDescription():
oid_description = mibVar.getDescription().strip()
syntax_type = "UNKNOWN"
if hasattr(mibVar, 'syntax'):
syntax_obj = mibVar.syntax
if hasattr(syntax_obj, 'getName'):
syntax_type = syntax_obj.getName()
elif hasattr(syntax_obj, '__class__'):
syntax_type = syntax_obj.__class__.__name__
elif hasattr(mibVar, '__class__'):
if 'MibModule' in str(mibVar.__class__) or 'MibNode' in str(mibVar.__class__):
syntax_type = "OBJECT IDENTIFIER"
else:
syntax_type = mibVar.__class__.__name__
unit = None
oid_type = "unknown"
if isinstance(mibVar, builder.MibScalar):
oid_type = "scalar"
if not oid_string.endswith('.0'):
oid_string += '.0'
elif isinstance(mibVar, builder.MibTable):
oid_type = "table"
elif isinstance(mibVar, builder.MibTableRow):
oid_type = "tableRow"
elif isinstance(mibVar, builder.MibTableColumn):
oid_type = "tableColumn"
elif isinstance(mibVar, builder.MibNotification):
oid_type = "notification"
elif 'OBJECT IDENTIFIER' in syntax_type or 'MibNode' in str(mibVar.__class__):
oid_type = "group"
extracted_oids.append({
"oid_string": oid_string,
"oid_name": oid_name,
"oid_description": oid_description,
"syntax_type": syntax_type,
"unit": unit,
"oid_type": oid_type,
})
except SmiError as e:
log.error(f"MIB parsing error: {e}")
log.error("Please ensure the MIB file is well-formed and its IMPORTS are resolvable.")
except Exception as e:
log.exception(f"An unexpected error occurred: {e}")
finally:
pass # No specific cleanup for temp files needed with this approach
if not extracted_oids:
log.warning("No OIDs extracted. This might be due to an empty MIB, a MIB with only MODULE-IDENTITY, or issues in parsing.")
log.warning("Ensure the MIB file contains OBJECT-TYPE, OBJECT IDENTIFIER, or TRAP-TYPE definitions.")
return
# Print the extracted details in a table format.
log.info(f"\n--- Extracted {len(extracted_oids)} OIDs ---")
log.info("{:<30} {:<30} {:<20} {:<15} {:<10} {:<15}".format(
"OID String", "OID Name", "Syntax Type", "OID Type", "Unit", "Description"
))
log.info("-" * 130)
for oid_data in extracted_oids:
display_description = (oid_data["oid_description"][:45] + '...') if len(oid_data["oid_description"]) > 48 else oid_data["oid_description"]
log.info("{:<30} {:<30} {:<20} {:<15} {:<10} {:<15}".format(
oid_data["oid_string"],
oid_data["oid_name"],
oid_data["syntax_type"],
oid_data["oid_type"],
oid_data["unit"] if oid_data["unit"] else "N/A",
display_description
))
log.info("-" * 130)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Extract OID details from a MIB file.")
parser.add_argument(
"--mib-directory",
type=str,
required=True,
help="Path to the directory containing the MIB file, relative to this script's location."
)
parser.add_argument(
"--mib-file",
type=str,
required=True,
help="Name of the MIB file (e.g., 'MIB-Dell-10892.mib')."
)
parser.add_argument(
"--mib-dirs",
type=str,
nargs='+', # Allows one or more arguments
help="Additional directories where dependent MIBs are located, relative to this script's location. "
"Provide multiple paths separated by spaces. E.g., 'standard_mibs vendor_mibs'."
)
args = parser.parse_args()
extract_mib_details(args.mib_directory, args.mib_file, args.mib_dirs)
Option 2
__author__ = "George Leonard"
__email__ = "georgelza@gmail.com"
__version__ = "0.0.1"
__copyright__ = "Copyright 2025, George Leonard"
import os
import json
import argparse
from pysnmp.smi import builder, view, error
from pysnmp.smi.rfc1902 import ObjectType, NotificationType # Import necessary types
from datetime import datetime # For timestamp in logs
from utils import logger # Import our custom logger from utils.py
import sys # To find pysnmp mibs path
# Initialize logger instance globally for this script
LOG_TAG = 'snmp_mib_ingester'
LOG_FILE = f'{LOG_TAG}.log'
CONSOLE_DEBUG_LEVEL = 0 # Set to DEBUG for detailed MIB loading info
FILE_DEBUG_LEVEL = 0 # Set to DEBUG for detailed MIB loading info
CUSTOM_LOG_FORMAT = f'{LOG_TAG}, %(asctime)s, %(levelname)s, %(message)s' # Added levelname
logger_instance = logger(LOG_FILE, CONSOLE_DEBUG_LEVEL, FILE_DEBUG_LEVEL, CUSTOM_LOG_FORMAT)
# Conditional imports for database connectors
# These will need to be installed:
# pip install pysnmp psycopg2-binary mysql-connector-python redis
try:
import psycopg2
except ImportError:
psycopg2 = None
logger_instance.warning("psycopg2 not found. PostgreSQL database option will not be available.")
# end try
try:
import mysql.connector
except ImportError:
mysql = None
logger_instance.warning("mysql-connector-python not found. MySQL database option will not be available.")
# end try
try:
import redis
except ImportError:
redis = None
logger_instance.warning("redis not found. Redis database option will not be available.")
# end try
def parse_mib_files(mib_file_path=None, mib_directory_path=None, mib_dirs=None, logger_instance=None):
"""
Parses MIB files from a given path (file or directory) and extracts OID metadata.
Args:
mib_file_path (str, optional): The path to a single MIB file to parse.
mib_directory_path (str, optional): The path to a directory containing MIB files.
mib_dirs (list, optional): A list of directories where dependent MIBs
might be located. Defaults to None.
logger_instance (logging.Logger, optional): The logger instance to use.
Returns:
list: A list of dictionaries, each representing an OID's metadata.
Returns an empty list if parsing fails or no OIDs are found.
"""
if logger_instance is None:
import logging
logger_instance = logging.getLogger(__name__) # Fallback if not provided
mib_data = []
mibBuilder = builder.MibBuilder()
# --- Configure MIB Sources ---
# 1. Add the directory where the user's MIB file/directory is located
user_mib_base_dir = None
if mib_file_path:
user_mib_base_dir = os.path.dirname(os.path.abspath(mib_file_path))
elif mib_directory_path:
user_mib_base_dir = os.path.abspath(mib_directory_path)
if user_mib_base_dir and os.path.isdir(user_mib_base_dir):
mibBuilder.add_mib_sources(builder.DirMibSource(user_mib_base_dir)) # Corrected: add_mib_sources
logger_instance.debug(f"Added user-specified MIB source: {user_mib_base_dir}")
else:
logger_instance.error(f"User-specified MIB source '{user_mib_base_dir}' does not exist or is not a directory. Exiting.")
return []
# 2. Add pysnmp's default MIB sources (critical for RFC MIBs like SNMPv2-SMI, RFC1213-MIB)
try:
# A more robust way to get the pysnmp MIBs path by iterating through sys.path
# and looking for 'pysnmp/smi/mibs' within site-packages or similar.
pysnmp_mibs_path = None
for p in sys.path:
potential_path = os.path.join(p, 'pysnmp', 'smi', 'mibs')
if os.path.isdir(potential_path):
pysnmp_mibs_path = potential_path
break
if pysnmp_mibs_path:
mibBuilder.add_mib_sources(builder.DirMibSource(pysnmp_mibs_path)) # Corrected: add_mib_sources
logger_instance.debug(f"Added pysnmp default MIB source: {pysnmp_mibs_path}")
else:
logger_instance.warning("Could not automatically find pysnmp default MIBs path. "
"Ensure standard RFC MIBs are manually placed in your MIB directory or specified via --mib-dirs.")
except Exception as e:
logger_instance.warning(f"Error while trying to add pysnmp default MIB source: {e}")
logger_instance.warning("Ensure standard RFC MIBs are manually placed in your MIB directory or specified via --mib-dirs.")
# 3. Add any additional MIB directories provided via --mib-dirs
if mib_dirs:
for mib_dir in mib_dirs:
abs_mib_dir = os.path.abspath(mib_dir)
if os.path.isdir(abs_mib_dir):
mibBuilder.add_mib_sources(builder.DirMibSource(abs_mib_dir)) # Corrected: add_mib_sources
logger_instance.debug(f"Added additional MIB source: {abs_mib_dir}")
else:
logger_instance.warning(f"Additional MIB directory '{abs_mib_dir}' does not exist or is not a directory. Skipping.")
# --- Debugging: Print all MIB sources configured ---
logger_instance.debug("Current MIB Builder search paths:")
for source in mibBuilder.get_mib_sources(): # Corrected: Use get_mib_sources()
logger_instance.debug(f"- {source}")
logger_instance.debug("-" * 30)
# --- End Debugging ---
# --- Determine MIB files to load ---
mib_files_to_load = []
if mib_file_path:
mib_files_to_load.append(os.path.basename(mib_file_path))
elif mib_directory_path:
for filename in os.listdir(os.path.abspath(mib_directory_path)):
if filename.endswith(('.mib', '.txt')):
mib_files_to_load.append(filename)
if not mib_files_to_load:
logger_instance.warning("No MIB files found to load in the specified path(s).")
return []
# --- Load MIB modules ---
for mib_filename in mib_files_to_load:
mib_module_name = os.path.splitext(mib_filename)[0]
logger_instance.info(f"Attempting to load MIB module: {mib_module_name}")
try:
# Use load_modules (with underscore) for the non-deprecated method
mibBuilder.load_modules(mib_module_name)
logger_instance.info(f"Successfully loaded MIB module: {mib_module_name}")
except error.SmiError as e:
logger_instance.error(f"Error loading MIB module {mib_module_name} (from file '{mib_filename}'): {e}")
logger_instance.error(f" This often means a MIB file it IMPORTS is missing from the search paths.")
logger_instance.error(f" Please ensure all MIB files imported by '{mib_filename}' are in your --mib-directory or --mib-dirs.")
except Exception as e:
logger_instance.error(f"An unexpected error occurred while loading MIB module {mib_module_name} from '{mib_filename}': {e}")
# --- Extract OID data from loaded MIBs ---
# Iterate over all loaded MIB objects using mibBuilder.mibSymbols.values()
for mibVar in mibBuilder.mibSymbols.values():
try:
# Only process ObjectType and NotificationType (which covers TRAP-TYPE)
if not (isinstance(mibVar, ObjectType) or isinstance(mibVar, NotificationType)):
continue
oid_name = str(mibVar.getName())
oid_string = str(mibVar.getOid())
description = getattr(mibVar, 'getDescription', lambda: '')().strip()
syntax_type = ""
oid_type = ""
unit = ""
if isinstance(mibVar, ObjectType):
if mibVar.isScalar():
oid_type = "scalar"
elif mibVar.isColumn():
oid_type = "table" # Use "table" for columns as per your schema
else: # Could be a table root, or other ObjectType
oid_type = "other" # Or refine this if you need to distinguish table roots
syntax = mibVar.getSyntax()
if hasattr(syntax, 'getLabel'):
syntax_type = syntax.getLabel()
else:
syntax_type = syntax.__class__.__name__
elif isinstance(mibVar, NotificationType):
oid_type = "notification"
syntax_type = "Notification" # Generic for notifications
# Basic unit extraction (can be expanded if needed)
desc_lower = description.lower()
if "seconds" in desc_lower:
unit = "seconds"
elif "bytes" in desc_lower:
unit = "bytes"
elif "kilobytes" in desc_lower:
unit = "kilobytes"
elif "bits" in desc_lower:
unit = "bits"
elif "percent" in desc_lower:
unit = "percent"
mib_data.append({
"oid_string": oid_string,
"oid_name": oid_name,
"description": description,
"syntax_type": syntax_type,
"unit": unit,
"oid_type": oid_type,
})
except Exception as e:
logger_instance.warning(f"Could not process MIB object {mibVar} ({type(mibVar)}): {e}")
continue
# Sort for consistent output
mib_data.sort(key=lambda x: x['oid_string'])
return mib_data
class DatabaseManager:
"""
Manages database connections and data insertion for various database types.
"""
def __init__(self, db_type, host, port, user=None, password=None, dbname=None, schema=None, tbl_name="snmp_oid_metadata", key_prefix="oid:", logger_instance=None):
self.db_type = db_type.lower()
self.host = host
self.port = port
self.user = user
self.password = password
self.dbname = dbname
self.schema = schema # Schema for PostgreSQL
self.tbl_name = tbl_name # Table name for SQL databases
self.key_prefix = key_prefix
self.connection = None
self.cursor = None # For SQL databases
self.logger = logger_instance if logger_instance else logging.getLogger(__name__)
# end def
def connect(self):
"""Establishes a connection to the specified database."""
try:
if self.db_type == 'postgresql':
if not psycopg2:
raise ImportError("psycopg2 is not installed. Cannot connect to PostgreSQL.")
# end if
self.connection = psycopg2.connect(
host = self.host,
port = self.port,
user = self.user,
password= self.password,
dbname = self.dbname
)
self.connection.autocommit = True # Auto-commit changes
self.cursor = self.connection.cursor()
self.logger.info(f"Connected to PostgreSQL database: {self.dbname} on {self.host}:{self.port}")
if self.schema:
self.cursor.execute(f"SET search_path TO {self.schema}, public;")
self.logger.info(f"PostgreSQL search_path set to: {self.schema}, public")
# end if
elif self.db_type == 'mysql':
if not mysql:
raise ImportError("mysql-connector-python is not installed. Cannot connect to MySQL.")
# end if
self.connection = mysql.connector.connect(
host = self.host,
port = self.port,
user = self.user,
password = self.password,
database = self.dbname
)
self.cursor = self.connection.cursor()
self.logger.info(f"Connected to MySQL database: {self.dbname} on {self.host}:{self.port}")
elif self.db_type == 'redis':
if not redis:
raise ImportError("redis is not installed. Cannot connect to Redis.")
#end if
self.connection = redis.Redis(
host = self.host,
port = self.port,
db = self.dbname, # In Redis, 'database' is an integer index
password = self.password
)
self.connection.ping() # Test connection
self.logger.info(f"Connected to Redis database: {self.dbname} on {self.host}:{self.port}")
else:
raise ValueError(f"Unsupported database type: {self.db_type}")
#end if
except Exception as e:
self.logger.error(f"Error connecting to {self.db_type} database: {e}")
self.connection = None
self.cursor = None
raise
#end try
# end def
def insert_oid_metadata(self, oid_data_list):
"""
Inserts a list of OID metadata dictionaries into the connected database.
Uses UPSERT logic for SQL databases.
"""
if not self.connection:
self.logger.error("No active database connection. Please connect first.")
return
# end if
if self.db_type == 'postgresql':
table_full_name = f"{self.schema}.{self.tbl_name}" if self.schema else self.tbl_name
sql = f"""
INSERT INTO {table_full_name} (oid_string, oid_name, description, syntax_type, unit, oid_type)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (oid_string) DO UPDATE SET
oid_name = EXCLUDED.oid_name,
description = EXCLUDED.description,
syntax_type = EXCLUDED.syntax_type,
unit = EXCLUDED.unit,
oid_type = EXCLUDED.oid_type;
"""
try:
for oid in oid_data_list:
self.cursor.execute(sql, (
oid['oid_string'],
oid['oid_name'],
oid['description'],
oid['syntax_type'],
oid['unit'],
oid['oid_type']
))
# end for
self.logger.info(f"Successfully inserted/updated {len(oid_data_list)} OIDs into PostgreSQL table '{table_full_name}'.")
except Exception as e:
self.logger.error(f"Error inserting into PostgreSQL table '{table_full_name}': {e}")
# end try
elif self.db_type == 'mysql':
# For MySQL, schema is part of the database connection, not table name directly in query
sql = f"""
INSERT INTO {self.tbl_name} (oid_string, oid_name, description, syntax_type, unit, oid_type)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
oid_name = VALUES(oid_name),
description = VALUES(description),
syntax_type = VALUES(syntax_type),
unit = VALUES(unit),
oid_type = VALUES(oid_type);
"""
try:
for oid in oid_data_list:
self.cursor.execute(sql, (
oid['oid_string'],
oid['oid_name'],
oid['description'],
oid['syntax_type'],
oid['unit'],
oid['oid_type']
))
# end for
self.connection.commit()
self.logger.info(f"Successfully inserted/updated {len(oid_data_list)} OIDs into MySQL table '{self.tbl_name}'.")
except Exception as e:
self.logger.error(f"Error inserting into MySQL table '{self.tbl_name}': {e}")
self.connection.rollback()
# enf try
elif self.db_type == 'redis':
try:
pipe = self.connection.pipeline()
for oid in oid_data_list:
redis_key = f"{self.key_prefix}{oid['oid_string']}"
pipe.set(redis_key, json.dumps(oid))
# end fr
pipe.execute()
self.logger.info(f"Successfully inserted/updated {len(oid_data_list)} OIDs into Redis (keys prefixed with '{self.key_prefix}').")
except Exception as e:
self.logger.error(f"Error inserting into Redis: {e}")
# end try
else:
self.logger.warning(f"Insertion not supported for database type: {self.db_type}")
# end if
# end insert_oid_metadata
def close(self):
"""Closes the database connection."""
if self.connection:
if self.db_type in ['postgresql', 'mysql'] and self.cursor:
self.cursor.close()
# end if
self.connection.close()
self.logger.info(f"Closed {self.db_type} database connection.")
else:
self.logger.info("No active connection to close.")
# end if
# end def close
# end class DatabaseManager
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse SNMP MIB files and insert OID metadata into a database.")
# Create a mutually exclusive group for mib_file and mib_directory
mib_input_group = parser.add_mutually_exclusive_group(required=True)
mib_input_group.add_argument("--mib-file", help="Path to a single MIB file to parse.")
mib_input_group.add_argument("--mib-directory", help="Path to a directory containing MIB files.")
parser.add_argument("--mib-dirs", nargs='*', default=[], help="Optional: List of directories where dependent MIBs are located.")
parser.add_argument("--db-type", choices=['postgresql', 'mysql', 'redis'], required=True, help="Type of database to connect to (postgresql, mysql, redis).")
parser.add_argument("--db-host", required=True, help="Database hostname or IP address.")
parser.add_argument("--db-port", type=int, required=True, help="Database port number.")
parser.add_argument("--db-name", help="Database name (for PostgreSQL/MySQL) or DB index (for Redis).")
parser.add_argument("--db-user", help="Database username (for SQL).")
parser.add_argument("--db-password", help="Database password (for PostgreSQL/MySQL/Redis).")
parser.add_argument("--db-schema", help="Database schema name (for PostgreSQL).")
parser.add_argument("--tbl-name", default="snmp_oid_metadata", help="Table name for SQL databases (PostgreSQL/MySQL). Defaults to 'snmp_oid_metadata'.")
parser.add_argument("--redis-key-prefix", default="oid:", help="Prefix for Redis keys (e.g., 'oid:' for 'oid:1.3.6.1.2.1.1.3.0'). Only for Redis.")
args = parser.parse_args()
# Parse the MIB file(s) or directory
# Pass the appropriate argument based on which one was provided
parsed_oids = parse_mib_files(
mib_file_path = args.mib_file,
mib_directory_path = args.mib_directory,
mib_dirs = args.mib_dirs,
logger_instance = logger_instance
)
if not parsed_oids:
logger_instance.info("No OID data extracted. Exiting.")
exit()
# end if
logger_instance.info("\n--- Parsed OID Metadata Summary ---")
# Changed to debug level for individual OID details to keep INFO logs cleaner
for oid in parsed_oids:
logger_instance.debug(f"OID String: {oid['oid_string']}")
logger_instance.debug(f"OID Name: {oid['oid_name']}")
logger_instance.debug(f"Description: {oid['description']}")
logger_instance.debug(f"Syntax Type: {oid['syntax_type']}")
logger_instance.debug(f"Unit: {oid['unit']}")
logger_instance.debug(f"OID Type: {oid['oid_type']}")
logger_instance.debug("-" * 20)
# end for
logger_instance.info(f"Total OIDs parsed: {len(parsed_oids)}")
# Database Insertion
db_manager = None
try:
db_manager = DatabaseManager(
db_type = args.db_type,
host = args.db_host,
port = args.db_port,
user = args.db_user,
password = args.db_password,
dbname = args.db_name,
schema = args.db_schema,
tbl_name = args.tbl_name,
key_prefix = args.redis_key_prefix,
logger_instance = logger_instance # Pass the logger instance
)
db_manager.connect()
db_manager.insert_oid_metadata(parsed_oids)
except Exception as e:
logger_instance.error(f"An error occurred during database operations: {e}")
finally:
if db_manager:
db_manager.close()
# end if
# end try
# end main
Option 3
__author__ = "George Leonard"
__email__ = "georgelza@gmail.com"
__version__ = "0.0.1"
__copyright__ = "Copyright 2025, George Leonard"
import os
import json
from pysnmp.smi import builder, view, error
from pysnmp.smi.rfc1902 import ObjectType, NotificationType # Corrected: Removed TrapType
import argparse
from datetime import datetime # For timestamp in logs
from utils import * # Import our custom logger from utils.py
# Initialize logger instance globally for this script
LOG_TAG = 'snmp_mib_ingester'
LOG_FILE = f'{LOG_TAG}.log'
CONSOLE_DEBUG_LEVEL = 1 # INFO
FILE_DEBUG_LEVEL = 0 # DEBUG
CUSTOM_LOG_FORMAT = f'{LOG_TAG}, %(asctime)s, %(message)s' # Custom format: "{LOG_TAG}, {time}, message"
logger_instance = logger(LOG_FILE, CONSOLE_DEBUG_LEVEL, FILE_DEBUG_LEVEL, CUSTOM_LOG_FORMAT)
# --- Configuration ---
# IMPORTANT: Ensure these MIB files are placed in the './mibs' directory
# along with any other MIBs they might import (e.g., RFC1155-SMI, RFC-1212, RFC-1215
# if they are not part of pysnmp's default MIBs or are custom versions).
MIB_FILES = [
"pfSenseMIB.mib"
,"RFC1213-MIB.mib"
]
# MIB_FILES = [
# "RFC1213-MIB.mib"
# ,"SNMPv2-MIB.mib"
# ,"SNMPv2-SMI.mib"
# ,"SNMPv2-TC.mib"
# ,"10892.mib"
# ,"TRUENAS-MIB.mib"
# ,"FreeBSD-MIB.mib"
# ,"pfSenseMIB.mib"
# ,"UBNT-MIB.mib"
# ,"UBNT-MIB1.mib"
# ,"UBNT-MIB2.mib"
# ,"UBNT-UniFi-MIB.mib"
# ]
# MIB_DIR = "./mibs" # Directory where your MIB files are located
def get_mib_oids(mib_files, mib_dir):
"""
Parses MIB files and extracts OID metadata.
Args:
mib_files (list): A list of MIB filenames to parse.
mib_dir (str): The directory where MIB files are located.
Returns:
list: A list of dictionaries, each representing an OID record.
"""
mib_builder = builder.MibBuilder()
# Add default MIB sources (where pysnmp finds standard RFC MIBs)
# and our custom MIB directory. This is crucial for resolving IMPORTS.
mib_builder.add_mib_sources(
builder.DirMibSource(os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pysnmp_mibs')), # A common place for pysnmp's own MIBs
builder.DirMibSource(mib_dir)
)
oids_data = []
# Load all specified MIB modules
for mib_file in mib_files:
mib_module_name = os.path.splitext(mib_file)[0]
print("mib_module_name ", mib_module_name)
try:
# Load the MIB module. This will also load any imported modules.
mib_builder.load_modules(mib_module_name)
print(f"Successfully loaded MIB: {mib_module_name}")
except error.SmiError as e:
print(f"Warning: Could not load MIB file {mib_file}: {e}")
print(f" Ensure all IMPORTS in {mib_file} are available in {mib_dir} or standard pysnmp MIB paths.")
continue
except Exception as e:
print(f"An unexpected error occurred while loading {mib_file}: {e}")
continue
# end try
# end for
# After loading all modules, iterate through all symbols in the MibBuilder
# This ensures we capture objects from all loaded MIBs and their dependencies.
for mib_node_name, mib_node_obj in mib_builder.mibSymbols.items():
# Filter for OBJECT-TYPE and NOTIFICATION-TYPE (which includes SMIv1 TRAP-TYPE)
if not (isinstance(mib_node_obj, ObjectType) or
isinstance(mib_node_obj, NotificationType)):
continue
#end f
oid_string = str(mib_node_obj.getName())
oid_name = mib_node_name # The symbolic name (e.g., 'sysDescr')
oid_description = getattr(mib_node_obj, 'getDescription', lambda: '')()
syntax_type = None
oid_type = None
if isinstance(mib_node_obj, ObjectType):
# Differentiate between scalar objects and table columns
if mib_node_obj.isScalar():
oid_type = "scalar"
elif mib_node_obj.isColumn():
oid_type = "table"
#end fif
syntax = mib_node_obj.getSyntax()
# For Textual Conventions, get its label. For base types, get class name.
if hasattr(syntax, 'getLabel'):
syntax_type = syntax.getLabel()
else:
syntax_type = syntax.__class__.__name__
#end if
elif isinstance(mib_node_obj, NotificationType):
oid_type = "notification" # Covers both SMIv1 TRAP-TYPE and SMIv2 NOTIFICATION-TYPE
syntax_type = "Notification" # A generic type for notifications themselves
#end if
unit = None # Highly difficult to infer reliably from MIB description without NLP
# Only add if we successfully determined a relevant OID type
if oid_type:
oids_data.append({
"oid_string": oid_string,
"oid_name": oid_name,
"oid_description": oid_description.strip(),
"syntax_type": syntax_type,
"unit": unit,
"oid_type": oid_type
})
#end if
# Sort for consistent output
oids_data.sort(key=lambda x: x['oid_string'])
return oids_data
#end def
def generate_redis_commands(oids_data):
"""
Generates Redis SET commands for the OID metadata.
"""
commands = []
commands.append("# Redis SET commands for SNMP OID Metadata")
for oid in oids_data:
json_value = {
"oid_name": oid["oid_name"],
"description": oid["oid_description"],
"syntax_type": oid["syntax_type"],
"unit": oid["unit"],
"oid_type": oid["oid_type"]
}
# Use json.dumps to convert dict to JSON string.
# ensure_ascii=False allows non-ASCII characters directly (e.g., in descriptions).
json_string = json.dumps(json_value, ensure_ascii=False)
# Escape double quotes within the JSON string for shell command.
# This is crucial for bash to interpret the entire string as a single argument.
escaped_json_string = json_string.replace('"', '\\"')
command = f'SET "oid:{oid["oid_string"]}" "{escaped_json_string}"'
commands.append(command)
#end for
return "\n".join(commands)
#end def
def generate_sql_inserts(oids_data, db_type="postgresql"):
"""
Generates SQL INSERT statements for the OID metadata.
"""
inserts = []
inserts.append(f"-- INSERT statements for snmp.snmp_oid_metadata ({db_type.upper()})")
for oid in oids_data:
# Escape single quotes for SQL string literals.
# MySQL and PostgreSQL both use '' for escaping single quotes.
description_sql = oid['oid_description'].replace("'", "''")
# Handle NULL for the 'unit' field in SQL
unit_sql = f"'{oid['unit']}'" if oid['unit'] else "NULL"
insert_statement = f"""INSERT INTO snmp.snmp_oid_metadata (
oid_string,
oid_name,
oid_description,
syntax_type,
unit,
oid_type
) VALUES (
'{oid['oid_string']}',
'{oid['oid_name']}',
'{description_sql}',
'{oid['syntax_type']}',
{unit_sql},
'{oid['oid_type']}'
);"""
inserts.append(insert_statement)
#end for
return "\n".join(inserts)
#end def
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Parse SNMP MIB files and insert OID metadata into a database.")
# # Create a mutually exclusive group for mib_file and mib_directory
# mib_input_group = parser.add_mutually_exclusive_group(required=True)
parser.add_argument("--mib-files", nargs='*', default=[], help="Path to a single MIB file to parse.")
parser.add_argument("--mib-directory", nargs='*', default=[], help="Path to a directory containing MIB files.")
args = parser.parse_args()
# MIB_FILES = args.mib_files,
# MIB_DIR = args.mib_directory,
MIB_DIR = "mibsx"
print("FilePath : ", MIB_FILES)
print("FileDir : ", MIB_DIR)
print(f"Please ensure the following MIB files are placed in the '{MIB_DIR}' directory:")
for mib_file in MIB_FILES:
print(f"- {mib_file}")
#end for
print("\nAttempting to load MIBs and extract OID data...")
oids = get_mib_oids(MIB_FILES, MIB_DIR)
if oids:
print(f"\nSuccessfully extracted {len(oids)} OID entries.")
# Generate Redis Commands
redis_commands = generate_redis_commands(oids)
# Save to a file that can be used by your entrypoint.sh
with open("redis_commands_generated.txt", "w", encoding="utf-8") as f:
f.write(redis_commands)
#end with
print("\n--- Redis SET Commands (saved to redis_commands_generated.txt) ---")
# Print a snippet for quick review
print(redis_commands.splitlines()[0])
print(redis_commands.splitlines()[1])
print("...")
print(redis_commands.splitlines()[-1])
# Generate MySQL Inserts
mysql_inserts = generate_sql_inserts(oids, "mysql")
with open("mysql_inserts_generated.sql", "w", encoding="utf-8") as f:
f.write(mysql_inserts)
#end with
print("\n--- MySQL INSERT Statements (saved to mysql_inserts_generated.sql) ---")
# Print a snippet for quick review
print(mysql_inserts.splitlines()[0])
print(mysql_inserts.splitlines()[1])
print("...")
print(mysql_inserts.splitlines()[-1])
# Generate PostgreSQL Inserts
postgresql_inserts = generate_sql_inserts(oids, "postgresql")
with open("postgresql_inserts_generated.sql", "w", encoding="utf-8") as f:
f.write(postgresql_inserts)
#end with
print("\n--- PostgreSQL INSERT Statements (saved to postgresql_inserts_generated.sql) ---")
# Print a snippet for quick review
print(postgresql_inserts.splitlines()[0])
print(postgresql_inserts.splitlines()[1])
print("...")
print(postgresql_inserts.splitlines()[-1])
else:
print("No OID data extracted. Please check MIB files, MIB_DIR, and any error messages during MIB loading.")
#end if
#end def
Utils.py
#######################################################################################################################
#
#
# Project : Genericl utils for programs
#
# File : utils.py
#
# Description :
#
# Created : 22 November 2024
#
# Notes : Python Logging Package:
# : https://docs.python.com/3/library/logging.html
# : https://realpython.com/python-logging/
#
########################################################################################################################
__author__ = "George Leonard"
__email__ = "georgelza@gmail.com"
__version__ = "3.0.0"
__copyright__ = "Copyright 2025, - G Leonard"
import logging
"""
Common Generic Logger setup, used by master loop for console and common file.
"""
def logger(filename, console_debuglevel, file_debuglevel, log_format=None):
"""
Configures and returns a logger instance with console and file handlers.
Args:
filename (str): The name of the log file.
console_debuglevel (int): Debug level for console output (0=DEBUG, 1=INFO, etc.).
file_debuglevel (int): Debug level for file output (0=DEBUG, 1=INFO, etc.).
log_format (str, optional): Custom log format string.
Defaults to '%(asctime)s - %(levelname)s - %(processName)s - %(message)s'.
Returns:
logging.Logger: Configured logger instance.
"""
logger = logging.getLogger(__name__)
# Ensure handlers are not added multiple times if logger is called more than once
if not logger.handlers:
logger.setLevel(logging.DEBUG) # Set overall logger level to DEBUG to allow all messages to pass to handlers
# Define default format if not provided
default_console_format = '%(asctime)s - %(levelname)s - %(processName)s - %(message)s'
default_file_format = '%(asctime)s - %(levelname)s - %(message)s'
# Create console handler
ch = logging.StreamHandler()
# Use provided format or default for console
console_formatter = logging.Formatter(log_format if log_format else default_console_format)
ch.setFormatter(console_formatter)
# Set console log level
if console_debuglevel == 0:
ch.setLevel(logging.DEBUG)
elif console_debuglevel == 1:
ch.setLevel(logging.INFO)
elif console_debuglevel == 2:
ch.setLevel(logging.WARNING)
elif console_debuglevel == 3:
ch.setLevel(logging.ERROR)
elif console_debuglevel == 4:
ch.setLevel(logging.CRITICAL)
else:
ch.setLevel(logging.INFO) # Default
logger.addHandler(ch)
# Create file handler
fh = logging.FileHandler(filename)
# Use provided format or default for file
file_formatter = logging.Formatter(log_format if log_format else default_file_format)
fh.setFormatter(file_formatter)
# Set file log level
if file_debuglevel == 0:
fh.setLevel(logging.DEBUG)
elif file_debuglevel == 1:
fh.setLevel(logging.INFO)
elif file_debuglevel == 2:
fh.setLevel(logging.WARNING)
elif file_debuglevel == 3:
fh.setLevel(logging.ERROR)
elif file_debuglevel == 4:
fh.setLevel(logging.CRITICAL)
else:
fh.setLevel(logging.INFO) # Default
logger.addHandler(fh)
return logger
# end logger
requirements
pysnmp
psycopg2-binary
mysql-connector-python
redis
pysmi