read a list of SPEC data files (or directories) and plot images of all scans

The images are stored in files within a directory structure
that is organized chronologically,
such as: ``yyyy/mm/spec_file/s1.svg``.
The root of the directory is either specified by the
command line ``-d`` option or  defaults to the current
working directory.  The ``yyyy/mm`` (year and month) are
taken from the ``#D`` line of the SPEC data file.
The ``spec_file`` is the file name with file extension
and directory name removed.  The image file names are
derived from the scan numbers.

    # every five minutes (generates no output from outer script)
    0-59/5 * * * *  /some/directory/ -d /web/page/dir /spec/data/file/dir

If this script is called too frequently and the list of plots to be generated
is large enough, it is possible for more than one process to be running.
In one extreme case, many processes were found running due to problems with the data files.
To identify and stop all processes of this program::

    kill -9 `ps -ef | grep python | awk '/ {print $2}' -`


import datetime
import getpass
import json
import logging
import os
import shutil
import socket
import sys

from . import __version__
from . import spec
from . import specplot

except NameError:
    FileNotFoundError = IOError  # py27 compatibility

MTIME_CACHE_FILE = "mtime_cache.json"
PLOT_TYPE = ".svg"
HTML_INDEX_FILE = "index.html"

[docs]class DirectoryNotFoundError(ValueError): """Exception: The requested directory does not exist"""
[docs]class PathIsNotDirectoryError(ValueError): """Exception: The path is not a directory"""
[docs]class PlotSpecFileScans(object): """ read a SPEC data file and plot thumbnail images of all its scans :param [str] filelist: list of SPEC data files to be checked :param str plotDir: name of base directory to store output image thumbnails .. autosummary:: ~specFileUpdated ~plot_all_scans ~getPlotDir ~getBaseDir ~href_format """ def __init__( self, filelist, plotDir=None, reverse_chronological=False ): """Read SPEC files and plot all scans.""" self.filelist = filelist self.plotDir = plotDir or os.getcwd() self.reversed = reverse_chronological for specFile in filelist: self.plot_all_scans(specFile)
[docs] def specFileUpdated(self, specFile): """ Report if specFile has been updated. Return mtime cache entry (or `None` if not updated) """ result = None if os.path.exists(specFile): mtime_cache = Cache_File_Mtime(self.plotDir) if mtime_cache.was_file_updated(specFile): result = mtime_cache.get(specFile) return result
[docs] def plot_all_scans(self, specFile): """Plot all the recognized scans from file ``specFile``.""" if not spec.is_spec_file(specFile): raise spec.NotASpecDataFile(specFile) try: logger("SPEC data file: %s" % specFile) sd = specplot.openSpecFile(specFile) except FileNotFoundError: return # could not open file, be silent about it if len(sd.headers) == 0: # no scan header found, again, silence return plot_path = self.getPlotDir(specFile) logger(" plot directory: %s" % plot_path) if not os.path.exists(plot_path): os.makedirs(plot_path) logger("creating directory: " + plot_path) # list the plots we expect scans = {} for scan_n in sd.getScanNumbers(): spec_scan = sd.getScan(scan_n) # make certain that plot files will sort lexically: S1 --> s00001 base = "s%05s" + PLOT_TYPE basePlotFile = (base % str(spec_scan.scanNum)).replace( " ", "0" ) altText = ( "#" + str(spec_scan.scanNum) + ": " + spec_scan.scanCmd ) href = self.href_format(basePlotFile, altText) full = os.path.join(plot_path, basePlotFile) scans[scan_n] = dict( make=True, base=(base % str(spec_scan.scanNum)).replace(" ", "0"), full=full, href=href, exists=os.path.exists(full), spec_scan=spec_scan, ) # delete any existing plots that must be remade last_cache = Cache_File_Mtime(self.plotDir).get(specFile) cache = self.specFileUpdated(specFile) if cache is None: return plot_list = [ k for k in sorted(os.listdir(plot_path)) if k.endswith(PLOT_TYPE) ] if ( len(plot_list) > 0 and cache["size"] > 0 and cache["size"] != last_cache["size"] ): if cache["size"] > last_cache["size"]: # Was last scan updated with more data? Look at file. with open(specFile, "r") as fp: # skip the part already considered["size"]) # look at the addition buf = # only delete last plot if addition not start with #S if not buf.lstrip().startswith("#S "): k = plot_list.pop() os.remove(os.path.join(plot_path, k)) else: # remake all the plots for k in plot_list: # TODO: needs unit test os.remove(os.path.join(plot_path, k)) plot_list = [] # make plots as needed remake_index_file = False problem_scans = [] for scan in scans.values(): scan["make"] = scan["base"] not in plot_list if scan["make"]: try: logger( " creating SPEC data scan image: " + scan["base"] ) selector = specplot.Selector() image_maker =["spec_scan"]) plotter = image_maker() plotter.plot_scan(scan["spec_scan"], scan["full"]) remake_index_file = True except Exception as _exc_obj: msg = "<b>%s</b>" % type(_exc_obj).__name__ msg += ": <tt>#S %s</tt>" % str( scan["spec_scan"].scanNum ) # msg += " (%s)" % specFile problem_scans.append(msg) # (re)make the index.html file as needed htmlFile = os.path.join(plot_path, HTML_INDEX_FILE) if remake_index_file or not os.path.exists(htmlFile): logger(" creating/updating index.html file") # list the plottable files def sorter(d): return d["spec_scan"].epoch plot_list = [ scan["href"] for scan in sorted( scans.values(), key=sorter, reverse=self.reversed ) if os.path.exists(scan["full"]) ] html = buildIndexHtml(specFile, plot_list, problem_scans) with open(htmlFile, "w") as f: f.write(html) # copy specFile to the plot_path, if newer target = os.path.join(plot_path, os.path.basename(specFile)) if not os.path.exists(target) or ( os.path.getmtime(target) < os.path.getmtime(specFile) ): shutil.copyfile(specFile, target) # touch to update the mtime on the plot_path os.utime(plot_path, None)
[docs] def getPlotDir(self, specFile): """ Return the plot directory based on the specFile. :param str specFile: name of SPEC data file (relative or absolute) """ data_file_root_name = os.path.splitext(os.path.split(specFile)[1])[ 0 ] date_str = getSpecFileDate(specFile) if date_str is None: return return self.getBaseDir(data_file_root_name, date_str)
[docs] def getBaseDir(self, basename, date): """ Find the path based on the date in the spec file. """ return os.path.join(self.plotDir, datePath(date), basename)
def href_format(self, basePlotFile, altText): href = '<a href="' + basePlotFile + '">' href += '<img src="' + basePlotFile + '"' href += ' width="150" height="75"' href += ' alt="' + altText + '"/>' href += "</a>" return href
[docs]class Cache_File_Mtime(object): """ Maintain a list of all known data file modification times. :param str base_dir: name of base directory to store output image thumbnails This list will allow the code to avoid unnecessary work reparsing and plotting of unchanged SPEC data files. """ def __init__(self, base_dir): if not os.path.exists(base_dir): msg = "directory does not exist: " + base_dir raise DirectoryNotFoundError(msg) self.base_dir = os.path.abspath(base_dir) self.cache_file = os.path.join(self.base_dir, MTIME_CACHE_FILE) self.cache =
[docs] def read(self): """Read the cache from storage.""" cache = {} if os.path.exists(self.cache_file): with open(self.cache_file, "r") as fp: cache = json.load(fp) return cache
[docs] def write(self): """Write the cache to storage.""" with open(self.cache_file, "w") as fp: json.dump(self.cache, fp, indent=4)
[docs] def get(self, fname, default=dict(mtime=0, size=0)): """ Get the mtime cache entry for data file ``fname``. :param str fname: file name, already known to exist :return: time (float) cached value of when fname was last modified or None if not known """ return self.cache.get(fname, default)
[docs] def was_file_updated(self, fname): """ Compare the mtime between disk and :param str fname: file name, already known to exist :return bool: True if file is newer than the cache (or new to the cache) """ assert os.path.exists(fname) updated = False cache = self.get(fname) file_mtime = os.path.getmtime(fname) file_size = os.path.getsize(fname) if file_mtime != cache["mtime"] and file_size != cache["size"]: updated = True if updated: logger("SPEC data file updated: " + fname) self.cache[fname] = dict( mtime=file_mtime, size=os.path.getsize(fname) ) self.write() return updated
[docs]def datePath(date): """ Convert the date into a path: yyyy/mm. :param str date: text date from SPEC file #D line: 'Thu Jun 19 12:21:55 2014' """ import time dateStr = time.strptime(date, "%a %b %d %H:%M:%S %Y") yyyy = "%04d" % dateStr.tm_year mm = "%02d" % dateStr.tm_mon return os.path.join(yyyy, mm)
[docs]def getSpecFileDate(specFile): """ Return the #D date of the SPEC data file or None. :param str specFile: name of SPEC data file (relative or absolute) """ # Get that without parsing the data file, it's on the 3rd line of the file. # #F 06_19_Tony.dat # #E 1403198515 # #D Thu Jun 19 12:21:55 2014 if not os.path.exists(specFile): return None # read the first lines of the file and validate for SPEC data file format f = open(specFile, "r") line = f.readline() if not line.startswith("#F "): return None line = f.readline() if not line.startswith("#E "): return None line = f.readline() if not line.startswith("#D "): return None f.close() return line[2:].strip() # 'Thu Jun 19 12:21:55 2014'
[docs]def needToMakePlot(fullPlotFile, mtime_specFile): """ Determine if a plot needs to be (re)made. Use mtime as the basis. :return bool: ``True`` if plot should be made again """ remake_plot = True if os.path.exists(fullPlotFile): mtime_plotFile = os.path.getmtime(fullPlotFile) if mtime_plotFile > mtime_specFile: # plot was made after the data file was updated remake_plot = False # don't remake the plot return remake_plot
[docs]def timestamp(): """ current time as yyyy-mm-dd hh:mm:ss :return str: """ ts = str( ts = " ".join(ts.split("T")) # convert to modified ISO8601 format ts = ts.split(".")[0] # strip off the fractional seconds return ts
[docs]def buildIndexHtml(specFile, plotted_scans, problem_scans): """ Build index.html content. :param str specFile: name of SPEC data file (relative or absolute) :param [str] plotList: list of HTML `<a>` elements, one for each plot image """ baseSpecFile = os.path.basename(specFile) comment = "\n" comment += " written by: %s\n" % sys.argv[0] comment += " date: %s\n" % timestamp() comment += " workstation: %s\n" % socket.gethostname() comment += " username: %s\n" % getpass.getuser() comment += " version: %s\n" % __version__ comment += " pid: %d\n" % os.getpid() href = "<a href='%s'>%s</a>" % (baseSpecFile, specFile) html = "<html>\n" html += " <head>\n" html += ( " <title>SPEC scans from %s</title>\n" % os.path.split(specFile)[-1] ) html += " <!-- %s -->\n" % comment html += " </head>\n" html += " <body>\n" html += ( " <h1>SPEC scans from: %s</h1>\n" % os.path.split(specFile)[-1] ) html += "\n" html += " spec file: %s\n" % href html += " <br />\n" html += "\n" if len(problem_scans) > 0: html += " <h2>%d scan(s) with plotting problems</h2>\n" % len( problem_scans ) html += "\n" html += " <ul>\n" for item in problem_scans: html += " <li>%s</li>\n" % item html += " </ul>\n" html += "\n" html += " <h2>%d plotted scan(s)</h2>\n" % len(plotted_scans) html += "\n" html += "\n".join(plotted_scans) html += "\n" html += "\n" html += " <hr />\n" ref = "page created: " + str( ref += ', specplot_gallery documentation: <a href="%s">%s</a>' % ( DOC_URL, DOC_URL, ) html += " <center><small>%s</small></center>\n" % ref html += "\n" html += " </body>\n" html += "</html>\n" return html
[docs]def logger(message): """ Log a message or report from this module. :param str message: text to be logged """ now = str( name = os.path.basename(sys.argv[0]) pid = os.getpid() text = "(%d,%s,%s) %s" % (pid, name, now, message)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - def main(): import argparse global SPECPLOTS_DIR doc = __doc__.strip().splitlines()[0] p = argparse.ArgumentParser(description=doc) p.add_argument( "paths", nargs="+", help="SPEC data file name(s) or directory(s) with SPEC data files", ) p.add_argument( "-r", action="store_true", default=False, dest="reverse_chronological", help="sort images from each data file in reverse chronolgical order", ) pwd = os.getcwd() msg = "base directory for output" msg += " (default:" + pwd + ")" p.add_argument("-d", "--dir", help=msg) args = p.parse_args() specplots_dir = args.dir or pwd if not os.path.exists(specplots_dir): raise DirectoryNotFoundError(specplots_dir) if not os.path.isdir(specplots_dir): raise PathIsNotDirectoryError(specplots_dir) file_list = [] def only_accept_spec_files(fname): if os.path.exists(fname) and spec.is_spec_file(fname): file_list.append(fname) for item in args.paths: if os.path.exists(item): if os.path.isfile(item): only_accept_spec_files(item) elif os.path.isdir(item): for subitem in os.listdir(item): only_accept_spec_files(os.path.join(item, subitem)) log_file = os.path.join(specplots_dir, "specplot_files_processing.log") logging.basicConfig(filename=log_file, level=logging.INFO) logger(">" * 10 + " starting") # TODO: do not start this process if it is running from previous call PlotSpecFileScans( file_list, specplots_dir, reverse_chronological=args.reverse_chronological, ) logger("<" * 10 + " finished")
[docs]def developer(): """ Supply a file and a directory as command-line arguments to "paths". """ import tempfile tempdir = tempfile.mkdtemp() sys.argv.append("-d") sys.argv.append(tempdir) sys.argv.append(os.path.join("data", "02_03_setup.dat")) sys.argv.append(os.path.join("..", "..", "tests", "data")) main() logging.disable(logging.CRITICAL) logging.shutdown() shutil.rmtree(tempdir) logging.disable(logging.NOTSET)
if __name__ == "__main__": main() # developer()