"""
Base functions to load data from various sources.
.. autosummary::
~load_spec
~load_csv
~load_databroker
~load_table
~is_Bluesky_specfile
~db_query
~show_meta
~collect_meta
~lookup_position
"""
# Copyright (c) 2020-2021, UChicago Argonne, LLC.
# See LICENSE file for details.
from pandas import read_csv, DataFrame
from os.path import join
from spec2nexus.spec import SpecDataFile
from warnings import warn
import copy
from datetime import datetime
from collections import OrderedDict
from pyRestTable import Table
from databroker.queries import TimeRange
from apstools.utils import getDatabase
from polartools.area_detector_handlers import EigerHandler, LambdaHDF5Handler
# TODO: This should be just temp fix
LambdaHDF5Handler.specs = {"AD_HDF5_lambda"} | LambdaHDF5Handler.specs
[docs]
def load_catalog(name=None, query=None, handlers=None):
"""
Loads a databroker catalog and register data handlers.
Parameters
----------
name : str, optional
Name of the database. Defaults to 4-ID-D name.
query : dict, optional
Dictionary with search parameters for the database.
handlers : dict, optional
Dictionary organized as {handler_name: handler_class}. If None,
defaults to handlers used at 4-ID-D.
Returns
-------
cat : databroker catalog
Catalog after running the query, and registering the handler.
"""
cat = getDatabase(catalog_name=name)
if query is not None:
cat = db_query(cat, query)
if handlers is None:
handlers = dict(
AD_HDF5_Lambda250k_APSPolar=LambdaHDF5Handler,
AD_HDF5_lambda=LambdaHDF5Handler, # Temporary fix
AD_EIGER_APSPolar=EigerHandler,
)
for name, handler in handlers.items():
cat.register_handler(name, handler, overwrite=True)
return cat
[docs]
def load_spec(scan_id, spec_file, folder=""):
"""
Load data from spec file.
If `spec_file` is the file name, it will load the spec file internally
which is time consuming.
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved.
spec_file : string or spec2nexus.spec.SpecDataFile
Either the spec file name or a SpecDataFile instance.
folder : string, optional
Folder where spec file is located.
Returns
-------
data : pandas.DataFrame
Table with the data from scan.
See also
--------
:func:`spec2nexus.spec.SpecDataFile`
"""
if isinstance(spec_file, str):
path = join(folder, spec_file)
spec_file = SpecDataFile(path)
return DataFrame(spec_file.getScan(scan_id).data)
[docs]
def load_csv(scan_id, folder="", name_format="scan_{}_primary.csv"):
"""
Load data from the 'primary' stream from exported csv files.
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved.
folder : string, optional
Folder where csv files are located.
name_format : string, optional
General format of file name. The correct name must be
retrievable through: `file_name_format.format(scan_id)`
Returns
-------
data : pandas.DataFrame
Table with the data from the primary stream.
See also
--------
:func:`pandas.read_csv`
"""
return read_csv(join(folder, name_format.format(scan_id)))
[docs]
def load_databroker(
scan_id, db=None, stream="primary", query=None, use_db_v1=True
):
"""
Load data of the first scan with the provided scan_id.
Currently defaults to databroker.v1 because it is faster. See issue #28.
For further details, refer to the `databroker` `documentation`_.
.. _documentation: https://blueskyproject.io/databroker/
Parameters
----------
scan_id : int
Scan_id of the scan to be retrieved
db :
`databroker` database
stream : string, optional
Selects the stream from which data will be loaded.
query : dict, optional
Dictionary with search parameters for the database.
use_db_v1 : bool, optional
Chooses databroker API version between 'v1' or 'v2', defaults to 'v1'.
Returns
-------
data : pandas.DataFrame
Table with the data from the primary stream.
"""
db = getDatabase(db=db)
_db = db_query(db, query) if query else db
if use_db_v1:
if stream in _db.v1[scan_id].stream_names:
return _db.v1[scan_id].table(stream_name=stream)
else:
raise ValueError(
f"The stream {stream} does not exist in scan {scan_id}."
)
else:
try:
return getattr(_db.v2[scan_id], stream).read().to_dataframe()
except AttributeError:
raise ValueError(
f"The stream {stream} does not exist in scan {scan_id}."
)
[docs]
def load_table(scan, source=None, **kwargs):
"""
Automated scan table loader.
The automation is based on the source argument.
- if source == 'csv' -> uses `load_csv`.
- else if source is a string or nexus2spec.spec.SpecDataFile -> uses\
`load_spec`.
- else -> uses `load_databroker`.
Parameters
----------
scan : int
Scan_id our uid. If scan_id is passed, it will load the last scan with
that scan_id. See kwargs for search options.
source : databroker database, name of the spec file, or 'csv'
Note that applicable kwargs depend on this selection.
kwargs :
The necessary kwargs are passed to the loading functions defined by the
`source` argument:
- csv -> possible kwargs: folder, name_format.
- spec -> possible kwargs: folder.
- databroker -> possible kwargs: stream, query, use_db_v1.
Note that a warning will be printed if the an unnecessary kwarg is
passed.
Returns
-------
table : pandas.DataFrame
Table with the scan data.
See also
--------
:func:`polartools.load_data.load_databroker`
:func:`polartools.load_data.load_csv`
:func:`polartools.load_data.load_spec`
"""
_kwargs = copy.deepcopy(kwargs)
folder = _kwargs.pop("folder", "")
if source == "csv":
name_format = kwargs.pop("name_format", "scan_{}_primary.csv")
table = load_csv(scan, folder=folder, name_format=name_format)
elif isinstance(source, str) or isinstance(source, SpecDataFile):
table = load_spec(scan, source, folder=folder)
else:
stream = _kwargs.pop("stream", "primary")
query = _kwargs.pop("query", None)
use_db_v1 = _kwargs.pop("use_db_v1", True)
table = load_databroker(
scan, db=source, stream=stream, query=query, use_db_v1=use_db_v1
)
if len(_kwargs) != 0:
warn(f"The following kwargs were not used! {list(_kwargs.keys())}")
return table
[docs]
def is_Bluesky_specfile(source, folder=""):
"""
Check if the specfile was created by Bluesky.
It looks for a "Bluesky" comment in the file header.
Parameters
----------
source : string or spec2nexus.spec.SpecDataFile
Either the spec file name or a SpecDataFile instance.
folder : string, optional
Folder where spec file is located.
Returns
-------
y : bool
True if spec_file was generated by Bluesky, False otherwise.
See also
--------
:func:`spec2nexus.spec.SpecDataFile`
"""
if isinstance(source, str):
path = join(folder, source)
source = SpecDataFile(path)
if len(source.headers[0].comments):
return source.headers[0].comments[0].startswith("Bluesky")
else:
return False
[docs]
def db_query(db, query):
"""
Searches the databroker v2 database.
Parameters
----------
db :
`databroker` database.
query: dict
Search parameters.
Returns
-------
_db :
Subset of db that satisfy the search parameters. Note that it has the
same format as db.
See also
--------
:func:`databroker.catalog.search`
"""
since = query.pop("since", None)
until = query.pop("until", None)
if since or until:
if not since:
since = "2010"
if not until:
until = "2050"
_db = db.v2.search(TimeRange(since=since, until=until))
else:
_db = db
if len(query) != 0:
_db = _db.v2.search(query)
return _db
def _flatten_list(inp):
"""Only handles lists to second level"""
output = []
for item in inp:
if isinstance(item, list):
output += [i for i in item]
else:
output.append(item)
return output
[docs]
def lookup_position(db, scan, search_string="", query=None):
"""
Lookup positioner values in past scans.
Parameters
----------
db : databroker database
Searcheable database
scan : integer
Scan numbers or uids.
search_string : string
Full or part of positioner name.
query: dict
Search parameters.
Returns
-------
output: list
"""
db_range = db_query(db, query=query) if query else db
baseline = load_databroker(scan, db_range, "baseline", use_db_v1=True)
if len(baseline["time"]) == 2:
date1 = baseline["time"][1].strftime("%m/%d/%y %H:%M:%S")
date2 = baseline["time"][2].strftime("%m/%d/%y %H:%M:%S")
print("=".center(100, "="))
print(f"{'Positioner':>50}{date1:>25}{date2:>25}")
print("-".center(100, "-"))
for key in baseline.keys():
if search_string in key:
if isinstance(baseline[key][1], list):
print(f"{key:>50}{baseline[key][1]}{baseline[key][2]}")
else:
print(
f"{key:>50}{baseline[key][1]:>25}{baseline[key][2]:>25}"
)
else:
date1 = baseline["time"][1].strftime("%m/%d/%y %H:%M:%S")
print("=".center(100, "="))
print(f"{'Positioner':>50}{date1:>25}")
print("-".center(100, "-"))
for key in baseline.keys():
if search_string in key:
if isinstance(baseline[key][1], list):
print(f"{key:>50}{baseline[key][1]}")
else:
print(f"{key:>50}{baseline[key][1]:>25}")
print("-".center(100, "-"))