Source code for econuy.transform.convert

from typing import Optional, Union
from datetime import datetime

import pandas as pd

from econuy.transform import rolling, resample
from econuy.utils import metadata
from econuy.utils.transform import error_handler


[docs]def convert_usd(df: pd.DataFrame, pipeline=None, errors: str = "raise") -> pd.DataFrame: """Convert to other units. See Also -------- :mod:`~econuy.core.Pipeline.convert`. """ if errors not in ["raise", "coerce", "ignore"]: raise ValueError("'errors' must be one of 'raise', " "'coerce' or 'ignore'.") if "Moneda" not in df.columns.names: raise ValueError("Input dataframe's multiindex requires the " "'Moneda' level.") if pipeline is None: from econuy.core import Pipeline pipeline = Pipeline() checks = [x == "UYU" for x in df.columns.get_level_values("Moneda")] if any(checks): if not all(checks) and errors == "raise": error_df = df.loc[:, [not check for check in checks]] msg = f"{error_df.columns[0][0]} does not have the " f"appropiate metadata." return error_handler(df=df, errors=errors, msg=msg) pipeline.get(name="nxr_monthly") nxr_data = pipeline.dataset all_metadata = df.columns.droplevel("Indicador") if all(x == all_metadata[0] for x in all_metadata): return _convert_usd(df=df, nxr=nxr_data) else: columns = [] for column_name, check in zip(df.columns, checks): df_column = df[[column_name]] if check is False: msg = f"{column_name[0]} does not have the " f"appropiate metadata." columns.append(error_handler(df=df_column, errors=errors, msg=msg)) else: converted = _convert_usd(df=df_column, nxr=nxr_data) columns.append(converted) return pd.concat(columns, axis=1) else: return error_handler(df=df, errors=errors)
def _convert_usd(df: pd.DataFrame, nxr: Optional[pd.DataFrame] = None) -> pd.DataFrame: if nxr is None: from econuy.core import Pipeline pipeline = Pipeline() pipeline.get("nxr_monthly") nxr = pipeline.dataset inferred_freq = pd.infer_freq(df.index) if inferred_freq in ["D", "B", "C", "W", "W-SUN", None]: if df.columns.get_level_values("Tipo")[0] == "Flujo": df = df.resample("M").sum() else: df = df.resample("M").last() inferred_freq = pd.infer_freq(df.index) if df.columns.get_level_values("Tipo")[0] == "Stock": metadata._set(nxr, ts_type="Stock") nxr_freq = resample(nxr, rule=inferred_freq, operation="last").iloc[:, [1]] else: metadata._set(nxr, ts_type="Flujo") nxr_freq = resample(nxr, rule=inferred_freq, operation="mean").iloc[:, [0]] cum_periods = int(df.columns.get_level_values("Acum. períodos")[0]) nxr_freq = rolling(nxr_freq, window=cum_periods, operation="mean") nxr_to_use = nxr_freq.reindex(df.index).iloc[:, 0] converted_df = df.div(nxr_to_use, axis=0) metadata._set(converted_df, currency="USD") return converted_df
[docs]def convert_real( df: pd.DataFrame, start_date: Union[str, datetime, None] = None, end_date: Union[str, datetime, None] = None, pipeline=None, errors: str = "raise", ) -> pd.DataFrame: """Convert to other units. See Also -------- :mod:`~econuy.core.Pipeline.convert`. """ if errors not in ["raise", "coerce", "ignore"]: raise ValueError("'errors' must be one of 'raise', " "'coerce' or 'ignore'.") if "Inf. adj." not in df.columns.names: raise ValueError("Input dataframe's multiindex requires the " "'Inf. adj.' level.") if pipeline is None: from econuy.core import Pipeline pipeline = Pipeline() checks = [ x == "UYU" and "Const." not in y for x, y in zip( df.columns.get_level_values("Moneda"), df.columns.get_level_values("Inf. adj.") ) ] if any(checks): if not all(checks) and errors == "raise": error_df = df.loc[:, [not check for check in checks]] msg = f"{error_df.columns[0][0]} does not have the " f"appropiate metadata." return error_handler(df=df, errors=errors, msg=msg) pipeline.get(name="cpi") cpi_data = pipeline.dataset all_metadata = df.columns.droplevel("Indicador") if all(x == all_metadata[0] for x in all_metadata): return _convert_real(df=df, start_date=start_date, end_date=end_date, cpi=cpi_data) else: columns = [] for column_name, check in zip(df.columns, checks): df_column = df[[column_name]] if check is False: msg = f"{column_name[0]} does not have the " f"appropiate metadata." columns.append(error_handler(df=df_column, errors=errors, msg=msg)) else: converted = _convert_real( df=df_column, start_date=start_date, end_date=end_date, cpi=cpi_data ) columns.append(converted) return pd.concat(columns, axis=1) else: return error_handler(df=df, errors=errors)
def _convert_real( df: pd.DataFrame, start_date: Union[str, datetime, None] = None, end_date: Union[str, datetime, None] = None, cpi: Optional[pd.DataFrame] = None, ) -> pd.DataFrame: if cpi is None: from econuy.core import Pipeline pipeline = Pipeline() pipeline.get("cpi") cpi = pipeline.dataset inferred_freq = pd.infer_freq(df.index) if inferred_freq in ["D", "B", "C", "W", "W-SUN", None]: if df.columns.get_level_values("Tipo")[0] == "Flujo": df = df.resample("M").sum() else: df = df.resample("M").mean() inferred_freq = pd.infer_freq(df.index) metadata._set(cpi, ts_type="Flujo") cpi_freq = resample(cpi, rule=inferred_freq, operation="mean").iloc[:, [0]] cum_periods = int(df.columns.get_level_values("Acum. períodos")[0]) cpi_to_use = rolling(cpi_freq, window=cum_periods, operation="mean").squeeze() if start_date is None: converted_df = df.div(cpi_to_use, axis=0) col_text = "Const." elif end_date is None: start_date = pd.to_datetime(start_date) month = df.index.to_series().sub(start_date).abs().idxmin() # month = df.iloc[df.index.get_loc(start_date, method="nearest")].name converted_df = df.div(cpi_to_use, axis=0) * cpi_to_use.loc[month] m_start = start_date.strftime("%Y-%m") col_text = f"Const. {m_start}" else: converted_df = df.div(cpi_to_use, axis=0) * cpi_to_use[start_date:end_date].mean() m_start = datetime.strptime(start_date, "%Y-%m-%d").strftime("%Y-%m") m_end = datetime.strptime(end_date, "%Y-%m-%d").strftime("%Y-%m") if m_start == m_end: col_text = f"Const. {m_start}" else: col_text = f"Const. {m_start}_{m_end}" converted_df = converted_df.reindex(df.index) metadata._set(converted_df, inf_adj=col_text) return converted_df
[docs]def convert_gdp(df: pd.DataFrame, pipeline=None, errors: str = "raise") -> pd.DataFrame: """Convert to other units. See Also -------- :mod:`~econuy.core.Pipeline.convert`. """ if errors not in ["raise", "coerce", "ignore"]: raise ValueError("'errors' must be one of 'raise', 'coerce' or " "'ignore'.") if any(x not in df.columns.names for x in ["Área", "Unidad"]): raise ValueError( "Input dataframe's multiindex requires the 'Área' " "and 'Unidad' levels." ) if pipeline is None: from econuy.core import Pipeline pipeline = Pipeline() checks = [ x not in ["Regional", "Global"] and "%PBI" not in y for x, y in zip(df.columns.get_level_values("Área"), df.columns.get_level_values("Unidad")) ] if any(checks): if not all(checks) and errors == "raise": error_df = df.loc[:, [not check for check in checks]] msg = f"{error_df.columns[0][0]} does not have the " f"appropiate metadata." return error_handler(df=df, errors=errors, msg=msg) pipeline.get(name="_monthly_interpolated_gdp") gdp_data = pipeline.dataset all_metadata = df.columns.droplevel("Indicador") if all(x == all_metadata[0] for x in all_metadata): return _convert_gdp(df=df, gdp=gdp_data) else: columns = [] for column_name, check in zip(df.columns, checks): df_column = df[[column_name]] if check is False: msg = f"{column_name[0]} does not have the " f"appropiate metadata." columns.append(error_handler(df=df_column, errors=errors, msg=msg)) else: converted = _convert_gdp(df=df_column, gdp=gdp_data) columns.append(converted) return pd.concat(columns, axis=1) else: return error_handler(df=df, errors=errors)
def _convert_gdp(df: pd.DataFrame, gdp: Optional[pd.DataFrame] = None) -> pd.DataFrame: if gdp is None: from econuy.core import Pipeline pipeline = Pipeline() pipeline.get("_monthly_interpolated_gdp") gdp = pipeline.dataset inferred_freq = pd.infer_freq(df.index) cum = int(df.columns.get_level_values("Acum. períodos")[0]) if inferred_freq in ["M", "MS"]: gdp = resample(gdp, rule=inferred_freq, operation="upsample", interpolation="linear") if cum != 12 and df.columns.get_level_values("Tipo")[0] == "Flujo": converter = int(12 / cum) df = rolling(df, window=converter, operation="sum") elif inferred_freq in ["Q", "Q-DEC"]: gdp = gdp.resample(inferred_freq, convention="end").asfreq() if cum != 4 and df.columns.get_level_values("Tipo")[0] == "Flujo": converter = int(4 / cum) df = rolling(df, window=converter, operation="sum") elif inferred_freq in ["A", "A-DEC"]: gdp = gdp.resample(inferred_freq, convention="end").asfreq() elif inferred_freq in ["D", "B", "C", "W", "W-SUN", None]: if df.columns.get_level_values("Tipo")[0] == "Flujo": df = df.resample("M").sum() else: df = df.resample("M").mean() gdp = resample(gdp, rule="M", operation="upsample", interpolation="linear") else: raise ValueError( "Frequency of input dataframe not any of 'D', 'C', " "'W', 'B', 'M', 'MS', 'Q', 'Q-DEC', 'A' or 'A-DEC'." ) if df.columns.get_level_values("Moneda")[0] == "USD": gdp = gdp.iloc[:, 1].to_frame() else: gdp = gdp.iloc[:, 0].to_frame() gdp_to_use = gdp.reindex(df.index).iloc[:, 0] converted_df = df.div(gdp_to_use, axis=0).multiply(100) metadata._set(converted_df, unit="% PBI") return converted_df