Source code for econuy.utils.operations

import inspect
import json
from os import path, PathLike, mkdir
from pathlib import Path
from typing import Union, Optional, Dict

import pandas as pd
from sqlalchemy.engine.base import Connection, Engine
from sqlalchemy.exc import ProgrammingError, OperationalError

from econuy.utils import metadata, sql, get_project_root


def _read(
    data_loc: Union[str, PathLike, Connection, Engine],
    file_fmt: str = "csv",
    multiindex: Optional[str] = "included",
    table_name: Optional[str] = None,
) -> pd.DataFrame:
    """Read a DataFrame from SQL database or CSV/Excel.

    Parameters
    ----------
    data_loc : Union[str, PathLike, Connection, Engine]
        SQL object or path to file.
    table_name : Optional[str]
        Name for input SQL table.
    file_fmt : {'csv', 'xlsx'}
        File format. Ignored if ``data_loc`` refers to a SQL object.
    multiindex : {'included', 'separate', None}
        How to handle multiindexes for metadata. ``None`` keeps only the first
        level, ``included`` keeps as DataFrame columns and ``separate`` saves
        it to another sheet (only valid for Excel-type formats).
    """
    try:
        if isinstance(data_loc, (Engine, Connection)):
            if multiindex is not None:
                previous_data = sql.read(con=data_loc, table_name=table_name)
            else:
                previous_data = pd.read_sql(
                    sql=table_name, con=data_loc, index_col="index", parse_dates="index"
                )
        else:
            date_format = None
            special_date_format = ["cattle_slaughter", "international_reserves_changes"]
            if Path(data_loc).stem in special_date_format:
                date_format = "%d/%m/%Y"
            if file_fmt == "csv":
                if multiindex == "included":
                    previous_data = pd.read_csv(
                        data_loc,
                        index_col=0,
                        header=list(range(9)),
                        float_precision="high",
                        encoding="latin1",
                    )
                    metadata._set(previous_data)
                else:
                    previous_data = pd.read_csv(
                        data_loc, index_col=0, header=0, float_precision="high", encoding="latin1"
                    )
            else:
                if multiindex == "included":
                    previous_data = pd.read_excel(
                        data_loc, index_col=0, header=list(range(9)), sheet_name="Data"
                    )
                    metadata._set(previous_data)
                elif multiindex == "separate":
                    excel = pd.ExcelFile(data_loc)
                    previous_data = pd.read_excel(excel, index_col=0, header=0, sheet_name="Data")
                    header = (
                        pd.read_excel(excel, sheet_name="Metadata", index_col=0)
                        .rename_axis("Indicador")
                        .T
                    )
                    previous_data.columns = pd.MultiIndex.from_frame(header.reset_index())
                    metadata._set(previous_data)
                else:
                    previous_data = pd.read_excel(
                        data_loc, index_col=0, header=0, sheet_name="Data"
                    )
            try:
                previous_data.index = pd.to_datetime(previous_data.index, format=date_format)
            except ValueError:
                previous_data.index = pd.to_datetime(previous_data.index, format=None)

    except (ProgrammingError, OperationalError, FileNotFoundError):
        previous_data = pd.DataFrame()

    return previous_data


def _save(
    data: pd.DataFrame,
    data_loc: Union[str, PathLike, Connection, Engine],
    table_name: str = None,
    file_fmt: str = "csv",
    multiindex: str = "included",
):
    """Save a DataFrame to SQL database or CSV/Excel.

    Parameters
    ----------
    data : pd.DataFrame
        Data to save.
    data_loc : Union[str, PathLike, Connection, Engine]
        SQL object or path to file.
    table_name : Optional[str]
        Name for output SQL table.
    file_fmt : {'csv', 'xlsx'}
        File format. Ignored if ``data_loc`` refers to a SQL object.
    multiindex : {'included', 'separate', None}
        How to handle multiindexes for metadata. ``None`` keeps only the first
        level, ``included`` keeps as DataFrame columns and ``separate`` saves
        it to another sheet (only valid for Excel-type formats).
    """
    if isinstance(data_loc, (Engine, Connection)):
        sql.df_to_sql(data, name=table_name, con=data_loc)
    else:
        data_proc = data.copy()
        if file_fmt == "csv":
            if multiindex != "included":
                data_proc.columns = data_proc.columns.get_level_values(0)
                data_proc.to_csv(data_loc, encoding="latin1")
            else:
                data_proc.to_csv(data_loc, encoding="latin1")
        else:
            if multiindex == "included":
                data_proc.to_excel(data_loc, sheet_name="Data")
            elif multiindex == "separate":
                metadata = data_proc.columns.to_frame().set_index("Indicador").T
                data_proc.columns = data_proc.columns.get_level_values(0)
                with pd.ExcelWriter(data_loc) as f:
                    data_proc.to_excel(f, sheet_name="Data")
                    metadata.to_excel(f, sheet_name="Metadata")
            else:
                data_proc.columns = data_proc.columns.get_level_values(0)
                data_proc.to_excel(data_loc, sheet_name="Data")

    return


