Source code for utils.database

import sqlite3
from pathlib import Path
import pandas as pd
from py_neuromodulation.utils.types import _PathLike
from py_neuromodulation.utils.io import generate_unique_filename


[docs] class NMDatabase: """ Class to create a database and insert data into it. Parameters ---------- out_dir : _PathLike The directory to save the database. csv_path : str, optional The path to save the csv file. If not provided, it will be saved in the same folder as the database. """ def __init__( self, name: str, out_dir: _PathLike, csv_path: _PathLike | None = None, ): # Make sure out_dir exists Path(out_dir).mkdir(parents=True, exist_ok=True) self.db_path = Path(out_dir, f"{name}.db") self.table_name = f"{name}_data" # change to param? self.table_created = False if self.db_path.exists(): self.db_path = generate_unique_filename(self.db_path) name = self.db_path.stem if csv_path is None: self.csv_path = Path(out_dir, f"{name}.csv") else: self.csv_path = Path(csv_path) self.csv_path.parent.mkdir(parents=True, exist_ok=True) self.conn = sqlite3.connect(self.db_path) self.cursor = self.conn.cursor() # Database config and optimization, prioritize data integrity self.cursor.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging mode self.cursor.execute("PRAGMA synchronous=FULL") # Sync on every commit self.cursor.execute("PRAGMA temp_store=MEMORY") # Store temp tables in memory self.cursor.execute( "PRAGMA wal_autocheckpoint = 1000" ) # WAL checkpoint every 1000 pages (default, 4MB, might change) self.cursor.execute( f"PRAGMA mmap_size = {2 * 1024 * 1024 * 1024}" ) # 2GB of memory mapped
[docs] def infer_type(self, value): """Infer the type of the value to create the table schema. Parameters ---------- value : int, float, str The value to infer the type.""" if isinstance(value, (int, float)): return "REAL" elif isinstance(value, str): return "TEXT" else: return "BLOB"
[docs] def create_table(self, feature_dict: dict): """ Create a table in the database. Parameters ---------- feature_dict : dict The dictionary with the feature names and values. """ columns_schema = ", ".join( [ f'"{column}" {self.infer_type(value)}' for column, value in feature_dict.items() ] ) self.cursor.execute( f'CREATE TABLE IF NOT EXISTS "{self.table_name}" ({columns_schema})' ) # Create column names and placeholders for insert statement self.columns: str = ", ".join([f'"{column}"' for column in feature_dict.keys()]) # Use named placeholders for more resiliency against unexpected change in column order self.placeholders = ", ".join([f":{key}" for key in feature_dict.keys()])
[docs] def insert_data(self, feature_dict: dict): """ Insert data into the database. Parameters ---------- feature_dict : dict The dictionary with the feature names and values. """ if not self.table_created: self.create_table(feature_dict) self.table_created = True insert_sql = f'INSERT INTO "{self.table_name}" ({self.columns}) VALUES ({self.placeholders})' self.cursor.execute(insert_sql, feature_dict)
def commit(self): self.conn.commit()
[docs] def fetch_all(self): """ " Fetch all the data from the database. Returns ------- pd.DataFrame The data in a pandas DataFrame. """ return pd.read_sql_query(f'SELECT * FROM "{self.table_name}"', self.conn)
[docs] def head(self, n: int = 5): """ " Returns the first N rows of the database. Parameters ---------- n : int, optional The number of rows to fetch, by default 1 ------- pd.DataFrame The data in a pandas DataFrame. """ return pd.read_sql_query( f'SELECT * FROM "{self.table_name}" LIMIT {n}', self.conn )
def save_as_csv(self): df = self.fetch_all() df.to_csv(self.csv_path, index=False) def close(self): # Optimize before closing is recommended: # https://www.sqlite.org/pragma.html#pragma_optimize self.cursor.execute("PRAGMA optimize") self.conn.close()