Source code for pyspectools.mmw.mmw_analysis


from typing import List
from pathlib import Path

import numpy as np
import lmfit
import pandas as pd
import os
import peakutils
from matplotlib import pyplot as plt

from . import fft_routines
from . import interpolation
from ..lineshapes import sec_deriv_lorentzian

"""
    Miscellanous routines for parsing and batch processing
    the millimeter-wave data.
"""

[docs]def parse_data(filepath): """ Function for parsing data from a millimeter-wave data file. """ settings = dict() intensity = list() # Boolean flags to check when to start/stop # reading parameters read_params = False read_int = False read_zeeman = False finished = False fieldoff_intensities = list() fieldon_intensities = list() with open(filepath) as read_file: for line in read_file: if "*****" in line: read_int = False if finished is True: break if "Scan" in line: if "[Field ON]" in line: read_zeeman = True scan_details = line.split() settings["ID"] = int(scan_details[1]) # settings["Date"] = str(scan_details[4]) read_params = True read_int = False continue if read_int is True: if read_zeeman is False: fieldoff_intensities += [float(value) for value in line.split()] else: fieldon_intensities += [float(value) for value in line.split()] finished = True if read_params is True and len(line.split()) > 1: # Read in the frequency step, frequency, and other info # needed to reconstruct the frequency data scan_params = line.split() shift = 1 settings["Frequency"] = float(scan_params[0]) settings["Frequency step"] = float(scan_params[1]) if len(scan_params) == 4: settings["Multiplier"] = 1. shift = 0 # If the multiplier data is there, we don't shift the read # index over by one else: settings["Multiplier"] = float(scan_params[2]) settings["Center"] = float(scan_params[2 + shift]) settings["Points"] = int(scan_params[3 + shift]) read_params = False # Start reading intensities immediately afterwards read_int = True continue fieldoff_intensities = np.array(fieldoff_intensities) fieldon_intensities = np.array(fieldon_intensities) # Generate the frequency grid settings["Frequency step"] = settings["Frequency step"] * settings["Multiplier"] # This calculates the length of either side side_length = settings["Frequency step"] * (settings["Points"] // 2) start_freq = settings["Frequency"] - side_length end_freq = settings["Frequency"] + side_length frequency = np.linspace(start_freq, end_freq, settings["Points"]) return frequency, fieldoff_intensities, fieldon_intensities, settings
[docs]def open_mmw(filepath, **kwargs): """ Function for opening and processing a single millimeter wave data file. Returns a pandas dataframe containing Frequency and Intensity information. """ frequency, fieldoff_intensities, fieldon_intensities, settings = parse_data(filepath) # Sometimes there are spurious zeros at the end of intensity data; # this will trim the padding npoints = settings.get("Points") fieldoff_intensities = fieldoff_intensities[:npoints] if fieldon_intensities.size > 1: fieldon_intensities = fieldon_intensities[:npoints] # Calculate sample rate in Hz for FFT step sample_rate = 1. / settings["Frequency step"] * 1e6 param_dict = { "window_function": None, "cutoff": [50, 690], "sample_rate": sample_rate } param_dict.update(**kwargs) fieldoff_intensities = fft_routines.fft_filter(fieldoff_intensities, **param_dict) # field on is not always there, but if it we use it to calculate Off - On if fieldon_intensities.size > 1: fieldon_intensities = fft_routines.fft_filter(fieldon_intensities, **param_dict) intensity = fieldoff_intensities - fieldon_intensities else: # if field on is missing, just use zeros instead fieldon_intensities = np.zeros(fieldoff_intensities.size) intensity = fieldoff_intensities # Format the data as a Pandas dataframe mmw_df = pd.DataFrame( data={"Frequency": frequency, "Field OFF": fieldoff_intensities, "Field ON": fieldon_intensities, "OFF - ON": intensity }, ) return mmw_df
[docs]def test_filtering(path: str, **kwargs): """ Convenience function to open a legacy data file and process it; kwargs are passed to the `fft_filter` function call. The function then returns a Matplotlib figure and axis object, showing the FFT filtered spectrum and the time-domain signal chunk that it corresponds to. Parameters ---------- path : str Filepath to the data file. Returns ------- fig, ax Matplotlib figure/axis objects. """ frequency, off, on, settings = parse_data(path) filtered = fft_routines.fft_filter(off, **kwargs) fig, axarray = plt.subplots(2, 1, figsize=(10, 5)) ax = axarray[0] ax.set_title("Frequency domain") ax.plot(frequency, filtered) ax.set_xlabel("Frequency (MHz)") ax = axarray[1] ax.set_title("Time domain") cutoff = kwargs.get("cutoff", np.arange(30, 500)) ax.plot(np.fft.fft(filtered)[np.arange(*cutoff)]) return fig, ax
[docs]def sec_deriv_peak_detection(df_group, threshold=5, window_size=25, magnet_thres=0.5, **kwargs): """ Function designed to take advantage of split-combine-apply techniques to analyze concatenated spectra, or a single spectrum. The spectrum is cross-correlated with a window corresponding to a second-derivative Lorentzian line shape, with the parameters corresponding to the actual downward facing peak such that the cross-correlation is upwards facing for a match. This X-correlation analysis is only done for the Field OFF data; every peak should appear in the Field OFF, and should disappear under the presence of a magnetic field. If the Field ON spectrum is non-zero, the peak finding is followed up by calculating the ratio of the intensity at the same position for ON/OFF; if the ratio is below a threshold, we consider it a magnetic line. Parameters ---------- df_group : pandas DataFrame Dataframe containing the millimeter-wave spectra. threshold : int, optional Absolute intensity units threshold for peak detection. This value corresponds to the value used in the X-correlation spectrum, by default 5 window_size : int, optional Size of the second derivative Lorentizan window function, by default 25 magnet_thres : float, optional Threshold for determining if a peak is magnetic, given as the ratio of ON/OFF. A value of 1 means the line is nominally unaffected by a magnetic field, and less than 1 corresponds to larger responses to magnetic fields. By default 0.5 Returns ------- pandas DataFrame DataFrame holding the detected peaks and their associated magnet tests, if applicable. """ # Get the frequencies and whatnot as numpy arrays; peakutils does not play # nicely with Series objects. signal = df_group["Field OFF"].to_numpy() frequency = df_group["Frequency"].to_numpy() # Improve SNR with cross-correlation corr_signal = cross_correlate_lorentzian(signal, window_size) # Find the peaks that match the specified threshold indices = peakutils.indexes(corr_signal, thres=threshold, **kwargs, thres_abs=True) peak_subset = df_group.iloc[indices] # Only evaluate magnetic if spectrum has a Field ON component if peak_subset["Field ON"].sum() != 0.: peak_subset.loc[:, "Ratio"] = peak_subset.loc[:,"Field ON"] / peak_subset.loc[:,"Field OFF"] peak_subset.loc[:, "Magnetic"] = peak_subset.loc[:,"Ratio"] < magnet_thres return peak_subset
[docs]def cross_correlate_lorentzian(signal: np.ndarray, window_size=25, **kwargs): """ Calculate the cross-correlation spectrum between an input signal and the second derivative lineshape of a Lorentzian function. This is a matched filter analysis to extract optimal signal to noise for millimeter-wave data. Parameters ---------- signal : np.ndarray NumPy 1D array containing the raw signal. window_size : int Size of the window function. Returns ------- corr_signal NumPy 1D array containing the cross-correlation spectrum. """ params = { "x0": 0, "gamma": 0.25, "I": 1 } if kwargs: params.update(**kwargs) # Create a template of the second derivative Lorentzian profile for # x-correlating with the spectrum temp_x = np.linspace(-5, 5, window_size) temp_y = sec_deriv_lorentzian(temp_x, **params) # Cross-correlate with the Lorentzian profile; "same" mode ensures # it is the same length as the original signal for easy indexing corr_signal = np.correlate(signal, temp_y, mode="same") return corr_signal
[docs]def interp_mmw(files, nelements=10000, window_function=None, cutoff=None): """ Function that will process archival millimeter-wave data by providing a list of filepaths. The function will parse the data out of each file, and produce a plot that is stitched together. The first step is to read in all the data, and append the parsed data into full_freq and full_int, corresponding to the experimental frequency and intensities. This is used to then determine the limits to the x axis for plotting. """ full_freq = list() full_int = list() print("Parsing files.") for index, file in enumerate(files): frequency, intensity, settings = parse_data(file) intensity = fft_routines.fft_filter(intensity, window_function, cutoff) if index == 0: minx = np.min(frequency) maxx = np.max(frequency) else: curr_minx = np.min(frequency) curr_maxx = np.max(frequency) if curr_minx < minx: minx = curr_minx if curr_maxx > maxx: maxx = curr_maxx full_freq.append(frequency) full_int.append(intensity) """ Stitching the spectrum together using a weighted Shepard interpolation. """ print("Lowest frequency: " + str(minx)) print("Highest frequency: " + str(maxx)) print("Performing Shepard interpolation") frequency = np.linspace(minx, maxx, nelements) interp_y = np.zeros(nelements) # Loop over each frequency bin for index, interp_freq in enumerate(frequency): if (index / len(frequency)) > 0.5: print("50% done.") # Calculate the Shepard interpolation at the given frequency point interp_y[index] = interpolation.eval_shep_interp(full_freq, full_int, interp_freq, p=16.) df = pd.DataFrame(data=list(zip(frequency, interp_y)), columns=["Frequency", "Intensity"]) return df
[docs]def batch_interpolate(files, datapoints=50000, zeeman_on=None, window_function=None, pass_filter=None): """ Function for batch processing multiple scan files, and concetanates them together to generate a single, interactive spectrum. Takes input argument files as a list of filepaths corresponding to scans that are (generally) without Zeeman. The optional argument, zeeman_on, is a list of filepaths corresponding to scans with Zeeman on. All of the scans will be processed with the same window functions and cut offs, so be sure to optimize them before batch processing. Returns a dataframe with up to three columns: Zeeman off/on, and off - on. """ spectrum_df = interp_mmw(files, datapoints, window_function, pass_filter) # If Zeeman on spectra are provided, we process those too, and perform subtraction if zeeman_on is not None: zeeman_on_df = interp_mmw(zeeman_on, datapoints, window_function, pass_filter) spectrum_df.columns = ["Frequency", "Zeeman OFF"] spectrum_df["Zeeman ON"] = zeeman_on_df["Intensity"].values() spectrum_df["OFF - ON"] = spectrum_df["Zeeman OFF"] - spectrum_df["Zeeman ON"] fig = plot_spectrum(spectrum_df) return fig, df
[docs]def batch_shepard(files, stepsize=0.1, xrange=10., p=4., threshold=0.1, npoints=15, window_function=None, pass_filter=None): """ Routine for performing mass interpolation using Shepard's interpolation method on mmw data. Input is a list of files, and parameters used to interpolate and to filter the data. Stepsize is given in units of MHz. """ print("Loading data...") datalist = [ open_mmw(file, window_function, pass_filter) for file in files ] print("Load complete.") min_freq = 1e9 max_freq = 0. print("Finding frequency range") for data in datalist: current_min = data["Frequency"].min() current_max = data["Frequency"].max() if current_min < min_freq: min_freq = current_min if current_max > max_freq: max_freq = current_max print("Max freq: " + str(max_freq)) print("Min freq: " + str(min_freq)) # Initialize the new frequency range in one MHz steps xnew = np.arange(min_freq, max_freq, stepsize) new_df = pd.DataFrame(data=xnew, columns=["Frequency"]) print("Performing interpolation") for column in ["Zeeman OFF", "Zeeman ON"]: print(column) new_df[column] = interpolation.calc_shep_interp( datalist, xnew, column, p, threshold, npoints ) print("Interpolation done") new_df["OFF - ON"] = new_df["Zeeman OFF"] - new_df["Zeeman ON"] return new_df
[docs]def batch_plot(files, window_function=None, pass_filter=None): for index, file in enumerate(files): if index == 0: full_df = open_mmw(file, window_function, pass_filter) else: full_df = pd.concat([ full_df, open_mmw(file, window_function, pass_filter) ]) full_df = full_df.sort_values(["Frequency"], ascending=True) fig = plot_spectrum(full_df) return full_df, fig
[docs]def fit_spectrum_chunk(dataframe: pd.DataFrame, freq_range: List[float], frequencies: np.ndarray, **kwargs): """ Automates the fitting of multiple frequencies to a single spectrum. Parameters ---------- dataframe : pd.DataFrame Pandas DataFrame containing the millimeter-wave spectrum. freq_range : 2-tuple Iterable with two floats, corresponding to the limits of the spectrum (in frequency) we want to consider for the fit. frequencies : np.ndarray NumPy 1D array containing center frequencies to fit. Returns ------- [type] [description] """ freq_range = sorted(freq_range) sliced_df = dataframe.loc[dataframe["Frequency"].between(*freq_range)] sliced_df["XCorrelation"] = cross_correlate_lorentzian( sliced_df["Field OFF"], **kwargs ) param_dict = dict() # Build fitting objective function from sum of second derivative # profiles for index, frequency in enumerate(frequencies): curr_model = lmfit.models.Model( sec_deriv_lorentzian, prefix=f"Line{index}_" ) curr_params = curr_model.make_params() curr_params[f"Line{index}_x0"].set(value=frequency - 0.3, min=frequency - 15., max=frequency + 15.) curr_params[f"Line{index}_I"].set(value=-1., min=-20., max=-0.05) curr_params[f"Line{index}_gamma"].set(value=0.25, min=0.1, max=0.4) if index == 0: model = curr_model parameters = curr_params else: model += curr_model parameters += curr_params # Fit the cumulative spectrum fit_results = model.fit( sliced_df["XCorrelation"], parameters, x=sliced_df["Frequency"] ) sliced_df["Fit"] = fit_results.eval() # print("Results of the fit:") # for param, covar in zip(fit_results.best_values, np.sqrt(np.diag(fit_results.covar))): # print(param + ": " + str(fit_results.best_values[param]) + " +/- " + str(covar)) return fit_results, sliced_df
[docs]def fit_lines(dataframe, frequencies, name, window_length=10., ycol="Field off", off_dataframe=None): """ Wrapper for the fit_chunk function, for when you want to try and fit many lines in a catalog. The input arguments are the survey dataframe, a list of center frequencies you want to try and fit, the name of the species. Optional argument is the name of column you want to use for the intensities. The function will iterate through each center frequency and try to fit it. """ foldername = Path(f"../data/clean/mmw_fits/{name}") try: os.mkdir(foldername) except FileExistsError: pass successes = list() fit_freq = list() amplitudes = list() uncertainties = list() # Loop over each catalog frequency for index, frequency in enumerate(frequencies): # Slice out a 20 MHz window - this should be sufficiently # large to capture uncertainty in lines min_freq = frequency - window_length max_freq = frequency + window_length # Attempt to fit the window fit, spec_df = fit_spectrum_chunk( dataframe, min_freq, max_freq, [frequency], ) fit_freq.append( np.round(fit.best_values["center0"], decimals=4) ) amplitudes.append( np.round(fit.best_values["A0"], decimals=4) ) uncertainties.append( np.round(np.sqrt(np.diag(fit.covar))[-1], decimals=4) ) successes.append(True) package = list(zip(frequencies, successes, fit_freq, uncertainties, amplitudes)) fit_df = pd.DataFrame( data=package, columns=["Catalog frequency", "Success", "Fitted frequency", "Uncertainty", "Amplitude"] ) fit_df["Calc. vs Obs."] = fit_df["Catalog frequency"] - fit_df["Fitted frequency"] fit_df.to_csv(foldername + "/fit_results.csv") # Make the static comparison if off_dataframe: fig, axarray = static_comparison( fit_df["Fitted frequency"].values, dataframe, off_dataframe, ) fig.savefig(foldername + "/on-off-comparison.pdf", dpi=300, format="pdf") return fit_df