import logging
import asyncio
import configparser
import pathlib
from passlib.hash import bcrypt
from typing import Union
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from pymongo.results import InsertOneResult
logging.basicConfig(level=logging.INFO)
class Database:
"""
An asynchronous class for working with the MongoDB database.
"""
def __init__(self, url: str):
self.client: AsyncIOMotorClient = AsyncIOMotorClient(url)
self.db: AsyncIOMotorDatabase = self.client["NauticDemoDataBase"]
self.users_collection: AsyncIOMotorCollection = self.db["users"]
async def close(self):
if self.client is not None:
await self.client.close()
async def save_user(self, user_data: dict) -> bool:
try:
insert_result: InsertOneResult = await self.users_collection.insert_one(user_data)
return insert_result.acknowledged
except Exception as e:
logging.error("Error saving user data: %s", e)
return False
async def test_add_data_to_database(database: Database) -> dict:
try:
data = {
"username": "Shepard",
"email": "naiticai@dbmongodb.com",
"password": bcrypt.hash("12345678"),
"risk_level": 0,
}
insert_result: InsertOneResult = await database.users_collection.insert_one(data)
if insert_result.acknowledged:
inserted_id = insert_result.inserted_id
logging.info("Data successfully added to database. ID of inserted document: %s", inserted_id)
print("Added data:", data)
return data
otherwise:
logging.error("Failed to confirm adding data to database.")
return {}
except Exception as e:
logging.error("Error adding data to database: %s", e)
return {}
async def connect_to_database_mongo(url: str) -> Union[None, Database]:
try:
database = Database(url)
await database.client.admin.command('ping')
logging.info("Connected to MongoDB. Ping successful!")
return database
except Exception as e:
logging.error("Failed to connect to MongoDB: %s", e)
return None
def get_mongo_url() -> str:
"""
We get the URL to connect to MongoDB from the settings.ini configuration file.
Returns:
str: URL to connect to the MongoDB database.
"""
file_config = pathlib.Path(__file__).parent.parent.joinpath("settings.ini")
config = configparser.ConfigParser()
config.read(file_config)
mongo_user = config.get("mongo", "user")
mongo_pass = config.get("mongo", "password")
mongo_db = config.get("mongo", "db_name")
mongo_domain = config.get("mongo", "domain")
url = f"mongodb+srv://{mongo_user}:{mongo_pass}@{mongo_domain}/?retryWrites=true&w=majority"
return url
async def perform_initialization() -> None:
url = get_mongo_url()
database = await connect_to_database_mongo(url)
if database is not None:
await test_add_data_to_database(database)
await database.close()
if __name__ == "__main__":
asyncio.run(perform_initialization())
I am getting the following error
Traceback (most recent call last):
Nauticai_backend\Nauticai_backend\src\database\connect.py", line 99, in
asyncio.run(perform_initialization())
Nauticai_backend\Nauticai_backend\src\database\connect.py", line 95, in perform_initialization
await database.close()
\Nauticai_backend\Nauticai_backend\src\database\connect.py", line 25, in close
await self.client.close()
TypeError: object NoneType can’t be used in ‘await’ expression