Source code for spec2nexus.writer

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# -----------------------------------------------------------------------------
# :author:    Pete R. Jemian
# :email:     prjemian@gmail.com
# :copyright: (c) 2014-2020, Pete R. Jemian
#
# Distributed under the terms of the Creative Commons Attribution 4.0 International Public License.
#
# The full license is in the file LICENSE.txt, distributed with this software.
# -----------------------------------------------------------------------------


"""(internal library) Parses SPEC data using spec2nexus.eznx API (only requires h5py)"""


import h5py
import numpy as np

from . import eznx
from . import spec
from . import utils


# see: http://download.nexusformat.org/doc/html/classes/base_classes/index.html
# CONTAINER_CLASS = 'NXlog'          # information that is recorded against time
CONTAINER_CLASS = "NXnote"  # any additional freeform information not covered by the other base classes
# CONTAINER_CLASS = 'NXparameters'   # Container for parameters, usually used in processing or analysis
# CONTAINER_CLASS = 'NXcollection'    # Use NXcollection to gather together any set of terms


# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -


[docs]class Writer(object): """ writes out scans from SPEC data file to NeXus HDF5 file :param obj spec_data: instance of :class:`~spec2nexus.spec.SpecDataFile` """ def __init__(self, spec_data): self.spec = spec_data
[docs] def save(self, hdf_file, scan_list=None): """ save the information in this SPEC data file to a NeXus HDF5 file Each scan in scan_list will be converted to a **NXentry** group. Scan data will be placed in a **NXdata** group where the attribute **signal=1** is the last column and the corresponding attribute **axes=<name of the first column>**. There are variations on this for 2-D and higher dimensionality data, such as mesh scans. In general, the tree structure of the NeXus HDF5 file is:: hdf5_file: NXroot @default="S1" definition="NXspecdata" # attributes S1:NXentry @default="data" # attributes and metadata fields data:NXdata @signal=<name of signal field> @axes=<name(s) of axes of signal> @<axis>_indices=<list of indices in "axis1"> <signal_is_the_last_column>:NX_NUMBER[number of points] = ... data ... @signal=1 @axes='<axis_is_name_of_first_column>' @<axis>_indices=<list of indices in "axis1" used as dimension scales of the "signal"> <axis_is_name_of_first_column>:NX_NUMBER[number of points] = ... data ... # other columns from the scan :param str hdf_file: name of NeXus/HDF5 file to be written :param [int] scanlist: list of scan numbers to be read """ scan_list = scan_list or [] root = eznx.makeFile(hdf_file, **self.root_attributes()) pick_first_entry = True for key in scan_list: nxentry = eznx.makeGroup(root, "S" + str(key), "NXentry") eznx.makeDataset( nxentry, "definition", "NXspecdata", description="NeXus application definition (status pending)", ) self.save_scan(nxentry, self.spec.getScan(key)) if pick_first_entry: pick_first_entry = False eznx.addAttributes(root, default="S" + str(key)) if "data" not in nxentry: # NXentry MUST have a NXdata group with data for default plot nxdata = eznx.makeGroup( nxentry, "data", "NXdata", signal="no_y_data", axes="no_x_data", no_x_data_indices=[0,], ) eznx.makeDataset( nxdata, "no_x_data", (0, 1), units="none", long_name="no data points in this scan", ) eznx.makeDataset( nxdata, "no_y_data", (0, 1), units="none", long_name="no data points in this scan", ) root.close() # be CERTAIN to close the file
[docs] def root_attributes(self): """*internal*: returns the attributes to be written to the root element as a dict""" from spec2nexus._version import get_versions version = get_versions()["version"] header0 = self.spec.headers[0] dd = dict( spec2nexus_version=version, SPEC_file=self.spec.specFile, SPEC_epoch=header0.epoch, SPEC_date=utils.iso8601(header0.date), SPEC_comments="\n".join(header0.comments), SPEC_num_headers=len(self.spec.headers), h5py_version=h5py.__version__, HDF5_Version=h5py.version.hdf5_version, numpy_version=h5py.version.numpy.version.full_version, ) try: c = header0.comments[0] user = c[c.find("User = ") :].split("=")[1].strip() dd["SPEC_user"] = user except Exception: pass return dd
[docs] def save_scan(self, nxentry, scan): """*internal*: save the data from each SPEC scan to its own NXentry group""" scan.interpret() # ensure interpretation is complete eznx.addAttributes(nxentry, default="data") eznx.write_dataset(nxentry, "title", str(scan)) eznx.write_dataset(nxentry, "scan_number", scan.scanNum) eznx.write_dataset(nxentry, "command", scan.scanCmd) for func in scan.header.h5writers.values(): # ask the header plugins to save their part func(nxentry, self, scan.header, nxclass=CONTAINER_CLASS) for func in scan.h5writers.values(): # ask the scan plugins to save their part func(nxentry, self, scan, nxclass=CONTAINER_CLASS)
[docs] def save_dict(self, group, data): """*internal*: store a dictionary""" for k, v in data.items(): self.write_ds(group, k, v)
[docs] def save_data(self, nxdata, scan): """*internal*: store the scan data""" scan_type = scan.scanCmd.split()[0] if scan_type in ("mesh", "hklmesh"): # hklmesh H 1.9 2.1 100 K 1.9 2.1 100 -800000 signal, axes = self.mesh(nxdata, scan) elif scan_type in ("hscan", "kscan", "lscan", "hklscan"): # hklscan 1.00133 1.00133 1.00133 1.00133 2.85 3.05 200 -400000 signal = self.oneD(nxdata, scan)[0] axes = [] h_0, h_N, k_0, k_N, l_0, l_N = scan.scanCmd.split()[1:7] if h_0 != h_N: axes.append("H") if k_0 != k_N: axes.append("K") if l_0 != l_N: axes.append("L") axes = ":".join(axes) else: signal, axes = self.oneD(nxdata, scan) # these locations suggested to NIAC, easier to parse than attached to dataset! # if len(signal) == 0: # pass # Syntax of axes attribute (http://wiki.nexusformat.org/2014_axes_and_uncertainties): # @axes="H:K" INCOREECT # @axes="H", "K" CORRECT if axes.find(":") >= 0: def fixer(s): # h5py requires list of strings to be encoded # see: https://stackoverflow.com/questions/23220513/storing-a-list-of-strings-to-a-hdf5-dataset-from-python return s.encode("ascii", "ignore") axes = list(map(fixer, axes.split(":"))) eznx.addAttributes(nxdata, signal=signal, axes=axes) indices = [ 0, ] # 0-based reference if isinstance(axes, str): eznx.addAttributes(nxdata, **{axes + "_indices": indices}) else: for axis_name in axes: axis_name = axis_name.decode("utf-8") # assume here that "axis_name" has rank=1 # if scan.data[axis_name] != 1: # pass # TODO: and do what? k = "%s%s" % (axis_name, "_indices") eznx.addAttributes(nxdata, **{k: indices})
[docs] def oneD(self, nxdata, scan): """*internal*: generic data parser for 1-D column data, returns signal and axis""" for column in scan.L: self.write_ds(nxdata, column, scan.data[column]) signal = utils.clean_name(scan.column_last) # primary Y axis axis = utils.clean_name(scan.column_first) # primary X axis self.mca_spectra(nxdata, scan, axis) # records any MCA data return signal, axis
[docs] def mca_spectra(self, nxdata, scan, primary_axis_label): """*internal*: parse for optional 2-D MCA spectra""" if spec.MCA_DATA_KEY in scan.data: # calibration for all spectra a, b, c = 0, 0, 0 if hasattr(scan, "MCA"): mca = scan.MCA if "CALIB" in mca: a = mca["CALIB"].get("a", 0) b = mca["CALIB"].get("b", 0) c = mca["CALIB"].get("c", 0) if a == b and b == c and a == 0: a, b, c = 1, 0, 0 # save each spectrum for key, spectrum in sorted( scan.data[spec.MCA_DATA_KEY].items() ): ds_name = "_" + key + "_" axes = primary_axis_label + ":" + ds_name + "channel_" self.write_ds( nxdata, ds_name, spectrum, axes=axes, units="counts" ) # save calibrated channel data for each spectrum, in case spectra are different lengths # _mca_ _mca_channel_ # _mca1_ _mca1_channel_ # _mca2_ _mca2_channel_ # ... channels = np.arange(1, len(spectrum[0]) + 1, dtype=int) _mca_x_ = a + channels * (b + channels * c) self.write_ds( nxdata, ds_name + "channel_", channels, units="channel" ) self.write_ds( nxdata, ds_name + "channel_scaled_x", _mca_x_, units="" )
[docs] def mesh(self, nxdata, scan): """*internal*: data parser for 2-D mesh and hklmesh""" # TODO: refactor to use NeXus data model: signal, axes, data # 2-D parser: http://www.certif.com/spec_help/mesh.html # mesh motor1 start1 end1 intervals1 motor2 start2 end2 intervals2 time # 2-D parser: http://www.certif.com/spec_help/hklmesh.html # hklmesh Q1 start1 end1 intervals1 Q2 start2 end2 intervals2 time # mesh: data/33id_spec.dat scan 22 # hklmesh: data/33bm_spec.dat scan 17 ( label1, _start1, _end1, intervals1, label2, _start2, _end2, intervals2, _time, ) = scan.scanCmd.split()[1:] if label1 not in scan.data: label1 = scan.L[0] # mnemonic v. name if label2 not in scan.data: label2 = scan.L[1] # mnemonic v. name axis1 = scan.data.get(label1) axis2 = scan.data.get(label2) intervals1, intervals2 = map(int, (intervals1, intervals2)) # unused: start1, end1, start2, end2, time = map(float, (start1, end1, start2, end2, time)) if ( len(axis1) < intervals1 ): # stopped scan before second row started signal, axes = self.oneD(nxdata, scan) # fallback support else: axis1 = axis1[0 : intervals1 + 1] axis2 = [ axis2[row] for row in range(len(axis2)) if row % (intervals1 + 1) == 0 ] column_labels = scan.L column_labels.remove(label1) # special handling column_labels.remove(label2) # special handling if scan.scanCmd.startswith("hkl"): # find the reciprocal space axis held constant label3 = [ key for key in ("H", "K", "L") if key not in (label1, label2) ][0] axis3 = scan.data.get(label3)[0] self.write_ds(nxdata, label3, axis3) self.write_ds(nxdata, label1, axis1) # 1-D array self.write_ds(nxdata, label2, axis2) # 1-D array # build 2-D data objects (do not build label1, label2, [or label3] as 2-D objects) data_shape = [len(axis1), len(axis2)] for label in column_labels: if label not in nxdata: axis = np.array(scan.data.get(label)) self.write_ds( nxdata, label, utils.reshape_data(axis, data_shape) ) # else: # pass signal = utils.clean_name(scan.column_last) axes = ":".join([label1, label2]) if spec.MCA_DATA_KEY in scan.data: # 3-D array(s) # save each spectrum for key, spectrum in sorted( scan.data[spec.MCA_DATA_KEY].items() ): num_channels = len(spectrum[0]) data_shape.append(num_channels) mca = np.array(spectrum) data = utils.reshape_data(mca, data_shape) channels = range(1, num_channels + 1) ds_name = "_" + key + "_" self.write_ds( nxdata, ds_name, data, axes=axes + ":" + ds_name + "channel_", units="counts", ) self.write_ds( nxdata, ds_name + "channel_", channels, units="channel" ) return signal, axes
[docs] def write_ds(self, group, label, data, **attr): """*internal*: writes a dataset to the HDF5 file, records the SPEC name as an attribute""" clean_name = utils.clean_name(label) eznx.write_dataset( group, clean_name, data, spec_name=label, **attr )