Source code for pyspectools.spectra.assignment

"""
    `assignment` module

    This module contains three main classes for performing analysis of broad-
    band spectra. The `AssignmentSession` class will be what the user will
    mainly interact with, which will digest a spectrum, find peaks, make
    assignments and keep track of them, and generate the reports at the end.

    To perform the assignments, the user can use the `LineList` class, which
    does the grunt work of homogenizing the different sources of frequency
    and molecular information: it is able to take SPCAT and .lin formats, as
    well as simply a list of frequencies. `LineList` then interacts with the
    `AssignmentSession` class, which handles the assignments.

    The smallest building block in this procedure is the `Transition` class;
    every peak, every molecule transition, every artifact is considered as
    a `Transition` object. The `LineList` contains a list of `Transition`s,
    and the peaks found by the `AssignmentSession` are also kept as a 
    `LineList`.

"""

import os
from shutil import rmtree
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Union, Type, Any
from copy import copy, deepcopy
from itertools import combinations
import warnings
import logging
from pathlib import Path

import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from tqdm.autonotebook import tqdm
from lmfit.models import GaussianModel
from IPython.display import display, HTML
from periodictable import formula
from plotly.offline import plot
from plotly import graph_objs as go
from uncertainties import ufloat
from jinja2 import Template
from monsterurl import get_monster
from scipy.ndimage.filters import gaussian_filter

from pyspectools import routines, parsers, figurefactory
from pyspectools import ftmw_analysis as fa
from pyspectools import fitting
from pyspectools import units
from pyspectools import database
from pyspectools.astro import analysis as aa
from pyspectools.spectra import analysis


