Source code for econuy.utils.sql

from os import PathLike, path, listdir
from pathlib import Path
from typing import Union, Optional, Iterable

import pandas as pd
import sqlalchemy as sqla
from pandas.errors import ParserError
from sqlalchemy import select, table, column, and_


[docs]def read( con: sqla.engine.base.Connection, command: Optional[str] = None, table_name: Optional[str] = None, cols: Union[str, Iterable[str], None] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, **kwargs, ) -> pd.DataFrame: """ Convenience wrapper around `pandas.read_sql_query <https://pandas.pydata. org/pandas-docs/stable/reference/api/pandas.read_sql_query.html>`_. Deals with multiindex column names. Parameters ---------- con : sqlalchemy.engine.base.Connection Connection to SQL database. command : str, sqlalchemy.sql.Selectable or None, default None Command to pass to `pandas.read_sql_query <https://pandas.pydata.org/ pandas-docs/stable/reference/api/pandas.read_sql_query.html>`_. If this parameter is not None, `table`, `cols`, `start_date` and `end_date` will be ignored. table_name : str or None, default None String representing which table should be retrieved from the database. cols : str, iterable or None, default None Column(s) to retrieve. By default, gets all all columns. start_date : str or None, default None Dates to filter. Inclusive. end_date : str or None, default None Dates to filter. Inclusive. **kwargs Keyword arguments passed to `pandas.read_sql_query <https://pandas. pydata.org/pandas-docs/stable/reference/api/ pandas.read_sql_query.html>`_. Returns ------- SQL queried table : pd.DataFrame """ if command is not None: output = pd.read_sql_query(sql=command, con=con, index_col="index", **kwargs) else: if all(v is None for v in [cols, start_date, end_date]): output = pd.read_sql( sql=table_name, con=con, index_col="index", parse_dates="index", **kwargs ) else: if isinstance(cols, Iterable) and not isinstance(cols, str): cols_sql = [column(x) for x in cols] cols_sql.append(column("index")) elif isinstance(cols, str) and cols != "*": cols_sql = [column(cols)] cols_sql.append(column("index")) else: cols_sql = "*" command = select(*cols_sql).select_from(table(table_name)) dates = column("index") if start_date is not None: if end_date is not None: command = command.where(and_(dates >= f"{start_date}", dates <= f"{end_date}")) else: command = command.where(dates >= f"{start_date}") elif end_date is not None: command = command.where(dates <= f"{end_date}") output = pd.read_sql( sql=command, con=con, index_col="index", parse_dates="index", **kwargs ) metadata = pd.read_sql(sql=f"{table_name}_metadata", con=con, index_col="index") if isinstance(cols, Iterable) and cols != "*": if isinstance(cols, str): cols = [cols] metadata = metadata.loc[metadata["Indicador"].isin(cols)] metadata = metadata.set_index("Indicador").loc[cols].reset_index() output.columns = pd.MultiIndex.from_frame(metadata) output.rename_axis(None, inplace=True) return output
[docs]def df_to_sql( df: pd.DataFrame, name: str, con: sqla.engine.base.Connection, if_exists: str = "replace" ) -> None: """Flatten MultiIndex index columns before creating SQL table from dataframe.""" data = df.copy() if isinstance(data.columns, pd.MultiIndex): metadata = data.columns.to_frame(index=False) metadata.to_sql(name=f"{name}_metadata", con=con, if_exists=if_exists) data.columns = data.columns.get_level_values(level=0) data.to_sql(name=name, con=con, if_exists=if_exists, index_label="index") return
[docs]def insert_csvs(con: sqla.engine.base.Connection, directory: Union[str, Path, PathLike]) -> None: """Insert all CSV files in data directory into a SQL database.""" if path.isfile(directory): directory = path.dirname(directory) for file in [x for x in listdir(directory) if x.endswith(".csv")]: full_path = Path(directory) / file try: data = pd.read_csv( full_path, index_col=0, header=list(range(9)), float_precision="high", parse_dates=True, encoding="latin1", ) except ParserError: data = pd.read_csv( full_path, index_col=0, float_precision="high", parse_dates=True, encoding="latin1" ) df_to_sql( df=data, name=Path(file).with_suffix("").as_posix(), con=con, if_exists="replace" ) print(f"Inserted {file} into {con.engine.url}.") return