from shutil import copyfile
import click
import duckdb
import pandas as pd
from loguru import logger
from ..._config import LevelContextIOConfig
from .._base import BaseParquetReader, BaseParquetWriter
from ..util import _ensure_pyarrow, get_parquet_column_names
pa, _, _ = _ensure_pyarrow()
[docs]
class ParquetReader(BaseParquetReader):
"""
Class for reading and processing data from OpenSWATH results stored in Parquet format.
The ParquetReader class provides methods to read different levels of data from the file and process it accordingly.
It supports reading data for semi-supervised learning, IPF analysis, context level analysis.
This assumes that the input file contains precursor and transition data.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
read(): Read data from the input file based on the alogorithm.
"""
[docs]
def __init__(self, config: LevelContextIOConfig):
super().__init__(config)
[docs]
def read(self) -> pd.DataFrame:
con = duckdb.connect()
try:
self._init_duckdb_views(con)
level = self.config.level
if level == "peptide":
return self._read_pyp_peptide(con)
elif level == "glycopeptide":
raise click.ClickException(
"Glycopeptide-level inference is not supported for split parquet files."
)
elif level == "protein":
return self._read_pyp_protein(con)
elif level == "gene":
return self._read_pyp_gene(con)
else:
raise click.ClickException(f"Unsupported level: {level}")
finally:
con.close()
def _read_pyp_peptide(self, con) -> pd.DataFrame:
cfg = self.config # LevelContextIOConfig instance
all_cols = get_parquet_column_names(self.infile)
if not any(c.startswith("SCORE_MS2_") for c in all_cols):
raise click.ClickException(
"Apply scoring to MS2-level data before running peptide-level scoring."
)
if cfg.context_fdr == "global":
run_id = "NULL"
group_id = "PEPTIDE_ID"
else:
run_id = "RUN_ID"
group_id = "RUN_ID || '_' || PEPTIDE_ID"
logger.info("Reading peptide-level data ...")
query = f"""
SELECT
{run_id} AS RUN_ID,
{group_id} AS GROUP_ID,
PEPTIDE_ID AS PEPTIDE_ID,
PRECURSOR_DECOY AS DECOY,
SCORE_MS2_SCORE AS SCORE,
'{cfg.context_fdr}' AS CONTEXT
FROM data p
WHERE SCORE_MS2_SCORE IS NOT NULL
QUALIFY ROW_NUMBER() OVER (PARTITION BY {group_id} ORDER BY SCORE_MS2_SCORE DESC) = 1
"""
df = con.execute(query).df()
return df.rename(columns=str.lower)
def _read_pyp_protein(self, con) -> pd.DataFrame:
cfg = self.config # LevelContextIOConfig instance
all_cols = get_parquet_column_names(self.infile)
if not any(c.startswith("SCORE_MS2_") for c in all_cols):
raise click.ClickException(
"Apply scoring to MS2-level data before running protein-level scoring."
)
if cfg.context_fdr == "global":
run_id = "NULL"
group_id = "PROTEIN_ID"
else:
run_id = "RUN_ID"
group_id = "RUN_ID || '_' || PROTEIN_ID"
logger.info("Reading protein-level data ...")
query = f"""
with one_peptide_proteins AS (
SELECT PEPTIDE_ID
FROM data
WHERE PROTEIN_ID IS NOT NULL
GROUP BY PEPTIDE_ID
HAVING COUNT(DISTINCT PROTEIN_ID) = 1
)
SELECT
{run_id} AS RUN_ID,
{group_id} AS GROUP_ID,
PROTEIN_ID AS PROTEIN_ID,
PRECURSOR_DECOY AS DECOY,
SCORE_MS2_SCORE AS SCORE,
'{cfg.context_fdr}' AS CONTEXT
FROM data p
JOIN one_peptide_proteins opp ON p.PEPTIDE_ID = opp.PEPTIDE_ID
WHERE SCORE_MS2_SCORE IS NOT NULL
QUALIFY ROW_NUMBER() OVER (PARTITION BY {group_id} ORDER BY SCORE_MS2_SCORE DESC) = 1
"""
df = con.execute(query).df()
df.columns = [col.lower() for col in df.columns]
return df
def _read_pyp_gene(self, con) -> pd.DataFrame:
cfg = self.config # LevelContextIOConfig instance
all_cols = get_parquet_column_names(self.infile)
if not any(c.startswith("SCORE_MS2_") for c in all_cols):
raise click.ClickException(
"Apply scoring to MS2-level data before running gene-level scoring."
)
if cfg.context_fdr == "global":
run_id = "NULL"
group_id = "GENE_ID"
else:
run_id = "RUN_ID"
group_id = "RUN_ID || '_' || GENE_ID"
logger.info("Reading gene-level data ...")
query = f"""
WITH one_gene_peptides AS (
SELECT PEPTIDE_ID
FROM data
WHERE GENE_ID IS NOT NULL
GROUP BY PEPTIDE_ID
HAVING COUNT(DISTINCT GENE_ID) = 1
)
SELECT
{run_id} AS RUN_ID,
{group_id} AS GROUP_ID,
GENE_ID AS GENE_ID,
PRECURSOR_DECOY AS DECOY,
SCORE_MS2_SCORE AS SCORE,
'{cfg.context_fdr}' AS CONTEXT
FROM data p
JOIN one_gene_peptides ogp ON p.PEPTIDE_ID = ogp.PEPTIDE_ID
WHERE SCORE_MS2_SCORE IS NOT NULL
QUALIFY ROW_NUMBER() OVER (PARTITION BY {group_id} ORDER BY SCORE_MS2_SCORE DESC) = 1
"""
df = con.execute(query).df()
df.columns = [col.lower() for col in df.columns]
return df
[docs]
class ParquetWriter(BaseParquetWriter):
"""
Class for writing OpenSWATH results to a Parquet file.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
save_results(result, pi0): Save the results to the output file based on the module using this class.
save_weights(weights): Save the weights to the output file.
"""
[docs]
def __init__(self, config: LevelContextIOConfig):
super().__init__(config)
[docs]
def save_results(self, result):
if self.infile != self.outfile:
copyfile(self.infile, self.outfile)
file_path = self.outfile
context = self.config.context_fdr
context_level_id = self.context_level_id_map.get(self.level)
col_prefix = f"SCORE_{self.level.upper()}_{context.upper().replace('-', '_')}"
result = result[
[
"context",
"run_id",
context_level_id,
"score",
"p_value",
"q_value",
"pep",
]
]
# drop context column
result = result.drop(columns=["context"])
result.columns = [col.upper() for col in result.columns]
result = result.rename(
columns=lambda x: (
f"{col_prefix}_{x}"
if x not in ("RUN_ID", context_level_id.upper())
else x
)
)
score_cols = [
col
for col in result.columns
if col.startswith(col_prefix) and col != "RUN_ID"
]
existing_cols = get_parquet_column_names(file_path)
exitsting_score_cols = [
col for col in existing_cols if col.startswith(col_prefix)
]
if exitsting_score_cols:
logger.warning(
f"Warn: There are existing {col_prefix}_ columns, these will be dropped."
)
existing_cols = [col for col in existing_cols if not col.startswith(col_prefix)]
select_old = ", ".join([f"p.{col}" for col in existing_cols])
new_score_sql = ", ".join([f"s.{col}" for col in score_cols])
con = duckdb.connect()
con.register("scores", pa.Table.from_pandas(result))
if context == "global":
# Validate input row entry count and joined entry count remain the same
self._validate_row_count_after_join(
con,
file_path,
f"p.{context_level_id.upper()}",
f"p.{context_level_id.upper()} = s.{context_level_id.upper()}",
"p",
)
con.execute(
f"""
COPY (
SELECT {select_old}, {new_score_sql}
FROM read_parquet('{file_path}') p
LEFT JOIN scores s
ON p.{context_level_id.upper()} = s.{context_level_id.upper()}
) TO '{file_path}'
(FORMAT 'parquet', COMPRESSION 'ZSTD', COMPRESSION_LEVEL 11)
"""
)
else:
# Validate input row entry count and joined entry count remain the same
self._validate_row_count_after_join(
con,
file_path,
f"p.RUN_ID, p.{context_level_id.upper()}",
f"p.RUN_ID = s.RUN_ID AND p.{context_level_id.upper()} = s.{context_level_id.upper()}",
"p",
)
con.execute(
f"""
COPY (
SELECT {select_old}, {new_score_sql}
FROM read_parquet('{file_path}') p
LEFT JOIN scores s
ON p.RUN_ID = s.RUN_ID AND p.{context_level_id.upper()} = s.{context_level_id.upper()}
) TO '{file_path}'
(FORMAT 'parquet', COMPRESSION 'ZSTD', COMPRESSION_LEVEL 11)
"""
)
logger.debug(
f"After appendings scores, {file_path} has {self._get_parquet_row_count(con, file_path)} entries"
)
con.close()
logger.success(f"Updated: {file_path}")