Source code for econuy.core

from __future__ import annotations
import copy
import importlib
from typing import Union, Optional, Dict, Callable
from os import PathLike
from datetime import datetime

import pandas as pd
from sqlalchemy.engine.base import Engine, Connection

from econuy.utils import operations
from econuy import transform
from econuy.utils.operations import DATASETS


[docs]class Pipeline(object): """ Main class to access download and transformation methods. Attributes ---------- location : str, os.PathLike, SQLAlchemy Connection or Engine, or None, \ default None Either Path or path-like string pointing to a directory where to find a CSV for updating and saving, SQLAlchemy connection or engine object, or ``None``, don't save or update. download : bool, default True If False the ``get`` method will only try to retrieve data on disk. always_Save : bool, default True If True, save every retrieved dataset to the specified ``location``. read_fmt : {'csv', 'xls', 'xlsx'} File format of previously downloaded data. Ignored if ``location`` points to a SQL object. save_fmt : {'csv', 'xls', 'xlsx'} File format for saving. Ignored if ``location`` points to a SQL object. read_header : {'included', 'separate', None} Location of dataset metadata headers. 'included' means they are in the first 9 rows of the dataset. 'separate' means they are in a separate Excel sheet (if ``read_fmt='csv'``, headers are discarded). None means there are no metadata headers. save_header : {'included', 'separate', None} Location of dataset metadata headers. 'included' means they will be set as the first 9 rows of the dataset. 'separate' means they will be saved in a separate Excel sheet (if ``save_fmt='csv'``, headers are discarded). None discards any headers. errors : {'raise', 'coerce', 'ignore'} How to handle errors that arise from transformations. ``raise`` will raise a ValueError, ``coerce`` will force the data into ``np.nan`` and ``ignore`` will leave the input data as is. """ def __init__( self, location: Union[str, PathLike, Engine, Connection, None] = None, download: bool = True, always_save: bool = True, read_fmt: str = "csv", read_header: Optional[str] = "included", save_fmt: str = "csv", save_header: Optional[str] = "included", errors: str = "raise", ): self.location = location self.download = download self.always_save = always_save self.read_fmt = read_fmt self.read_header = read_header self.save_fmt = save_fmt self.save_header = save_header self.errors = errors self._name = None self._dataset = pd.DataFrame() self._download_commodity_weights = False def _get_retrieval_function(self, name: str) -> Callable: """Parse function string and return function.""" function_string = self.available_datasets[name]["function"] module, function = function_string.split(".") path_prefix = "econuy.retrieval." module = importlib.import_module(path_prefix + module) function = getattr(module, function) return function @property def dataset(self) -> pd.DataFrame: """Get dataset.""" return self._dataset @property def dataset_flat(self) -> pd.DataFrame: """Get dataset with no metadata in its column names.""" nometa = self._dataset.copy(deep=True) nometa.columns = nometa.columns.get_level_values(0) return nometa @property def name(self) -> str: """Get dataset name.""" return self._name @property def description(self) -> str: """Get dataset description.""" try: return self.available_datasets[self.name]["description"] except KeyError: return None @property def available_datasets(self) -> Dict: """Get a dictionary with all available datasets. The dictionary is separated by original and custom keys, which denote whether the dataset has been modified in some way or if its as provided by the source """ return {name: metadata for name, metadata in DATASETS.items() if not metadata["disabled"]} def __repr__(self): return f"Pipeline(location={self.location})\n" f"Current dataset: {self.name}"
[docs] def copy(self, deep: bool = False) -> Pipeline: """Copy or deepcopy a Pipeline object. Parameters ---------- deep : bool, default True If True, deepcopy. Returns ------- :class:`~econuy.core.Pipeline` """ if deep: return copy.deepcopy(self) else: return copy.copy(self)
[docs] def get(self, name: str) -> Pipeline: """ Main download method. Parameters ---------- name : str Dataset to download, see available options in :attr:`~econuy.core.Pipeline.available_datasets`. Raises ------ ValueError If an invalid string is given to the ``name`` argument. """ if name not in self.available_datasets.keys(): raise ValueError("Invalid dataset selected.") if self.location is None: prev_data = pd.DataFrame() else: prev_data = operations._io( operation="read", data_loc=self.location, name=name, file_fmt=self.read_fmt, multiindex=self.read_header, ) if not self.download and not prev_data.empty: self._dataset = prev_data print(f"{name}: previous data found.") else: if not self.download: print(f"{name}: previous data not found, downloading.") if name in [ "trade_balance", "terms_of_trade", "rxr_custom", "commodity_index", "cpi_measures", "_monthly_interpolated_gdp", "net_public_debt", "fiscal_balance_summary", "core_industrial_production", "regional_embi_yields", "regional_rxr", "real_wages", "labor_rates_persons", "nxr_monthly", ]: # Some datasets require retrieving other datasets. Passing the # class instance allows running these retrieval operations # with the same parameters (for example, save file formats). new_data = self._get_retrieval_function(name)(pipeline=self) elif name in ["international_reserves_changes"]: # Datasets that require many requests (mostly daily data) benefit # from having previous data, so they can start requests # from the last available date. new_data = self._get_retrieval_function(name)( pipeline=self, previous_data=prev_data ) else: new_data = self._get_retrieval_function(name)() data = operations._revise(new_data=new_data, prev_data=prev_data, revise_rows="nodup") self._dataset = data self._name = name if self.always_save and (self.download or prev_data.empty) and self.location is not None: self.save() return self
[docs] def resample( self, rule: Union[pd.DateOffset, pd.Timedelta, str], operation: str = "sum", interpolation: str = "linear", warn: bool = False, ) -> Pipeline: """ Wrapper for the `resample method <https://pandas.pydata.org/pandas-docs stable/reference/api/pandas.DataFrame.resample.html>`_ in Pandas that integrates with econuy dataframes' metadata. Trim partial bins, i.e. do not calculate the resampled period if it is not complete, unless the input dataframe has no defined frequency, in which case no trimming is done. Parameters ---------- rule : pd.DateOffset, pd.Timedelta or str Target frequency to resample to. See `Pandas offset aliases <https://pandas.pydata.org/pandas-docs/stable/ user_guide/timeseries.html#offset-aliases>`_ operation : {'sum', 'mean', 'last', 'upsample'} Operation to use for resampling. interpolation : str, default 'linear' Method to use when missing data are produced as a result of resampling, for example when upsampling to a higher frequency. See `Pandas interpolation methods <https://pandas.pydata.org/pandas-docs /stable/reference/api/pandas.Series.interpolate.html>`_ warn : bool, default False If False, don't raise warnings with incomplete time-range bins. Returns ------- ``None`` Raises ------ ValueError If ``operation`` is not one of available options. ValueError If the input dataframe's columns do not have the appropiate levels. Warns ----- UserWarning If input frequencies cannot be assigned a numeric value, preventing incomplete bin trimming. """ if self.dataset.empty: raise ValueError( "Can't use transformation methods without " "retrieving a dataset first." ) output = transform.resample( self.dataset, rule=rule, operation=operation, interpolation=interpolation, warn=warn ) self._dataset = output return self
[docs] def chg_diff(self, operation: str = "chg", period: str = "last") -> Pipeline: """Wrapper for the `pct_change <https://pandas.pydata.org/pandas-docs/stable/ reference/api/pandas.DataFrame.pct_change.html>`_ and `diff <https://pandas .pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.diff.html>`_ Pandas methods. Calculate percentage change or difference for dataframes. The ``period`` argument takes into account the frequency of the dataframe, i.e., ``inter`` (for interannual) will calculate pct change/differences with ``periods=4`` for quarterly frequency, but ``periods=12`` for monthly frequency. Parameters ---------- df : pd.DataFrame Input dataframe. operation : {'chg', 'diff'} ``chg`` for percent change or ``diff`` for differences. period : {'last', 'inter', 'annual'} Period with which to calculate change or difference. ``last`` for previous period (last month for monthly data), ``inter`` for same period last year, ``annual`` for same period last year but taking annual sums. Returns ------- ``None`` Raises ------ ValueError If the dataframe is not of frequency ``M`` (month), ``Q`` or ``Q-DEC`` (quarter), or ``A`` or ``A-DEC`` (year). ValueError If the ``operation`` parameter does not have a valid argument. ValueError If the ``period`` parameter does not have a valid argument. ValueError If the input dataframe's columns do not have the appropiate levels. """ if self.dataset.empty: raise ValueError( "Can't use transformation methods without " "retrieving a dataset first." ) output = transform.chg_diff(self.dataset, operation=operation, period=period) self._dataset = output return self
[docs] def decompose( self, component: str = "seas", method: str = "x13", force_x13: bool = False, fallback: str = "loess", trading: bool = True, outlier: bool = True, x13_binary: Union[str, PathLike] = "search", search_parents: int = 0, ignore_warnings: bool = True, **kwargs, ) -> Pipeline: """Apply seasonal decomposition. Decompose the series in a Pandas dataframe using either X13 ARIMA, Loess or moving averages. X13 can be forced in case of failure by alternating the underlying function's parameters. If not, it will fall back to one of the other methods. If the X13 method is chosen, the X13 binary has to be provided. Please refer to the README for instructions on where to get this binary. Parameters ---------- component : {'seas', 'trend'} Return seasonally adjusted or trend component. method : {'x13', 'loess', 'ma'} Decomposition method. ``X13`` refers to X13 ARIMA from the US Census, ``loess`` refers to Loess decomposition and ``ma`` refers to moving average decomposition, in all cases as implemented by `statsmodels <https://www.statsmodels.org/dev/tsa.html>`_. force_x13 : bool, default False Whether to try different ``outlier`` and ``trading`` parameters in statsmodels' `x13 arima analysis <https://www.statsmodels.org/dev/ generated/statsmodels.tsa.x13.x13_arima_analysis.html>`_ for each series that fails. If ``False``, jump to the ``fallback`` method for the whole dataframe at the first error. fallback : {'loess', 'ma'} Decomposition method to fall back to if ``method="x13"`` fails and ``force_x13=False``. trading : bool, default True Whether to automatically detect trading days in X13 ARIMA. outlier : bool, default True Whether to automatically detect outliers in X13 ARIMA. x13_binary: str, os.PathLike or None, default 'search' Location of the X13 binary. If ``search`` is used, will attempt to find the binary in the project structure. If ``None``, statsmodels will handle it. search_parents: int, default 0 If ``x13_binary=search``, this parameter controls how many parent directories to go up before recursively searching for the binary. ignore_warnings : bool, default True Whether to suppress X13Warnings from statsmodels. kwargs Keyword arguments passed to statsmodels' ``x13_arima_analysis``, ``STL`` and ``seasonal_decompose``. Returns ------- ``None`` Raises ------ ValueError If the ``method`` parameter does not have a valid argument. ValueError If the ``component`` parameter does not have a valid argument. ValueError If the ``fallback`` parameter does not have a valid argument. ValueError If the ``errors`` parameter does not have a valid argument. FileNotFoundError If the path provided for the X13 binary does not point to a file and ``method='x13'``. """ if self.dataset.empty: raise ValueError( "Can't use transformation methods without " "retrieving a dataset first." ) valid_component = ["seas", "trend"] if component not in valid_component: raise ValueError( f"Only {', '.join(valid_component)} are allowed." f"See underlying 'decompose'." ) output = transform.decompose( self.dataset, component=component, method=method, force_x13=force_x13, fallback=fallback, trading=trading, outlier=outlier, x13_binary=x13_binary, search_parents=search_parents, ignore_warnings=ignore_warnings, errors=self.errors, **kwargs, ) self._dataset = output return self
[docs] def convert( self, flavor: str, start_date: Union[str, datetime, None] = None, end_date: Union[str, datetime, None] = None, ) -> Pipeline: """Convert dataframe from UYU to USD, from UYU to real UYU or from UYU/USD to % GDP. ``flavor=usd``: Convert a dataframe's columns from Uruguayan pesos to US dollars. Call the :mod:`~econuy.core.Pipeline.get` function to obtain nominal exchange rates, and take into account whether the input dataframe's ``Type``, as defined by its multiindex, is flow or stock, in order to ` choose end of period or monthly average NXR. Also take into account the input dataframe's frequency and whether columns represent rolling averages or sums. ``flavor=real``: Convert a dataframe's columns to real prices. Call the :mod:`~econuy.core.Pipeline.get` method to obtain the consumer price index. take into account the input dataframe's frequency and whether columns represent rolling averages or sums. Allow choosing a single period, a range of dates or no period as a base (i.e., period for which the average/sum of input dataframe and output dataframe is the same). ``flavor=gdp``: Convert a dataframe's columns to percentage of GDP. Call the the :mod:`~econuy.core.Pipeline.get` method to obtain UYU and USD quarterly GDP series. Take into account the input dataframe's currency for chossing UYU or USD GDP. If frequency of input dataframe is higher than quarterly, GDP will be upsampled and linear interpolation will be performed to complete missing data. If input dataframe's "Acum." level is not 12 for monthly frequency or 4 for quarterly frequency, calculate rolling input dataframe. In all cases, if input dataframe's frequency is higher than monthly (daily, business, etc.), resample to monthly frequency. Parameters ---------- pipeline : econuy.core.Pipeline or None, default None An instance of the econuy Pipeline class. start_date : str, datetime.date or None, default None Only used if ``flavor=real``. If set to a date-like string or a date, and ``end_date`` is None, the base period will be ``start_date``. end_date : str, datetime.date or None, default None Only used if ``flavor=real``. If ``start_date`` is set, calculate so that the data is in constant prices of ``start_date-end_date``. errors : {'raise', 'coerce', 'ignore'} What to do when a column in the input dataframe is not expressed in Uruguayan pesos. ``raise`` will raise a ValueError, ``coerce`` will force the entire column into ``np.nan`` and ``ignore`` will leave the input column as is. Returns ------- ``None`` Raises ------ ValueError If the ``errors`` parameter does not have a valid argument. ValueError If the input dataframe's columns do not have the appropiate levels. """ if self.dataset.empty: raise ValueError( "Can't use transformation methods without " "retrieving a dataset first." ) if flavor not in ["usd", "real", "gdp", "pcgdp"]: raise ValueError("'flavor' can be one of 'usd', 'real', " "or 'gdp'.") if flavor == "usd": output = transform.convert_usd(self.dataset, errors=self.errors, pipeline=self) elif flavor == "real": output = transform.convert_real( self.dataset, start_date=start_date, end_date=end_date, errors=self.errors, pipeline=self, ) else: output = transform.convert_gdp(self.dataset, errors=self.errors, pipeline=self) self._dataset = output return self
[docs] def rebase( self, start_date: Union[str, datetime], end_date: Union[str, datetime, None] = None, base: Union[float, int] = 100.0, ) -> Pipeline: """Rebase all dataframe columns to a date or range of dates. Parameters ---------- start_date : string or datetime.datetime Date to which series will be rebased. end_date : string or datetime.datetime, default None If specified, series will be rebased to the average between ``start_date`` and ``end_date``. base : float, default 100 Float for which ``start_date`` == ``base`` or average between ``start_date`` and ``end_date`` == ``base``. Returns ------- ``None`` """ if self.dataset.empty: raise ValueError( "Can't use transformation methods without " "retrieving a dataset first." ) output = transform.rebase( self.dataset, start_date=start_date, end_date=end_date, base=base ) self._dataset = output return self
[docs] def rolling(self, window: Optional[int] = None, operation: str = "sum") -> Pipeline: """ Wrapper for the `rolling method <https://pandas.pydata.org/pandas-docs/ stable/reference/api/pandas.DataFrame.rolling.html>`_ in Pandas that integrates with econuy dataframes' metadata. If ``periods`` is ``None``, try to infer the frequency and set ``periods`` according to the following logic: ``{'A': 1, 'Q-DEC': 4, 'M': 12}``, that is, each period will be calculated as the sum or mean of the last year. Parameters ---------- window : int, default None How many periods the window should cover. operation : {'sum', 'mean'} Operation used to calculate rolling windows. Returns ------- ``None`` Raises ------ ValueError If ``operation`` is not one of available options. ValueError If the input dataframe's columns do not have the appropiate levels. Warns ----- UserWarning If the input dataframe is a stock time series, for which rolling operations are not recommended. """ if self.dataset.empty: raise ValueError( "Can't use transformation methods without " "retrieving a dataset first." ) output = transform.rolling(self.dataset, window=window, operation=operation) self._dataset = output return self
[docs] def save(self): """Write held dataset. Raises ------- ValueError If `dataset` is an empty DataFrame or `self.location` is None. """ if self.dataset.empty: raise ValueError("Can't save without " "retrieving a dataset first.") if self.location is None: raise ValueError("No save location defined.") else: operations._io( operation="save", data_loc=self.location, name=self.name, file_fmt=self.save_fmt, multiindex=self.save_header, data=self.dataset, ) return