Source code for econuy.transform.decompose

import warnings
from pathlib import Path
from typing import Union, Dict, Tuple
from os import PathLike, getcwd, path

import pandas as pd
from statsmodels.tools.sm_exceptions import X13Error, X13Warning
from statsmodels.tsa import x13 as x13_sm
from statsmodels.tsa.x13 import x13_arima_analysis as x13a
from statsmodels.tsa.seasonal import STL, seasonal_decompose

from econuy.utils import metadata, x13
from econuy.utils.transform import error_handler


# The `_open_and_read` function needs to be monkey-patched to specify the
# encoding or decomposition will fail on Windows
def _new_open_and_read(fname):
    with open(fname, "r", encoding="utf8") as fin:
        fout = fin.read()
    return fout


x13_sm._open_and_read = _new_open_and_read


[docs]def decompose( df: pd.DataFrame, component: str = "both", method: str = "x13", force_x13: bool = False, fallback: str = "loess", outlier: bool = True, trading: bool = True, x13_binary: Union[str, PathLike, None] = "search", search_parents: int = 0, ignore_warnings: bool = True, errors: str = "raise", **kwargs, ) -> Union[Dict[str, pd.DataFrame], pd.DataFrame]: """ Apply seasonal decomposition. By default returns both trend and seasonally adjusted components, unlike the class method referred below. See Also -------- :mod:`~econuy.core.Pipeline.decompose`. """ if errors not in ["raise", "coerce", "ignore"]: raise ValueError("method can only be 'x13', 'loess' or 'ma'.") if method not in ["x13", "loess", "ma"]: raise ValueError("method can only be 'x13', 'loess' or 'ma'.") if fallback not in ["loess", "ma"]: raise ValueError("method can only be 'loess' or 'ma'.") if component not in ["trend", "seas", "both"]: raise ValueError("component can only be 'trend', 'seas' or 'both'.") if "Seas. Adj." not in df.columns.names: raise ValueError("Input dataframe's multiindex requires the " "'Seas. Adj.' level.") binary_path = None if method == "x13": if x13_binary == "search": binary_path = x13._search_binary( start_path=getcwd(), n=search_parents, download_path=getcwd() ) elif isinstance(x13_binary, str): binary_path = x13_binary elif isinstance(x13_binary, PathLike): binary_path = Path(x13_binary).as_posix() else: binary_path = None if isinstance(binary_path, str) and path.isfile(binary_path) is False: raise FileNotFoundError("X13 binary missing. Try using 'x13_binary=search'.") checks = [x not in ["Tendencia", "SA"] for x in df.columns.get_level_values("Seas. Adj.")] passing = df.loc[:, checks] not_passing = df.loc[:, [not x for x in checks]] if any(checks): if not all(checks) and errors == "raise": error_df = df.loc[:, [not check for check in checks]] msg = f"{error_df.columns[0][0]} does not have the " f"appropiate metadata." return error_handler(df=df, errors=errors, msg=msg) passing_output = _decompose( passing, component=component, method=method, force_x13=force_x13, fallback=fallback, outlier=outlier, trading=trading, x13_binary=binary_path, ignore_warnings=ignore_warnings, errors=errors, **kwargs, ) if not_passing.shape[1] != 0: not_passing_output = error_handler(df=not_passing, errors=errors) else: not_passing_output = not_passing if isinstance(passing_output, pd.DataFrame): output = pd.concat([passing_output, not_passing_output], axis=1) output = output[df.columns.get_level_values(0)] return output elif isinstance(passing_output, Dict): output = {} for name, data in passing_output.items(): aux = pd.concat([data, not_passing_output], axis=1) output[name] = aux[df.columns.get_level_values(0)] return output else: return error_handler(df=df, errors=errors)
def _decompose( df: pd.DataFrame, component: str = "both", method: str = "x13", force_x13: bool = False, fallback: str = "loess", outlier: bool = True, trading: bool = True, x13_binary: Union[str, PathLike, None] = None, ignore_warnings: bool = True, errors: str = "raise", **kwargs, ) -> Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]: if method not in ["x13", "loess", "ma"]: raise ValueError("method can only be 'x13', 'loess' or 'ma'.") if fallback not in ["loess", "ma"]: raise ValueError("method can only be 'loess' or 'ma'.") df_proc = df.copy() old_columns = df_proc.columns df_proc.columns = df_proc.columns.get_level_values(level=0) df_proc.index = pd.to_datetime(df_proc.index, errors="coerce") trends_array = [] seas_adjs_array = [] for col in df_proc.columns: col_df = df_proc[col].dropna() if method == "x13": try: with warnings.catch_warnings(): if ignore_warnings is True: action = "ignore" else: action = "default" warnings.filterwarnings(action=action, category=X13Warning) results = x13a( col_df, outlier=outlier, trading=trading, x12path=x13_binary, prefer_x13=True, **kwargs, ) trends = results.trend.reindex(df_proc.index).T seas_adjs = results.seasadj.reindex(df_proc.index).T except X13Error: if force_x13 is True: if outlier is True: try: warnings.warn( "X13 error found with selected " "parameters. Trying with outlier=False.", UserWarning, ) results = x13a( col_df, outlier=False, trading=trading, x12path=x13_binary, prefer_x13=True, **kwargs, ) except X13Error: try: warnings.warn( "X13 error found with trading=True. " "Trying with trading=False.", UserWarning, ) results = x13a( col_df, outlier=False, trading=False, x12path=x13_binary, prefer_x13=True, **kwargs, ) trends = results.trend.reindex(df_proc.index).T seas_adjs = results.seasadj.reindex(df_proc.index).T except X13Error: warnings.warn( "No combination of parameters " "successful. No decomposition " "performed.", UserWarning, ) trends = error_handler(df=col_df, errors=errors) seas_adjs = trends.copy() elif trading is True: try: warnings.warn( "X13 error found with trading=True. " "Trying with trading=False...", UserWarning, ) results = x13a( col_df, trading=False, x12path=x13_binary, prefer_x13=True, **kwargs, ) trends = results.trend.reindex(df_proc.index).T seas_adjs = results.seasadj.reindex(df_proc.index).T except X13Error: warnings.warn( "No combination of parameters " "successful. Filling with NaN.", UserWarning, ) trends = error_handler(df=col_df, errors=errors) seas_adjs = trends.copy() else: if fallback == "loess": results = STL(col_df).fit() else: results = seasonal_decompose(col_df, extrapolate_trend="freq") trends = results.trend.reindex(df_proc.index).T seas_adjs = (results.observed - results.seasonal).reindex(df_proc.index).T else: if method == "loess": results = STL(col_df).fit() else: results = seasonal_decompose(col_df, extrapolate_trend="freq") trends = results.trend.reindex(df_proc.index).T seas_adjs = (results.observed - results.seasonal).reindex(df_proc.index).T trends_array.append(trends) seas_adjs_array.append(seas_adjs) trends = pd.concat(trends_array, axis=1) seas_adjs = pd.concat(seas_adjs_array, axis=1) trends.columns = old_columns seas_adjs.columns = old_columns metadata._set(trends, seas_adj="Tendencia") metadata._set(seas_adjs, seas_adj="SA") if component == "both": output = {"trend": trends, "seas": seas_adjs} elif component == "seas": output = seas_adjs elif component == "trend": output = trends return output