"""
This module provides abstract base classes and utility classes for reading and writing
data in various formats (e.g., OSW, Parquet, Split Parquet) used in the OpenSWATH workflow.
The module defines the following:
- Abstract base classes (`BaseReader`, `BaseWriter`) for implementing custom readers and writers.
- Concrete base classes for specific file formats, such as OSW and Parquet.
- Utility methods for handling data validation, column preparation, and file merging.
Classes:
- RowCountMismatchError: Custom exception for row count mismatches during data processing.
- BaseReader: Abstract base class for implementing readers for different data formats.
- BaseWriter: Abstract base class for implementing writers for different data formats.
- BaseOSWReader: Reader class for OSW (SQLite-based) files.
- BaseOSWWriter: Writer class for OSW (SQLite-based) files.
- BaseParquetReader: Reader class for Parquet files.
- BaseParquetWriter: Writer class for Parquet files.
- BaseSplitParquetReader: Reader class for split Parquet files (multi-run support).
- BaseSplitParquetWriter: Writer class for split Parquet files (multi-run support).
Key Features:
- Abstract methods (`read`, `save_results`) enforce implementation in subclasses.
- Support for semi-supervised learning, IPF analysis, and context-level inference.
- Validation of row counts to ensure data consistency during processing.
- Utility methods for handling Parquet-specific operations, such as collapsing protein IDs.
Usage:
This module is intended to be extended by specific implementations for reading and writing
data in the OpenSWATH workflow. Subclasses must implement the abstract methods to handle
specific file formats and processing requirements.
Dependencies:
- pandas
- polars
- duckdb
- click
- loguru
"""
import glob
import math
import os
import pickle
import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass
import click
import duckdb
import pandas as pd
import polars as pl
import sklearn.preprocessing as preprocessing # For MinMaxScaler
from loguru import logger
from .._base import BaseIOConfig
from .._config import ExportIOConfig
from ..report import save_report
from .util import get_parquet_column_names, print_parquet_tree
class RowCountMismatchError(Exception):
"""
Exception to raise when the new output with joined scores has a different amount of rows than the input.
"""
pass
[docs]
@dataclass
class BaseReader(ABC):
"""
Abstract base class for implementing readers that load data from different sources (OSW, Parquet, etc.).
"""
[docs]
def __init__(self, config: BaseIOConfig):
"""
Initialize the reader with a given configuration.
Args:
config (BaseIOConfig): Configuration object containing input details, and module specific config for params for reading.
"""
self.config = config
@property
def context(self):
return self.config.context
@property
def infile(self):
return self.config.infile
@property
def outfile(self):
return self.config.outfile
@property
def file_type(self):
return self.config.file_type
@property
def subsample_ratio(self):
return self.config.subsample_ratio
@property
def classifier(self):
return self.config.runner.classifier
@property
def level(self):
return self.config.level
@property
def glyco(self):
return self.config.runner.glyco
[docs]
@abstractmethod
def read(self):
"""
Abstract method to be implemented by subclasses to read data from a specific format.
"""
raise NotImplementedError("Subclasses must implement 'read'.")
# ----------------------------
# Shared Scoring Methods
# ----------------------------
[docs]
def _finalize_feature_table(self, df, ss_main_score):
"""
Finalize the feature table for semi-supervised scoring.
"""
df.columns = [c.lower() for c in df.columns]
main_score = ss_main_score.lower()
if main_score in df.columns:
df = df.rename(columns={main_score: "main_" + main_score})
elif (
main_score == "swath_pretrained"
): # TODO: Should we deprecate this? This is probably not optimal for newer data from different instruments
# Add a pretrained main score corresponding to the original implementation in OpenSWATH
# This is optimized for 32-windows SCIEX TripleTOF 5600 data
df["main_var_pretrained"] = -(
-0.19011762 * df["var_library_corr"]
+ 2.47298914 * df["var_library_rmsd"]
+ 5.63906731 * df["var_norm_rt_score"]
+ -0.62640133 * df["var_isotope_correlation_score"]
+ 0.36006925 * df["var_isotope_overlap_score"]
+ 0.08814003 * df["var_massdev_score"]
+ 0.13978311 * df["var_xcorr_coelution"]
+ -1.16475032 * df["var_xcorr_shape"]
+ -0.19267813 * df["var_yseries_score"]
+ -0.61712054 * df["var_log_sn_score"]
)
else:
raise click.ClickException(
f"Main score ({main_score}) not found in input columns: {df.columns}"
)
if self.classifier == "XGBoost" and self.level != "alignment":
logger.info(
"Enable number of transitions & precursor / product charge scores for XGBoost-based classifier"
)
df = df.rename(
columns={
"precursor_charge": "var_precursor_charge",
"product_charge": "var_product_charge",
"transition_count": "var_transition_count",
}
)
# print(df.info())
return df
[docs]
@dataclass
class BaseWriter(ABC):
"""
Abstract base class for implementing writers that save results to various output formats.
"""
[docs]
def __init__(self, config: BaseIOConfig):
"""
Initialize the writer with a given configuration.
Args:
config (BaseIOConfig): Configuration object containing output details.
"""
self.config = config
self.__post_init__()
[docs]
def __post_init__(self):
"""
Post-initialization method to set up variables for IO specific config
"""
if isinstance(self.config, ExportIOConfig):
# Quantification matrix normalization methods
self.normalization_methods = {
"median": self._median_normalize,
"medianmedian": self._median_median_normalize,
"quantile": self._quantile_normalize,
"none": lambda x: x,
}
@property
def context(self):
return self.config.context
@property
def infile(self):
return self.config.infile
@property
def outfile(self):
return self.config.outfile
@property
def file_type(self):
return self.config.file_type
@property
def classifier(self):
return self.config.runner.classifier
@property
def level(self):
return self.config.level
@property
def glyco(self):
return self.config.runner.glyco
# ----------------------------
# Shared Common Methods
# ----------------------------
[docs]
@abstractmethod
def save_results(self, result, pi0):
"""
Abstract method to save scoring results and statistical outputs.
Args:
result: The result object containing scoring tables.
pi0: Estimated pi0 value from FDR statistics.
"""
raise NotImplementedError("Subclasses must implement 'save_results'.")
[docs]
def _get_columns_to_keep(
self, existing_cols: list[str], score_prefix: str
) -> list[str]:
"""
Get the columns to keep in the DataFrame by removing existing score columns. Mainly for Parquet files.
Note: this method itself does not remove the columns, it just returns a list of columns to keep that does not include the existing score columns.
"""
drop_cols = [col for col in existing_cols if col.startswith(score_prefix)]
if drop_cols:
logger.warning(
f"Warn: Dropping existing {score_prefix} columns.",
)
return [col for col in existing_cols if not col.startswith(score_prefix)]
[docs]
def _get_parquet_row_count(self, con, target_file: str) -> int:
"""
Get the row count of a Parquet file.
Parameters:
con: DuckDB connection object
target_file: Path to the Parquet file
Returns:
int: Row count of the Parquet file
"""
query = f"SELECT COUNT(*) FROM read_parquet('{target_file}')"
row_count = con.execute(query).fetchone()[0]
return row_count
[docs]
def _validate_row_count_after_join(
self, con, target_file: str, key_cols: str, join_on: str, prefix: str
):
"""
Validates the row count after performing a join operation on a Parquet file.
This is important, because we would not expect the appending of scores to change the number of rows in the input Parquet file.
Parameters:
con: DuckDB connection object
target_file: Path to the Parquet file
key_cols: The key columns for the join operation
join_on: The condition for the join operation
prefix: The prefix (table alias) used for the Parquet file in the query
Raises:
RowCountMismatchError: If the row count of the resulting join doesn't match the original row count.
"""
original_row_count = self._get_parquet_row_count(con, target_file)
logger.debug(
f"Prior to appending scores, {target_file} has {original_row_count} entries"
)
validate_query = f"""
SELECT
COUNT(*) AS row_count
FROM (
SELECT {key_cols}
FROM read_parquet('{target_file}') {prefix}
LEFT JOIN scores s
ON {join_on}
) AS subquery
"""
logger.trace(f"Row Entry validation query:\n{validate_query}")
result = con.execute(validate_query).fetchone()
resulting_row_count = result[0]
# If row count mismatch occurs, raise an error and log it
if resulting_row_count != original_row_count:
error_message = (
f"There was an issue with appending scores to {target_file}.\n"
f"Row count mismatch: Original rows {original_row_count} ≠ Resulting rows {resulting_row_count}.\n"
"Appending scores resulted in a different number of entries than the original input file. This is unexpected behaviour."
)
try:
raise RowCountMismatchError(error_message)
except RowCountMismatchError as e:
logger.opt(exception=e, depth=1).critical(f"Critical error: {str(e)}")
sys.exit(1)
# ----------------------------
# Shared Scoring Methods
# ----------------------------
[docs]
def save_weights(self, weights):
"""
Abstract method to save model weights (e.g., LDA coefficients, XGBoost model).
Args:
weights: Model weights or trained object.
"""
if self.classifier == "LDA":
self._save_tsv_weights(weights)
elif self.classifier == "SVM":
self._save_tsv_weights(weights)
elif self.classifier == "XGBoost" or self.classifier == "HistGradientBoosting":
self._save_bin_weights(weights)
else:
raise NotImplementedError(
f"Classifier {self.classifier} not supported for saving weights."
)
[docs]
def _prepare_score_dataframe(
self, df: pd.DataFrame, level: str, prefix: str
) -> pd.DataFrame:
"""
Prepare the score DataFrame
"""
score_cols = [
"feature_id",
"d_score",
"peak_group_rank",
"p_value",
"q_value",
"pep",
]
if "h_score" in df.columns:
score_cols.insert(2, "h_score")
score_cols.insert(3, "h0_score")
if level in ("ms1", "ms2", "ms1ms2") and self.config.file_type in (
"parquet",
"parquet_split",
"parquet_split_multi",
):
# For parquet files, there may be multiple proteins that map to the same peptide
score_cols.insert(2, "protein_id")
# # Check if protein_id column is collapsed, if it is, we need to explode it
# if (
# df["protein_id"]
# .apply(lambda x: isinstance(x, (list, np.ndarray)) and len(x) > 1)
# .any()
# ):
# logger.info(
# "Exploding protein_id column to handle multiple proteins per feature."
# )
pl_df = pl.from_pandas(df)
pl_df = pl_df.explode("protein_id")
df = pl_df.to_pandas()
if level == "transition":
if self.file_type in ("parquet", "parquet_split", "parquet_split_multi"):
score_cols.insert(1, "ipf_peptide_id")
score_cols.insert(2, "transition_id")
# For parquet files, there may be multiple ipf_peptide_ids that map to the same transition feature
df = df.explode("ipf_peptide_id")
else:
score_cols.insert(1, "transition_id")
if level == "alignment":
score_cols.insert(2, "alignment_id")
score_cols.insert(2, "decoy")
# Reverse map DECOY 0 to 1 and 1 to -1
# arycal saves a label column to indicate 1 as target aligned peaks and -1 as the random/shuffled decoy aligned peak
df["decoy"] = df["decoy"].map({0: 1, 1: -1})
df = df[score_cols].rename(columns=str.upper)
df = df.rename(columns={"D_SCORE": "SCORE"})
if self.config.file_type == "osw":
# Consistent renaming with original implementation for OSW files
df = df.rename(columns={"P_VALUE": "PVALUE", "Q_VALUE": "QVALUE"})
if self.config.file_type == "osw" or self.level not in ("ms1ms2", "ms2"):
df = df.rename(columns={"PEAK_GROUP_RANK": "RANK"})
if level == "transition":
if self.file_type in ("parquet", "parquet_split", "parquet_split_multi"):
key_cols = {"FEATURE_ID", "IPF_PEPTIDE_ID", "TRANSITION_ID"}
else:
key_cols = {"FEATURE_ID", "TRANSITION_ID"}
elif level == "alignment":
key_cols = {"ALIGNMENT_ID", "FEATURE_ID", "DECOY"}
elif level in ("ms1", "ms2", "ms1ms2") and self.config.file_type in (
"parquet",
"parquet_split",
"parquet_split_multi",
):
key_cols = {"FEATURE_ID", "PROTEIN_ID"}
else:
key_cols = {"FEATURE_ID"}
if self.config.file_type == "osw":
return df
rename_map = {
col: f"{prefix}{col}" for col in df.columns if col not in key_cols
}
return df.rename(columns=rename_map)
[docs]
def _write_pdf_report(self, result, pi0):
"""
Write a PDF report if the scoring results contain final statistics.
"""
if result.final_statistics is None:
return
df = result.scored_tables
if self.level == "alignment":
# Map Decoy 1 to 0 and -1 to 1
df["decoy"] = df["decoy"].map({1: 0, -1: 1})
# print(df)
# print(df.columns)
prefix = self.config.prefix
level = self.level
cutoffs = result.final_statistics["cutoff"].values
svalues = result.final_statistics["svalue"].values
qvalues = result.final_statistics["qvalue"].values
pvalues = df[(df.peak_group_rank == 1) & (df.decoy == 0)]["p_value"].values
top_targets = df[(df.peak_group_rank == 1) & (df.decoy == 0)]["d_score"].values
top_decoys = df[(df.peak_group_rank == 1) & (df.decoy == 1)]["d_score"].values
# Check if any of the values are empty, can't create a report if they are
if not all(
[
len(top_targets),
len(top_decoys),
len(cutoffs),
len(svalues),
len(qvalues),
len(pvalues),
]
):
logger.error("Not enough values to create a report.")
logger.error(f"top_targets: {len(top_targets)}")
logger.error(f"top_decoys: {len(top_decoys)}")
logger.error(f"cutoffs: {len(cutoffs)}")
logger.error(f"svalues: {len(svalues)}")
logger.error(f"qvalues: {len(qvalues)}")
logger.error(f"pvalues: {len(pvalues)}")
logger.error(f"pi0: {len(pi0)}")
return
pdf_path = os.path.join(prefix + f"_{level}_report.pdf")
save_report(
pdf_path,
self.outfile,
top_decoys,
top_targets,
cutoffs,
svalues,
qvalues,
pvalues,
pi0,
self.config.runner.color_palette,
self.level,
df=df,
)
logger.success(f"{pdf_path} written.")
[docs]
def _save_tsv_weights(self, weights):
"""
Save the model weights to a TSV file, ensuring no duplicate levels.
If weights for the current level already exist, they are removed before saving the new ones.
"""
weights["level"] = self.level
trained_weights_path = self.config.extra_writes.get("trained_weights_path")
if trained_weights_path is not None:
if os.path.exists(trained_weights_path):
existing_df = pd.read_csv(trained_weights_path, sep=",")
existing_df = existing_df[existing_df["level"] != self.level]
updated_df = pd.concat([existing_df, weights], ignore_index=True)
else:
updated_df = weights
# Always overwrite with a single header
updated_df.to_csv(trained_weights_path, sep=",", index=False)
logger.success(f"{trained_weights_path} written.")
[docs]
def _save_bin_weights(self, weights):
"""
Save the model weights to a binary file.
Args:
weights: Model weights or trained object.
"""
trained_weights_path = self.config.extra_writes.get(
f"trained_model_path_{self.level}"
)
if trained_weights_path is not None:
with open(trained_weights_path, "wb") as file:
self.persisted_weights = pickle.dump(weights, file)
logger.success("%s written." % trained_weights_path)
else:
logger.error(f"Trained model path {trained_weights_path} not found. ")
# ----------------------------
# Shared Levels Context Methods
# ----------------------------
[docs]
def _write_levels_context_pdf_report(self, data, stat_table, pi0):
"""
Write a PDF report for levels context.
"""
context = self.config.context_fdr
prefix = self.config.prefix
analyte = self.config.level
if context == "run-specific":
prefix = prefix + "_" + str(data["run_id"].unique()[0])
pdf_path = os.path.join(prefix + "_" + context + "_" + analyte + "_report.pdf")
title = prefix + "_" + context + "_" + analyte + "-level error-rate control"
save_report(
pdf_path,
title,
data[data.decoy == 1]["score"].values,
data[data.decoy == 0]["score"].values,
stat_table["cutoff"].values,
stat_table["svalue"].values,
stat_table["qvalue"].values,
data[data.decoy == 0]["p_value"].values,
pi0,
self.config.color_palette,
analyte,
data,
)
logger.success(f"{pdf_path} written.")
# ----------------------------
# Shared Export Methods
# ----------------------------
[docs]
def export_results(self, data: pd.DataFrame):
"""
Save the results to the output file based on the export format.
Args:
data: DataFrame containing the data to be exported
"""
cfg = self.config
# Filter out decoys if exclude_decoys is True
if cfg.exclude_decoys and "decoy" in data.columns:
initial_count = len(data)
data = data.loc[data["decoy"].eq(0)].copy()
decoy_count = initial_count - len(data)
if decoy_count > 0:
logger.info(f"Excluded {decoy_count} decoy entries from export.")
sep = "," if cfg.out_type == "csv" else "\t"
if cfg.export_format == "legacy_split":
logger.info("Exporting results in legacy split format.")
data = data.drop(["id_run", "id_peptide"], axis=1)
# filename might contain archive extensions, so we need to remove these
data["filename"] = data["filename"].apply(
lambda x: os.path.splitext(os.path.basename(x))[0]
)
data.groupby("filename").apply(
lambda x: (
x.to_csv(
os.path.basename(x["filename"].values[0]) + f".{cfg.out_type}",
sep=sep,
index=False,
),
logger.success(
f"Exported results to {os.path.basename(x['filename'].values[0])}.{cfg.out_type}."
),
)
)
elif cfg.export_format == "legacy_merged":
logger.info(
f"Exporting results ({data.shape} | {sys.getsizeof(data) / (1024**2):.2f}MB ) in legacy merged format."
)
data.drop(["id_run", "id_peptide"], axis=1).to_csv(
cfg.outfile, sep=sep, index=False
)
logger.success(f"Exported results to {cfg.outfile}.")
else:
raise ValueError(f"Unsupported export format: {cfg.export_format}")
[docs]
def clean_and_export_library(self, data: pd.DataFrame) -> pd.DataFrame:
"""
This function cleans the original dataframe and exports the library
Args:
data: Input DataFrame with library data
"""
cfg = self.config
# For precursors found in more than one run, select the run with the smallest q value
# If q values are the same, select the first run
data = (
data.sort_values(by=["Q_Value", "Intensity", "RunId"])
.groupby("TransitionId")
.head(1)
)
assert len(data["TransitionId"].drop_duplicates()) == len(data), (
"After filtering by Q_Value Intensity and RunId, duplicate transition IDs found."
)
# Remove Annotation Column if all NAN
if data["Annotation"].isnull().all() or data["Annotation"].eq("NA").all():
logger.debug("Annotation column is empty, so computing it manually.")
data.drop(columns=["Annotation"], inplace=True)
data["Annotation"] = (
data["FragmentType"]
+ data["FragmentSeriesNumber"].astype(str)
+ "^"
+ data["FragmentCharge"].astype(str)
)
if cfg.rt_calibration and cfg.rt_unit == "iRT":
data["NormalizedRetentionTime"] = (
preprocessing.MinMaxScaler().fit_transform(
data[["NormalizedRetentionTime"]]
)
* 100
)
if cfg.intensity_calibration:
data["LibraryIntensity"] = (
data["LibraryIntensity"]
/ data.groupby("Precursor")["LibraryIntensity"].transform("max")
* 10000
)
logger.debug(
"Removing {} rows with zero intensity.".format(
len(data[data["LibraryIntensity"] <= 0])
)
)
# Remove rows with zero intensity
data = data[data["LibraryIntensity"] > 0]
## Print Library statistics
logger.info(
f"Library Contains {len(data['Precursor'].drop_duplicates())} Precursors"
)
logger.info(f"Precursor Fragment Distribution (Before Filtering)")
num_frags_per_prec = (
data[["Precursor", "TransitionId"]]
.groupby("Precursor")
.count()
.reset_index(names="Precursor")
.groupby("TransitionId")
.count()
)
for frag, count in num_frags_per_prec.iterrows():
logger.info(
f"There are {count['Precursor']} precursors with {frag} fragment(s)"
)
logger.info(
f"Filter library to precursors containing {cfg.min_fragments} or more fragments"
)
ids_to_keep = data[["Precursor", "Annotation"]].groupby("Precursor").count()
ids_to_keep = ids_to_keep[ids_to_keep["Annotation"] >= cfg.min_fragments].index
data = data[data["Precursor"].isin(ids_to_keep)]
logger.info(
f"After filtering, library contains {len(data['Precursor'].drop_duplicates())} Precursors"
)
if cfg.keep_decoys:
logger.info(
"Of Which {} are decoys".format(
len(data[data["Decoy"] == 1]["Precursor"].drop_duplicates())
)
)
data.drop(
columns=["TransitionId", "Q_Value", "RunId", "Intensity"], inplace=True
)
if cfg.test:
data = data.sort_values(
by=[
"Precursor",
"FragmentType",
"FragmentSeriesNumber",
"FragmentCharge",
"ProductMz",
]
)
logger.info("Exporting library to file.")
data.to_csv(cfg.outfile, sep="\t", index=False)
[docs]
def export_quant_matrix(self, data: pd.DataFrame) -> pd.DataFrame:
"""
Export quantification matrix at specified level with optional normalization.
Args:
data: Input DataFrame with quantification data
"""
cfg = self.config
# Filter out decoys if exclude_decoys is True
if cfg.exclude_decoys and "decoy" in data.columns:
initial_count = len(data)
data = data[data["decoy"] == 0]
decoy_count = initial_count - len(data)
if decoy_count > 0:
logger.info(f"Excluded {decoy_count} decoy entries from quantification matrix.")
# Check if data is empty
if data.empty:
raise ValueError(
"No identification results passed the filtering criteria. "
"The filtered dataset is empty. Please check your filter settings: "
f"max_rs_peakgroup_qvalue={cfg.max_rs_peakgroup_qvalue}, "
f"max_global_peptide_qvalue={cfg.max_global_peptide_qvalue}, "
f"max_global_protein_qvalue={cfg.max_global_protein_qvalue}"
)
sep = "," if cfg.out_type == "csv" else "\t"
level = self.level
normalization = self.config.normalization
# Validate input
if level not in ["precursor", "peptide", "protein", "gene"]:
raise ValueError(
"Invalid level. Choose from: precursor, peptide, protein, gene"
)
if normalization not in self.normalization_methods:
raise ValueError(
f"Invalid normalization. Choose from: {list(self.normalization_methods.keys())}"
)
# Get the appropriate summarization method
summarizer = getattr(self, f"_summarize_{level}_level")
matrix = summarizer(data, self.config.top_n, self.config.consistent_top)
# Apply normalization
if normalization != "none":
# Set non-numeric columns as index
non_numeric_cols = list(matrix.select_dtypes(exclude="number").columns)
if len(non_numeric_cols) > 0:
matrix = matrix.set_index(non_numeric_cols)
matrix = self.normalization_methods[normalization](matrix)
matrix = matrix.reset_index()
matrix.to_csv(self.config.outfile, sep=sep, index=False)
[docs]
def _summarize_precursor_level(
self, data: pd.DataFrame, _top_n: int, _consistent_top: bool
) -> pd.DataFrame:
"""
Create precursor-level matrix (no summarization needed).
Just select top peak group per precursor.
"""
# Select top ranking peak group only
idx = data.groupby(["run_id", "transition_group_id"])["m_score"].idxmin()
data = data.loc[idx]
if data.empty:
raise ValueError(
"No data available for precursor-level summarization. "
"This typically occurs when no peak groups pass the q-value thresholds."
)
logger.info("Summarizing to precursor level.")
# Create matrix
matrix = data.pivot_table(
index=[
"transition_group_id",
"Sequence",
"FullPeptideName",
"Charge",
"ProteinName",
],
columns="filename",
values="Intensity",
).reset_index()
return matrix
[docs]
def _summarize_peptide_level(
self, data: pd.DataFrame, top_n: int, consistent_top: bool
) -> pd.DataFrame:
"""
Summarize to peptide level using top N precursors.
"""
# First get top peak group per precursor
idx = data.groupby(["run_id", "transition_group_id"])["m_score"].idxmin()
data = data.loc[idx]
if data.empty:
raise ValueError(
"No data available after filtering for peptide-level summarization. "
"This typically occurs when no peak groups pass the q-value thresholds."
)
logger.info("Summarizing to peptide level.")
# Get top precursors for each peptide
if consistent_top:
logger.info("Using consistent top precursors across all runs.")
# Use precursors with highest median intensity across all runs
median_intensity = (
data.groupby(["transition_group_id", "Sequence", "FullPeptideName"])[
"Intensity"
]
.median()
.reset_index()
)
top_precursors = (
median_intensity.groupby(["Sequence", "FullPeptideName"])
.apply(lambda x: x.nlargest(top_n, "Intensity")["transition_group_id"])
.reset_index()["transition_group_id"]
)
data = data[data["transition_group_id"].isin(top_precursors)]
else:
logger.info("Using top precursors per run individually.")
# Select top precursors per run individually
data = (
data.groupby(["run_id", "Sequence", "FullPeptideName"])
.apply(lambda x: x.nlargest(top_n, "Intensity"))
.reset_index(drop=True)
)
if data.empty:
raise ValueError(
"No data available after selecting top precursors for peptide-level summarization. "
"Check that top_n is not too large for your dataset."
)
# Summarize by peptide (mean of top precursors)
# Group and aggregate
grouped = data.groupby(["Sequence", "FullPeptideName", "filename"])["Intensity"].mean()
# Unstack and reset index carefully to avoid column name conflicts
try:
peptide_matrix = grouped.unstack(fill_value=None)
# Reset index to convert index columns to regular columns
peptide_matrix = peptide_matrix.reset_index()
except ValueError as e:
if "cannot insert" in str(e):
raise ValueError(
"Failed to create quantification matrix because a run filename "
"conflicts with reserved column names ('Sequence' or 'FullPeptideName'). "
"Please rename the conflicting input file or adjust the data before summarization."
) from e
raise
return peptide_matrix
[docs]
def _summarize_protein_level(
self, data: pd.DataFrame, top_n: int, consistent_top: bool
) -> pd.DataFrame:
"""
Summarize to protein level using top N peptides.
"""
# First summarize to peptide level
peptide_matrix = self._summarize_peptide_level(
data, top_n=top_n, consistent_top=consistent_top
)
logger.info("Summarizing to protein level.")
# Need to get protein annotations - get from original data
protein_map = data.drop_duplicates(
["FullPeptideName", "ProteinName"]
).set_index("FullPeptideName")["ProteinName"]
# Split protein groups and explode (one row per protein)
protein_matrix = peptide_matrix.copy()
protein_matrix = protein_matrix.merge(
protein_map.reset_index(),
left_on="FullPeptideName",
right_on="FullPeptideName",
how="left",
)
if protein_map.empty or protein_matrix["ProteinName"].isna().all():
raise ValueError(
"No protein data available after mapping peptides to proteins. "
"Check that protein annotations are present in the data."
)
protein_matrix = protein_matrix.explode("ProteinName")
if consistent_top:
# Calculate median intensity for each peptide-protein combination
protein_matrix["median_intensity"] = protein_matrix.select_dtypes(
include="number"
).median(axis=1)
# Get top N peptides per protein based on median intensity
top_peptides = protein_matrix.groupby("ProteinName").apply(
lambda x: x.nlargest(top_n, "median_intensity")
)
# Flatten the multi-index and select the top peptides
top_peptides = top_peptides.droplevel(0)
protein_matrix = protein_matrix.loc[top_peptides.index]
# Drop the median_intensity column as it is no longer needed
protein_matrix = protein_matrix.drop(columns=["median_intensity"])
# Summarize by protein (mean of top peptides)
protein_matrix = (
protein_matrix.groupby("ProteinName").mean(numeric_only=True).reset_index()
)
if protein_matrix.empty:
raise ValueError(
"No data available after protein-level summarization. "
"This may indicate all proteins were filtered out."
)
return protein_matrix
[docs]
def _summarize_gene_level(
self, data: pd.DataFrame, top_n: int, consistent_top: bool
) -> pd.DataFrame:
"""
Summarize to gene level using top N peptides.
"""
# First summarize to peptide level
peptide_matrix = self._summarize_peptide_level(
data, top_n=top_n, consistent_top=consistent_top
)
logger.info("Summarizing to gene level.")
# Need to get gene annotations - get from original data
gene_map = data.drop_duplicates(["FullPeptideName", "Gene"]).set_index(
"FullPeptideName"
)["Gene"]
# Split gene groups and explode (one row per gene)
gene_matrix = peptide_matrix.copy()
gene_matrix = gene_matrix.merge(
gene_map.reset_index(),
left_on="FullPeptideName",
right_on="FullPeptideName",
how="left",
)
if gene_map.empty or gene_matrix["Gene"].isna().all():
raise ValueError(
"No gene data available after mapping peptides to genes. "
"Check that gene annotations are present in the data."
)
gene_matrix = gene_matrix.explode("Gene")
if consistent_top:
# Calculate median intensity for each peptide-gene combination
gene_matrix["median_intensity"] = gene_matrix.select_dtypes(
include="number"
).median(axis=1)
# Get top N peptides per gene based on median intensity
top_peptides = gene_matrix.groupby("Gene").apply(
lambda x: x.nlargest(top_n, "median_intensity")
)
# Flatten the multi-index and select the top peptides
top_peptides = top_peptides.droplevel(0)
gene_matrix = gene_matrix.loc[top_peptides.index]
# Drop the median_intensity column as it is no longer needed
gene_matrix = gene_matrix.drop(columns=["median_intensity"])
# Summarize by gene (mean of top peptides)
gene_matrix = gene_matrix.groupby("Gene").mean(numeric_only=True).reset_index()
if gene_matrix.empty:
raise ValueError(
"No data available after gene-level summarization. "
"This may indicate all genes were filtered out."
)
return gene_matrix
[docs]
def _quantile_normalize(self, matrix: pd.DataFrame) -> pd.DataFrame:
"""Quantile normalization"""
try:
from sklearn.preprocessing import quantile_transform
logger.info("Applying quantile normalization.")
# Transpose to normalize samples (columns) together
normalized = quantile_transform(matrix.T, copy=True).T
return pd.DataFrame(normalized, index=matrix.index, columns=matrix.columns)
except ImportError as exc:
raise ImportError(
"scikit-learn is required for quantile normalization"
) from exc
[docs]
def _execute_copy_query(self, conn, query: str, path: str) -> None:
"""Execute COPY query with configured compression settings"""
conn.execute(
f"COPY ({query}) TO '{path}' "
f"(FORMAT 'parquet', COMPRESSION '{self.config.compression_method}', "
f"COMPRESSION_LEVEL {self.config.compression_level})"
)
[docs]
@dataclass
class BaseOSWReader(BaseReader):
"""
Class for reading and processing data from an OpenSWATH workflow OSW-sqlite based file.
The OSWReader class provides methods to read different levels of data from the file and process it accordingly.
It supports reading data for semi-supervised learning, IPF analysis, context level analysis.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
read(): Read data from the input file based on the alogorithm.
_init_duckdb_views(): Initialize DuckDB views with optional subsampling.
"""
[docs]
def __init__(self, config: BaseIOConfig):
super().__init__(config)
[docs]
def read(self) -> pd.DataFrame:
"""
Abstract method to be implemented by subclasses to read data from OSW format for a specific algorithm.
"""
raise NotImplementedError(
"The read method must be implemented in subclasses of BaseOSWReader."
)
[docs]
def _init_duckdb_views(self, con):
"""
Initialize DuckDB views for the OSW file with optional subsampling support.
Creates a TEMP table of sampled precursor IDs if subsample_ratio < 1.0,
which can be used by subclasses to filter feature queries.
Subclasses should call this method before creating feature views and then
filter views with: WHERE PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)
when self.subsample_ratio < 1.0.
Args:
con: DuckDB connection with OSW database attached as 'osw'
"""
# Create TEMP table of sampled precursor IDs (if needed)
if self.subsample_ratio < 1.0:
logger.info(
f"Subsampling data for semi-supervised learning. Ratio: {self.subsample_ratio:.2f}"
)
precursor_count = con.execute(
"SELECT COUNT(DISTINCT ID) FROM osw.PRECURSOR"
).fetchone()[0]
sample_size = max(1, math.ceil(precursor_count * self.subsample_ratio))
con.execute(
f"""
CREATE TEMP TABLE sampled_precursor_ids AS
SELECT DISTINCT ID AS PRECURSOR_ID
FROM osw.PRECURSOR
ORDER BY hash(ID)
LIMIT {sample_size}
"""
)
n = con.execute("SELECT COUNT(*) FROM sampled_precursor_ids").fetchone()[0]
logger.info(f"Sampled {n} precursor IDs")
[docs]
@dataclass
class BaseOSWWriter(BaseWriter):
"""
Class for writing OpenSWATH results to an OSW-sqlite based file.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
save_results(result, pi0): Save the results to the output file based on the module using this class.
save_weights(weights): Save the weights to the output file.
"""
[docs]
def __init__(self, config: BaseIOConfig):
super().__init__(config)
[docs]
def save_results(self, result, pi0):
"""
Abstract method to save scoring results and statistical outputs.
"""
raise NotImplementedError(
"The save_results method must be implemented in subclasses of BaseOSWWriter."
)
[docs]
@dataclass
class BaseParquetReader(BaseReader):
"""
Class for reading and processing data from OpenSWATH results stored in Parquet format.
The ParquetReader class provides methods to read different levels of data from the file and process it accordingly.
It supports reading data for semi-supervised learning, IPF analysis, context level analysis.
This assumes that the input file contains precursor and transition data.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
read(): Read data from the input file based on the alogorithm.
"""
[docs]
def __init__(self, config: BaseIOConfig):
super().__init__(config)
if self.config.context == "ipf" and config.propagate_signal_across_runs:
# We make the assumption that the alignment file is in the same directory as the input file
self.alignment_file = os.path.join(
os.path.dirname(self.infile), "feature_alignment.parquet"
)
if not os.path.exists(self.alignment_file):
raise click.ClickException(
f"To use the --propagate-signal-across-runs option, "
f"the alignment file {self.alignment_file} must exist."
)
[docs]
def read(self) -> pd.DataFrame:
"""
Abstract method to be implemented by subclasses to read data from Parquet format for a specific algorithm.
"""
raise NotImplementedError(
"The read method must be implemented in subclasses of BaseParquetReader."
)
def _init_duckdb_views(self, con):
# Create TEMP table of sampled precursors IDs (if needed)
if self.subsample_ratio < 1.0:
logger.info(
f"Subsampling {self.subsample_ratio * 100}% data for semi-supervised learning."
)
con.execute(
f"""
CREATE TEMP TABLE sampled_precursor_ids AS
SELECT DISTINCT PRECURSOR_ID
FROM read_parquet('{self.infile}')
USING SAMPLE {self.subsample_ratio * 100}%
"""
)
n = con.execute("SELECT COUNT(*) FROM sampled_precursor_ids").fetchone()[0]
logger.info(f"Sampled {n} precursor IDs")
con.execute(
f"CREATE VIEW data AS SELECT * FROM read_parquet('{self.infile}') WHERE PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)"
)
else:
con.execute(
f"CREATE VIEW data AS SELECT * FROM read_parquet('{self.infile}')"
)
if self.context == "ipf" and self.config.propagate_signal_across_runs:
con.execute(
f"CREATE VIEW alignment AS SELECT * FROM read_parquet('{self.alignment_file}')"
)
def _get_columns_by_prefix(self, parquet_file, prefix):
cols = get_parquet_column_names(parquet_file)
return [c for c in cols if c.startswith(prefix)]
[docs]
def _collapse_protein_ids(self, df):
"""
Collapse protein IDs to avoid duplicating feature data in the DataFrame.
Parameters:
- df (DataFrame): Input DataFrame containing feature data.
Returns:
- DataFrame: DataFrame with collapsed protein IDs.
Example:
df = _collapse_protein_ids(df)
"""
org_shape = df.shape
# Handle cases where there may be multiple proteins mapping to a peptide with the same feature ID.
# We collapse the protein IDs to avoid duplicating feature data
df = (
df.group_by(["GROUP_ID", "FEATURE_ID"])
.agg(
[
pl.col("PROTEIN_ID").unique().alias("PROTEIN_ID"),
*[
pl.col(c).first()
for c in df.columns
if c not in {"GROUP_ID", "FEATURE_ID", "PROTEIN_ID"}
],
]
)
.sort(["RUN_ID", "PRECURSOR_ID", "EXP_RT"])
)
collapsed_shape = df.shape
if org_shape != collapsed_shape:
logger.info(
f"Collapsed {org_shape[0] - collapsed_shape[0]} of {org_shape[0]} rows due to multiple proteins mapping to the same feature ID."
)
# print(
# df.filter(
# pl.col("PRECURSOR_ID").is_in([7012])
# & pl.col("FEATURE_ID").is_in([33731659783088])
# )
# )
return df
[docs]
def _collapse_ipf_peptide_ids(self, df):
"""
Collapse IPF peptide IDs to avoid duplicating feature data in the DataFrame.
Parameters:
- df (DataFrame): Input DataFrame containing feature data.
Returns:
- DataFrame: DataFrame with collapsed IPF peptide IDs.
"""
org_shape = df.shape
# Handle cases where there may be multiple peptides mapping to a precursor with the same feature ID.
# We collapse the IPF peptide IDs to avoid duplicating feature data
df = (
df.group_by(["GROUP_ID", "FEATURE_ID"])
.agg(
[
pl.col("IPF_PEPTIDE_ID").unique().alias("IPF_PEPTIDE_ID"),
*[
pl.col(c).first()
for c in df.columns
if c not in {"GROUP_ID", "FEATURE_ID", "IPF_PEPTIDE_ID"}
],
]
)
.sort(["RUN_ID", "TRANSITION_ID"])
)
collapsed_shape = df.shape
if org_shape != collapsed_shape:
logger.info(
f"Collapsed {org_shape[0] - collapsed_shape[0]} of {org_shape[0]} rows due to multiple transitions features mapping to different peptidoforms."
)
return df
[docs]
@dataclass
class BaseParquetWriter(BaseWriter):
"""
Class for writing OpenSWATH results to a Parquet file.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
save_results(result, pi0): Save the results to the output file based on the module using this class.
save_weights(weights): Save the weights to the output file.
"""
[docs]
def __init__(self, config: BaseIOConfig):
super().__init__(config)
if self.context == "levels_context":
self.context_level_id_map = {
"peptide": "peptide_id",
"protein": "protein_id",
"gene": "gene_id",
}
[docs]
def save_results(self, result, pi0):
"""
Abstract method to save scoring results and statistical outputs.
"""
raise NotImplementedError(
"The save_results method must be implemented in subclasses of ParquetWriter."
)
[docs]
@dataclass
class BaseSplitParquetReader(BaseParquetReader):
"""
Class for reading and processing data from OpenSWATH results stored in a directoy containing split Parquet files.
The ParquetReader class provides methods to read different levels of data from the split parquet files and process it accordingly.
It supports reading data for semi-supervised learning, IPF analysis, context level analysis.
This assumes that the input infile path is a directory containing the following files:
- precursors_features.parquet
- transition_features.parquet
- feature_alignment.parquet (optional)
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
read(): Read data from the input file based on the alogorithm.
"""
[docs]
def __init__(self, config: BaseIOConfig):
super().__init__(config)
self.config = config
if config.file_type not in ("parquet_split", "parquet_split_multi"):
raise click.ClickException(
f"SplitParquetReader requires 'parquet_split' or 'parquet_split_multi' input, got '{config.file_type}' instead."
)
# Flag to indicate whether the input is a multi-run directory
self._is_multi_run = config.file_type == "parquet_split_multi"
[docs]
def read(self) -> pd.DataFrame:
"""
Abstract method to be implemented by subclasses to read data from splti parquet format for a specific algorithm.
"""
raise NotImplementedError(
"The read method must be implemented in subclasses of BaseSplitParquetReader."
)
[docs]
def _init_duckdb_views(self, con):
"""
Initialize DuckDB views for the split Parquet files.
"""
base_dir = self.infile
# Gather files from multiple runs
precursor_files = glob.glob(
os.path.join(base_dir, "*.oswpq", "precursors_features.parquet")
)
transition_files = glob.glob(
os.path.join(base_dir, "*.oswpq", "transition_features.parquet")
)
alignment_file = os.path.join(base_dir, "feature_alignment.parquet")
print_alignment_file = (
alignment_file if os.path.exists(alignment_file) else None
)
# If no multi-run structure, check single run input directory
if not precursor_files:
precursor_path = os.path.join(base_dir, "precursors_features.parquet")
if os.path.exists(precursor_path):
precursor_files = [precursor_path]
if not transition_files:
transition_path = os.path.join(base_dir, "transition_features.parquet")
if os.path.exists(transition_path):
transition_files = [transition_path]
if self.context in ("score_learn", "ipf", "levels_context", "export"):
print_parquet_tree(
base_dir, precursor_files, transition_files, print_alignment_file, 5
)
if not precursor_files:
raise click.ClickException("Error: No precursor Parquet files found.")
# Create TEMP table of sampled precursor IDs (if needed)
if self.subsample_ratio < 1.0:
logger.info(
f"Subsampling data for semi-supervised learning. Ratio: {self.subsample_ratio:.2f}"
)
con.execute(
f"""
CREATE TEMP TABLE sampled_precursor_ids AS
SELECT DISTINCT PRECURSOR_ID
FROM read_parquet({precursor_files})
USING SAMPLE {self.subsample_ratio * 100}%
"""
)
n = con.execute("SELECT COUNT(*) FROM sampled_precursor_ids").fetchone()[0]
logger.info(f"Sampled {n} precursor IDs")
# Create view: precursors
if self.subsample_ratio < 1.0:
logger.trace("Creating 'precursors' view with sampled precursor IDs")
con.execute(
f"""
CREATE VIEW precursors AS
SELECT *
FROM read_parquet({precursor_files})
WHERE PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)
"""
)
else:
logger.trace("Creating 'precursors' view with full input")
con.execute(
f"CREATE VIEW precursors AS SELECT * FROM read_parquet({precursor_files})"
)
# Create view: transition
if transition_files and self.config.context in (
"score_learn",
"score_apply",
"ipf",
"export",
):
if self.subsample_ratio < 1.0:
logger.trace("Creating 'transition' view with sampled precursor IDs")
con.execute(
f"""
CREATE VIEW transition AS
SELECT *
FROM read_parquet({transition_files})
WHERE PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)
"""
)
else:
logger.trace("Creating 'transition' view with full input")
con.execute(
f"CREATE VIEW transition AS SELECT * FROM read_parquet({transition_files})"
)
# Create view: feature_alignment
if os.path.exists(alignment_file) and self.config.context in (
"score_learn",
"score_apply",
"ipf",
):
if self.subsample_ratio < 1.0:
logger.trace(
"Creating 'feature_alignment' view with sampled precursor IDs"
)
con.execute(
f"""
CREATE VIEW feature_alignment AS
SELECT *
FROM read_parquet('{alignment_file}')
WHERE PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)
"""
)
else:
logger.trace("Creating 'feature_alignment' view with full input")
con.execute(
f"CREATE VIEW feature_alignment AS SELECT * FROM read_parquet('{alignment_file}')"
)
[docs]
def _get_columns_by_prefix(self, parquet_file, prefix):
"""
Returns columns that start with `prefix` from one of the parquet files.
In multi-run mode, uses the first run's file as a representative.
"""
if self._is_multi_run:
candidate = glob.glob(
os.path.join(self.config.infile, "*.oswpq", parquet_file)
)
if not candidate:
raise click.ClickException(
f"Could not find '{parquet_file}' in any '.oswpq' subdirectory of '{self.config.infile}'."
)
path = candidate[0]
else:
path = os.path.join(self.config.infile, parquet_file)
if not os.path.exists(path):
raise click.ClickException(f"File '{path}' does not exist.")
cols = get_parquet_column_names(path)
logger.trace(f"Columns in {path}: {cols}")
if cols is None:
raise click.ClickException(f"Failed to read schema or columns from: {path}")
return [c for c in cols if c.startswith(prefix)]
[docs]
@dataclass
class BaseSplitParquetWriter(BaseParquetWriter):
"""
Class for writing OpenSWATH results to a directory containing split Parquet files.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
save_results(result, pi0): Save the results to the output file based on the module using this class.
save_weights(weights): Save the weights to the output file.
"""
[docs]
def __init__(self, config: BaseIOConfig):
super().__init__(config)
if self.file_type not in ("parquet_split", "parquet_split_multi"):
raise click.ClickException(
f"SplitParquetWriter requires 'parquet_split' or 'parquet_split_multi' input, got '{self.file_type}' instead."
)
if self.context == "levels_context":
if self.level not in ("peptide", "protein", "gene"):
raise click.ClickException(
f"SplitParquetWriter levels_context only supports peptide, protein, or gene levels, got '{self.level}' instead."
)
self.context_level_id_map = {
"peptide": "peptide_id",
"protein": "protein_id",
"gene": "gene_id",
}
[docs]
def save_results(self, result, pi0=None):
"""
Abstract method to be implemented by subclasses to save scoring results and statistical outputs.
"""
raise NotImplementedError(
"The save_results method must be implemented in subclasses of BaseSplitParquetWriter."
)
[docs]
def merge_files(
self,
merge_transitions: bool = False,
):
"""
Merges the precursors_features.parquet and transition_features.parquet files from all subdirectories into one single file each.
"""
base_dir = self.infile
if merge_transitions:
out_dir = self.outfile
os.makedirs(out_dir, exist_ok=True)
output_file = os.path.join(out_dir, "precursors_features.parquet")
output_file_transitions = os.path.join(
out_dir, "transition_features.parquet"
)
else:
output_file = self.outfile
con = duckdb.connect()
# Use glob to find all precursors and transition features files
precursors_files = glob.glob(
os.path.join(base_dir, "**", "precursors_features.parquet"), recursive=True
)
transition_files = glob.glob(
os.path.join(base_dir, "**", "transition_features.parquet"), recursive=True
)
# Merge precursors features into one parquet file
if precursors_files:
logger.info(
f"Merging {len(precursors_files)} precursors features files into {output_file}..."
)
precursors_query = f"""
COPY (
SELECT * FROM read_parquet([{",".join([f"'{file}'" for file in precursors_files])}])
) TO '{output_file}' (FORMAT 'parquet')
"""
logger.trace(f"Precursors merge query:\n{precursors_query}")
con.execute(precursors_query)
# Merge transition features into one parquet file (optional)
if merge_transitions:
logger.info(
f"Merging {len(transition_files)} transition features files into {output_file_transitions}..."
)
transitions_query = f"""
COPY (
SELECT * FROM read_parquet([{",".join([f"'{file}'" for file in transition_files])}])
) TO '{output_file_transitions}' (FORMAT 'parquet')
"""
con.execute(transitions_query)
con.close()
logger.success("Merging complete.")