Source code for ananke.services.collection.importers

"""Module containing all logic to import collections."""
from __future__ import annotations

import logging

from typing import TYPE_CHECKING, List, Type, TypeVar

import pandas as pd

from ananke.configurations.events import Interval
from ananke.models.detector import Detector
from ananke.models.event import Hits, Records, Sources
from ananke.models.interfaces import DataFrameFacade
from ananke.schemas.event import RecordType, SourceType, Types
from tqdm import tqdm


if TYPE_CHECKING:
    from ananke.models.collection import Collection

import os

from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Union


DataFrameFacade_ = TypeVar("DataFrameFacade_", bound=DataFrameFacade)


[docs]class AbstractCollectionImporter(ABC): """Abstract parent class for collection importers.""" def __init__(self, collection: Collection): """Constructor of the Abstract Collection Importer. Args: collection: Path to collection or collection """ self.collection = collection self.logger = logging.getLogger(self.__class__.__name__)
[docs] @abstractmethod def import_data(self, **kwargs) -> None: """Abstract stub for the import of a collection. Args: kwargs: Additional importer args Returns: Imported collection """ pass
# class JuliaArrowCollectionImporter(AbstractCollectionImporter): # """Concrete implementation for Julia Arrow imports.""" # # def __read_file(self, filename: Union[str, bytes, os.PathLike]): # # with pa.ipc.open_file(filename) as reader: # df = reader.read_pandas() # print(df.dtypes) # # def import_data( # self, # import_path: Union[str, bytes, os.PathLike], # **kwargs: object # ) -> None: # """Import of a julia arrow collection. # # Args: # import_path: File path to import # **kwargs: Additional importer args # """ # directory = os.fsencode(import_path) # for file in os.listdir(directory): # filename = os.path.join(import_path, os.fsdecode(file)) # if filename.endswith(".arrow"): # self.__read_file(filename)
[docs]class LegacyCollectionKeys(str, Enum): """Enum containing keys for the collection file.""" RECORDS = "records" HITS = "hits" SOURCES = "sources" DETECTOR = "detector"
[docs]class LegacyCollection: """Class combining all data frames to a complete record collection.""" def __init__( self, data_path: str, complevel: int = 3, complib: str = "lzo", read_only: bool = False, ): """Constructor for the collection. Args: data_path: Path to store/read collection from records: Optional records to store directly detector: Optional detector to store directly complevel: Compression level for data file complib: Compression Algorithm to use read_only: Ensure that no data is written """ logging.debug("Instantiated collection with path {}".format(data_path)) file_extensions = (".hdf", ".h5") if not str(data_path).lower().endswith(file_extensions): raise ValueError( "Only {} and {} are supported file extensions".format( file_extensions[0], file_extensions[1] ) ) self.data_path = data_path self.complevel = complevel self.complib = complib self.read_only = read_only self.store: pd.HDFStore = self.__get_store() def __get_store(self) -> pd.HDFStore: """Gets the stored hdf object. Returns: stored hdf object. """ mode = "a" if self.read_only: mode = "r" return pd.HDFStore(self.data_path, mode=mode) def __del__(self): """Close store on delete.""" self.store.close() def __get_hdf_path( self, collection_key: LegacyCollectionKeys, record_id: int | str | pd.Series ) -> str: """Gets a proper hdf path for all subgrouped datasets. Args: collection_key: collection key to start with record_id: record id of the record to save Returns: string combining the two elements """ if not ( collection_key == LegacyCollectionKeys.HITS or collection_key == LegacyCollectionKeys.SOURCES ): raise ValueError("Paths only possible for hits and sources.") if type(record_id) == pd.Series: record_id = record_id.astype(str) elif type(record_id) != str: record_id = str(record_id) return "/{key}/".format(key=collection_key.value) + record_id def __get_data( self, key: str, facade_class: Type[DataFrameFacade_], where: Optional[list] = None, ) -> Optional[DataFrameFacade_]: """Gets data frame facade from file. Args: key: Key to get data by facade_class: Data frame facade class to instantiate Returns: Data frame facade containing data or None """ logging.debug( "Get {} with key {} at '{}'".format(facade_class, key, self.data_path) ) try: store = self.store df = pd.DataFrame(store.select(key=key, where=where)) if "type" in df.columns: new_dict = {} for type in RecordType: new_dict[type.name.lower()] = type.value for type in SourceType: new_dict[type.name.lower()] = type.value df["type"] = df["type"].map(new_dict) facade = facade_class(df=df) return facade except KeyError: return None
[docs] def get_detector(self) -> Optional[Detector]: """Gets detector of the collection. Returns: Detector of the collection. """ return self.__get_data(key=LegacyCollectionKeys.DETECTOR, facade_class=Detector)
[docs] def get_records( self, record_type: Optional[Union[List[Types], Types]] = None ) -> Optional[Records]: """Gets records of the collection. Args: record_type: record type to include Returns: Records of the collection. """ where = None if record_type is not None: if type(record_type) is not list: record_type = [record_type] wheres = ["type={}".format(current_type) for current_type in record_type] where = "({})".format(" & ".join(wheres)) return self.__get_data( key=LegacyCollectionKeys.RECORDS, facade_class=Records, where=where )
def __get_subgroup_dataset( self, base_key: LegacyCollectionKeys, facade_class: Type[DataFrameFacade_], record_id: int | str, interval: Optional[Interval] = None, ) -> Optional[DataFrameFacade_]: """Gets data of group based on record id. Args: base_key: key of the subgroup facade_class: Data frame facade class to instantiate record_id: Record id to get hits from Returns: Data frame facade with the data from disc. """ key = self.__get_hdf_path(base_key, record_id) # TODO: Check right place where = None if interval is not None: where = "(time < {end_time} & time >= {start_time})".format( end_time=interval.end, start_time=interval.start ) return self.__get_data(key=key, facade_class=facade_class, where=where) # TODO: Allow multiple or all
[docs] def get_sources( self, record_id: int | str, interval: Optional[Interval] = None ) -> Optional[Sources]: """Gets sources by a specific record id. Args: record_id: Record id to get hits from interval: Interval to get hits in Returns: None if not available or sources for record id """ return self.__get_subgroup_dataset( base_key=LegacyCollectionKeys.SOURCES, facade_class=Sources, record_id=record_id, interval=interval, )
# TODO: Allow multiple or all
[docs] def get_hits( self, record_id: int | str, interval: Optional[Interval] = None ) -> Optional[Hits]: """Gets hits by a specific record id. Args: record_id: Record id to get hits from interval: Interval to get hits in Returns: None if not available or hits for record id """ return self.__get_subgroup_dataset( base_key=LegacyCollectionKeys.HITS, facade_class=Hits, record_id=record_id, interval=interval, )
[docs]class LegacyCollectionImporter(AbstractCollectionImporter): """Class to import legacy collection to current one."""
[docs] def import_data( self, import_path: Union[str, bytes, os.PathLike], **kwargs ) -> None: """Imports data from legacy collection. Args: import_path: Path to import data from **kwargs: additional args for data import. """ self.logger.info( "Starting Legacy import from path '{}'".format(str(import_path)) ) legacy_collection = LegacyCollection(import_path) legacy_detector = legacy_collection.get_detector() legacy_records = legacy_collection.get_records() if legacy_records is None: raise ValueError("No records to import") number_of_records = len(legacy_records) with self.collection: if legacy_detector is not None: self.collection.storage.set_detector(legacy_detector) new_ids = self.collection.storage.get_next_record_ids(number_of_records) legacy_records_ids = legacy_records.record_ids legacy_records.df["record_id"] = new_ids if legacy_records is not None: self.collection.storage.set_records(legacy_records) with tqdm(total=len(legacy_records), mininterval=0.5) as pbar: for index, legacy_record_id in enumerate(legacy_records_ids): new_id = new_ids[index] legacy_hits = legacy_collection.get_hits( record_id=legacy_record_id ) legacy_sources = legacy_collection.get_sources( record_id=legacy_record_id ) if legacy_hits is not None: legacy_hits.df["record_id"] = new_id self.collection.storage.set_hits(legacy_hits) if legacy_sources is not None: legacy_sources.df["record_id"] = new_id self.collection.storage.set_sources(legacy_sources) pbar.update()