Source code for econuy.session

from __future__ import annotations
import logging
import copy
import traceback
from datetime import datetime
from os import PathLike
from pathlib import Path
from typing import Optional, Union, Sequence, Dict, List

import pandas as pd
from sqlalchemy.engine.base import Connection, Engine

from econuy import transform
from econuy.core import Pipeline
from econuy.utils import operations
from econuy.utils import logging as log_utils
from econuy.utils.exceptions import RetryLimitError
from econuy.utils.operations import DATASETS


[docs]class Session(object): """ A download and transformation session that creates a Pipeline object and simplifies working with multiple datasets. Alternatively, can be created directly from a Pipeline by using the :mod:`~econuy.session.Session.from_pipeline` class method. Attributes ---------- location : str, os.PathLike, SQLAlchemy Connection or Engine, or None, \ default None Either Path or path-like string pointing to a directory where to find a CSV for updating and saving, SQLAlchemy connection or engine object, or ``None``, don't save or update. download : bool, default True If False the ``get`` method will only try to retrieve data on disk. always_Save : bool, default True If True, save every retrieved dataset to the specified ``location``. read_fmt : {'csv', 'xls', 'xlsx'} File format of previously downloaded data. Ignored if ``location`` points to a SQL object. save_fmt : {'csv', 'xls', 'xlsx'} File format for saving. Ignored if ``location`` points to a SQL object. read_header : {'included', 'separate', None} Location of dataset metadata headers. 'included' means they are in the first 9 rows of the dataset. 'separate' means they are in a separate Excel sheet (if ``read_fmt='csv'``, headers are discarded). None means there are no metadata headers. save_header : {'included', 'separate', None} Location of dataset metadata headers. 'included' means they will be set as the first 9 rows of the dataset. 'separate' means they will be saved in a separate Excel sheet (if ``save_fmt='csv'``, headers are discarded). None discards any headers. errors : {'raise', 'coerce', 'ignore'} How to handle errors that arise from transformations. ``raise`` will raise a ValueError, ``coerce`` will force the data into ``np.nan`` and ``ignore`` will leave the input data as is. log : {str, 0, 1, 2} Controls how logging works. ``0``: don't log; ``1``: log to console; ``2``: log to console and file with default file; ``str``: log to console and file with filename=str logger : logging.Logger, default None Logger object. For most cases this attribute should be ``None``, allowing :attr:`log` to control how logging works. max_retries : int, default 3 Number of retries for :mod:`get` in case any of the selected datasets cannot be retrieved. """ def __init__( self, location: Union[str, PathLike, Connection, Engine, None] = None, download: bool = True, always_save: bool = True, read_fmt: str = "csv", read_header: Optional[str] = "included", save_fmt: str = "csv", save_header: Optional[str] = "included", errors: str = "raise", log: Union[int, str] = 1, logger: Optional[logging.Logger] = None, max_retries: int = 3, ): self.location = location self.download = download self.always_save = always_save self.read_fmt = read_fmt self.read_header = read_header self.save_fmt = save_fmt self.save_header = save_header self.errors = errors self.log = log self.logger = logger self.max_retries = max_retries self._datasets = {} self._retries = 1 if logger is not None: self.log = "custom" else: if isinstance(log, int) and (log < 0 or log > 2): raise ValueError( "'log' takes either 0 (don't log info)," " 1 (log to console), 2 (log to console and" " default file), or str (log to console and " "file with filename=str)." ) elif log == 2: logfile = Path(self.location) / "info.log" log_obj = log_utils.setup(file=logfile) elif isinstance(log, str) and log != "custom": logfile = (Path(self.location) / log).with_suffix(".log") log_obj = log_utils.setup(file=logfile) elif log == 1: log_obj = log_utils.setup(file=None) else: log_obj = log_utils.setup(null=True) self.logger = log_obj
[docs] @classmethod def from_pipeline(cls, pipeline: Pipeline) -> Session: # Alternative constructor s = Session( location=pipeline.location, download=pipeline.download, always_save=pipeline.location, read_fmt=pipeline.read_fmt, read_header=pipeline.read_header, save_fmt=pipeline.read_fmt, save_header=pipeline.save_header, errors=pipeline.errors, ) if not pipeline.dataset.empty: s._datasets = {pipeline.name: pipeline.dataset} return s
@property def pipeline(self) -> Pipeline: # Define a property so that changes to Session attributes are passed # down to the Pipeline and taken into account in get(). p = Pipeline( location=self.location, download=self.download, always_save=self.always_save, read_fmt=self.read_fmt, read_header=self.read_header, save_fmt=self.save_fmt, save_header=self.save_header, errors=self.errors, ) return p @property def datasets(self) -> Dict[str, pd.DataFrame]: """Holds retrieved datasets. Returns ------- Datasets : Dict[str, pd.DataFrame] """ return self._datasets @property def datasets_flat(self) -> Dict[str, pd.DataFrame]: """Holds retrieved datasets. Returns ------- Datasets : Dict[str, pd.DataFrame] """ nometa = copy.deepcopy(self._datasets) for v in nometa.values(): v.columns = v.columns.get_level_values(0) return nometa def __repr__(self): return ( f"Session(location={self.location})\n" f"Current dataset(s): {list(self.datasets.keys())}" )
[docs] def copy(self, deep: bool = False) -> Session: """Copy or deepcopy a Session object. Parameters ---------- deep : bool, default True If True, deepcopy. Returns ------- :class:`~econuy.session.Session` """ if deep: return copy.deepcopy(self) else: return copy.copy(self)
@property def available_datasets(self) -> Dict[str, Dict]: # functions: bool = False """Return available ``dataset`` arguments for use in :mod:`~econuy.session.Session.get`. Returns ------- Dataset : Dict[str, Dict] """ return { name: metadata for name, metadata in DATASETS.items() if not metadata["disabled"] and not metadata["auxiliary"] } def _select_datasets(self, select: Union[str, int, Sequence[str], Sequence[int]]) -> List[str]: """Generate list of dataset names based on selection. Parameters ---------- select : str, int, Sequence[str] or Sequence[int], default "all" Datasets in :attr:`datasets` to apply transformation on. Can be defined with their names or index position. Returns ------- List[str] List of dataset names. Raises ------ ValueError If names and indexes are combined. """ keys = list(self.datasets.keys()) if isinstance(select, Sequence) and not isinstance(select, str): if not all(isinstance(select[i], type(select[0])) for i in range(len(select))): raise ValueError("`select` must be all `int` or all `str`") if isinstance(select[0], int): proc_select = [keys[i] for i in select] else: proc_select = [i for i in keys if i in select] elif isinstance(select, int): proc_select = [keys[select]] elif select == "all": proc_select = keys else: proc_select = [select] return proc_select def _apply_transformation( self, transformation: str, select: Union[str, int, Sequence[str], Sequence[int]] = "all", **kwargs, ) -> Dict[str, pd.DataFrame]: """Helper method to apply transformations on :attr:`datasets`. Parameters ---------- transformation : {'resample', 'chg_diff', 'convert', 'decompose', \ 'rolling', 'rebase'} String representing transformation methods in :class:`~econuy.retrieval.core.Pipeline`. select : str, int, Sequence[str] or Sequence[int], default "all" Datasets in :attr:`datasets` to apply transformations on. Returns ------- Transformed datasets : Dict[str, pd.DataFrame] """ p = self.pipeline.copy() methods = { "resample": p.resample, "chg_diff": p.chg_diff, "convert": p.convert, "decompose": p.decompose, "rolling": p.rolling, "rebase": p.rebase, } proc_select = self._select_datasets(select=select) new_kwargs = {} for k, v in kwargs.items(): if not isinstance(v, Sequence) or isinstance(v, str): new_kwargs.update({k: [v] * len(proc_select)}) elif len(v) != len(proc_select): raise ValueError(f"Wrong number of arguments for '{k}'") else: new_kwargs.update({k: v}) output = self.datasets.copy() for i, name in enumerate(proc_select): current_kwargs = {k: v[i] for k, v in new_kwargs.items()} p._dataset = self.datasets[name] methods[transformation](**current_kwargs) transformed = p.dataset output.update({name: transformed}) return output
[docs] def get(self, names: Union[str, Sequence[str]]) -> Session: """ Main download method. Parameters ---------- names : Union[str, Sequence[str]] Dataset to download, see available options in :mod:`~econuy.session.Session.available_datasets`. Either a string representing a dataset name or a sequence of strings in order to download several datasets. Raises ------ ValueError If an invalid string is found in the ``names`` argument. """ if isinstance(names, str): names = [names] if any(x not in self.pipeline.available_datasets.keys() for x in names): raise ValueError("Invalid dataset selected.") # Deepcopy the Pipeline so that its dataset attribute is not # overwritten each time it's accessed within this method. p = self.pipeline.copy() failed = [] not_failed = [] for name in names: try: p.get(name=name) self._datasets.update({name: p.dataset}) not_failed.append(name) except BaseException: traceback.print_exc() failed.append(name) continue if len(failed) > 0: if self._retries < self.max_retries: self._retries += 1 self.logger.info( f"Failed to retrieve {', '.join(failed)}. " f"Retrying (run {self._retries})." ) self.get(names=failed) else: self.logger.info(f"Could not retrieve {', '.join(failed)}") self._retries = 1 raise RetryLimitError(f"Maximum retries ({self.max_retries})" f" reached.") self._retries = 1 return self._retries = 1 return self
[docs] def get_bulk(self, names: str) -> Session: """ Get datasets in bulk. Parameters ---------- names : {'all', 'original', 'custom', 'economic_activity', \ 'prices', 'fiscal_accounts', 'labor', 'external_sector', \ 'financial_sector', 'income', 'international', 'regional'} Type of data to download. `all` gets all available datasets, `original` gets all original datatsets and `custom` gets all custom datasets. The remaining options get all datasets for that area. Raises ------ ValueError If an invalid string is given to the ``names`` argument. """ valid_datasets = [ "all", "original", "custom", "activity", "prices", "fiscal", "labor", "external", "financial", "income", "global_", "regional", ] if names not in valid_datasets: raise ValueError(f"'names' can only be one of " f"{', '.join(valid_datasets)}.") original_datasets = { name: metadata for name, metadata in self.available_datasets.items() if not metadata["custom"] } custom_datasets = { name: metadata for name, metadata in self.available_datasets.items() if metadata["custom"] } if names == "original": self.get(names=list(original_datasets)) elif names == "custom": self.get(names=list(custom_datasets)) elif names == "all": self.get(names=list(self.available_datasets)) else: datasets = [ name for name, metadata in self.available_datasets.items() if names == metadata["area"] ] self.get(names=datasets) return self
[docs] def resample( self, rule: Union[pd.DateOffset, pd.Timedelta, str, List], operation: Union[str, List] = "sum", interpolation: Union[str, List] = "linear", warn: Union[bool, List] = False, select: Union[str, int, Sequence[str], Sequence[int]] = "all", ) -> Session: """ Resample to target frequencies. See Also -------- :mod:`~econuy.core.Pipeline.resample` """ output = self._apply_transformation( select=select, transformation="resample", rule=rule, operation=operation, interpolation=interpolation, warn=warn, ) self._datasets = output return self
[docs] def chg_diff( self, operation: Union[str, List] = "chg", period: Union[str, List] = "last", select: Union[str, int, Sequence[str], Sequence[int]] = "all", ) -> Session: """ Calculate pct change or difference. See Also -------- :mod:`~econuy.core.Pipeline.chg_diff`. """ output = self._apply_transformation( select=select, transformation="chg_diff", operation=operation, period=period ) self._datasets = output return self
[docs] def decompose( self, component: Union[str, List] = "seas", method: Union[str, List] = "x13", force_x13: Union[bool, List] = False, fallback: Union[str, List] = "loess", trading: Union[bool, List] = True, outlier: Union[bool, List] = True, x13_binary: Union[str, PathLike, List] = "search", search_parents: Union[int, List] = 0, ignore_warnings: Union[bool, List] = True, select: Union[str, int, Sequence[str], Sequence[int]] = "all", **kwargs, ) -> Session: """ Apply seasonal decomposition. See Also -------- :mod:`~econuy.core.Pipeline.decompose`. """ valid_component = ["seas", "trend"] if component not in valid_component: raise ValueError( f"Only {', '.join(valid_component)} are allowed." f"See underlying 'decompose'." ) output = self._apply_transformation( select=select, transformation="decompose", component=component, method=method, force_x13=force_x13, fallback=fallback, trading=trading, outlier=outlier, x13_binary=x13_binary, search_parents=search_parents, ignore_warnings=ignore_warnings, **kwargs, ) self._datasets = output return self
[docs] def convert( self, flavor: Union[str, List], start_date: Union[str, datetime, None, List] = None, end_date: Union[str, datetime, None, List] = None, select: Union[str, int, Sequence[str], Sequence[int]] = "all", ) -> Session: """Convert to other units. See Also -------- :mod:`~econuy.core.Pipeline.convert`. """ if flavor not in ["usd", "real", "gdp", "pcgdp"]: raise ValueError("'flavor' can be one of 'usd', 'real', " "or 'gdp'.") if flavor == "real": output = self._apply_transformation( select=select, transformation="convert", flavor=flavor, start_date=start_date, end_date=end_date, ) else: output = self._apply_transformation( select=select, transformation="convert", flavor=flavor ) self._datasets = output return self
[docs] def rebase( self, start_date: Union[str, datetime, List], end_date: Union[str, datetime, None, List] = None, base: Union[float, List] = 100.0, select: Union[str, int, Sequence[str], Sequence[int]] = "all", ) -> Session: """ Scale to a period or range of periods. See Also -------- :mod:`~econuy.core.Pipeline.rebase`. """ output = self._apply_transformation( select=select, transformation="rebase", start_date=start_date, end_date=end_date, base=base, ) self._datasets = output return self
[docs] def rolling( self, window: Union[int, List, None] = None, operation: Union[str, List] = "sum", select: Union[str, int, Sequence[str], Sequence[int]] = "all", ) -> Session: """ Calculate rolling averages or sums. See Also -------- :mod:`~econuy.core.Pipeline.rolling`. """ output = self._apply_transformation( select=select, transformation="rolling", window=window, operation=operation ) self._datasets = output return self
[docs] def concat( self, select: Union[str, int, Sequence[str], Sequence[int]] = "all", concat_name: Optional[str] = None, force_suffix: bool = False, ) -> Session: """ Concatenate datasets in :attr:`datasets` and add as a new dataset. Resample to lowest frequency of selected datasets. Parameters ---------- select : str, int, Sequence[str] or Sequence[int], default "all" Datasets to concatenate. concat_name : Optional[str], default None Name used as a key for the output dataset. The default None sets the name to "concat_{dataset_1_name}_..._{dataset_n_name}". force_suffix : bool, default False Whether to include each dataset's full name as a prefix in all indicator columns. """ proc_select = self._select_datasets(select=select) proc_select = [x for x in proc_select if "concat_" not in x] selected_datasets = [d for n, d in self.datasets.items() if n in proc_select] indicator_names = [ col for df in selected_datasets for col in df.columns.get_level_values(0) ] if len(indicator_names) > len(set(indicator_names)) or force_suffix is True: for n, d in zip(proc_select, selected_datasets): full_name = self.available_datasets[n]["description"] d.columns = d.columns.set_levels( f"{full_name}_" + d.columns.get_level_values(0), level=0 ) freqs = [pd.infer_freq(df.index) for df in selected_datasets] if all(freq == freqs[0] for freq in freqs): combined = pd.concat(selected_datasets, axis=1) else: for freq_opt in ["A-DEC", "A", "Q-DEC", "Q", "M", "2W-SUN", "W-SUN"]: if freq_opt in freqs: output = [] for df in selected_datasets: freq_df = pd.infer_freq(df.index) if freq_df == freq_opt: df_match = df.copy() else: type_df = df.columns.get_level_values("Tipo")[0] unit_df = df.columns.get_level_values("Unidad")[0] if type_df == "Stock": df_match = transform.resample(df, rule=freq_opt, operation="last") elif type_df == "Flujo" and not any( x in unit_df for x in ["%", "=", "Cambio"] ): df_match = transform.resample(df, rule=freq_opt, operation="sum") else: df_match = transform.resample(df, rule=freq_opt, operation="mean") output.append(df_match) combined = pd.concat(output, axis=1) break else: continue if concat_name is None: concat_name = "concat_" + "_".join(proc_select) elif not concat_name.startswith("concat_"): concat_name = "concat_" + concat_name else: pass self._datasets.update({concat_name: combined}) return self
[docs] def save(self, select: Union[str, int, Sequence[str], Sequence[int]] = "all"): """Write datasets. Parameters ---------- select : str, int, Sequence[str] or Sequence[int], default "all" Datasets to save. Raises ------ ValueError If `self.location` is None. """ if self.location is None: raise ValueError("No save location defined.") proc_select = self._select_datasets(select=select) for name, dataset in self.datasets.items(): if name in proc_select: operations._io( operation="save", data_loc=self.location, data=dataset, name=name, file_fmt=self.save_fmt, multiindex=self.save_header, ) else: continue return