""" Routines to:
Parse cat files
Run SPFIT and/or SPCAT
"""
import os
import subprocess
import shutil
import json
import types
from typing import List, Any, Union, Dict, Tuple
from glob import glob
from warnings import warn
import ruamel.yaml as yaml
import numpy as np
import joblib
import paramiko
[docs]def run_spcat(filename: str, temperature=None):
# Run SPCAT
parameter_file = filename + ".var"
if os.path.isfile(filename + ".var") is False:
print("VAR file unavailable. Attempting to run with PAR file.")
if os.path.isfile(filename + ".par") is False:
raise FileNotFoundError("No .var or .par file found.")
else:
shutil.copy2(filename + ".par", parameter_file)
process = subprocess.Popen(
["spcat", filename + ".int", parameter_file],
stdout=subprocess.PIPE, # suppress stdout
)
process.wait()
# Extract the partition function at the specified temperature
if temperature is not None:
# Read in the piped standard output, and format into a list
stdout = str(process.communicate()[0]).split("\\n")
for line in stdout:
if temperature in line:
# If the specified temperature is found, get the partition
# function
Q = float(line.split()[1])
return Q
[docs]def run_calbak(filename: str):
""" Runs the calbak routine, which generates a .lin file from the .cat """
if os.path.isfile(filename + ".cat") is False:
raise FileNotFoundError(filename + ".cat is missing; cannot run calbak.")
process = subprocess.Popen(
["calbak", filename + ".cat", filename + ".lin"], stdout=subprocess.DEVNULL
)
process.wait()
with open(filename + ".lin") as read_file:
lin_length = read_file.readlines()
if lin_length == 0:
raise RuntimeError("No lines produced in calbak! Check .cat file.")
[docs]def run_spfit(filename: str):
"""
Parameters
----------
filename
Returns
-------
"""
process = subprocess.run(
["spfit", filename + ".lin", filename + ".par"],
timeout=20.0,
capture_output=True,
)
if process.returncode != 0:
raise OSError("SPFIT failed to run.")
[docs]def list_chunks(target: List[Any], n: int):
"""
Split a list into a number of chunks with length n. If there are not enough elements,
the last chunk will finish the remaining elements.
Parameters
----------
target: list
List to split into chunks
n: int
Number of elements per chunk
Returns
-------
split_list: list
Nested list of chunks
"""
split_list = [target[i : i + n] for i in range(0, len(target), n)]
return split_list
[docs]def human2pickett(name: str, reduction="A", linear=True, nuclei=0):
""" Function for translating a Hamiltonian parameter to a Pickett
identifier.
An alternative way of doing this is to programmatically
generate the Pickett identifiers, and just use format string
to output the identifier.
"""
pickett_parameters = read_yaml(
os.path.expanduser("~") + "/.pyspectools/pickett_terms.yml"
)
if name is "B" and linear is True:
# Haven't thought of a clever way of doing this yet...
identifier = 100
elif name is "B" and linear is False:
identifier = 20000
else:
# Hyperfine terms
if name in ["eQq", "eQq/2"]:
identifier = str(pickett_parameters[name]).format(nuclei)
elif "D_" in name or "del" in name:
identifier = str(pickett_parameters[name][reduction])
else:
try:
identifier = pickett_parameters[name]
except KeyError:
print("Parameter name unknown!")
return identifier
[docs]def read_json(json_filepath: str) -> Dict[Any, Any]:
"""
Load a JSON file into memory as a Python dictionary.
Parameters
----------
json_filepath : str
Path to the JSON file
Returns
-------
Dict[Any, Any]
Dictionary from JSON file
"""
with open(json_filepath, "r") as read_file:
json_data = json.load(read_file)
return json_data
[docs]def dump_json(json_filepath: str, json_dict: Dict[Any, Any]):
"""
Function to serialize a Python dictionary into a JSON file.
The pretty printing is enabled by default.
Parameters
----------
json_filepath : str
Path to the JSON file to save to
json_dict : Dict[Any, Any]
Dictionary to be serialized
"""
with open(json_filepath, "w+") as write_file:
json.dump(json_dict, write_file, indent=4, sort_keys=True)
[docs]def read_yaml(yaml_filepath: str) -> Dict[Any, Any]:
"""
Function to load in a YAML file into a Python dictionary.
Parameters
----------
yaml_filepath : str
Path to the YAML file
Returns
-------
Dict[Any, Any]
Dictionary based on the YAML contents
"""
with open(yaml_filepath) as read_file:
yaml_data = yaml.load(read_file, Loader=yaml.Loader)
return yaml_data
[docs]def dump_yaml(yaml_filepath: str, yaml_dict: Dict[Any, Any]):
"""
Function to serialize a Python dictionary into a YAML file.
Parameters
----------
yaml_filepath : str
Path to the YAML file
yaml_dict : Dict[Any, Any]
Dictionary to be serialized
"""
with open(yaml_filepath, "w+") as write_file:
yaml.dump(yaml_dict, write_file)
[docs]def generate_folder():
"""
Generates the folder for the next calculation
and returns the next calculation number
"""
folderlist = list_directories() # get every file/folder in directory
# filter out any non-folders that happen to be here
shortlist = list()
for folder in folderlist:
try:
shortlist.append(int(folder))
except ValueError: # if it's not an integer
pass
if len(shortlist) == 0:
lastcalc = 0
else:
lastcalc = max(shortlist)
# lastcalc = len(folderlist)
os.mkdir(str(lastcalc + 1))
return lastcalc + 1
[docs]def decimal_length(value: float):
# Function that determines the decimal length of a float; convert the value
# into a string, then work out the length by splitting at the decimal point
decimal_split = str(value).split(".")
return [len(position) for position in decimal_split]
[docs]def copy_template():
script_location = os.path.dirname(os.path.realpath(__file__))
templates_folder = script_location + "/templates/"
available_templates = glob(templates_folder + "*.json")
available_templates = [template.split("/")[-1] for template in available_templates]
print("The templates available are:")
for template in available_templates:
print(template)
target = input("Please specify which template to copy: ")
if target not in available_templates:
print("Not a template; probably a typo.")
print("Please re-run the script.")
else:
shutil.copy2(templates_folder + target, os.getcwd() + "/parameters.json")
print("Copied template " + target + " to your folder as parameters.json.")
print("Edit the .json input file and re-run the script.")
[docs]def flatten_list(input_list: List[List[Any]]):
"""
Takes a nested list of values and flattens it. The code is written as a try/except that makes the assumption
that the data is a list/tuple/array, and in the case that it isn't will simply append the item to the
output instead.
Parameters
----------
input_list: list
List of values, where some of the elements are lists
Returns
-------
output_list: list
Flattened version of input_list
"""
output_list = list()
for value in input_list:
try:
output_list.extend(value)
# Ask for forgiveness
except TypeError:
output_list.append(value)
return output_list
[docs]def list_directories():
return [directory for directory in os.listdir() if os.path.isdir(directory)]
[docs]def backup_files(molecule_name, save_location):
extensions = [".cat", ".var", ".par", ".int", ".json", ".lin"]
filenames = [molecule_name + ext for ext in extensions]
for filename in filenames:
if os.path.isfile(filename) is True:
shutil.copy2(filename, save_location)
print("Backing up " + filename + " to " + save_location)
else:
pass
[docs]def isnotebook():
# Check if the code is being run in a notebook, IPython shell, or Python
try:
shell = get_ipython().__class__.__name__
if shell == "ZMQInteractiveShell": # Jupyter notebook or qtconsole?
return True
elif shell == "TerminalInteractiveShell": # Terminal running IPython?
return False
else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
[docs]def save_obj(obj: Any, filepath: str, **kwargs):
"""
Function to serialize an object using dump from joblib.
Additional kwargs are passed into the dump, which can
be compression parameters, etc.
parameters:
---------------
obj - instance of object to be serialized
filepath - filepath to save to
"""
settings = {"compress": ("gzip", 6), "protocol": 4}
settings.update(kwargs)
joblib.dump(obj, filepath, **settings)
[docs]def read_obj(filepath: str):
"""
Wrapper for joblib.load to load an object from disk
parameters:
---------------
filepath - path to object
"""
obj = joblib.load(filepath)
return obj
[docs]def dump_packages():
"""
Function that will return a list of packages that
have been loaded and their version numbers.
This function will ignore system packages:
sys, __builtins__, types, os
as well as modules with no version.
This is not working the way I want it to...
returns:
-------------
mod_dict - dict with keys corresponding to module name,
and values the version number.
"""
mod_dict = dict()
sys_packages = ["sys", "__builtins__", "types", "os"]
for name, module in globals().items():
if isinstance(module, types.ModuleType):
if module.__name__ not in sys_packages:
try:
mod_name = module.__name__
mod_ver = module.__version__
mod_dict[mod_name] = mod_ver
except AttributeError:
pass
return mod_dict
[docs]def find_nearest(array: np.ndarray, value: Union[float, int]) -> Tuple[np.ndarray, int]:
"""
Function that will find the nearest value in a NumPy array to a specified
value.
Parameters
----------
array : np.ndarray
NumPy 1D array
value : float
Value to search the array for
Returns
-------
Tuple[np.ndarray, int]
Returns the closest value, as well as the index
"""
array = np.asarray(array)
idx = (np.abs(array - value)).argmin()
return array[idx], idx
[docs]class RemoteClient(paramiko.SSHClient):
def __init__(self, hostname=None, username=None, **kwargs):
super().__init__()
self.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.connect(hostname=hostname, username=username, **kwargs)
self.sftp = self.open_sftp()
[docs] @classmethod
def from_file(cls, filepath: str):
"""
Reload a remote session from a pickle file created by the save_session.
:param filepath: str path to RemoteClient pickle file
:return: RemoteClient object
"""
remote = read_obj(filepath)
# Make sure that the pickle file is a RemoteClient object
if remote.__name__ != "RemoteClient":
raise Exception(
"File was not a RemoteClient session; {}".format(remote.__name__)
)
else:
return read_obj(filepath)
def __exit__(self, exc_type, exc_value, traceback):
"""
Dunder method that should be called when the object is destroyed. In this case,
the remote connection should be closed automatically.
"""
self.sftp.close()
self.close()
[docs] def get_file(self, remote_path: str, local_path=os.getcwd()):
"""
Download a file from remote server to disk. If no local path is provided, defaults
to the current working directory.
:param remote_path: str remote file path target
:param local_path: str optional path to save the file to
"""
self.sftp.get(remote_path, local_path)
[docs] def run_command(self, command: str):
stdin, stdout, stderr = self.exec_command(command)
error_msg = stderr.read()
if len(error_msg) == 0:
return stdout.readlines()
else:
raise Exception(f"Error in running command: {error_msg}")
[docs] def open_remote(self, remote_path: str):
"""
Function to stream the file contents of a remote file. Can be used to directly
provide data into memory without downloading it to disk.
:param remote_path: str remote path to target file
:return: list of contents of the target file
"""
contents = self.run_command("cat {}".format(remote_path))
return contents
[docs] def ls(self, remote_path=""):
"""
Function to get the list of files present in a specified directory.
Defaults to the current ssh directory.
:param remote_path: str remote path to inspect
:return: list of files and folders
"""
contents = self.run_command("ls {}".format(remote_path))
return contents
[docs] def save_session(self, filepath="ssh.pkl", **kwargs):
"""
Function to dump the ssh settings object to a pickle file. Keep in mind
that while this is a matter of convenience, the file is unencrypted and
so storing passwords in here is not exactly the safest thing to do!
:param filepath: str optional path to save the session to.
"""
save_obj(self, filepath, **kwargs)
[docs]def group_consecutives(vals: List[float], step=1):
"""
Function to group all consecutive values in a list together. The primary purpose of this
is to split concatenated spectra that are given in a single list of frequencies
into individual windows.
Parameters
----------
vals : list
List of floats to be split
step : int, optional
[description], by default 1
Returns
-------
[type]
[description]
"""
run = []
result = [run]
expect = None
for v in vals:
if (v == expect) or (expect is None):
run.append(v)
else:
run = [v]
result.append(run)
expect = v + step
return result