Source code for pyprophet.io.scoring.split_parquet

import os

import click
import duckdb
import pandas as pd
from loguru import logger

from ..._config import RunnerIOConfig
from .._base import BaseSplitParquetReader, BaseSplitParquetWriter
from ..util import _ensure_pyarrow, get_parquet_column_names

pa, _, _ = _ensure_pyarrow()


[docs] class SplitParquetReader(BaseSplitParquetReader): """ Class for reading and processing data from OpenSWATH results stored in a directoy containing split Parquet files. The ParquetReader class provides methods to read different levels of data from the split parquet files and process it accordingly. It supports reading data for semi-supervised learning, IPF analysis, context level analysis. This assumes that the input infile path is a directory containing the following files: - precursors_features.parquet - transition_features.parquet - feature_alignment.parquet (optional) Attributes: infile (str): Input file path. outfile (str): Output file path. classifier (str): Classifier used for semi-supervised learning. level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific'). glyco (bool): Flag indicating whether analysis is glycoform-specific. Methods: read(): Read data from the input file based on the alogorithm. """
[docs] def __init__(self, config: RunnerIOConfig): super().__init__(config)
[docs] def read(self) -> pd.DataFrame: """ Reads and processes data from a DuckDB connection to generate a final feature table based on the specified level and main score. Returns: pd.DataFrame: Final feature table with the specified main score. """ con = duckdb.connect() self._init_duckdb_views(con) ss_main_score = self.config.runner.ss_main_score feature_table = self._fetch_feature_table(con) if self.level == "ms1ms2": ms1_cols = self._get_columns_by_prefix( "precursors_features.parquet", "FEATURE_MS1_VAR" ) feature_table = self._merge_ms1ms2_features(con, feature_table, ms1_cols) con.close() return self._finalize_feature_table(feature_table, ss_main_score)
def _fetch_feature_table(self, con): if self.level in ("ms2", "ms1ms2"): return self._fetch_ms2_features( con, self._get_columns_by_prefix( "precursors_features.parquet", "FEATURE_MS2_" ), ) elif self.level == "ms1": return self._fetch_ms1_features( con, self._get_columns_by_prefix( "precursors_features.parquet", "FEATURE_MS1_" ), ) elif self.level == "transition": if not self._get_columns_by_prefix( "precursors_features.parquet", "SCORE_MS2_" ): raise click.ClickException( "Transition-level scoring for IPF requires prior MS2 or MS1MS2-level scoring. Please run 'pyprophet score --level=ms2' or 'pyprophet score --level=ms1ms2' on this file first." ) # if not os.path.exists( # os.path.join(self.infile, "transition_features.parquet") # ): # raise click.ClickException("Transition-level feature table not found.") return self._fetch_transition_features( con, self._get_columns_by_prefix( "transition_features.parquet", "FEATURE_TRANSITION_VAR" ), ) elif self.level == "alignment": return self._fetch_alignment_features( con, self._get_columns_by_prefix("feature_alignment.parquet", "VAR_") ) else: raise click.ClickException( "Unsupported level for reading semi-supervised input." ) def _fetch_ms2_features(self, con, feature_cols): cols_sql = ", ".join([f"p.{col}" for col in feature_cols]) query = f"""SELECT p.RUN_ID, p.PRECURSOR_ID, p.PROTEIN_ID, p.PRECURSOR_CHARGE, p.FEATURE_ID, p.EXP_RT, p.PRECURSOR_DECOY AS DECOY, {cols_sql}, COALESCE(t.TRANSITION_COUNT, 0) AS TRANSITION_COUNT, p.RUN_ID || '_' || p.PRECURSOR_ID AS GROUP_ID FROM precursors p LEFT JOIN ( SELECT PRECURSOR_ID, COUNT(*) AS TRANSITION_COUNT FROM ( SELECT DISTINCT PRECURSOR_ID, TRANSITION_ID FROM transition WHERE TRANSITION_DETECTING = 1 ) sub GROUP BY PRECURSOR_ID ) t ON p.PRECURSOR_ID = t.PRECURSOR_ID ORDER BY p.RUN_ID, p.PRECURSOR_ID, p.EXP_RT""" df = ( con.execute(query) .pl() .rename({col: col.replace("FEATURE_MS2_", "") for col in feature_cols}) ) df = self._collapse_protein_ids(df) return df.to_pandas() def _fetch_ms1_features(self, con, feature_cols): cols_sql = ", ".join([f"p.{col}" for col in feature_cols]) query = f"""SELECT DISTINCT p.RUN_ID, p.PRECURSOR_ID, p.PROTEIN_ID, p.PRECURSOR_CHARGE, p.FEATURE_ID, p.EXP_RT, p.PRECURSOR_DECOY AS DECOY, {cols_sql}, p.RUN_ID || '_' || p.PRECURSOR_ID AS GROUP_ID FROM precursors p WHERE p.RUN_ID IS NOT NULL ORDER BY p.RUN_ID, p.PRECURSOR_ID, p.EXP_RT""" df = ( con.execute(query) .pl() .rename({col: col.replace("FEATURE_MS1_", "") for col in feature_cols}) ) df = self._collapse_protein_ids(df) return df.to_pandas() def _fetch_transition_features(self, con, feature_cols): cols_sql = ", ".join([f"t.{col}" for col in feature_cols]) rc = self.config.runner query = f"""SELECT t.TRANSITION_DECOY AS DECOY, t.RUN_ID, t.FEATURE_ID, t.IPF_PEPTIDE_ID, t.TRANSITION_ID, t.FEATURE_TRANSITION_AREA_INTENSITY AS AREA_INTENSITY, {cols_sql}, p.PRECURSOR_CHARGE, t.TRANSITION_CHARGE, p.RUN_ID || '_' || t.FEATURE_ID || '_' || t.PRECURSOR_ID || '_' || t.TRANSITION_ID AS GROUP_ID FROM transition t INNER JOIN precursors p ON t.PRECURSOR_ID = p.PRECURSOR_ID AND t.FEATURE_ID = p.FEATURE_ID WHERE p.SCORE_MS2_PEAK_GROUP_RANK <= {rc.ipf_max_peakgroup_rank} AND p.SCORE_MS2_PEP <= {rc.ipf_max_peakgroup_pep} AND p.PRECURSOR_DECOY = 0 AND t.FEATURE_TRANSITION_VAR_ISOTOPE_OVERLAP_SCORE <= {rc.ipf_max_transition_isotope_overlap} AND t.FEATURE_TRANSITION_VAR_LOG_SN_SCORE > {rc.ipf_min_transition_sn} ORDER BY p.RUN_ID, p.PRECURSOR_ID, p.EXP_RT, t.TRANSITION_ID""" logger.trace(f"Transition features SQL query: {query}") df = ( con.execute(query) .pl() .rename( {col: col.replace("FEATURE_TRANSITION_", "") for col in feature_cols} ) ) df = self._collapse_ipf_peptide_ids(df) return df.to_pandas() def _fetch_alignment_features(self, con, feature_cols): cols_sql = ", ".join([f"a.{col}" for col in feature_cols]) query = f"""SELECT a.ALIGNMENT_ID, a.RUN_ID, a.PRECURSOR_ID, a.FEATURE_ID, a.ALIGNED_RT, a.DECOY, {cols_sql}, a.FEATURE_ID || '_' || a.PRECURSOR_ID AS GROUP_ID FROM feature_alignment a ORDER BY a.RUN_ID, a.PRECURSOR_ID, a.REFERENCE_RT""" df = con.execute(query).df() # Map DECOY to 1 and -1 to 0 and 1 # arycal saves a label column to indicate 1 as target aligned peaks and -1 as the random/shuffled decoy aligned peak df["DECOY"] = df["DECOY"].map({1: 0, -1: 1}) return df def _merge_ms1ms2_features(self, con, df, feature_cols): cols_sql = ", ".join([f"p.{col}" for col in feature_cols]) query = f"SELECT DISTINCT p.FEATURE_ID, {cols_sql} FROM precursors p" ms1_df = con.execute(query).df() feature_cols = ms1_df.columns.tolist() feature_cols.remove("FEATURE_ID") ms1_df = ms1_df.rename( columns={ col: col.replace("FEATURE_MS1_VAR_", "VAR_MS1_") for col in feature_cols } ) return pd.merge(df, ms1_df, how="left", on="FEATURE_ID")
[docs] class SplitParquetWriter(BaseSplitParquetWriter): """ Class for writing OpenSWATH results to a directory containing split Parquet files. Attributes: infile (str): Input file path. outfile (str): Output file path. classifier (str): Classifier used for semi-supervised learning. level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific'). glyco (bool): Flag indicating whether analysis is glycoform-specific. Methods: save_results(result, pi0): Save the results to the output file based on the module using this class. save_weights(weights): Save the weights to the output file. """
[docs] def __init__(self, config: RunnerIOConfig): super().__init__(config)
[docs] def save_results(self, result, pi0): """ Save the results of scoring to parquet files based on the specified level. Parameters: - result: The result object containing scored tables. - pi0: The pi0 value used in scoring. """ df = result.scored_tables level = self.level file_map = { "ms2": "precursors_features.parquet", "ms1ms2": "precursors_features.parquet", "ms1": "precursors_features.parquet", "transition": "transition_features.parquet", "alignment": "feature_alignment.parquet", } prefix_map = { "ms2": "SCORE_MS2_", "ms1ms2": "SCORE_MS2_", "ms1": "SCORE_MS1_", "transition": "SCORE_TRANSITION_", "alignment": "SCORE_ALIGNMENT_", } file_key = file_map.get(level) prefix = prefix_map.get(level) score_df = self._prepare_score_dataframe(df, level, prefix) if self.file_type == "parquet_split_multi" and self.level != "alignment": run_dirs = [ os.path.join(self.outfile, d) for d in os.listdir(self.outfile) if (d.endswith(".oswpq") or d.endswith(".oswpqd")) and os.path.isdir(os.path.join(self.outfile, d)) ] for run_dir in run_dirs: file_path = os.path.join(run_dir, file_key) if not os.path.exists(file_path): logger.warning( f"Warning: {file_path} does not exist. Skipping.", ) continue try: con = duckdb.connect() feature_ids = con.execute( f"SELECT FEATURE_ID FROM read_parquet('{file_path}')" ).fetchall() feature_ids = set(f[0] for f in feature_ids) con.close() except duckdb.Error as e: logger.error( f"Error reading FEATURE_IDs from {file_path}: {e}", ) continue subset = score_df[score_df["FEATURE_ID"].isin(feature_ids)] if subset.empty: logger.warning(f"No scores for {run_dir}, skipping write.") continue existing_cols = get_parquet_column_names(file_path) columns_to_keep = self._get_columns_to_keep(existing_cols, prefix) self._write_parquet_with_scores(file_path, subset, columns_to_keep) else: file_path = os.path.join(self.outfile, file_key) if not os.path.exists(file_path): raise click.ClickException(f"Error: {file_path} not found for writing.") existing_cols = get_parquet_column_names(file_path) columns_to_keep = self._get_columns_to_keep(existing_cols, prefix) self._write_parquet_with_scores(file_path, score_df, columns_to_keep) self._write_pdf_report(result, pi0)
def _write_parquet_with_scores( self, target_file: str, df: pd.DataFrame, keep_columns: list[str] ): # Define columns for scoring new_score_cols = [ col for col in df.columns if col not in ( "FEATURE_ID", "PROTEIN_ID", "TRANSITION_ID", "IPF_PEPTIDE_ID", "ALIGNMENT_ID", "DECOY", ) ] new_score_sql = ", ".join([f"s.{col}" for col in new_score_cols]) prefix = "p" join_on = "p.FEATURE_ID = s.FEATURE_ID AND p.PROTEIN_ID = s.PROTEIN_ID" key_cols = "p.FEATURE_ID" if self.level == "transition": prefix = "t" join_on = "t.FEATURE_ID = s.FEATURE_ID AND t.TRANSITION_ID = s.TRANSITION_ID AND t.IPF_PEPTIDE_ID = s.IPF_PEPTIDE_ID" key_cols = "t.FEATURE_ID, t.TRANSITION_ID, t.IPF_PEPTIDE_ID" if self.level == "alignment": prefix = "a" join_on = "a.ALIGNMENT_ID = s.ALIGNMENT_ID AND a.FEATURE_ID = s.FEATURE_ID AND a.DECOY = s.DECOY" key_cols = "a.ALIGNMENT_ID, a.FEATURE_ID, a.DECOY" table_alias = prefix select_old = ", ".join([f"{table_alias}.{col}" for col in keep_columns]) # Register the DataFrame as a table in DuckDB con = duckdb.connect() con.register("scores", pa.Table.from_pandas(df)) # Validate input row entry count and joined entry count remain the same self._validate_row_count_after_join(con, target_file, key_cols, join_on, prefix) con.execute( f""" COPY ( SELECT {select_old}, {new_score_sql} FROM read_parquet('{target_file}') {table_alias} LEFT JOIN scores s ON {join_on} ) TO '{target_file}' (FORMAT 'parquet', COMPRESSION 'ZSTD', COMPRESSION_LEVEL 11); """ ) logger.debug( f"After appendings scores, {target_file} has {self._get_parquet_row_count(con, target_file)} entries" ) con.close() logger.success(f"{target_file} written successfully.")