def _revise(new_data: pd.DataFrame, prev_data: pd.DataFrame, revise_rows: Union[int, str]):
    """Replace n rows of data at the end of a dataframe with new data."""
    if len(prev_data) == 0:
        return new_data
    frequency = pd.infer_freq(prev_data.index)
    freq_table = {"A": 3, "A-DEC": 3, "Q": 4, "Q-DEC": 4, "M": 12}
    new_data = new_data.apply(pd.to_numeric, errors="coerce")

    if isinstance(revise_rows, str) and revise_rows in "noduplicate":
        prev_data.columns = new_data.columns
        updated = pd.concat([prev_data, new_data], axis=0)
        updated = updated.loc[~updated.index.duplicated(keep="last")]
        updated.sort_index(inplace=True)
        return updated

    elif isinstance(revise_rows, str) and revise_rows in "automatic":
        try:
            revise_rows = freq_table[frequency]
        except KeyError:
            revise_rows = 12
            if len(prev_data) <= 12 or len(new_data) <= 12:
                revise_rows = 3

    elif isinstance(revise_rows, int):
        revise_rows = revise_rows
    else:
        raise ValueError("`revise_rows` accepts int, 'nodup' or 'auto'")

    non_revised = prev_data[: len(prev_data) - revise_rows]
    revised = new_data[len(prev_data) - revise_rows :]
    non_revised.columns = new_data.columns
    updated = non_revised.append(revised, sort=False)

    return updated


def _io(
    operation: str,
    data_loc: Union[str, PathLike, Connection, Engine],
    name: str,
    data: Optional[pd.DataFrame] = None,
    file_fmt: str = "csv",
    multiindex: Optional[str] = "included",
) -> Optional[pd.DataFrame]:
    """Save/read a DataFrame to/from SQL database or CSV/Excel.

    Parameters
    ----------
    operation : {'save', 'read'}
        Whether to save or read.
    data : Optional[pd.DataFrame]
        Data to save.
    data_loc : Union[str, PathLike, Connection, Engine]
        SQL object or path to folder.
    name : str
        Name for SQL table or IO file.
    file_fmt : {'csv', 'xlsx'}
        File format. Ignored if ``data_loc`` refers to a SQL object.
    multiindex : {'included', 'separate', None}
        How to handle multiindexes for metadata. ``None`` keeps only the first
        level, ``included`` keeps as DataFrame columns and ``separate`` saves
        it to another sheet (only valid for Excel-type formats).
    """
    valid_fmt = ["csv", "xlsx", "xls"]
    valid_operation = ["read", "save"]
    if file_fmt not in valid_fmt:
        raise ValueError(f"'file_fmt' must be one of {', '.join(valid_fmt)}.")
    if operation not in valid_operation:
        raise ValueError(f"'operation' must be one of {', '.join(valid_operation)}.")
    if multiindex not in ["included", "separate", None]:
        raise ValueError("'multiindex' must be one of 'included', 'separate' or None.")

    suffix = f".{file_fmt}"
    if operation == "read":
        if isinstance(data_loc, (str, PathLike)):
            full_update_loc = (Path(data_loc) / name).with_suffix(suffix)
        else:
            full_update_loc = data_loc
        return _read(full_update_loc, table_name=name, multiindex=multiindex, file_fmt=file_fmt)

    elif operation == "save":
        if isinstance(data_loc, (str, PathLike)):
            full_save_loc = (Path(data_loc) / name).with_suffix(suffix)
            if not path.exists(path.dirname(full_save_loc)):
                mkdir(path.dirname(full_save_loc))
        else:
            full_save_loc = data_loc

        _save(
            data=data,
            data_loc=full_save_loc,
            file_fmt=file_fmt,
            multiindex=multiindex,
            table_name=name,
        )
        return


[docs]def load_datasets_info() -> Dict: with open(get_project_root() / "retrieval" / "datasets.json", "r") as f: return json.load(f)
DATASETS = load_datasets_info()
[docs]def get_name_from_function() -> str: return inspect.currentframe().f_back.f_code.co_name
[docs]def get_download_sources(name: str) -> Dict: return DATASETS[name]["sources"]["downloads"]