[docs]@dataclass class Transition: """ DataClass for handling assignments. Attributes are assigned in order to be sufficiently informative for a line assignment to be unambiguous and reproduce it later in a form that is both machine and human readable. Attributes ---------- name : str IUPAC/common name; the former is preferred to be unambiguous formula : str Chemical formula, or usually the stochiometry smiles : str SMILES code that provides a machine and human readable chemical specification frequency : float Observed frequency in MHz intensity : float Observed intensity, in whatever units the experiments are in. Examples are Jy/beam, or micro volts. catalog_frequency : float Catalog frequency in MHz catalog_intensity : float Catalog line intensity, typically in SPCAT units S : float Theoretical line strength; differs from the catalog line strength as it may be used for intrinsic line strength S u^2 peak_id : int Peak id from specific experiment uline : bool Flag to indicate whether line is identified or not composition : list of str A list of atomic symbols specifying what the experimental elemental composition is. Influences which molecules are considered possible in the Splatalogue assignment procedure. v_qnos : list of int Quantum numbers for vibrational modes. Index corresponds to mode, and int value to number of quanta. Length should be equal to 3N-6. r_qnos : str Rotational quantum numbers. TODO - better way of managing rotational quantum numbers experiment : int Experiment ID to use as a prefix/suffix for record keeping weighting : float Value for weighting factor used in the automated assignment fit : dict Contains the fitted parameters and model ustate_energy : float Energy of the upper state in Kelvin lstate_energy: float Energy of the lower state in Kelvin intereference : bool Flag to indicate if this assignment is not molecular in nature source : str Indicates what the source used for this assignment is public : bool Flag to indicate if the information for this assignment is public/published velocity : float Velocity of the source used to make the assignment in km/s discharge : bool Whether or not the line is discharge dependent magnet : bool Whether or not the line is magnet dependent (i.e. open shell) """ name: str = "" smiles: str = "" formula: str = "" frequency: float = 0.0 catalog_frequency: float = 0.0 catalog_intensity: float = 0.0 deviation: float = 0.0 intensity: float = 0.0 uncertainty: float = 0.0 S: float = 0.0 peak_id: int = 0 experiment: int = 0 uline: bool = True composition: List[str] = field(default_factory=list) v_qnos: List[int] = field(default_factory=list) r_qnos: str = "" fit: Dict = field(default_factory=dict) ustate_energy: float = 0.0 lstate_energy: float = 0.0 interference: bool = False weighting: float = 0.0 source: str = "Catalog" public: bool = True velocity: float = 0.0 discharge: bool = False magnet: bool = False multiple: List[str] = field(default_factory=list) final: bool = False def __eq__(self, other: object) -> bool: """ Dunder method for comparing molecules. This method is simply a shortcut to see if two molecules are the same based on their SMILES code, the chemical name, and frequency. """ if not isinstance(other, Transition): return NotImplemented else: comparisons = [ self.smiles == other.smiles, self.name == other.name, self.frequency == other.frequency, self.v_qnos == other.v_qnos, ] return all(comparisons) def __str__(self): """ Dunder method for representing an Transition, which returns the name of the line and the frequency. Returns ------- str name and frequency of the Transition """ if self.uline is True: frequency = self.frequency else: frequency = self.catalog_frequency return f"{self.name}, {frequency:,.4f}" def __repr__(self): if self.uline is True: frequency = self.frequency else: frequency = self.catalog_frequency return f"{self.name}, {frequency:,.4f}" def __lt__(self, other): return self.frequency < other.frequency def __gt__(self, other): return self.frequency > other.frequency
[docs] def check_molecule(self, other): """ Check equivalency based on a common carrier. Compares the name, formula, and smiles of this `Transition` object with another. Returns ------- bool True if the two `Transitions` belong to the same carrier. """ assert type(other) == type(self) return all( [ self.name == other.name, self.formula == other.formula, self.smiles == other.smiles, ] )
[docs] def calc_intensity(self, Q: float, T=300.0): """ Convert linestrength into intensity. Parameters ---------- Q - float Partition function for the molecule at temperature T T - float Temperature to calculate the intensity at in Kelvin Returns ------- I - float log10 of the intensity in SPCAT format """ # Take the frequency value to calculate I frequency = max([self.frequency, self.catalog_frequency]) I = units.S2I( self.intensity, Q, frequency, units.calc_E_lower(frequency, self.ustate_energy), T, ) self.S = I return I
[docs] def calc_linestrength(self, Q: float, T=300.0): """ Convert intensity into linestrength. Parameters ---------- Q - float Partition function for the molecule at temperature T T - float Temperature to calculate the intensity at in Kelvin Returns ------- intensity - float intrinsic linestrength of the transition """ # Take the frequency value to calculate I frequency = max([self.frequency, self.catalog_frequency]) intensity = units.I2S( self.S, Q, frequency, units.calc_E_lower(frequency, self.ustate_energy), T ) self.intensity = intensity return intensity
[docs] def to_file(self, filepath: str, format="yaml"): """ Save an Transition object to disk with a specified file format. Defaults to YAML. Parameters ---------- filepath : str Path to yaml file format : str, optional Denoting the syntax used for dumping. Defaults to YAML. """ if "." not in filepath: if format == "json": filepath += ".json" else: filepath += ".yml" if format == "json": writer = routines.dump_json else: writer = routines.dump_yaml writer(filepath, self.__dict__)
[docs] def get_spectrum(self, x: np.ndarray): """ Generate a synthetic peak by supplying the x axis for a particular spectrum. This method assumes that some fit parameters have been determined previously. Parameters ---------- x : Numpy 1D array Frequency bins from an experiment to simulate the line features. Returns ------- Numpy 1D array Values of the model function spectrum at each particular value of x """ if hasattr(self, "fit"): return self.fit.eval(x=x) else: raise Exception("get_spectrum() with no fit data available!")
[docs] @classmethod def from_dict(cls, data_dict: Dict): """ Method for generating an Assignment object from a dictionary. All this method does is unpack a dictionary into the __init__ method. Parameters ---------- data_dict : dict Dictionary containing all of the Assignment DataClass fields that are to be populated. Returns ------- Transition Converted Assignment object from the input dictionary """ assignment_obj = cls(**data_dict) return assignment_obj
[docs] @classmethod def from_yml(cls, yaml_path: str): """ Method for initializing an Assignment object from a YAML file. Parameters ---------- yaml_path : str path to yaml file Returns ------- Transition Assignment object loaded from a YAML file. """ yaml_dict = routines.read_yaml(yaml_path) assignment_obj = cls(**yaml_dict) return assignment_obj
[docs] @classmethod def from_json(cls, json_path: str): """ Method for initializing an Assignment object from a JSON file. Parameters ---------- json_path : str Path to JSON file Returns ------- Transition Assignment object loaded from a JSON file. """ json_dict = routines.read_json(json_path) assignment_obj = cls(**json_dict) return assignment_obj
[docs] def reset_assignment(self): """ Function to reset an assigned line into its original state. The only information that is kept regards to the frequency, intensity, and aspects about the experiment. """ remain = { "frequency": self.frequency, "intensity": self.intensity, "experiment": self.experiment, "velocity": self.velocity, "source": self.source, } empty = Transition() self.__dict__.update(**empty.__dict__) self.__dict__.update(**remain)
[docs] def choose_assignment(self, index: int): """ Function to manually pick an assignment from a list of multiple possible assignments found during `process_linelist`. After the new assignment is copied over, the `final` attribute is set to True and will no longer throw a warning duiring finalize_assignments. Parameters ---------- index : int Index of the candidate to use for the assignment. """ assert len(self.multiple != 0) assert index < len(self.multiple) remain = { "frequency": self.frequency, "intensity": self.intensity, "experiment": self.experiment, "velocity": self.velocity, "source": self.source, "multiple": self.multiple, } chosen = deepcopy(self.multiple[index]) chosen.update(**remain) self.__dict__.update(**chosen.__dict__) self.final = True
[docs]@dataclass class LineList: """ Class for handling and homogenizing all of the possible line lists: from peaks to assignments to catalog files. Attributes ---------- name: str, optional Name of the line list. Can be used to identify the molecule, or to simply state the purpose of the list. formula: str, optional Chemical formula for the molecule, if applicable. smi: str, optional SMILES representation of the molecule, if applicable. filecontents: str, optional String representation of the file contents used to make the line list. filepath: str, optional Path to the file used to make the list. transitions: list, optional A designated list for holding Transition objects. This is the bulk of the information for a given line list. """ name: str = "" formula: str = "" smi: str = "" filecontents: str = "" filepath: str = "" transitions: List = field(default_factory=list) frequencies: List[float] = field(default_factory=list) catalog_frequencies: List[float] = field(default_factory=list) source: str = "" def __str__(self): nentries = len(self.transitions) return f"Line list for: {self.name} Formula: {self.formula}, Number of entries: {nentries}" def __repr__(self): nentries = len(self.transitions) return f"Line list name: {self.name}, Number of entries: {nentries}" def __len__(self): return len(self.transitions) def __post_init__(self): if len(self.transitions) != 0: self.frequencies = [obj.frequency for obj in self.transitions] self.catalog_frequencies = [ obj.catalog_frequency for obj in self.transitions ] def __eq__(self, other: object) -> bool: """ Dunder method for comparison of LineLists. Since users can accidently use different method/formulas yet use the same catalog/lin file to create the LineList, we only perform the check on the list of transitions. Parameters ---------- other : LineList object The other LineList to be used for comparison. Returns ------- bool If True, the two LineList objects are equal. """ if not isinstance(other, LineList): return NotImplemented # Assert that we're comparing LineList objects return self.transitions == other.transitions def __add__(self, transition_obj: Type[Transition]): """ Dunder method to add Transitions to the LineList. Parameters ---------- transition_obj : [type] [description] """ assert type(transition_obj) == Transition self.transitions.append(Transition) def __iter__(self): """ Sets up syntax for looping over a LineList. This is recommended more for users, but not for programming. When writing new code in the module, iterate over the transitions attribute explicitly. """ yield from self.transitions
[docs] @classmethod def from_catalog( cls, name: str, formula: str, filepath: str, min_freq=0.0, max_freq=1e12, max_lstate=9000.0, **kwargs, ): """ Create a Line List object from an SPCAT catalog. Parameters ---------- name: str Name of the molecule the catalog belongs to formula: str Chemical formula of the molecule filepath: str Path to the catalog file. min_freq: float, optional Minimum frequency in MHz for the frequency cutoff max_freq: float, optional Maximum frequency in MHz for the frequency cutoff max_lstate: float, optional Maximum lower state energy to filter out absurd lines kwargs: optional Additional attributes that are passed into the Transition objects. Returns ------- linelist_obj Instance of LineList with the digested catalog. """ catalog_df = parsers.parse_cat(filepath, low_freq=min_freq, high_freq=max_freq) try: columns = ["N'", "J'", "N''", "J''"] catalog_df["qno"] = catalog_df[columns].apply( lambda x: "N'={}, J'={} - N''={}, J''={}".format(*x), axis=1 ) # Create a formatted quantum number string # catalog_df["qno"] = "N'={}, J'={} - N''={}, J''={}".format( # *catalog_df[["N'", "J'", "N''", "J''"]].values # ) # Calculate E upper to have a complete set of data catalog_df["Upper state energy"] = units.calc_E_upper( catalog_df["Frequency"], catalog_df["Lower state energy"] ) # Filter out the lower states catalog_df = catalog_df.loc[catalog_df["Lower state energy"] <= max_lstate] vfunc = np.vectorize(Transition) # Vectorized generation of all the Transition objects transitions = vfunc( catalog_frequency=catalog_df["Frequency"], catalog_intensity=catalog_df["Intensity"], uncertainty=catalog_df["Uncertainty"], lstate_energy=catalog_df["Lower state energy"], ustate_energy=catalog_df["Upper state energy"], r_qnos=catalog_df["qno"], source="Catalog", name=name, formula=formula, uline=False, **kwargs, ) linelist_obj = cls( name, formula, filepath=filepath, transitions=list(transitions), source="Catalog", ) return linelist_obj except IndexError: return None
[docs] @classmethod def from_dataframe( cls, dataframe: pd.DataFrame, name="Peaks", freq_col="Frequency", int_col="Intensity", **kwargs, ): """ Specialized class method for creating a LineList object from a Pandas Dataframe. This method is called by the AssignmentSession.df2ulines function to generate a Peaks LineList during peak detection. Parameters ---------- dataframe: pandas DataFrame DataFrame containing frequency and intensity information freq_col: str, optional Name of the frequency column int_col: str, optional Name of the intensity column kwargs Optional settings are passed into the creation of Transition objects. Returns ------- LineList """ vfunc = np.vectorize(Transition) transitions = vfunc( frequency=dataframe[freq_col], intensity=dataframe[int_col], uline=True, **kwargs, ) linelist_obj = cls(name=name, transitions=list(transitions), source="Peaks") return linelist_obj
[docs] @classmethod def from_list(cls, name: str, frequencies: List[float], formula="", **kwargs): """ Generic, low level method for creating a LineList object from a list of frequencies. This method can be used when neither lin, catalog, nor splatalogue is appropriate and you would like to manually create it by handpicked frequencies. obj.uline == True, Name of the species - doesn't have to be its real name, just an identifier. frequencies: list A list of floats corresponding to the "catalog" frequencies. formula: str, optional Formula of the species, if known. kwargs Optional settings are passed into the creation of Transition objects. Returns ------- LineList """ vfunc = np.vectorize(Transition) frequencies = np.asarray(frequencies) transitions = vfunc( catalog_frequency=frequencies, uline=False, name=name, formula=formula, **kwargs, ) linelist_obj = cls(name=name, transitions=list(transitions), source="Catalog") return linelist_obj
[docs] @classmethod def from_pgopher(cls, name: str, filepath: str, formula="", **kwargs): """ Method to take the output of a PGopher file and create a LineList object. The PGopher output must be in the comma delimited specification. This is actually the ideal way to generate LineList objects: it fills in all of the relevant fields, such as linestrength and state energies. Parameters ---------- name : str Name of the molecule filepath : str Path to the PGopher CSV output formula : str, optional Chemical formula of the molecule, defaults to an empty string. Returns ------- LineList """ pgopher_df = pd.read_csv(filepath, skiprows=1) pgopher_df = pgopher_df.iloc[:-1] vfunc = np.vectorize(Transition) transitions = vfunc( catalog_frequency=pgopher_df["Position"].astype(float), catalog_intensity=pgopher_df["Intensity"].astype(float), ustate_energy=pgopher_df["Eupper"].apply(units.MHz2cm), lstate_energy=pgopher_df["Elower"].apply(units.MHz2cm), S=pgopher_df["Spol"], uline=False, name=name, formula=formula, ) linelist_obj = cls(name=name, transitions=list(transitions), source="Catalog")
[docs] @classmethod def from_lin(cls, name: str, filepath: str, formula="", **kwargs): """ Generate a LineList object from a .lin file. This method should be used for intermediate assignments, when one does not know what the identity of a molecule is but has measured some frequency data. Parameters ---------- name : str Name of the molecule filepath : str File path to the .lin file. formula : str, optional Chemical formula of the molecule if known. kwargs Additional kwargs are passed into the Transition objects. Returns ------- LineList """ lin_df = parsers.parse_lin(filepath) vfunc = np.vectorize(Transition) transitions = vfunc( name=name, formula=formula, catalog_frequency=lin_df["Frequency"], uncertainty=lin_df["Uncertainty"], r_qnos=lin_df["Quantum numbers"], uline=False, source="Line file", **kwargs, ) linelist_obj = cls( name, formula, filepath=filepath, transitions=list(transitions), source="Line file", ) return linelist_obj
[docs] @classmethod def from_splatalogue_query(cls, dataframe: pd.DataFrame, **kwargs): """ Method for converting a Splatalogue query dataframe into a LineList object. This is designed with the intention of pre-querying a set of molecules ahead of time, so that the user can have direct control over which molecules are specifically targeted without having to generate specific catalog files. Parameters ---------- dataframe : pandas DataFrame DataFrame generated by the function `analysis.search_molecule` Returns ------- LineList """ vfunc = np.vectorize(Transition) name = dataframe["Chemical Name"].unique()[0] transitions = vfunc( name=dataframe["Chemical Name"].values, catalog_frequency=dataframe["Frequency"].values, catalog_intensity=dataframe["CDMS/JPL Intensity"].values, ustate_energy=dataframe["E_U (K)"].values, formula=dataframe["Species"].values, r_qnos=dataframe["Resolved QNs"].values, uline=False, source="Splatalogue", public=True, **kwargs, ) linelist_obj = cls( name=name, transitions=list(transitions), source="Splatalogue" ) return linelist_obj
[docs] @classmethod def from_artifacts(cls, frequencies: List[float], **kwargs): """ Specialized class method for creating a LineList object specifically for artifacts/RFI. These Transitions are specially flagged as Artifacts. Parameters ---------- frequencies: iterable of floats List or array of floats corresponding to known artifact frequencies. kwargs Kwargs are passed into the Transition object creation. Returns ------- LineList """ vfunc = np.vectorize(Transition) transitions = vfunc( name="Artifact", catalog_frequency=np.asarray(frequencies), uline=False, source="Artifact", public=False, **kwargs, ) linelist_obj = cls( name="Artifacts", transitions=list(transitions), source="Artifacts" ) return linelist_obj
[docs] @classmethod def from_clock(cls, max_multi=64, clock=65000.0, **kwargs): """ Method of generating a LineList object by calculating all possible combinations of the Parameters ---------- max_multi : int, optional [description], by default 64 clock : float, optional Clock frequency to calculate sub-harmonics of, in units of MHz. Defaults to 65,000 MHz, which corresponds to the Keysight AWG Returns ------- LineList object LineList object with the full list of possible clock spurs, as harmonics, sum, and difference frequencies. """ frequencies = [clock / i for i in range(1, max_multi + 1)] for pair in combinations(frequencies, 2): # Round to 4 decimal places frequencies.append(np.round(sum(pair), 4)) frequencies.append(np.round(pair[0] - pair[1], 4)) # Remove duplicates frequencies = list(set(frequencies)) frequencies = sorted(frequencies) # Generate Transition objects from this list of frequencies vfunc = np.vectorize(Transition) transitions = vfunc( name="Artifact", catalog_frequency=np.asarray(frequencies), uline=False, source="Artifact", public=False, ) linelist_obj = cls( name="ClockSpurs", transitions=list(transitions), source="Artifacts" ) return linelist_obj
[docs] def to_dataframe(self): """ Convert the transition data into a Pandas DataFrame. Returns ------- dataframe Pandas Dataframe with all of the transitions in the line list. """ list_rep = [obj.__dict__ for obj in self.transitions] return pd.DataFrame(list_rep)
[docs] def to_ftb(self, filepath=None, thres=-10.0, shots=500, dipole=1.0, **kwargs): """ Function to create an FTB file from a LineList object. This will create entries for every transition entry above a certain intensity threshold, in whatever units the intensities are in; i.e. SPCAT will be in log units, while experimental peaks will be in whatever arbitrary voltage scale. Parameters ---------- filepath: None or str, optional Path to write the ftb file to. If None (default), uses the name of the LineList and writes to the ftb folder. thres: float, optional Threshold to cutoff transitions in the ftb file. Transitions with less intensity than this value not be included. Units are in the same units as whatever the LineList units are. shots: int, optional Number of shots to integrate. dipole: float, optional Target dipole moment for the species kwargs Additional kwargs are passed into the ftb creation, e.g. magnet, discharge, etc. """ # If no path is given, use the default naming scheme. if filepath is None: filepath = f"ftb/{self.name}-batch.ftb" # If the source of information are from experimentally measured peaks, # we'll use the correct attributes. if self.source == "Peaks": freq_attr = "frequency" int_attr = "intensity" else: freq_attr = "catalog_frequency" int_attr = "catalog_intensity" # Get all the frequencies that have intensities above a threshold frequencies = [ getattr(obj, freq_attr) for obj in self.transitions if getattr(obj, int_attr) >= thres ] ftb_kwargs = {"dipole": dipole} ftb_kwargs.update(**kwargs) ftb_str = "" # Loop over the frequencies for frequency in frequencies: ftb_str += fa.generate_ftb_line( np.round(frequency, 4), shots=shots, **ftb_kwargs ) with open(filepath, "w+") as write_file: write_file.write(ftb_str)
[docs] def to_pickle(self, filepath=None): """ Function to serialize the LineList to a Pickle file. If no filepath is provided, the function will default to using the name attribute of the LineList to name the file. Parameters ---------- filepath: str or None, optional If None, uses name attribute for the filename, and saves to the linelists folder. """ if filepath is None: filepath = "linelists/{}-linelist.pkl".format(self.name) routines.save_obj(self, filepath)
[docs] def find_nearest(self, frequency: float, tol=1e-3): """ Look up transitions to find the nearest in frequency to the query. If the matched frequency is within a tolerance, then the function will return the corresponding Transition. Otherwise, it returns None. Parameters ---------- frequency: float Frequency in MHz to search for. tol: float, optional Maximum tolerance for the deviation from the LineList frequency and query frequency Returns ------- Transition object or None """ match_freq, index = routines.find_nearest(self.frequencies, frequency) deviation = np.abs(frequency - match_freq) if deviation <= tol: return self.transitions[index] else: return None
[docs] def find_candidates( self, frequency: float, lstate_threshold=4.0, freq_tol=1e-1, int_tol=-10.0, max_uncertainty=0.2, ): """ Function for searching the LineList for candidates. The first step uses pure Python to isolate transitions that meet three criteria: the lower state energy, the catalog intensity, and the frequency distance. If no candidates are found, the function will return None. Otherwise, it will return the list of transitions and a list of associated normalized weights. Parameters ---------- frequency: float Frequency in MHz to try and match. lstate_threshold: float, optional Lower state energy threshold in Kelvin freq_tol: float, optional Frequency tolerance in MHz for matching two frequencies int_tol: float, optional log Intensity threshold Returns ------- transitions, weighting or None If candidates are found, lists of the transitions and the associated weights are returned. Otherwise, returns None """ # Filter out all transition objects quickly with a list comprehension # and if statement if self.source == "Peaks": freq_attr = "frequency" else: freq_attr = "catalog_frequency" transitions = [ obj for obj in self.transitions if all( [ obj.lstate_energy <= lstate_threshold, obj.catalog_intensity >= int_tol, abs(getattr(obj, freq_attr) - frequency) <= freq_tol, obj.uncertainty <= max_uncertainty, ] ) ] # If there are candidates, calculate the weights associated with each transition if len(transitions) != 0: transition_frequencies = np.array( [getattr(obj, "freq_attr", np.nan) for obj in transitions] ) transition_intensities = np.array( [getattr(obj, "catalog_intensity", np.nan) for obj in transitions] ) # If there are actually no catalog intensities, it should sum up to zero in which case we won't # use the intensities in the weight factors if np.nansum(transition_intensities) == 0.0: transition_intensities = None weighting = analysis.line_weighting( frequency, transition_frequencies, transition_intensities ) # Only normalize if there's more than one if len(weighting) > 1: weighting /= weighting.max() return transitions, weighting else: return None
[docs] def update_transition(self, index: int, **kwargs): """ Function for updating a specific Transition object within the Line List. Parameters ---------- index: int Index for the list Transition object kwargs: optional Updates to the Transition object """ self.transitions[index].__dict__.update(**kwargs)
[docs] def update_linelist(self, transition_objs: List[Transition]): """ Adds transitions to a LineList if they do not exist in the list already. Parameters ---------- transition_objs: list List of Transition objects """ self.transitions.extend( [obj for obj in transition_objs if obj not in self.transitions] )
[docs] def get_ulines(self): """ Function for retrieving unidentified lines in a Line List. Returns ------- uline_objs: list List of all of the transition objects where the uline flag is set to True. """ uline_objs = [obj for obj in self.transitions if obj.uline is True] return uline_objs
[docs] def get_assignments(self): """ Function for retrieving assigned lines in a Line List. Returns ------- assign_objs: list List of all of the transition objects where the uline flag is set to False. """ assign_objs = [obj for obj in self.transitions if obj.uline is False] return assign_objs
[docs] def get_frequencies(self, numpy=False): """ Method to extract all the frequencies out of a LineList Parameters ---------- numpy: bool, optional If True, returns a NumPy `ndarray` with the frequencies. Returns ------- List or np.ndarray List of transition frequencies """ frequencies = [transition.frequency for transition in self.transitions] if numpy: frequencies = np.array(frequencies) return frequencies
[docs] def get_multiple(self): """ Convenience function to extract all the transitions within a LineList that have multiple possible assignments. Returns ------- List List of `Transition` objects that have multiple assignments remaining. """ assign_objs = list() for transition in self.transitions: if transition.final == False and len(transition.multiple) != 0: assign_objs.append(transition) return assign_objs
[docs] def add_uline(self, frequency: float, intensity: float, **kwargs): """ Function to manually add a U-line to the LineList. The function creates a Transition object with the frequency and intensity values provided by a user, which is then compared with the other transition entries within the LineList. If it doesn't already exist, it will then add the new Transition to the LineList. Kwargs are passed to the creation of the Transition object. Parameters ---------- frequency, intensity: float Floats corresponding to the frequency and intensity of the line in a given unit. """ transition = Transition( frequency=frequency, intensity=intensity, uline=True, **kwargs ) if transition not in self.transitions: self.transitions.append(transition)
[docs] def add_ulines(self, data: List[Tuple[float, float]], **kwargs): """ Function to add multiple pairs of frequency/intensity to the current LineList. Kwargs are passed to the creation of the Transition object. Parameters ---------- data: iterable of 2-tuple List-like of 2-tuples corresponding to frequency and intensity. Data should look like this example: [ (12345.213, 5.), (18623.125, 12.3) ] """ for line in data: # This assertion makes sure that every line has a specified # frequency AND intensity value assert len(line) == 2 self.add_uline(*line, **kwargs)
[docs]@dataclass class Session: """ Data class for handling parameters used for an AssignmentSession. The user generally shouldn't need to directly interact with this class, but can give some level of dynamic control and bookkeeping to how and what molecules can be assigned, particularly with the composition, the frequency thresholds for matching, and the noise statistics. Attributes ---------- experiment : int ID for experiment composition : list of str List of atomic symbols. Used for filtering out species in the Splatalogue assignment procedure. temperature : float Temperature in K. Used for filtering transitions in the automated assigment, which are 3 times this value. doppler : float Doppler width in km/s; default value is about 5 kHz at 15 GHz. Used for simulating lineshapes and for lineshape analysis. velocity : float Radial velocity of the source in km/s; used to offset the frequency spectrum freq_prox : float frequency cutoff for line assignments. If freq_abs attribute is True, this value is taken as the absolute value. Otherwise, it is a percentage of the frequency being compared. freq_abs : bool If True, freq_prox attribute is taken as the absolute value of frequency, otherwise as a decimal percentage of the frequency being compared. baseline : float Baseline level of signal used for intensity calculations and peak detection noise_rms : float RMS of the noise used for intensity calculations and peak detection noise_region : 2-tuple of floats The frequency region used to define the noise floor. max_uncertainty : float Value to use as the maximum uncertainty for considering a transition for assignments. """ experiment: int composition: List[str] = field(default_factory=list) temperature: float = 4.0 doppler: float = 0.01 velocity: float = 0.0 freq_prox: float = 0.1 freq_abs: bool = True baseline: float = 0.0 noise_rms: float = 0.0 noise_region: List[float] = field(default_factory=list) max_uncertainty: float = 0.2 def __str__(self): form = ( f"Experiment: {self.experiment}," f"Composition: {self.composition}," f"Temperature: {self.temperature} K" ) return form
[docs]class AssignmentSession: """ Main class for bookkeeping and analyzing broadband spectra. This class revolves around operating on a single continuous spectrum, using the class functions to automatically assess the noise statistics, find peaks, and do the bulk of the bookkeeping on what molecules are assigned to what peak. """
[docs] @classmethod def load_session(cls, filepath: str): """ Load an AssignmentSession from disk, once it has been saved with the save_session method which creates a pickle file. Parameters -------------- filepath : str path to the AssignmentSession pickle file; typically in the sessions/{experiment_id}.pkl Returns -------------- AssignmentSession Instance of the AssignmentSession loaded from disk """ session = routines.read_obj(filepath) # If the pickle file is just read independently, just make all the # folders ahead of time folders = [ "assignment_objs", "queries", "sessions", "clean", "figures", "reports", "logs", "outputs", "ftb", "linelists", ] for folder in folders: if os.path.isdir(folder) is False: os.mkdir(folder) session._init_logging() session.logger.info(f"Reloading session: {filepath}") return session
[docs] @classmethod def from_ascii( cls, filepath: str, experiment: int, composition=["C", "H"], delimiter="\t", temperature=4.0, velocity=0.0, col_names=None, freq_col="Frequency", int_col="Intensity", skiprows=0, verbose=False, **kwargs, ): """ Class method for AssignmentSession to generate a session using an ASCII file. This is the preferred method for starting an AssignmentSession. The ASCII files are parsed using the pandas method `read_csv`, with the arguments for reading simply passed to that function. Example based on blackchirp spectrum: The first row in an ASCII output from blackchirp contains the headers, which typically should be renamed to "Frequency, Intensity". This can be done with this call: ``` session = AssignmentSession.from_ascii( filepath="ft1020.txt", experiment=0, col_names=["Frequency", "Intensity"], skiprows=1 ) ``` Example based on astronomical spectra: File formats are not homogenized, and delimiters may change. This exam- ple reads in a comma-separated spectrum, with a radial velocity of +26.2 km/s. ``` session = AssignmentSession.from_ascii( filepath="spectrum.mid.dat", experiment=0, col_names=["Frequency", "Intensity"], velocity=26.2, delimiter="," ) ``` Parameters ---------- filepath : str Filepath to the ASCII spectrum experiment : int Integer identifier for the experiment composition : list of str, optional List of atomic symbols, representing the atomic composition of the experiment delimiter : str, optional Delimiter character used in the ASCII file. For example, "\t", "\s", "," velocity: float, optional Radial velocity to offset the frequency in km/s. temperature : float, optional Rotational temperature in Kelvin used for the experiment. col_names : None or list of str, optional Names to rename the columns. If None, this is ignored. freq_col : str, optional Name of the column to be used for the frequency axis int_col : str, optional Name of the column to be used for the intensity axis skip_rows : int, optional Number of rows to skip reading. verbose : bool, optional If True, the logging module will also print statements and display any interaction that happens. kwargs Additional kwargs are passed onto initializing the Session class Returns ------- AssignmentSession """ spec_df = parsers.parse_ascii(filepath, delimiter, col_names, skiprows=skiprows) session = cls( spec_df, experiment, composition, temperature, velocity, freq_col, int_col, verbose, **kwargs, ) return session
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if hasattr(self, "log_handlers"): for _, handler in self.log_handlers.items(): handler.close() def __repr__(self): return f"Experiment {self.session.experiment}" def __deepcopy__(self, memodict={}): # Kill all of the loggers prior to copying, otherwise thread lock # prevents pickling if hasattr(self, "log_handlers"): for key, handler in self.log_handlers.items(): handler.close() settings = { "exp_dataframe": self.data, "experiment": self.session.experiment, "composition": self.session.composition, } new_copy = AssignmentSession(**settings) new_copy.__dict__.update(**self.__dict__) new_copy.session = deepcopy(self.session) return new_copy def __init__( self, exp_dataframe: pd.DataFrame, experiment: int, composition: List[str], temperature=4.0, velocity=0.0, freq_col="Frequency", int_col="Intensity", verbose=True, **kwargs, ): """ init method for AssignmentSession. Attributes ---------- session : `Session` object Class containing parameters for the experiment, including the chemical composition, the noise statistics, radial velocity, etc. data : Pandas DataFrame This pandas dataframe contains the actual x/y data for the spectrum being analyzed. freq_col, int_col : str Names of the frequency and intensity columns that are contained in the `self.data` dataframe t_threshold : float This value is used to cut off upper-states for assignment. This corresponds to three times the user specified temperature for the experiment. umols : list TODO this list should be used to keep track of unidentified molecules during the assignment process. Later on if an the carrier is identified we should be able to update everything consistently. verbose : bool Specifies whether the logging is printed in addition to being dumped to file. line_lists : dict Dictionary containing all of the `LineList` objects being used for assignments. When the `find_peaks` function is run, a `LineList` is generated, holding every peak found as a corres- ponding `Transition`. This `LineList` is then referenced by the "Peaks" key in the line_lists dictionary. """ # Make folders for organizing output folders = [ "assignment_objs", "queries", "sessions", "clean", "figures", "reports", "logs", "outputs", "ftb", "linelists", ] for folder in folders: if os.path.isdir(folder) is False: os.mkdir(folder) # Initialize a Session dataclass self.session = Session(experiment, composition, temperature, velocity=velocity) # Update additional setttings self.session.__dict__.update(**kwargs) self.data = exp_dataframe # Set the temperature threshold for transitions to be 3x the set value self.t_threshold = self.session.temperature * 3.0 # Initial threshold for peak detection is set to None self.threshold = None self.umol_names: Dict[str, str] = dict() self.verbose = verbose # Holds catalogs self.line_lists: Dict[str, Type[LineList]] = dict() self._init_logging() # Default settings for columns if freq_col not in self.data.columns: self.freq_col = self.data.columns[0] else: self.freq_col = freq_col if int_col not in self.data.columns: self.int_col = self.data.columns[1] else: self.int_col = int_col if velocity != 0.0: self.set_velocity(velocity) def __truediv__(self, other: "AssignmentSession", copy=True): """ Method to divide the spectral intensity of the current experiment by another. This gives the ratio of spectral intensities, and can be useful for determining whether a line becomes stronger or weaker between experiments. If the copy keyword is True, this method creates a deep copy of the current experiment and returns the copy with the updated intensities. Otherwise, the spectrum of the current experiment is modified. Parameters ---------- other - AssignmentSession object Other AssignmentSession object to compare spectra with. Acts as denominator. copy - bool, optional If True, returns a copy of the experiment, with the ID number added Returns ------- new_experiment - AssignmentSession object Generated new experiment with the divided spectrum, if copy is True """ if copy is True: new_experiment = deepcopy(self) new_experiment.data[:, self.int_col] = ( new_experiment[self.int_col] / other.data[other.int_col] ) new_experiment.session.experiment += other.session.experiment return new_experiment else: self.data[:, self.int_col] = ( self.data[self.int_col] / other.data[other.int_col] ) def __sub__(self, other: "AssignmentSession", copy=True, window=0.2): """ Dunder method to blank the current experiment with another. This method will take every detected frequency in the reference experiment, and blank the corresponding regions from the current experiment. In effect this Parameters ---------- other copy window Returns ------- """ if copy is True: experiment = deepcopy(self) else: experiment = self blank_freqs = list() # Get regions to blank for transition in other.line_lists["Peaks"].transitions: # Use the measured frequency where possible if transition.frequency != 0.0: frequency = transition.frequency else: frequency = transition.catalog_frequency blank_freqs.append(frequency) try: blank_spectrum = analysis.blank_spectrum( self.data, blank_freqs, self.session.baseline, self.session.noise_rms, self.freq_col, self.int_col, window, df=False, ) experiment.data[self.int_col] = blank_spectrum new_id = self.session.experiment - other.session.experiment experiment.session.experiment = new_id return experiment except AttributeError: raise Exception("Peak/Noise detection not yet run!") def __contains__(self, item: Union["LineList", str]) -> bool: """ Dunder method to check if a molecule is contained within this experiment. Parameters ---------- item : Union[LineList, str] Item to check; can be a `LineList` or a `str`. If the item is a `LineList`, check and see if the `LineList` is present in the current list. If the item is a `str`, check in the assignment table for a name or a formula. Returns ------- bool True if present in the experiment. """ if isinstance(item, str): check = any(item in self.table["name"], item in self.table["formula"]) return check elif isinstance(item, LineList): return item in self.line_lists def __call__(self, frequency: float, **kwargs) -> pd.DataFrame: """ Dunder method to look up a frequency contained within the experiment peak line list. Additional kwargs are passed into `LineList.find_candidates` function, which allows one to specify tolerances for the lookup. Parameters ---------- frequency : float Frequency to search the experiment for in MHz. Returns ------- pd.DataFrame DataFrame of the matched frequencies. Raises ------- AttributeError If peak detection has not been run yet """ if "Peaks" in self.line_lists: return self.line_lists["Peaks"].find_candidates(frequency, **kwargs) else: raise AttributeError("Experiment has no peaks!")
[docs] def umol_gen(self, silly=True): """ Generator for unidentified molecule names. Wraps Yields ------ str Formatted as "UMol_XXX" """ counter = 1 while counter <= 200: # If we want to use silly names from the monsterurl generator if silly is True: name = get_monster() # Otherwise use boring counters else: name = f"U-molecule-{counter:03d}" yield name counter += 1
[docs] def create_ulinelist(self, filepath: str, silly=True): """ Create a LineList object for an unidentified molecule. This uses the class method `umol_gen` to automatically generate names for U-molecules which can then be renamed once it has been identified. The session attribute `umol_names` also keeps track of filepaths to catalog names. If the filepath has been used previously, then it will raise an Exception noting that the filepath is already associated with another catalog. Parameters ---------- filepath: str File path to the catalog or .lin file to use as a reference silly: bool, optional Flag whether to use boring numbered identifiers, or randomly generated `AdjectiveAdjectiveAnimal`. Returns ------- LineList object """ if filepath not in self.umol_names: path = Path(filepath) ext = path.suffix name = next(self.umol_gen(silly=silly)) parameters = {"name": name, "formula": name, "filepath": filepath} if ext == ".lin": method = LineList.from_lin elif ext == ".cat": method = LineList.from_catalog else: raise Exception(f"File extension not recognized: {ext}.") linelist = method(**parameters) linelist = self.line_lists.get(name, None) # Give the first frequency just for book keeping frequency = linelist.transitions[0].catalog_frequency self.logger.info( f"Created a U-molecule: {name}, with frequency {frequency:,.4f}." ) # Create a symlink so we know which catalog this molecule refers to sym_path = Path(f"linelists/{name}{ext}") sym_path.symlink_to(filepath) # Also for internal record keeping self.umol_names[filepath] = name return linelist else: name = self.umol_names[filepath] raise Exception(f"U-molecule already exists with name: {name}!")
[docs] def rename_umolecule(self, name: str, new_name: str, formula=""): """ Function to update the name of a LineList. This function should be used to update a LineList, particularly when the identity of an unidentified molecule is discovered. Parameters ---------- name: str Old name of the LineList. new_name: str New name of the LineList - preferably, a real molecule name. formula: str, optional New formula of the LineList. """ if name in self.line_lists: # Rename the LineList within the experiment self.line_lists[new_name] = self.line_lists.pop(name) self.line_lists[new_name].name = new_name self.line_lists[new_name].formula = formula # Create symlinks to the catalog with the new name for book keeping sym_path = Path(f"linelists/{new_name}.cat") sym_path.symlink_to(self.line_lists[new_name].filepath) else: raise Exception( f"{name} does not exist in {self.session.experiment} line_lists" )
[docs] def add_ulines(self, data: List[Tuple[float, float]], **kwargs): """ Function to manually add multiple pairs of frequency/intensity to the current experiment's Peaks list. Kwargs are passed to the creation of the Transition object. Parameters ---------- data: iterable of 2-tuple List-like of 2-tuples corresponding to frequency and intensity. Data should look like this example: [ (12345.213, 5.), (18623.125, 12.3) ] """ default_dict = { "experiment": self.session.experiment, "velocity": self.session.velocity, } default_dict.update(**kwargs) if "Peaks" in self.line_lists: self.line_lists["Peaks"].add_ulines(data, **default_dict) self.logger.info(f"Added {len(data)} transitions to U-lines.")
def _init_logging(self): """ Set up the logging formatting and files. The levels are defined in the dictionary mapping, and so new levels and their warning level should be defined there. Additional log files can be added in the log_handlers dictionary. In the other routines, the logger attribute should be used. """ mapping = { "debug": logging.DEBUG, "warning": logging.WARNING, "analysis": logging.INFO, "stream": logging.INFO, } logging.captureWarnings(True) self.logger = logging.getLogger(f"{self.session.experiment} log") self.logger.setLevel(logging.DEBUG) # Define file handlers for each type of log self.log_handlers = { "analysis": logging.FileHandler( f"./logs/{self.session.experiment}-analysis.log", mode="w" ), "warning": logging.FileHandler( f"./logs/{self.session.experiment}-warnings.log", mode="w" ), "debug": logging.FileHandler( f"./logs/{self.session.experiment}-debug.log", mode="w" ), } # If verbose is specified, the logging info is directly printed as well if self.verbose is True: self.log_handlers["stream"] = logging.StreamHandler() # Set up the formatting and the level definitions for key, handler in self.log_handlers.items(): handler.setFormatter( logging.Formatter("%(asctime)s - %(levelname)s - %(message)s") ) # do not redefine the default warning levels handler.setLevel(mapping[key]) self.logger.addHandler(handler)
[docs] def set_velocity(self, value: float): """ Set the radial velocity offset for the spectrum. The velocity is specified in km/s, and is set up such that the notation is positive velocity yields a redshifted spectrum (i.e. moving towards us). This method should be used to change the velocity, as it will automatically re-calculate the dataframe frequency column to the new velocity. Parameters ---------- value : float Velocity in km/s """ # Assume that if this is the first time we're performing a frequency offset, make a copy of the data if "Laboratory Frame" not in self.data.columns: self.data.loc[:, "Laboratory Frame"] = self.data.loc[:, self.freq_col] else: # If we have performed the shift before, copy back the unshifted data self.data.loc[:, self.freq_col] = self.data.loc[:, "Laboratory Frame"] doppler_offset = units.dop2freq(value, self.data["Laboratory Frame"].values) # Offset the values self.data.loc[:, self.freq_col] += doppler_offset self.session.velocity = value self.logger.info(f"Set the session velocity to {value}.")
[docs] def detect_noise_floor(self, region=None, als=True, **kwargs): """ Set the noise parameters for the current spectrum. Control over what "defines" the noise floor is specified with the parameter region. By default, if region is None then the function will perform an initial peak find using 1% of the maximum intensity as the threshold. The noise region will be established based on the largest gap between peaks, i.e. hopefully capturing as little features in the statistics as possible. The alternative method is invoked when the `als` argument is set to True, which will use the asymmetric least-squares method to determine the baseline. Afterwards, the baseline is decimated by an extremely heavy Gaussian blur, and one ends up with a smoothly varying baseline. In this case, there is no `noise_rms` attribute to be returned as it is not required to determine the minimum peak threshold. Parameters ---------- region : 2-tuple or None, optional If None, use the automatic algorithm. Otherwise, a 2-tuple specifies the region of the spectrum in frequency to use for noise statistics. als : bool, optional If True, will use the asymmetric least squares method to determine the baseline. kwargs Additional kwargs are passed into the ALS function. Returns ------- baseline - float Value of the noise floor rms - float Noise RMS/standard deviation """ if als is False: if region is None: # Perform rudimentary peak detection threshold = 0.2 * self.data[self.int_col].max() peaks_df = analysis.peak_find( self.data, freq_col=self.freq_col, int_col=self.int_col, thres=threshold, ) if len(peaks_df) >= 2: # find largest gap in data, including the edges of the spectrum freq_values = list(peaks_df["Frequency"].values) freq_values.extend( [self.data[self.freq_col].min(), self.data[self.freq_col].max()] ) # sort the values freq_values = sorted(freq_values) self.logger.info("Possible noise region:") self.logger.info(freq_values) index = np.argmax(np.diff(freq_values)) # Define region as the largest gap region = freq_values[index : index + 2] # Add a 30 MHz offset to either end of the spectrum region[0] = region[0] + 30.0 region[1] = region[1] - 30.0 # Make sure the frequencies are in ascending order region = np.sort(region) self.logger.info( "Noise region defined as {} to {}.".format(*region) ) noise_df = self.data.loc[self.data[self.freq_col].between(*region)] if len(noise_df) < 50: noise_df = self.data.sample(int(len(self.data) * 0.1)) self.logger.warning( "Noise region too small; taking a statistical sample." ) else: # If we haven't found any peaks, sample 10% of random channels and determine the # baseline from those values noise_df = self.data.sample(int(len(self.data) * 0.1)) self.logger.warning( "No obvious peaks detected; taking a statistical sample." ) else: # If we haven't found any peaks, sample 10% of random channels and determine the # baseline from those values noise_df = self.data.sample(int(len(self.data) * 0.1)) self.logger.warning( "No obvious peaks detected; taking a statistical sample." ) # Calculate statistics baseline = np.average(noise_df[self.int_col]) rms = np.std(noise_df[self.int_col]) self.session.noise_region = region self.session.noise_rms = rms self.session.baseline = baseline self.logger.info(f"Baseline signal set to {baseline}.") self.logger.info(f"Noise RMS set to {rms}.") elif als is True: # Use the asymmetric least-squares method to determine the baseline self.logger.info( "Using asymmetric least squares method for baseline determination." ) als_params = {"lam": 1e2, "p": 0.05, "niter": 10} als_params.update(**kwargs) baseline = fitting.baseline_als(self.data[self.int_col], **als_params) # Decimate the noise with a huge Gaussian blur baseline = gaussian_filter(baseline, 200.0) rms = np.std(baseline) self.session.baseline = baseline self.session.noise_region = "als" self.session.noise_rms = rms return baseline, rms
[docs] def find_peaks( self, threshold=None, region=None, sigma=6, min_dist=10, als=True, **kwargs ): """ Find peaks in the experiment spectrum, with a specified threshold value or automatic threshold. The method calls the peak_find function from the analysis module, which in itself wraps peakutils. The function works by finding regions of the intensity where the first derivative goes to zero and changes sign. This gives peak frequency/intensities from the digitized spectrum, which is then "refined" by interpolating over each peak and fitting a Gaussian to determine the peak. The peaks are then returned as a pandas DataFrame, which can also be accessed in the peaks_df attribute of AssignmentSession. When a value of threshold is not provided, the function will turn to use automated methods for noise detection, either by taking a single value as the baseline (not ALS), or by using the asymmetric least-squares method for fitting the baseline. In both instances, the primary intensity column to be used for analysis will be changed to "SNR", which is the recommended approach. To use the ALS algorithm there may be some tweaking involved for the parameters. These are typically found empirically, but for reference here are some "optimal" values that have been tested. For millimeter-wave spectra, larger values of lambda are favored: lambda = 1e5 p = 0.1 This should get rid of periodic (fringe) baselines, and leave the "real" signal behind. Parameters ---------- threshold : float or None, optional Peak detection threshold. If None, will take 1.5 times the noise RMS. region : 2-tuple or None, optional If None, use the automatic algorithm. Otherwise, a 2-tuple specifies the region of the spectrum in frequency to use for noise statistics. sigma : float, optional Defines the number of sigma (noise RMS) above the baseline to use as the peak detection threshold. min_dist : int, optional Number of channels between peaks to be detected. als : bool, optional If True, uses ALS fitting to determine a baseline. kwargs Additional keyword arguments are passed to the ALS fitting routine. Returns ------- peaks_df : dataframe Pandas dataframe with Frequency/Intensity columns, corresponding to peaks """ if self.int_col == "SNR": # if we run peak find again, the int_col is incorrectly # going to be set to SNR giving us bad results. Use the # last column instead. columns = self.data.columns.to_list() columns = [col for col in columns if col != "SNR"] self.int_col = columns[-1] self.logger.info(f"SNR set as int_col and is invalid for peak finding. Using {self.int_col} instead.") if threshold is None and als is False: # Use a quasi-intelligent method of determining the noise floor # and ultimately using noise + 1 sigma baseline, rms = self.detect_noise_floor(region) threshold = baseline + (rms * sigma) # Convert threshold into SNR units threshold /= baseline self.data["SNR"] = self.data[self.int_col] / np.abs(baseline) self.int_col = "SNR" self.logger.info("Now using SNR as primary intensity unit.") elif threshold is None and als is True: baseline, _ = self.detect_noise_floor(als=True, **kwargs) # Convert to SNR, and start using this instead self.data["SNR"] = self.data[self.int_col] / np.abs(baseline) self.int_col = "SNR" self.logger.info("Now using SNR as primary intensity unit.") # If using ALS, the sigma argument becomes the threshold value in SNR units threshold = sigma self.threshold = threshold self.logger.info(f"Peak detection threshold is: {threshold}") peaks_df = analysis.peak_find( self.data, freq_col=self.freq_col, int_col=self.int_col, thres=threshold, min_dist=min_dist, ) # Shift the peak intensities down by the noise baseline if als is False: baseline = getattr(self.session, "baseline", 0.0) peaks_df.loc[:, self.int_col] = peaks_df[self.int_col] - baseline self.logger.info(f"Found {len(peaks_df)} peaks in total.") # Reindex the peaks peaks_df.reset_index(drop=True, inplace=True) if len(peaks_df) != 0: # Generate U-lines self.df2ulines(peaks_df, self.freq_col, self.int_col) # Assign attribute self.peaks = peaks_df self.peaks.to_csv( f"./outputs/{self.session.experiment}-peaks.csv", index=False ) return peaks_df else: return None
[docs] def df2ulines(self, dataframe: pd.DataFrame, freq_col=None, int_col=None): """ Add a dataframe of frequency and intensities to the session U-line dictionary. This function provides more manual control over what can be processed in the assignment pipeline, as not everything can be picked up by the peak finding algorithm. Parameters ---------- dataframe : pandas dataframe Dataframe containing a frequency and intensity column to add to the uline list. freq_col : None or str Specify column to use for frequencies. If None, uses the session value freq_col. int_col : None or str Specify column to use for intensities. If None, uses the session value int_col. """ if freq_col is None: freq_col = self.freq_col if int_col is None: int_col = self.int_col self.logger.info("Modifying the experiment LineList.") # Set up session information to be passed in the U-line skip = [ "temperature", "doppler", "freq_abs", "freq_prox", "noise_rms", "baseline", "header", "noise_region", "composition", "name", "max_uncertainty", ] selected_session = { key: self.session.__dict__[key] for key in self.session.__dict__ if key not in skip } # If the Peaks key has not been set up yet, we set it up now if "Peaks" not in self.line_lists: self.line_lists["Peaks"] = LineList.from_dataframe( dataframe, name="Peaks", freq_col=freq_col, int_col=int_col, **selected_session, ) # Otherwise, we'll just update the existing LineList else: vfunc = np.vectorize(Transition) transitions = vfunc( frequency=dataframe[freq_col], intensity=dataframe[int_col], **selected_session, ) self.line_lists["Peaks"].update_linelist(transitions) new_remaining = len(self.line_lists["Peaks"]) self.logger.info(f"There are now {new_remaining} line entries in this session.") peaks_df = self.line_lists["Peaks"].to_dataframe()[["frequency", "intensity"]] peaks_df.columns = ["Frequency", "Intensity"] self.peaks = peaks_df
[docs] def search_frequency(self, frequency: float): """ Function for searching the experiment for a particular frequency. The search range is defined by the Session attribute freq_prox, and will first look for the frequency in the assigned features if any have been made. The routine will then look for it in the U-lines. Parameters ---------- frequency : float Center frequency in MHz Returns ------- dataframe Pandas dataframe with the matches """ slice_df = None if self.session.freq_abs is False: lower_freq = frequency * (1.0 - self.session.freq_prox) upper_freq = frequency * (1 + self.session.freq_prox) else: lower_freq = frequency - self.session.freq_prox upper_freq = frequency + self.session.freq_prox if hasattr(self, "table"): slice_df = self.table.loc[ (self.table["frequency"] >= lower_freq) & (self.table["frequency"] <= upper_freq) ] # If no hits turn up, look for it in U-lines if slice_df is not None: self.logger.info("No assignment found; searching U-lines") ulines = np.array( [[index, uline.frequency] for index, uline in self.ulines.items()] ) nearest, array_index = routines.find_nearest(ulines[:, 1], frequency) uline_index = int(ulines[array_index, 0]) return nearest, uline_index else: self.logger.info("Found assignments.") return slice_df
[docs] def apply_filter(self, window: Union[str, List[str]], sigma=0.5, int_col=None): """ Applies a filter to the spectral signal. If multiple window functions are to be used, a list of windows can be provided, which will then perform the convolution stepwise. With the exception of the gaussian window function, the others functions use the SciPy signal functions. A reference copy of the original signal is kept as the "Ref" column; this is used if a new window function is applied, rather than on the already convolved signal. Parameters ---------- window: str, or iterable of str Name of the window function sigma: float, optional Specifies the magnitude of the gaussian blur. Only used when the window function asked for is "gaussian". int_col: None or str, optional Specifies which column to apply the window function to. If None, defaults to the session-wide intensity column """ if int_col is None: int_col = self.int_col # If the reference spectrum exists, we'll use that instead if "Ref" in self.data.columns: int_col = "Ref" self.logger.info("Using Ref signal for window function.") else: # Make a copy of the original signal self.data["Ref"] = self.data[self.int_col].copy() self.logger.info("Copied signal to Ref column.") intensity = self.data[int_col].values self.logger.info(f"Applying {window} to column {int_col}.") # Try to see if the window variable is an iterable try: assert type(window) == list for function in iter(window): intensity = analysis.filter_spectrum(intensity, function, sigma) except AssertionError: # In the event that window is not a list, just apply the filter intensity = analysis.filter_spectrum(intensity, window, sigma) # Windowed signal is usually too small for float printing in the html # report self.data[int_col] = intensity * 1000.0
[docs] def splat_assign_spectrum(self, auto=False): """ Alias for `process_splatalogue`. Function will be removed in a later version. Parameters ---------- auto : bool Specifies whether the assignment procedure is automatic. """ self.process_splatalogue(auto=auto)
[docs] def process_clock_spurs(self, **kwargs): """ Method that will generate a LineList corresponding to possible harmonics, sum, and difference frequencies based on a given clock frequency (default: 65,000 MHz). It is advised to run this function at the end of assignments, owing to the sheer number of possible combinations of lines, which may interfere with real molecular features. Parameters ---------- kwargs Optional kwargs are passed into the creation of the LineList with `LineList.from_clock`. """ self.logger.info(f"Processing electronics RFI peaks.") clock_linelist = LineList.from_clock(**kwargs) self.process_linelist(name="Clock", linelist=clock_linelist) self.logger.info(f"Done processing electronic RFI peaks.")
[docs] def process_splatalogue(self, auto=True, progressbar=True): """ Function that will provide an "interface" for interactive line assignment in a notebook environment. Basic functionality is looping over a series of peaks, which will query splatalogue for known transitions in the vicinity. If the line is known in Splatalogue, it will throw it into an Transition object and flag it as known. Conversely, if it's not known in Splatalogue it will defer assignment, flagging it as unassigned and dumping it into the `uline` attribute. Parameters ---------- auto: bool If True the assignment process does not require user input, otherwise will prompt user. """ ulines = self.line_lists["Peaks"].get_ulines() self.logger.info(f"Beginning Splatalogue lookup on {len(ulines)} lines.") iterator = enumerate(ulines) if progressbar is True: iterator = tqdm(iterator) for index, uline in iterator: frequency = uline.frequency self.logger.info(f"Searching for frequency {frequency:,.4f}") # Call splatalogue API to search for frequency if self.session.freq_abs is True: width = self.session.freq_prox else: width = self.session.freq_prox * frequency splat_df = analysis.search_center_frequency(frequency, width=width) if splat_df is not None: # Filter out lines that are way too unlikely on grounds of temperature splat_df = splat_df.loc[splat_df["E_U (K)"] <= self.t_threshold] # Filter out quack elemental compositions for index, row in splat_df.iterrows(): # Convert the string into a chemical formula object try: # Clean vibrational state and torsional specification cation_flag = False clean_formula = row["Species"].split("v")[0] # Remove labels for label in [ "l-", "c-", "t-", ",", "-gauche", "cis-", "trans-", "trans", "anti", "sym", "=", ]: clean_formula = clean_formula.replace(label, "") # Remove the torsional states clean_formula = clean_formula.split("-")[-1] # These are typically charged, and the chemical fomrula parsing does not play well if "+" in clean_formula: cation_flag = True clean_formula = clean_formula.replace("+", "") formula_obj = formula(clean_formula) # Add the charge back on if cation_flag is True: clean_formula += "+" # Check if proposed molecule contains atoms not # expected in composition comp_check = all( str(atom) in self.session.composition for atom in formula_obj.atoms ) if comp_check is False: # If there are crazy things in the mix, forget about it self.logger.info("Molecule " + clean_formula + " rejected.") splat_df.drop(index, inplace=True) except: self.logger.warning( "Could not parse molecule " + clean_formula + " rejected." ) splat_df.drop(index, inplace=True) nitems = len(splat_df) splat_df = analysis.calc_line_weighting( frequency, splat_df, prox=self.session.freq_prox, abs=self.session.freq_abs, ) if splat_df is not None: self.logger.info( f"Found {len(splat_df)} candidates for frequency {frequency:,.4f}, index {index}." ) if self.verbose is True: display(HTML(splat_df.to_html())) try: if auto is False: # If not automated, we need a human to look at frequencies # Print the dataframe for notebook viewing splat_index = int( input( "Please choose an assignment index: 0 - " + str(nitems - 1) ) ) else: # If automated, choose closest frequency splat_index = 0 self.logger.info(f"Index {splat_index} was chosen.") ass_df = splat_df.iloc[[splat_index]] splat_df.to_csv( f"queries/{self.session.experiment}-{index}.csv", index=False, ) ass_dict = { "uline": False, "frequency": frequency, "intensity": uline.intensity, "name": ass_df["Chemical Name"][0], "catalog_frequency": ass_df["Frequency"][0], "catalog_intensity": ass_df["CDMS/JPL Intensity"][0], "formula": ass_df["Species"][0], "r_qnos": ass_df["Resolved QNs"][0], # "S": ass_df["$S_{ij}^2 D^2$"][0], "ustate_energy": ass_df["E_U (K)"][0], "weighting": ass_df["Weighting"][0], "source": "CDMS/JPL", "deviation": frequency - ass_df["Frequency"][0], } # Update the Transition entry uline.__dict__.update(**ass_dict) except ValueError: # If nothing matches, keep in the U-line # pile. self.logger.info(f"Deferring assignment for index {index}.") else: # Throw into U-line pile if no matches at all self.logger.info(f"No species known for {frequency:,.4f}") self.logger.info("Splatalogue search finished.")
[docs] def overlay_molecule(self, species: str, freq_range=None, threshold=-7.0): """ Function to query splatalogue for a specific molecule. By default, the frequency range that will be requested corresponds to the spectral range available in the experiment. Parameters ---------- species : str Identifier for a specific molecule, typically name Returns ------- FigureWidget Plotly FigureWidget that shows the experimental spectrum along with the detected peaks, and the molecule spectrum. DataFrame Pandas DataFrame from the Splatalogue query. Raises ------ Exception If no species are found in the query, raises Exception. """ if freq_range is None: freq_range = [ self.data[self.freq_col].min(), self.data[self.freq_col].max(), ] molecule_df = analysis.search_molecule(species, freq_range) # Threshold intensity molecule_df = molecule_df.loc[molecule_df["CDMS/JPL Intensity"] >= threshold] # This prevents missing values from blowing up the normalization molecule_df.loc[molecule_df["CDMS/JPL Intensity"] > -1.0][ "CDMS/JPL Intensity" ] = -1.0 # Calculate the real intensity and normalize molecule_df["Intensity"] = 10 ** molecule_df["CDMS/JPL Intensity"] molecule_df["Normalized"] = ( molecule_df["Intensity"] / molecule_df["Intensity"].max() ) # Add annotations to the hovertext molecule_df["Annotation"] = ( molecule_df["Resolved QNs"].astype(str) + "-" + molecule_df["E_U (K)"].astype(str) ) if len(molecule_df) != 0: fig = self.plot_spectrum() fig.add_bar( x=molecule_df["Frequency"], y=molecule_df["Normalized"], hoverinfo="text", text=molecule_df["Annotation"], name=species, width=2.0, ) molecule_df.to_csv(f"linelists/{species}-splatalogue.csv", index=False) return fig, molecule_df else: raise Exception(f"No molecules found in Splatalogue named {species}.")
[docs] def process_linelist_batch(self, param_dict=None, yml_path=None, **kwargs): """ Function for processing a whole folder of catalog files. This takes a user-specified mapping scheme that will associate catalog files with molecule names, formulas, and any other `LineList`/`Transition` attributes. This can be in the form of a dictionary or a YAML file; one has to be provided. An example scheme is given here: { "cyclopentadiene": { "formula": "c5h6", "filepath": "../data/catalogs/cyclopentadiene.cat" } } The top dictionary has keys corresponding to the name of the molecule, and the value as a sub dictionary containing the formula and filepath to the catalog file as minimum input. You can also provide additional details that are `Transition` attributes: { "benzene": { "formula": "c6h6", "filepath": "../data/catalogs/benzene.cat", "smiles": "c1ccccc1", "publc": False } } Parameters ---------- param_dict : dict or None, optional If not None, a dictionary containing the mapping scheme will be used to process the catalogs. Defaults to None. yml_path : str or None, optional If not None, corresponds to a str filepath to the YAML file to be read. kwargs Additional keyword arguments will be passed into the assignment process, which are the args for `process_linelist`. Raises ------ ValueError : If yml_path and param_dict args are the same value. """ if param_dict == yml_path: raise ValueError("Please provide arguments to param_dict or yml_path.") if yml_path: param_dict = routines.read_yaml(yml_path) yml_path = Path(yml_path) root = yml_path.parents[0] linelists = list() self.logger.info(f"Processing {len(param_dict)} catalog entries in batch mode.") for name, subdict in param_dict.items(): try: # Append the directory path to the catalog filepath subdict["filepath"] = str(root.joinpath(subdict["filepath"])) self.logger.info(f"Creating LineList for molecule {name}") extension = Path(subdict["filepath"]).suffix if extension == ".cat": func = LineList.from_catalog elif extension == ".lin": func = LineList.from_lin else: raise ValueError( f"File extension not recognized: {name} extension: {extension}" ) linelist_obj = func(name=name, **subdict) print(linelist_obj) self.process_linelist(name=linelist_obj.name, linelist=linelist_obj) except ValueError: self.logger.warning(f"Could not parse molecule {name}.") raise Exception(f"Could not parse molecule {name} from dictionary.") self.logger.info("Completed catalog batch analysis.")
[docs] def process_linelist( self, name=None, formula=None, filepath=None, linelist=None, auto=True, thres=-10.0, progressbar=True, tol=None, **kwargs, ): """ General purpose function for performing line assignments using local catalog and line data. The two main ways of running this function is to either provide a linelist or filepath argument. The type of linelist will be checked to determine how the catalog data will be processed: if it's a string, it will be used to use Parameters ---------- name: str, optional Name of the molecule being assigned. This should be specified when providing a new line list, which then gets added to the experiment. formula: str, optional Chemical formula for the molecule being assigned. Should be added in conjuction with name. filepath: str, optional If a linelist is not given, a filepath can be specified corresponding to a .cat or .lin file, which will be used to create a LineList object. linelist: str or LineList, optional Can be the name of a molecule or LineList object; the former is specified as a string which looks up the experiment line_list attribute for an existing LineList object. If a LineList object is provided, the function will use this directly. auto: bool, optional Specifies whether the assignment procedure works without intervention. If False, the user will be prompted to provide a candidate index. thres: float, optional log Intensity cut off used to screen candidates. progressbar: bool, optional If True, a tqdm progressbar will indicate assignment progress. tol: float, optional Tolerance for making assignments. If None, the function will default to the session-wide values of freq_abs and freq_prox to determine the tolerance. kwargs Kwargs are passed to the Transition object update when assignments are made. """ # First case is if linelist is provided as a string corresponding to the key in the line_lists attribute self.logger.info(f"Processing local catalog for molecule {name}.") if type(linelist) == str: linelist = self.line_lists[linelist] # In the event that linelist is an actual LineList object elif type(linelist) == LineList: if linelist.name not in self.line_lists: self.line_lists[linelist.name] = linelist elif filepath: # If a catalog is specified, create a LineList object with this catalog. The parser is inferred from the # file extension. if ".cat" in filepath: func = LineList.from_catalog elif ".lin" in filepath: func = LineList.from_lin else: raise Exception( "File extension for reference line list not recognized!" ) linelist = func( name=name, formula=formula, filepath=filepath, min_freq=self.data[self.freq_col].min(), max_freq=self.data[self.freq_col].max(), ) if name not in self.line_lists: self.line_lists[name] = linelist else: raise Exception("Please specify an internal or external line list!") if linelist is not None: nassigned = 0 # Sort the LineList by intensity if possible. If the strongest line isn't # there, the other lines shouldn't be there if linelist.source in ["Splatalogue", "Catalog"]: linelist.transitions = sorted( linelist, key=lambda line: line.catalog_intensity )[::-1] # Filter out out-of-band stuff, but with some small tolerance extending # out just in case it's close max_freq = max(self.line_lists["Peaks"].get_frequencies()) * 1.001 min_freq = min(self.line_lists["Peaks"].get_frequencies()) * 0.999 # Make sure we're only assigning with in-band, has a suitable uncertainty, # and that the lower state energy is within range transitions = [ transition for transition in linelist.transitions if all( [ min_freq <= transition.catalog_frequency <= max_freq, transition.uncertainty <= self.session.max_uncertainty, transition.lstate_energy <= self.t_threshold, ] ) ] # Loop over the LineList lines if progressbar is True: iterator = tqdm(transitions) iterator = enumerate(iterator) else: iterator = enumerate(transitions) # Loop over all of the U-lines for index, transition in iterator: # Control the flow so that we're not wasting time looking for lines if the strongest # transitions are not seen # if (index > 5) and (nassigned == 0) and (linelist.source in ["Catalog", "Splatalogue"]): # self.logger.info( # "Searched for five strongest transitions in {linelist.name}, and nothing; aborting." # ) # break # If no value of tolerance is provided, determine from the session if tol is None: if self.session.freq_abs is True: tol = self.session.freq_prox else: tol = (1.0 - self.session.freq_prox) * transition.frequency # Log the search self.logger.info( f"Searching for frequency {transition.catalog_frequency:,.4f}" f" with tolerances: {self.t_threshold:.2f} K, " f" +/-{tol:.4f} MHz, {thres} intensity." ) # Find transitions in the peak list that might match can_pkg = self.line_lists["Peaks"].find_candidates( transition.catalog_frequency, lstate_threshold=self.t_threshold, freq_tol=tol, int_tol=thres, ) # If there are actual candidates instead of NoneType, we can process it. if can_pkg is not None: candidates, weighting = can_pkg ncandidates = len(candidates) self.logger.info(f"Found {ncandidates} possible matches.") make_assignment = True # If auto mode or if there's just one candidate, just take the highest weighting if auto is True: chosen = candidates[weighting.argmax()] if chosen.uline is False: # If this line has previously been assigned, then we # fill up the multiple "buffer" if not chosen.check_molecule(transition): if transition not in chosen.multiple: # Only add more transitions if this is a # different molecule and it's not already in # the list chosen.multiple.append(transition) make_assignment = False elif auto is False: for cand_idx, candidate in enumerate(candidates): print(cand_idx, candidate) chosen_idx = int( input("Please specify the candidate index. ") ) chosen = candidates[chosen_idx] if make_assignment is True: self.logger.info( f"Assigning {transition.name}" f" (catalog {transition.catalog_frequency:,.4f})" f" to peak {index} at {chosen.frequency:,.4f}." ) # Create a copy of the Transition data from the LineList assign_dict = copy(transition.__dict__) # Update with the measured frequency and intensity assign_dict["frequency"] = chosen.frequency assign_dict["intensity"] = chosen.intensity assign_dict["experiment"] = chosen.experiment assign_dict["velocity"] = self.session.velocity assign_dict["peak_id"] = chosen.peak_id assign_dict["uline"] = False # Add any other additional kwargs assign_dict.update(**kwargs) # Copy over the information from the assignment, and update # the experimental peak information with the assignment chosen.__dict__.update(**assign_dict) nassigned += 1 self.logger.info( f"Assigned {nassigned} new transitions to {linelist.name}." ) else: self.logger.warning("LineList was empty, and no lines were assigned.")
[docs] def process_db(self, auto=True, dbpath=None): """ Function for assigning peaks based on a local database file. The database is controlled with the SpectralCatalog class, which will handle all of the searching. Parameters ---------- auto : bool, optional If True, the assignments are made automatically. dbpath : str or None, optional Filepath to the local database. If none is supplied, uses the default value from the user's home directory. """ # Get the U-line list ulines = self.line_lists.get("Peaks", None).get_ulines() old_nulines = len(ulines.get_ulines()) db = database.SpectralCatalog(dbpath) self.logger.info(f"Processing local database: {dbpath}") for index, uline in tqdm(enumerate(ulines)): self.logger.info(f"Searching database for {uline.frequency:.4f}") catalog_df = db.search_frequency( uline.frequency, self.session.freq_prox, self.session.freq_abs ) if catalog_df is not None: catalog_df["frequency"].replace(0.0, np.nan, inplace=True) catalog_df["frequency"].fillna( catalog_df["catalog_frequency"], inplace=True ) if len(catalog_df) > 0: sliced_catalog = analysis.line_weighting( uline.frequency, catalog_df, prox=self.session.freq_prox, abs=self.session.freq_abs, freq_col="frequency", ) if self.verbose is True: display(HTML(sliced_catalog.to_html())) if auto is False: index = int(input("Please choose a candidate by index.")) elif auto is True: index = 0 if index in catalog_df.index: select_df = catalog_df.iloc[index] # Create an approximate quantum number string new_dict = { "index": index, "frequency": uline.frequency, "source": "Database", "deviation": uline.frequency - select_df["frequency"], "uline": False, } assign_dict = select_df.to_dict() assign_dict.update(**new_dict) # Use assign_line function to mark peak as assigned uline.__dict__.update(**assign_dict) else: raise IndexError("Invalid index chosen for assignment.") # Update the internal table ass_df = pd.DataFrame(data=[ass_obj.__dict__ for ass_obj in self.assignments]) remaining_ulines = self.line_lists.get("Peaks").get_ulines() self.table = ass_df self.logger.info(f"Prior number of ulines: {old_nulines}") self.logger.info(f"Current number of ulines: {len(remaining_ulines)}") self.logger.info("Finished processing local database.")
[docs] def copy_assignments(self, other: "AssignmentSession", thres_prox=1e-2): """ Function to copy assignments from another experiment. This class method wraps two analysis routines: first, correlations in detected peaks are found, and indexes of where correlations are found will be used to locate the corresponding Transition object, and copy its data over to the current experiment. Parameters ---------- other : AssignmentSession object The reference AssignmentSession object to copy assignments from thres_prox : float, optional Threshold for considering coincidences between spectra. """ self.logger.info( f"Copying assignments from Experiment {other.session.experiment}" ) _, indices = analysis.correlate_experiments( [self, other], thres_prox=thres_prox ) # Get the correlation matrix corr_mat = indices[other.session.experiment][1] analysis.copy_assignments(self, other, corr_mat) n_assign = len(np.where(corr_mat > 0)[0]) self.logger.info(f"Copied {n_assign} assignments.")
[docs] def blank_spectrum(self, noise=0.0, noise_std=0.05, window=1.0): """ Blanks a spectrum based on the lines already previously assigned. The required arguments are the average and standard deviation of the noise, typically estimated by picking a region free of spectral features. The spectra are sequentially blanked - online catalogs first, followed by literature species, finally the private assignments. Parameters ---------- noise - float Average noise value for the spectrum. Typically measured by choosing a region void of spectral lines. noise_std - float Standard deviation for the spectrum noise. window - float Value to use for the range to blank. This region blanked corresponds to frequency+/-window. """ sources = ["CDMS/JPL", "Literature", "New"] slices = [ self.table.loc[self.table["source"] == "CDMS/JPL"], self.table.loc[ (self.table["source"] != "CDMS/JPL") & (self.table["public"] == True) ], self.table.loc[ (self.table["source"] != "CDMS/JPL") & (self.table["public"] == False) ], ] last_source = "Catalog" for index, (df, source) in enumerate(zip(slices, sources)): try: if len(df) > 0: if index == 0: reference = self.int_col else: reference = last_source blanked_spectrum = analysis.blank_spectrum( self.data, df["frequency"].values, noise, noise_std, self.freq_col, reference, window, df=False, ) self.data[source] = blanked_spectrum last_source = source except (KeyError, ValueError): self.logger.warning( f"Could not blank spectrum {last_source} with {source}." )
def _get_assigned_names(self): """ Method for getting all the unique molecules out of the assignments, and tally up the counts. :return identifications: dict containing a tally of molecules identified """ names = [ass_obj.name for ass_obj in self.line_lists["Peaks"].get_assignments()] # Get unique names seen = set() seen_add = seen.add self.names = [name for name in names if not (name in seen or seen_add(name))] # Tally up the molecules self.identifications = {name: names.count(name) for name in self.names} return self.identifications
[docs] def create_uline_ftb_batch( self, filepath=None, shots=500, dipole=1.0, threshold=0.0, sort_int=False, atten=None, ): """ Create an FTB file for use in QtFTM based on the remaining ulines. This is used to provide cavity frequencies. If a filepath is not specified, a -uline.ftb file will be created in the ftb folder. The user has the ability to control parameters of the batch by setting a global shot count, dipole moment, and minimum intensity value for creation. Parameters ---------- filepath : str or None, optional Path to save the .ftb file to. If None, defaults to the session ID. shots : int, optional Number of shots to integrate on each frequency dipole : float, optional Dipole moment in Debye attenuation target for each frequency threshold : float, optional Minimum value for the line intensity to be considered. For example, if the spectrum is analyzed in units of SNR, this would be the minimum value of SNR to consider in the FTB file. sort_int : bool, optional If True, sorts the FTB entries in descending intensity order. atten : None or int, optional Value to use for the attenuation, overwriting the `dipole` argument. This is useful for forcing cavity power in the high band. """ if filepath is None: filepath = f"./ftb/{self.session.experiment}-ulines.ftb" lines = "" transitions = self.line_lists["Peaks"].get_ulines() transitions = [ transition for transition in transitions if transition.intensity >= threshold ] if atten is not None: assert type(atten) == int ftb_settings = {"atten": atten} else: ftb_settings = {"dipole": dipole} # Sort the intensities such that the strongest ones are scanned first. if sort_int is True: transitions = sorted(transitions, key=lambda line: line.intensity)[::-1] for index, uline in enumerate(transitions): if uline.intensity >= threshold: lines += fa.generate_ftb_line(uline.frequency, shots, **ftb_settings) with open(filepath, "w+") as write_file: write_file.write(lines)
[docs] def create_uline_dr_batch( self, filepath=None, select=None, shots=25, dipole=1.0, min_dist=500.0, thres=None, atten=None, drpower=13, ): """ Create an FTB batch file for use in QtFTM to perform a DR experiment. A list of selected frequencies can be used as the cavity frequencies, which will subsequently be exhaustively DR'd against by all of the U-line frequencies remaining in this experiment. The file is then saved to "ftb/XXX-dr.ftb". Parameters ---------- filepath: str, optional Path to save the ftb file to. Defaults to ftb/{}-dr.ftb select: list of floats, optional List of frequencies to use as cavity frequencies. Defaults to None, which will just DR every frequency against each other. shots: int, optional Number of integration shots dipole: float, optional Dipole moment used for attenuation setting gap: float, optional Minimum frequency difference between cavity and DR frequency to actually perform the experiment thres: None or float, optional Minimum value in absolute intensity units to consider in the DR batch. If None, this is ignored (default). atten: None or int, optional Value to use for the attenuation, overwriting the `dipole` argument. This is useful for forcing cavity power in the high band. """ ulines = self.line_lists["Peaks"].get_ulines() if select is None: cavity_freqs = [uline.frequency for uline in ulines] dr_freqs = [uline.frequency for uline in ulines] else: cavity_freqs = select dr_freqs = select if thres is not None: intensities = np.array([uline.intensity for uline in ulines]) mask = np.where(intensities >= thres) dr_freqs = np.asarray(dr_freqs)[mask] cavity_freqs = np.asarray(cavity_freqs) ftb_settings = {"drpower": -20, "skiptune": False, "dipole": dipole} if atten is not None: assert type(atten) == int del ftb_settings["dipole"] ftb_settings["atten"] = int(atten) lines = "" for cindex, cavity in enumerate(cavity_freqs): for dindex, dr in enumerate(dr_freqs): if dindex == 0: ftb_settings.update(**{"drpower": -20, "skiptune": False}) lines += fa.generate_ftb_line(cavity, shots, **ftb_settings) if np.abs(cavity - dr) >= min_dist: ftb_settings.update( **{"drpower": drpower, "skiptune": True, "drfreq": dr} ) lines += fa.generate_ftb_line(cavity, shots, **ftb_settings) if filepath is None: filepath = f"ftb/{self.session.experiment}-dr.ftb" with open(filepath, "w+") as write_file: write_file.write(lines)
[docs] def create_full_dr_batch( self, cavity_freqs: List[float], filepath=None, shots=25, dipole=1.0, min_dist=500.0, atten=None, drpower=13, ): """ Create an FTB batch file for use in QtFTM to perform a DR experiment. A list of selected frequencies can be used as the cavity frequencies, which will subsequently be exhaustively DR'd against by ALL frequencies in the experiment. The file is then saved to "ftb/XXX-full-dr.ftb". The ``atten`` parameter provides a more direct way to control RF power; if this value is used, it will overwrite the dipole moment setting. Parameters ---------- cavity_freqs : iterable of floats Iterable of frequencies to tune to, in MHz. filepath : str, optional Path to save the ftb file to. Defaults to ftb/{}-dr.ftb shots : int, optional Number of integration shots dipole : float, optional Dipole moment used for attenuation setting min_dist : float, optional Minimum frequency difference between cavity and DR frequency to actually perform the experiment atten : None or int, optional Value to set the rf attenuation. By default, this is None, which will use the dipole moment instead to set the rf power. If a value is provided, it will overwrite whatever the dipole moment setting is. """ dr_freqs = np.array(self.line_lists["Peaks"].get_frequencies()) lines = "" ftb_settings = {"drpower": -20, "skiptune": False, "dipole": dipole} if atten is not None: assert type(atten) == int del ftb_settings["dipole"] ftb_settings["atten"] = int(atten) for cindex, cavity in enumerate(cavity_freqs): for dindex, dr in enumerate(dr_freqs): if dindex == 0: ftb_settings.update(**{"drpower": -20, "skiptune": False}) lines += fa.generate_ftb_line(cavity, shots, **ftb_settings) if np.abs(cavity - dr) >= min_dist: ftb_settings.update( **{"drpower": drpower, "skiptune": True, "drfreq": dr} ) lines += fa.generate_ftb_line(cavity, shots, **ftb_settings) if filepath is None: filepath = f"ftb/{self.session.experiment}-full-dr.ftb" with open(filepath, "w+") as write_file: write_file.write(lines)
[docs] def analyze_molecule( self, Q=None, T=None, name=None, formula=None, smiles=None, chi_thres=10.0 ): """ Function for providing some astronomically relevant parameters by analyzing Gaussian line shapes. Parameters ---------- Q - float Partition function at temperature T T - float Temperature in Kelvin name - str, optional Name of the molecule to perform the analysis on. Can be used as a selector. formula - str, optional Chemical formula of the molecule to perform the analysis on. Can be used as a selector. smiles - str, optional SMILES code of the molecule to perform the analysis on. Can be used as a selector, chi_thres - float Threshold for the Chi Squared value to consider fits for statistics. Any instances of fits with Chi squared values above this value will not be used to calculate line profile statistics. Returns ------- return_data - list First element is the profile dataframe, and second element is the fitted velocity. If a rotational temperature analysis is also performed, the third element will be the least-squares regression. """ if name: selector = "name" value = name elif formula: selector = "formula" value = formula elif smiles: selector = "smiles" value = smiles else: raise Exception( "No valid selector specified! Please give a name, formula, or SMILES code." ) # Loop over all of the assignments mol_data = list() for index, ass_obj in enumerate(self.assignments): # If the assignment matches the criteria # we perform the analysis if ass_obj.__dict__[selector] == value: self.logger.info( f"Performing line profile analysis on assignment index {index}." ) # Perform a Gaussian fit whilst supplying as much information as we can # The width is excluded because it changes significantly between line profiles fit_result, summary = analysis.fit_line_profile( self.data, center=ass_obj.frequency, intensity=ass_obj.intensity, freq_col=self.freq_col, int_col=self.int_col, logger=self.logger, ) # If the fit actually converged and worked if fit_result: # Calculate what the lab frame frequency would be in order to calculate the frequency offset lab_freq = fit_result.best_values["center"] + units.dop2freq( self.session.velocity, fit_result.best_values["center"] ) summary["Frequency offset"] = lab_freq - ass_obj.catalog_frequency summary["Doppler velocity"] = units.freq2vel( ass_obj.catalog_frequency, summary["Frequency offset"] ) if Q is not None and T is not None: # Add the profile parameters to list profile_dict = aa.lineprofile_analysis( fit_result, ass_obj.I, Q, T, ass_obj.ustate_energy ) ass_obj.N = profile_dict["N cm$^{-2}$"] summary.update(profile_dict) ass_obj.fit = fit_result mol_data.append(summary) # If there are successful analyses performed, format the results if len(mol_data) > 0: profile_df = pd.DataFrame(data=mol_data) # Sort the dataframe by ascending order of chi square - better fits are at the top profile_df.sort_values(["Chi squared"], inplace=True) # Threshold the dataframe to ensure good statistics profile_df = profile_df.loc[profile_df["Chi squared"] <= chi_thres] # Calculate the weighted average VLSR based on the goodness-of-fit profile_df.loc[:, "Weight"] = ( profile_df["Chi squared"].max() / profile_df["Chi squared"].values ) weighted_avg = np.sum( profile_df["Weight"] * profile_df["Doppler velocity"] ) / np.sum(profile_df["Weight"]) # Calculate the weighted standard deviation stdev = np.sum( profile_df["Weight"] * (profile_df["Doppler velocity"] - weighted_avg) ** 2 ) / np.sum(profile_df["Weight"]) self.logger.info( f"Calculated VLSR: {weighted_avg:.3f}+/-{stdev:.3f}" f" based on {len(profile_df)} samples." ) return_data = [profile_df, ufloat(weighted_avg, stdev)] # If there's data to perform a rotational temperature analysis, then do it! if "L" in profile_df.columns: self.logger.info("Performing rotational temperature analysis.") rot_temp = aa.rotational_temperature_analysis( profile_df["L"], profile_df["E upper"] ) self.logger.info(rot_temp.fit_report()) return_data.append(rot_temp) return return_data else: self.logger.info("No molecules found, or fits were unsuccessful!") return None
[docs] def finalize_assignments(self): """ Function that will complete the assignment process by serializing DataClass objects and formatting a report. Creates summary pandas dataframes as self.table and self.profiles, which correspond to the assignments and fitted line profiles respectively. """ assignments = self.line_lists["Peaks"].get_assignments() ulines = self.line_lists["Peaks"].get_ulines() if len(assignments) > 0: for obj in assignments: if len(obj.multiple) != 0 and obj.final is False: warnings.warn( f"Transition at {obj.frequency:4f} has multiple candidates." ) warnings.warn(f"Please choose assignment for peak {obj.peak_id}.") else: # Set the transition as finalized obj.final = True # Dump all the assignments into YAML format obj.to_file(f"assignment_objs/{obj.experiment}-{obj.peak_id}", "yaml") obj.deviation = obj.catalog_frequency - obj.frequency # Convert all of the assignment data into a CSV file assignment_df = pd.DataFrame(data=[obj.__dict__ for obj in assignments]) self.table = assignment_df self.table.sort_values(["frequency"], ascending=True, inplace=True) # Generate a LaTeX table for publication self.create_latex_table() # Dump assignments to disk assignment_df.to_csv(f"reports/{self.session.experiment}.csv", index=False) # Dump Uline data to disk peak_data = [[peak.frequency, peak.intensity] for peak in ulines] peak_df = pd.DataFrame(peak_data, columns=["Frequency", "Intensity"]) peak_df.to_csv(f"reports/{self.session.experiment}-ulines.csv", index=False) tally = self._get_assigned_names() combined_dict = { "assigned_lines": len(assignments), "ulines": len(ulines), "peaks": self.line_lists["Peaks"].get_frequencies(), "num_peaks": len(self.line_lists["Peaks"]), "tally": tally, "unique_molecules": self.names, "num_unique": len(self.names), } # Combine Session information combined_dict.update(self.session.__dict__) # Dump to disk routines.dump_yaml(f"sessions/{self.session.experiment}.yml", "yaml") self._create_html_report() # Dump data to notebook output for key, value in combined_dict.items(): self.logger.info(key + ": " + str(value)) else: raise Exception( "No assignments made in this session - nothing to finalize!" )
[docs] def clean_folder(self, action=False): """ Method for cleaning up all of the directories used by this routine. Use with caution!!! Requires passing a True statement to actually clean up. Parameters ---------- action : bool If True, folders will be deleted. If False (default) nothing is done. """ folders = ["assignment_objs", "queries", "sessions", "clean", "reports"] if action is True: for folder in folders: rmtree(folder)
[docs] def update_database(self, dbpath=None): """ Adds all of the entries to a specified SpectralCatalog database. The database defaults to the global database stored in the home directory. This method will remove everything in the database associated with this experiment's ID, and re-add the entries. Parameters ---------- dbpath : str, optional path to a SpectralCatalog database. Defaults to the system-wide catalog. """ with database.SpectralCatalog(dbpath) as db_obj: # Tabula rasa db_obj.remove_experiment(self.session.experiment) ass_list = [assignment.__dict__ for assignment in self.assignments] db_obj.insert_multiple(ass_list)
[docs] def simulate_sticks( self, catalogpath: str, N: float, Q: float, T: float, doppler=None, gaussian=False, ): """ Simulates a stick spectrum with intensities in flux units (Jy) for a given catalog file, the column density, and the rotational partition function at temperature T. Parameters ---------- catalogpath : str path to SPCAT catalog file N : float column density in cm^-2 Q : float partition function at temperature T T : float temperature in Kelvin doppler : float, optional doppler width in km/s; defaults to session wide value gaussian : bool, optional if True, simulates Gaussian profiles instead of sticks Returns ------- :return: if gaussian is False, returns a dataframe with sticks; if True, returns a simulated Gaussian line profile spectrum """ # If no Doppler width provided, use the session wide value if doppler is None: doppler = self.session.doppler catalog_df = aa.simulate_catalog(catalogpath, N, Q, T, doppler) # Take only the values within band catalog_df = catalog_df[ (catalog_df["Frequency"] >= self.data[self.freq_col].min()) & (catalog_df["Frequency"] <= self.data[self.freq_col].max()) ] if gaussian is False: return catalog_df[["Frequency", "Flux (Jy)"]] else: # Convert Doppler width to frequency widths widths = units.dop2freq(doppler, catalog_df["Frequency"].values) # Calculate the Gaussian amplitude amplitudes = catalog_df["Flux (Jy)"] / np.sqrt(2.0 * np.pi ** 2.0 * widths) sim_y = self.simulate_spectrum( self.data[self.freq_col], catalog_df["Frequency"].values, widths, amplitudes, ) simulated_df = pd.DataFrame( data=list(zip(self.data[self.freq_col], sim_y)), columns=["Frequency", "Flux (Jy)"], ) return simulated_df
[docs] def simulate_spectrum( self, x: np.ndarray, centers: List[float], widths: List[float], amplitudes: List[float], fake=False, ): """ Generate a synthetic spectrum with Gaussians with the specified parameters, on a given x axis. GaussianModel is used here to remain internally consistent with the rest of the code. x: array of x values to evaluate Gaussians on centers: array of Gaussian centers widths: array of Gaussian widths amplitudes: array of Gaussian amplitudes fake: bool indicating whether false intensities are used for the simulation :return y: array of y values """ y = np.zeros(len(x)) model = GaussianModel() for c, w, a in zip(centers, widths, amplitudes): if fake is True: scaling = a else: scaling = 1.0 y += scaling * model.eval(x=x, center=c, sigma=w, amplitude=a) return y
[docs] def clean_spectral_assignments(self, window=1.0): """ Function to blank regions of the spectrum that have already been assigned. This function takes the frequencies of assignments, and uses the noise statistics to generate white noise to replace the peak. This is to let one focus on unidentified features, rather than be distracted by the assignments with large amplitudes. Parameters ---------- window: float, optional Frequency value in MHz to blank. The region corresponds to the frequency +/- this value. """ # Make a back up of the full spectrum if "Full" not in self.data.columns: self.data["Full"] = self.data[self.int_col].copy() # If the backup exists, restore the backup first before blanking. else: self.data[self.int_col] = self.data["Full"].copy() assignments = self.line_lists["Peaks"].get_assignments() frequencies = np.array([transition.frequency for transition in assignments]) self.logger.info(f"Blanking spectrum over {frequencies} windows.") if self.session.noise_region == "als": baseline = np.mean(self.session.baseline) baseline /= baseline rms = np.std(baseline) else: baseline = self.session.baseline rms = self.session.noise_rms self.data[self.int_col] = analysis.blank_spectrum( self.data, frequencies, baseline, rms, freq_col=self.freq_col, int_col=self.int_col, window=window, df=False, )
[docs] def calculate_assignment_statistics(self): """ Function for calculating some aggregate statistics of the assignments and u-lines. This breaks the assignments sources up to identify what the dominant source of information was. The two metrics for assignments are the number of transitions and the intensity contribution assigned by a particular source. :return: dict """ reduced_table = self.table[ [ "frequency", "intensity", "formula", "name", "catalog_frequency", "deviation", "ustate_energy", "source", "public", ] ] artifacts = reduced_table.loc[reduced_table["name"] == "Artifact"] splat = reduced_table.loc[reduced_table["source"] == "CDMS/JPL"] local = reduced_table.loc[ (reduced_table["source"] != "Artifact") & (reduced_table["source"] != "CDMS/JPL") ] public = local.loc[local["public"] == True] private = local.loc[local["public"] == False] sources = [ "Artifacts", "Splatalogue", "Published molecules", "Unpublished molecules", ] # Added up the total number of lines total_lines = len(self.line_lists["Peaks"]) # Add up the total intensity ulines = self.line_lists.get("Peaks").get_ulines() total_intensity = np.sum([uline.intensity for uline in ulines]) total_intensity += np.sum(reduced_table["intensity"]) line_breakdown = [len(source) for source in [artifacts, splat, public, private]] intensity_breakdown = [ np.sum(source["intensity"]) for source in [artifacts, splat, public, private] ] # Calculate the aggregate statistics cum_line_breakdown = np.cumsum(line_breakdown) cum_int_breakdown = np.cumsum(intensity_breakdown) # Organize results into dictionary for return return_dict = { "sources": sources, # These are absolute values "abs": { "total lines": total_lines, "total intensity": total_intensity, "line breakdown": line_breakdown, "intensity breakdown": intensity_breakdown, "cumulative line breakdown": cum_line_breakdown, "cumulative intensity breakdown": cum_int_breakdown, }, # These are the corresponding values in percentage "percent": { "line breakdown": [ (value / total_lines) * 100.0 for value in line_breakdown ], "intensity breakdown": [ (value / total_intensity) * 100.0 for value in intensity_breakdown ], "cumulative line breakdown": [ (value / total_lines) * 100.0 for value in cum_line_breakdown ], "cumulative intensity breakdown": [ (value / total_intensity) * 100.0 for value in cum_int_breakdown ], }, "molecules": { "CDMS/JPL": { name: len(splat.loc[splat["name"] == name]) for name in splat["name"].unique() }, "Published": { name: len(public.loc[public["name"] == name]) for name in public["name"].unique() }, "Unpublished": { name: len(private.loc[private["name"] == name]) for name in private["name"].unique() }, }, } return return_dict
[docs] def plot_spectrum(self, simulate=False): """ Generates a Plotly figure of the spectrum. If U-lines are present, it will plot the simulated spectrum also. """ fig = go.FigureWidget() fig.layout["xaxis"]["title"] = "Frequency (MHz)" fig.layout["xaxis"]["tickformat"] = ".," fig.add_scattergl( x=self.data[self.freq_col], y=self.data[self.int_col], name="Experiment", opacity=0.6, ) if "Peaks" in self.line_lists: ulines = self.line_lists["Peaks"].get_ulines() labels = list(range(len(ulines))) amplitudes = np.array([uline.intensity for uline in ulines]) centers = np.array([uline.frequency for uline in ulines]) # Add sticks for U-lines fig.add_bar( x=centers, y=amplitudes, hoverinfo="text", text=labels, name="Peaks" ) if simulate is True: widths = units.dop2freq(self.session.doppler, centers) simulated = self.simulate_spectrum( self.data[self.freq_col].values, centers, widths, amplitudes, fake=True, ) self.simulated = pd.DataFrame( data=list(zip(self.data[self.freq_col].values, simulated)), columns=["Frequency", "Intensity"], ) fig.add_scattergl( x=self.simulated["Frequency"], y=self.simulated["Intensity"], name="Simulated spectrum", ) return fig
def _create_html_report(self, filepath=None): """ Function for generating an HTML report for sharing. The HTML report is rendered with Jinja2, and uses the template "report_template.html" located in the module directory. The report includes interactive plots showing statistics of the assignments/ulines and an overview of the spectrum. At the end of the report is a table of the assignments and uline data. Parameters ---------- filepath: str or None, optional Path to save the report to. If `None`, defaults to `reports/{id}-summary.html` """ template_path = os.path.join( os.path.dirname(os.path.abspath(__file__)), "report_template.html" ) with open(template_path) as read_file: template = Template(read_file.read()) html_dict = dict() # Say what the minimum value for peak detection is. html_dict["peak_threshold"] = self.threshold # The assigned molecules table reduced_table = self.table[ [ "frequency", "intensity", "formula", "name", "catalog_frequency", "deviation", "ustate_energy", "source", ] ] # Render pandas dataframe HTML with bar annotations reduced_table_html = ( reduced_table.style.bar( subset=["deviation", "ustate_energy"], align="mid", color=["#d65f5f", "#5fba7d"], ) .bar(subset=["intensity"], color="#5fba7d") .format( { "frequency": "{:.4f}", "catalog_frequency": "{:.4f}", "deviation": "{:.3f}", "ustate_energy": "{:.2f}", "intensity": "{:.3f}", } ) .set_table_attributes("""class = "data-table hover compact" """) .render(classes=""" "data-table hover compact" """) ) html_dict["assignments_table"] = reduced_table_html # The unidentified features table ulines = self.line_lists["Peaks"].get_ulines() uline_df = pd.DataFrame( [[uline.frequency, uline.intensity] for uline in ulines], columns=["Frequency", "Intensity"], ) html_dict["uline_table"] = ( uline_df.style.bar(subset=["Intensity"], color="#5fba7d") .format({"Frequency": "{:.4f}", "Intensity": "{:.2f}"}) .set_table_attributes("""class = "data-table hover compact" """) .render(classes=""" "data-table hover compact" """) ) # Plotly displays of the spectral feature breakdown and whatnot html_dict["plotly_breakdown"] = plot(self.plot_breakdown(), output_type="div") html_dict["plotly_figure"] = plot(self.plot_assigned(), output_type="div") # Render the template with Jinja and save the HTML report output = template.render(session=self.session, **html_dict) if filepath is None: filepath = f"reports/{self.session.experiment}-summary.html" with open(filepath, "w+") as write_file: write_file.write(output)
[docs] def create_latex_table(self, filepath=None, header=None, cols=None, **kwargs): """ Method to create a LaTeX table summarizing the measurements in this experiment. Without any additional inputs, the table will be printed into a .tex file in the reports folder. The table will be created with the minimum amount of information required for a paper, including the frequency and intensity information, assignments, and the source of the information. The user can override the default settings by supplying `header` and `col` arguments, and any other kwargs are passed into the `to_latex` pandas DataFrame method. The header and col lengths must match. Parameters ---------- filepath : str, optional Filepath to save the LaTeX table to; by default None header : iterable of str, optional An iterable of strings specifying the header to be printed. By default None cols : iterable of str, optional An iterable of strings specifying which columns to include. If this is changed, the header must also be changed to reflect the new columns. """ table = self.line_lists["Peaks"].to_dataframe() # Numerical formatting formatters = { "frequency": "{:,.4f}".format, "intensity": "{:.2f}".format, "catalog_frequency": "{:,.4f}".format, "deviation": "{:,.4f}".format, } def ref_formatter(x): noform = ["CDMS/JPL", "Artifact", "U"] if x not in noform: return f"\cite{x}" else: return x # Replace the source information to designate u-line if it is a u-line table.loc[table["uline"] == True, "source"] = "U" if header is None: header = [ "Frequency", "Intensity", "Catalog frequency", "Deviation", "Name", "Formula", "Quantum numbers", "Source", ] if cols is None: cols = [ "frequency", "intensity", "catalog_frequency", "deviation", "name", "formula", "r_qnos", "source", ] assert len(header) == len(cols) if "source" in cols: table["source"].apply(ref_formatter) if "formula" in cols: table["formula"].apply(lambda x: f"\ce{x}") if filepath is None: filepath = f"reports/{self.session.experiment}-table.tex" # Settings to be passed into the latex table creation table_settings = { "formatters": formatters, "index": False, "longtable": True, "header": header, "escape": False, } table_settings.update(**kwargs) # Write the table to file with open(filepath, "w+") as write_file: table[cols].to_latex(buf=write_file, **table_settings)
[docs] def plot_breakdown(self): """ Generate two charts to summarize the breakdown of spectral features. The left column plot shows the number of ulines being assigned by the various sources of frequency data. Artifacts - instrumental interference, from the function `process_artifacts` Splatalogue - uses the astroquery API, from the function `splat_assign_spectrum` Published - local catalogs, but with the `public` kwarg flagged as True Unpublished - local catalogs, but with the `public` kwarg flagged as False :return: Plotly FigureWidget object """ fig = figurefactory.init_plotly_subplot( nrows=1, ncols=2, **{"subplot_titles": ("Reference Breakdown", "Intensity Breakdown")}, ) fig.layout["title"] = "Spectral Feature Breakdown" fig.layout["showlegend"] = False reduced_table = self.table[ [ "frequency", "intensity", "formula", "name", "catalog_frequency", "deviation", "ustate_energy", "source", "public", ] ] artifacts = reduced_table.loc[reduced_table["name"] == "Artifact"] splat = reduced_table.loc[reduced_table["source"] == "CDMS/JPL"] local = reduced_table.loc[ (reduced_table["source"] != "Artifact") & (reduced_table["source"] != "CDMS/JPL") ] public = local.loc[local["public"] == True] private = local.loc[local["public"] == False] sources = [ "Artifacts", "Splatalogue", "Published molecules", "Unpublished molecules", ] # Added up the total number of lines ulines = self.line_lists["Peaks"].get_ulines() assignments = self.line_lists["Peaks"].get_assignments() total_lines = len(ulines) + len(assignments) # Add up the total intensity total_intensity = np.sum([uline.intensity for uline in ulines]) total_intensity += np.sum(reduced_table["intensity"]) line_breakdown = [total_lines] intensity_breakdown = [total_intensity] for source in [artifacts, splat, public, private]: line_breakdown.append(-len(source)) intensity_breakdown.append(-np.sum(source["intensity"])) line_breakdown = np.cumsum(line_breakdown) intensity_breakdown = np.cumsum(intensity_breakdown) labels = ["Total"] + sources colors = ["#d7191c", "#fdae61", "#ffffbf", "#abdda4", "#2b83ba"] # Left column plot of the number of lines assigned fig.add_trace( go.Scattergl(x=labels, y=line_breakdown, fill="tozeroy", hoverinfo="x+y"), 1, 1, ) # Bar charts showing the number of lines from each source fig.add_trace( go.Bar( x=labels, y=[0.0] + [len(source) for source in [artifacts, splat, public, private]], hoverinfo="x+y", width=0.5, marker={"color": colors}, ), 1, 1, ) # Right column plot of the intensity contributions fig.add_trace( go.Scattergl( x=labels, y=intensity_breakdown, fill="tozeroy", hoverinfo="x+y" ), 1, 2, ) # Bar chart showing the intensity contribution from each source fig.add_trace( go.Bar( x=labels, y=[0.0] + [ np.sum(source["intensity"]) for source in [artifacts, splat, public, private] ], hoverinfo="x+y", width=0.5, marker={"color": colors}, ), 1, 2, ) fig["layout"]["xaxis1"].update(title="Source", showgrid=True) fig["layout"]["yaxis1"].update( title="Cumulative number of assignments", range=[0.0, total_lines * 1.05] ) fig["layout"]["xaxis2"].update(title="Source", showgrid=True) fig["layout"]["yaxis2"].update( title="Cumulative intensity", range=[0.0, total_intensity * 1.05] ) return fig
[docs] def plot_assigned(self): """ Generates a Plotly figure with the assignments overlaid on the experimental spectrum. Does not require any parameters, but requires that the assignments and peak finding functions have been run previously. """ fig = go.FigureWidget() fig.layout["title"] = f"Experiment {self.session.experiment}" fig.layout["xaxis"]["title"] = "Frequency (MHz)" fig.layout["xaxis"]["tickformat"] = ",.2f" # Update the peaks table self.peaks = pd.DataFrame( data=[ [uline.frequency, uline.intensity] for uline in self.line_lists["Peaks"].get_ulines() ], columns=["Frequency", "Intensity"], ) fig.add_scattergl( x=self.data["Frequency"], y=self.data[self.int_col], name="Experiment", opacity=0.6, ) fig.add_bar( x=self.table["catalog_frequency"], y=self.table["intensity"], width=1.0, hoverinfo="text", text=self.table["name"].astype(str) + "-" + self.table["r_qnos"].astype(str), name="Assignments", ) ulines = np.array( [ [uline.intensity, uline.frequency] for uline in self.line_lists["Peaks"].get_ulines() ] ) fig.add_bar(x=ulines[:, 1], y=ulines[:, 0], width=1.0, name="U-lines") return fig
[docs] def stacked_plot(self, frequencies: List[float], int_col=None, freq_range=0.05): """ Special implementation of the stacked_plot from the figurefactory module, adapted for AssignmentSession. In this version, the assigned/u-lines are also indicated. This function will generate a Plotly figure that stacks up the spectra as subplots, with increasing frequencies going up the plot. This function was written primarily to identify harmonically related lines, which in the absence of centrifugal distortion should line up perfectly in the center of the plot. Due to limitations with Plotly, there is a maximum of ~8 plots that can stacked and will return an Exception if > 8 frequencies are provided. frequencies: list of floats, corresponding to center frequencies freq_range: float percentage value of each center frequency to use as cutoffs :return: Plotly Figure object """ if int_col is None: int_col = self.int_col # Update the peaks table ulines = self.line_lists["Peaks"].get_ulines() self.peaks = pd.DataFrame( data=[[uline.frequency, uline.intensity] for uline in ulines], columns=["Frequency", "Intensity"], ) dataframe = self.data.copy() # Want the frequencies in ascending order, going upwards in the plot indices = np.where( np.logical_and( dataframe[self.freq_col].min() <= frequencies, frequencies <= dataframe[self.freq_col].max(), ) ) # Plot only frequencies within band frequencies = np.asarray(frequencies)[indices] # Sort frequencies such that plots are descending in frequency frequencies = np.sort(frequencies)[::-1] delta = min(frequencies) * freq_range nplots = len(frequencies) if nplots > 5: raise Exception("Too many requested frequencies; I can't stack them all!") titles = tuple(f"{frequency:,.0f} MHz" for frequency in frequencies) fig = figurefactory.init_plotly_subplot( nrows=nplots, ncols=1, **{ "subplot_titles": titles, "vertical_spacing": 0.15, "shared_xaxes": True, }, ) for index, frequency in enumerate(frequencies): # Calculate the offset frequency dataframe["Offset " + str(index)] = dataframe[self.freq_col] - frequency # Range as a fraction of the center frequency freq_cutoff = freq_range * frequency max_freq = frequency + freq_cutoff min_freq = frequency - freq_cutoff sliced_df = dataframe.loc[ dataframe[f"Offset {index}"].between(-delta, delta) ] sliced_peaks = self.peaks.loc[ self.peaks["Frequency"].between(min_freq, max_freq) ] sliced_peaks["Offset Frequency"] = sliced_peaks["Frequency"] - frequency sliced_assignments = self.table.loc[ self.table["frequency"].between(min_freq, max_freq) ] sliced_assignments["offset_frequency"] = ( sliced_assignments["frequency"] - frequency ) # Plot the data traces = list() # Spectrum plot traces.append( go.Scattergl( x=sliced_df["Offset " + str(index)], y=sliced_df[self.int_col], mode="lines", opacity=0.6, marker={"color": "rgb(5,113,176)"}, ) ) traces.append( go.Bar( x=sliced_assignments["offset_frequency"], y=sliced_assignments["intensity"], width=1.0, hoverinfo="text", text=sliced_assignments["name"] + "-" + sliced_assignments["r_qnos"].apply(str), name="Assignments", marker={"color": "rgb(253,174,97)"}, ) ) traces.append( go.Bar( x=sliced_peaks["Offset Frequency"], y=sliced_peaks["Intensity"], width=1.0, name="U-lines", marker={"color": "rgb(26,150,65)"}, hoverinfo="text", text=sliced_peaks["Frequency"], ) ) # Plotly indexes from one because they're stupid fig.add_traces(traces, [index + 1] * 3, [1] * 3) fig["layout"]["xaxis1"].update( range=[-freq_cutoff, freq_cutoff], title="Offset frequency (MHz)", showgrid=True, ) fig["layout"]["yaxis" + str(index + 1)].update(showgrid=False) fig["layout"].update(autosize=True, height=1000, width=900, showlegend=False) return fig
[docs] def match_artifacts(self, artifact_exp: "AssignmentSession", threshold=0.05): """ TODO: Need to update this method; `process_artifacts` is no longer a method. Remove artifacts based on another experiment which has the blank sample - i.e. only artifacts. The routine will simple match peaks found in the artifact experiment, and assign all coincidences in the present experiment as artifacts. Parameters ---------- artifact_exp - AssignmentSession object Experiment with no sample present threshold - float, optional Threshold in absolute frequency units for matching """ matches = analysis.match_artifacts(self, artifact_exp, threshold) self.process_artifacts([freq for index, freq in matches.items()]) for index, freq in matches.items(): self.logger.info(f"Removed {freq:,.4f} peak as artifact.")
[docs] def find_progressions( self, search=0.001, low_B=400.0, high_B=9000.0, refit=False, plot=True, preferences=None, **kwargs, ): """ Performs a search for possible harmonically related U-lines. The first step loops over every possible U-line pair, and uses the difference to estimate an effective B value for predicting the next transition. If the search is successful, the U-line is added to the list. The search is repeated until there are no more U-lines within frequency range of the next predicted line. Once the possible series are identified, the frequencies are fit to an effective linear molecule model (B and D terms). An affinity propa- gation cluster model is then used to group similar progressions toge- ther, with either a systematic test of preference values or a user specified value. Parameters ---------- search: float, optional Percentage value of the target frequency cutoff for excluding possible candidates in the harmonic search low_B, high_B: float, optional Minimum and maximum value of B in MHz to be considered. This constrains the size of the molecule you are looking for. refit: bool, optional If True, B and D are refit to the cluster frequencies. plot: bool, optional If True, creates a Plotly scatter plot of the clusters, as a funct- ion of the preference values. preferences: float or array_like of floats, optional A single value or an array of preference values for the AP cluster model. If None, the clustering will be performed on a default grid, where all of the results are returned. kwargs: optional Additional kwargs are passed to the AP model initialization. Returns ------- fig """ ulines = self.line_lists["Peaks"].get_ulines() uline_frequencies = [uline.frequency for uline in ulines] self.logger.info("Performing harmonic progression search") progressions = analysis.harmonic_finder( uline_frequencies, search=search, low_B=low_B, high_B=high_B ) self.logger.info("Fitting progressions.") fit_df, _ = fitting.harmonic_fitter(progressions, J_thres=search) self.logger.info(f"Found {len(fit_df)} possible progressions.") pref_test_data = dict() # Run cluster analysis on the results if preferences is None: preferences = np.array( [-8e4, -4e4, -2e4, -1e4, -5e3, -3e3, -1.5e3, -500.0, -100.0] ) # Perform the AP cluster modelling if type(preferences) == list or type(preferences) == np.ndarray: self.logger.info(f"Evaluating AP over grid values {preferences}.") for preference in preferences: try: ap_settings = {"preference": preference} ap_settings.update(**kwargs) cluster_dict, progressions, _ = analysis.cluster_AP_analysis( fit_df, sil_calc=True, refit=refit, **ap_settings ) npoor = (progressions.loc[progressions["Silhouette"] < 0.0].size,) if type(npoor) == tuple: npoor = npoor[0] nclusters = len(cluster_dict) pref_test_data[preference] = { "cluster_data": cluster_dict, "nclusters": nclusters, "npoor": npoor, "avg_silh": np.average(progressions["Silhouette"]), "progression_df": progressions, } self.logger.info( f"{preference} has {npoor} poor fits " f"out of {nclusters} clusters ({npoor / nclusters})." ) except ValueError: pass else: # Perform the AP clustering with a single preference value cluster_dict, progressions, _ = analysis.cluster_AP_analysis( fit_df, sil_calc=True, refit=refit, **{"preference": preferences} ) npoor = (progressions.loc[progressions["Silhouette"] < 0.0].size,) if type(npoor) == tuple: npoor = npoor[0] nclusters = len(cluster_dict) pref_test_data[preferences] = { "cluster_data": cluster_dict, "nclusters": nclusters, "npoor": npoor, "avg_silh": np.average(progressions["Silhouette"]), "progression_df": progressions, } self.logger.info( f"{preferences} has {npoor} poor fits " f"out of {nclusters} clusters ({npoor / nclusters})." ) # Make a plotly figure of how the clustering goes (with frequency) # as a function of preference if plot is True: fig = go.FigureWidget() fig.layout["xaxis"]["title"] = "Frequency (MHz)" fig.layout["yaxis"]["title"] = "Preference" fig.layout["xaxis"]["tickformat"] = ".," # Loop over the preference values for preference, contents in pref_test_data.items(): # Create the colors for the unique clusters colors = figurefactory.generate_colors( len(contents["cluster_data"]), hex=True, cmap=plt.cm.Spectral ) # Assign a color to each cluster cmap = { index: color for index, color in zip( np.arange(len(contents["cluster_data"])), colors ) } for index, data in contents["cluster_data"].items(): frequencies = data["Frequencies"] fig.add_scattergl( x=frequencies, y=[preference + index] * len(frequencies), mode="markers", marker={"color": cmap.get(index, "#ffffff")}, opacity=0.7, name=f"{preference}-{index}", ) return fig, pref_test_data else: return None, pref_test_data
[docs] def search_species(self, formula=None, name=None, smiles=None): """ Method for finding species in the assigned dataframe, with the intention of showing where the observed frequencies are. Parameters -------------- formula - str for chemical formula lookup name - str for common name smiles - str for unique SMILES string Returns -------------- pandas dataframe slice with corresponding lookup """ if hasattr(self, "table") is False: raise Exception("No assignment table created yet. Finalize assignments.") if formula: locator = "formula" check = formula if name: locator = "name" check = name if smiles: locator = "smiles" check = smiles return self.table.loc[self.table[locator] == check]
[docs] def save_session(self, filepath=None): """ Method to save an AssignmentSession to disk. The underlying mechanics are based on the joblib library, and so there can be cross-compatibility issues particularly when loading from different versions of Python. Parameters --------------- filepath - str Path to save the file to. By default it will go into the sessions folder. """ if filepath is None: filepath = f"./sessions/{self.session.experiment}.pkl" self.logger.info(f"Saving session to {filepath}") if hasattr(self, "log_handlers"): del self.log_handlers # Save to disk routines.save_obj(self, filepath)
[docs]@dataclass class Molecule(LineList): """ Special instance of the LineList class. The idea is to eventually use the high speed fitting/cataloguing routines by Brandon to provide quick simulations overlaid on chirp spectra. Attributes """ A: float = 20000.0 B: float = 6000.0 C: float = 3500.0 var_file: str = "" def __post_init__(self, **kwargs): super().__init__(**kwargs)