import os
import re
import sqlite3
from typing import Tuple
import click
import duckdb
import numpy as np
import pandas as pd
from loguru import logger
from ..._config import ExportIOConfig
from .._base import BaseOSWReader, BaseOSWWriter
from ..util import (
check_sqlite_table,
get_table_columns,
get_table_columns_with_types,
load_sqlite_scanner,
unimod_to_codename,
write_scores_sql_command,
)
[docs]
class OSWReader(BaseOSWReader):
"""
Class for reading and processing data from an OpenSWATH workflow OSW-sqlite based file.
Extended to support exporting functionality.
"""
[docs]
def __init__(self, config: ExportIOConfig):
super().__init__(config)
[docs]
def read(self) -> pd.DataFrame:
"""
Read data from the OpenSWATH workflow OSW-sqlite based file.
"""
con = sqlite3.connect(self.infile)
if self.config.context == "export_scored_report":
return self._read_for_export_scored_report(con)
else:
return self._read_sqlite(con)
[docs]
def _create_indexes(self):
"""
Create necessary indexes for export queries.
"""
try:
sqlite_con = sqlite3.connect(self.infile)
index_statements = [
"CREATE INDEX IF NOT EXISTS idx_precursor_precursor_id ON PRECURSOR (ID);",
"CREATE INDEX IF NOT EXISTS idx_precursor_peptide_mapping_precursor_id ON PRECURSOR_PEPTIDE_MAPPING (PRECURSOR_ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_precursor_id ON FEATURE (PRECURSOR_ID);",
"CREATE INDEX IF NOT EXISTS idx_precursor_peptide_mapping_peptide_id ON PRECURSOR_PEPTIDE_MAPPING (PEPTIDE_ID);",
"CREATE INDEX IF NOT EXISTS idx_peptide_peptide_id ON PEPTIDE (ID);",
"CREATE INDEX IF NOT EXISTS idx_run_run_id ON RUN (ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_run_id ON FEATURE (RUN_ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_feature_id ON FEATURE (ID);",
]
# Add conditional indexes based on tables present
if check_sqlite_table(sqlite_con, "FEATURE_MS1"):
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_feature_ms1_feature_id ON FEATURE_MS1 (FEATURE_ID);"
)
if check_sqlite_table(sqlite_con, "FEATURE_MS2"):
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_feature_ms2_feature_id ON FEATURE_MS2 (FEATURE_ID);"
)
if check_sqlite_table(sqlite_con, "SCORE_MS1"):
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_score_ms1_feature_id ON SCORE_MS1 (FEATURE_ID);"
)
if check_sqlite_table(sqlite_con, "SCORE_MS2"):
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_score_ms2_feature_id ON SCORE_MS2 (FEATURE_ID);"
)
if check_sqlite_table(sqlite_con, "SCORE_IPF"):
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_score_ipf_feature_id ON SCORE_IPF (FEATURE_ID);"
)
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_score_ipf_peptide_id ON SCORE_IPF (PEPTIDE_ID);"
)
if check_sqlite_table(sqlite_con, "SCORE_TRANSITION"):
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_feature_transition_transition_id ON FEATURE_TRANSITION (TRANSITION_ID);"
)
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_transition_transition_id ON TRANSITION (ID);"
)
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_feature_transition_transition_id_feature_id ON FEATURE_TRANSITION (TRANSITION_ID, FEATURE_ID);"
)
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_score_transition_transition_id_feature_id ON SCORE_TRANSITION (TRANSITION_ID, FEATURE_ID);"
)
index_statements.append(
"CREATE INDEX IF NOT EXISTS idx_feature_transition_feature_id ON FEATURE_TRANSITION (FEATURE_ID);"
)
for stmt in index_statements:
try:
sqlite_con.execute(stmt)
except sqlite3.OperationalError as e:
logger.warning(f"Warn: SQLite index creation failed: {e}")
sqlite_con.commit()
sqlite_con.close()
except Exception as e:
raise click.ClickException(
f"Failed to create indexes via SQLite fallback: {e}"
)
def _read_duckdb(self, con):
cfg = self.config
return self._read_sqlite(con) # We will use SQLite as the main reader for now
[docs]
def _read_sqlite(self, con):
"""Main entry point for reading SQLite data, delegates to specific methods."""
cfg = self.config
if self.config.export_format == "library":
raise NotImplementedError(
"Library export from sqlite OSW files is not supported"
)
if self._is_unscored_file(con):
logger.info("Reading unscored data from OSW file.")
return self._read_unscored_data(con)
ipf_present = self._check_ipf_presence(con, cfg)
if ipf_present and cfg.ipf == "peptidoform":
logger.info("Reading peptidoform IPF data from OSW file.")
data = self._read_peptidoform_data(con, cfg)
elif ipf_present and cfg.ipf == "augmented":
logger.info("Reading augmented data with IPF from OSW file.")
data = self._read_augmented_data(con, cfg)
else:
logger.info("Reading standard OpenSWATH data from OSW file.")
data = self._read_standard_data(con, cfg)
logger.debug(f"Base data read with {len(data)} rows")
# Apply common augmentations to all scored data types
return self._augment_data(data, con, cfg)
[docs]
def _is_unscored_file(self, con):
"""Check if the file is unscored (no score tables present)."""
tables = [
"SCORE_MS1",
"SCORE_MS2",
"SCORE_IPF",
"SCORE_PEPTIDE",
"SCORE_PROTEIN",
]
return all(not check_sqlite_table(con, table) for table in tables)
[docs]
def _check_ipf_presence(self, con, cfg):
"""Check if IPF data is present and should be used."""
return cfg.ipf != "disable" and check_sqlite_table(con, "SCORE_IPF")
[docs]
def _check_alignment_presence(self, con):
"""Check if alignment data is present."""
return check_sqlite_table(con, "FEATURE_MS2_ALIGNMENT") and check_sqlite_table(
con, "SCORE_ALIGNMENT"
)
[docs]
def _has_im_boundaries(self, con) -> bool:
"""Return True if the FEATURE table contains IM boundary columns.
Older OSW files may not have these columns; this helper centralises the
PRAGMA check so callers don't duplicate the logic.
"""
try:
cols = [
r[1] for r in con.execute("PRAGMA table_info('FEATURE')").fetchall()
]
except Exception:
return False
return "EXP_IM_LEFTWIDTH" in cols and "EXP_IM_RIGHTWIDTH" in cols
[docs]
def _has_im(self, con) -> bool:
"""Return True if the FEATURE table contains the EXP_IM column.
Older OSW files may not have this column; centralise the PRAGMA
check so callers don't duplicate the logic.
"""
try:
cols = [
r[1] for r in con.execute("PRAGMA table_info('FEATURE')").fetchall()
]
except Exception:
return False
return "EXP_IM" in cols
[docs]
def _read_unscored_data(self, con):
"""Read data from unscored files."""
score_sql = self._build_score_sql(con)
# IM columns may or may not be present; centralised checks
has_im_boundaries = self._has_im_boundaries(con)
has_im = self._has_im(con)
# Compose EXP_IM (or NULL) plus IM boundary columns (or NULLs)
im_cols_sql = (
(
"""FEATURE.EXP_IM AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth"""
)
if has_im and has_im_boundaries
else (
"""FEATURE.EXP_IM AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth"""
)
if has_im and not has_im_boundaries
else (
"""NULL AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth"""
)
if (not has_im) and has_im_boundaries
else """NULL AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth"""
)
query = f"""
SELECT
RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
PRECURSOR.ID AS transition_group_id,
PRECURSOR.DECOY AS decoy,
RUN.ID AS run_id,
RUN.FILENAME AS filename,
FEATURE.EXP_RT AS RT,
FEATURE.EXP_RT - FEATURE.DELTA_RT AS assay_rt,
FEATURE.DELTA_RT AS delta_rt,
PRECURSOR.LIBRARY_RT AS assay_RT,
FEATURE.NORM_RT - PRECURSOR.LIBRARY_RT AS delta_RT,
FEATURE.ID AS id,
PRECURSOR.CHARGE AS Charge,
PRECURSOR.PRECURSOR_MZ AS mz,
FEATURE_MS2.AREA_INTENSITY AS Intensity,
FEATURE_MS1.AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
{score_sql}
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN PEPTIDE ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN FEATURE ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN RUN ON RUN.ID = FEATURE.RUN_ID
LEFT JOIN FEATURE_MS1 ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
LEFT JOIN FEATURE_MS2 ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
ORDER BY transition_group_id
"""
return pd.read_sql_query(query, con)
[docs]
def _build_score_sql(self, con):
"""Build SQL fragment for score columns in unscored files."""
score_sql = ""
if check_sqlite_table(con, "FEATURE_MS1"):
score_sql = write_scores_sql_command(
con, score_sql, "FEATURE_MS1", "var_ms1_"
)
if check_sqlite_table(con, "FEATURE_MS2"):
score_sql = write_scores_sql_command(
con, score_sql, "FEATURE_MS2", "var_ms2_"
)
if score_sql:
return ", " + score_sql[:-2] # Remove last comma and space
return ""
[docs]
def _read_augmented_data(self, con, cfg):
"""Read standard data augmented with IPF information."""
score_ms1_pep, link_ms1 = self._get_ms1_score_info(con)
# IM columns may or may not be present; centralised checks
has_im_boundaries = self._has_im_boundaries(con)
has_im = self._has_im(con)
im_cols_sql = (
(
"""FEATURE.EXP_IM AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
)
if has_im and has_im_boundaries
else (
"""FEATURE.EXP_IM AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)
if has_im and not has_im_boundaries
else (
"""NULL AS EXP_IM,
FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
)
if (not has_im) and has_im_boundaries
else """NULL AS EXP_IM,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)
query = f"""
SELECT RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
PRECURSOR.ID AS transition_group_id,
PRECURSOR.DECOY AS decoy,
RUN.ID AS run_id,
RUN.FILENAME AS filename,
FEATURE.EXP_RT AS RT,
FEATURE.EXP_RT - FEATURE.DELTA_RT AS assay_rt,
FEATURE.DELTA_RT AS delta_rt,
FEATURE.NORM_RT AS iRT,
PRECURSOR.LIBRARY_RT AS assay_iRT,
FEATURE.NORM_RT - PRECURSOR.LIBRARY_RT AS delta_iRT,
FEATURE.ID AS id,
PEPTIDE.UNMODIFIED_SEQUENCE AS Sequence,
PEPTIDE.MODIFIED_SEQUENCE AS FullPeptideName,
PRECURSOR.CHARGE AS Charge,
PRECURSOR.PRECURSOR_MZ AS mz,
FEATURE_MS2.AREA_INTENSITY AS Intensity,
FEATURE_MS1.AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2.RANK AS peak_group_rank,
SCORE_MS2.SCORE AS d_score,
SCORE_MS2.QVALUE AS m_score,
{score_ms1_pep} AS ms1_pep,
SCORE_MS2.PEP AS ms2_pep
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN PEPTIDE ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN FEATURE ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN RUN ON RUN.ID = FEATURE.RUN_ID
LEFT JOIN FEATURE_MS1 ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
LEFT JOIN FEATURE_MS2 ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
{link_ms1}
LEFT JOIN SCORE_MS2 ON SCORE_MS2.FEATURE_ID = FEATURE.ID
WHERE SCORE_MS2.QVALUE < {cfg.max_rs_peakgroup_qvalue}
ORDER BY transition_group_id, peak_group_rank;
"""
data = pd.read_sql_query(query, con)
# Augment with IPF data
ipf_data = self._get_ipf_augmentation_data(con, cfg)
return pd.merge(data, ipf_data, how="left", on="id")
[docs]
def _read_standard_data(self, con, cfg):
"""Read standard OpenSWATH data without IPF, optionally including aligned features."""
# Check if we should attempt alignment integration
use_alignment = cfg.use_alignment and self._check_alignment_presence(con)
# IM boundary columns may or may not be present; centralised check
has_im_boundaries = self._has_im_boundaries(con)
im_cols_sql = (
"""FEATURE.EXP_IM_LEFTWIDTH AS IM_leftWidth,
FEATURE.EXP_IM_RIGHTWIDTH AS IM_rightWidth,"""
if has_im_boundaries
else """NULL AS IM_leftWidth,
NULL AS IM_rightWidth,"""
)
# First, get features that pass MS2 QVALUE threshold
query = f"""
SELECT RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
PRECURSOR.ID AS transition_group_id,
PRECURSOR.DECOY AS decoy,
RUN.ID AS run_id,
RUN.FILENAME AS filename,
FEATURE.EXP_RT AS RT,
FEATURE.EXP_RT - FEATURE.DELTA_RT AS assay_rt,
FEATURE.DELTA_RT AS delta_rt,
FEATURE.NORM_RT AS iRT,
PRECURSOR.LIBRARY_RT AS assay_iRT,
FEATURE.NORM_RT - PRECURSOR.LIBRARY_RT AS delta_iRT,
CAST(FEATURE.ID AS INTEGER) AS id,
PEPTIDE.UNMODIFIED_SEQUENCE AS Sequence,
PEPTIDE.MODIFIED_SEQUENCE AS FullPeptideName,
PRECURSOR.CHARGE AS Charge,
PRECURSOR.PRECURSOR_MZ AS mz,
FEATURE_MS2.AREA_INTENSITY AS Intensity,
FEATURE_MS1.AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
{im_cols_sql}
SCORE_MS2.RANK AS peak_group_rank,
SCORE_MS2.SCORE AS d_score,
SCORE_MS2.QVALUE AS m_score,
SCORE_MS2.PEP AS pep
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN PEPTIDE ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN FEATURE ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN RUN ON RUN.ID = FEATURE.RUN_ID
LEFT JOIN FEATURE_MS1 ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
LEFT JOIN FEATURE_MS2 ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
LEFT JOIN SCORE_MS2 ON SCORE_MS2.FEATURE_ID = FEATURE.ID
WHERE SCORE_MS2.QVALUE < {cfg.max_rs_peakgroup_qvalue}
ORDER BY transition_group_id, peak_group_rank;
"""
data = pd.read_sql_query(query, con)
logger.trace(
f"Initial standard data read with {len(data)} rows and columns: {data.columns.tolist()}"
)
# Ensure id column is Int64 to preserve precision for large feature IDs
if "id" in data.columns:
data["id"] = data["id"].astype("Int64")
# If alignment is enabled and alignment data is present, fetch and merge aligned features
if use_alignment:
aligned_features = self._fetch_alignment_features(con, cfg)
if not aligned_features.empty:
# Get full feature data for aligned features that are NOT already in base results
# We only want to add features that didn't pass MS2 threshold but have good alignment
aligned_ids = aligned_features["id"].unique()
existing_ids = data["id"].unique()
new_aligned_ids = [
aid for aid in aligned_ids if aid not in existing_ids
]
# First, merge alignment info into existing features (those that passed MS2)
# Mark them with from_alignment=0
data = pd.merge(
data,
aligned_features[
[
"id",
"alignment_group_id",
"alignment_reference_feature_id",
"alignment_reference_rt",
"alignment_pep",
"alignment_qvalue",
]
],
on="id",
how="left",
)
data["from_alignment"] = 0
# Now add features that didn't pass MS2 but have good alignment (recovered features)
if new_aligned_ids:
# Fetch full data for these new aligned features
aligned_ids_str = ",".join(map(str, new_aligned_ids))
aligned_query = f"""
SELECT RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
PRECURSOR.ID AS transition_group_id,
PRECURSOR.DECOY AS decoy,
RUN.ID AS run_id,
RUN.FILENAME AS filename,
FEATURE.EXP_RT AS RT,
FEATURE.EXP_RT - FEATURE.DELTA_RT AS assay_rt,
FEATURE.DELTA_RT AS delta_rt,
FEATURE.NORM_RT AS iRT,
PRECURSOR.LIBRARY_RT AS assay_iRT,
FEATURE.NORM_RT - PRECURSOR.LIBRARY_RT AS delta_iRT,
CAST(FEATURE.ID AS INTEGER) AS id,
PEPTIDE.UNMODIFIED_SEQUENCE AS Sequence,
PEPTIDE.MODIFIED_SEQUENCE AS FullPeptideName,
PRECURSOR.CHARGE AS Charge,
PRECURSOR.PRECURSOR_MZ AS mz,
FEATURE_MS2.AREA_INTENSITY AS Intensity,
FEATURE_MS1.AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
SCORE_MS2.RANK AS peak_group_rank,
SCORE_MS2.SCORE AS d_score,
SCORE_MS2.QVALUE AS m_score,
SCORE_MS2.PEP AS pep
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN PEPTIDE ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN FEATURE ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN RUN ON RUN.ID = FEATURE.RUN_ID
LEFT JOIN FEATURE_MS1 ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
LEFT JOIN FEATURE_MS2 ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
LEFT JOIN SCORE_MS2 ON SCORE_MS2.FEATURE_ID = FEATURE.ID
WHERE FEATURE.ID IN ({aligned_ids_str})
"""
aligned_data = pd.read_sql_query(aligned_query, con)
# Ensure id column is Int64 to preserve precision
if "id" in aligned_data.columns:
aligned_data["id"] = aligned_data["id"].astype("Int64")
# Merge alignment scores and reference info into the aligned data
aligned_data = pd.merge(
aligned_data,
aligned_features[
[
"id",
"alignment_group_id",
"alignment_reference_feature_id",
"alignment_reference_rt",
"alignment_pep",
"alignment_qvalue",
]
],
on="id",
how="left",
)
# Mark as recovered through alignment
aligned_data["from_alignment"] = 1
logger.info(
f"Adding {len(aligned_data)} features recovered through alignment"
)
# Combine with base data
data = pd.concat([data, aligned_data], ignore_index=True)
# Convert alignment_reference_feature_id to int64 to avoid scientific notation
if "alignment_reference_feature_id" in data.columns:
data["alignment_reference_feature_id"] = data[
"alignment_reference_feature_id"
].astype("Int64")
if "alignment_group_id" in data.columns:
data["alignment_group_id"] = data["alignment_group_id"].astype(
"Int64"
)
# Assign alignment_group_id to reference features
# Create a mapping from reference feature IDs to their alignment_group_ids
if (
"alignment_reference_feature_id" in data.columns
and "alignment_group_id" in data.columns
):
# Get all reference feature IDs and their corresponding alignment_group_ids
ref_mapping = data[data["alignment_reference_feature_id"].notna()][
["alignment_reference_feature_id", "alignment_group_id"]
].drop_duplicates()
# For each reference feature ID, we need to assign the alignment_group_id
# to the feature row where id == alignment_reference_feature_id
if not ref_mapping.empty:
# Merge the alignment_group_id for reference features
# First create a DataFrame mapping id -> alignment_group_id for references
ref_group_mapping = ref_mapping.rename(
columns={
"alignment_reference_feature_id": "id",
"alignment_group_id": "ref_alignment_group_id",
}
)
# Merge this mapping to assign alignment_group_id to reference features
data = pd.merge(data, ref_group_mapping, on="id", how="left")
# Fill in alignment_group_id for reference features (where it's currently null but ref_alignment_group_id is not)
mask = (
data["alignment_group_id"].isna()
& data["ref_alignment_group_id"].notna()
)
data.loc[mask, "alignment_group_id"] = data.loc[
mask, "ref_alignment_group_id"
]
# Drop the temporary column
data = data.drop(columns=["ref_alignment_group_id"])
logger.debug(
f"Assigned alignment_group_id to {mask.sum()} reference features"
)
logger.trace(
f"Data after merging alignment features has {len(data)} rows and columns: {data.columns.tolist()}"
)
return data
[docs]
def _augment_data(self, data, con, cfg):
"""Apply common data augmentations to the base dataset."""
if cfg.transition_quantification:
logger.info("Adding transition-level quantification data.")
data = self._add_transition_data(data, con, cfg)
logger.info("Adding protein information.")
data = self._add_protein_data(data, con)
if cfg.peptide:
logger.info("Adding peptide error rate data.")
data = self._add_peptide_data(data, con, cfg)
if cfg.protein:
logger.info("Adding protein error rate data.")
data = self._add_protein_error_data(data, con, cfg)
return data
[docs]
def _get_ms1_score_info(self, con):
"""Get MS1 score information if available."""
if check_sqlite_table(con, "SCORE_MS1"):
return (
"SCORE_MS1.PEP",
"LEFT JOIN SCORE_MS1 ON SCORE_MS1.FEATURE_ID = FEATURE.ID",
)
return "NULL", ""
[docs]
def _get_ipf_augmentation_data(self, con, cfg):
"""Get IPF data for augmentation."""
query = f"""
SELECT FEATURE_ID AS id,
MODIFIED_SEQUENCE AS ipf_FullUniModPeptideName,
PRECURSOR_PEAKGROUP_PEP AS ipf_precursor_peakgroup_pep,
PEP AS ipf_peptidoform_pep,
QVALUE AS ipf_peptidoform_m_score
FROM SCORE_IPF
INNER JOIN PEPTIDE ON SCORE_IPF.PEPTIDE_ID = PEPTIDE.ID
WHERE SCORE_IPF.PEP < {cfg.ipf_max_peptidoform_pep};
"""
data_augmented = pd.read_sql_query(query, con)
return (
data_augmented.groupby("id")
.apply(
lambda x: pd.Series(
{
"ipf_FullUniModPeptideName": ";".join(
x[
x["ipf_peptidoform_pep"]
== np.min(x["ipf_peptidoform_pep"])
]["ipf_FullUniModPeptideName"]
),
"ipf_precursor_peakgroup_pep": x[
x["ipf_peptidoform_pep"] == np.min(x["ipf_peptidoform_pep"])
]["ipf_precursor_peakgroup_pep"].values[0],
"ipf_peptidoform_pep": x[
x["ipf_peptidoform_pep"] == np.min(x["ipf_peptidoform_pep"])
]["ipf_peptidoform_pep"].values[0],
"ipf_peptidoform_m_score": x[
x["ipf_peptidoform_pep"] == np.min(x["ipf_peptidoform_pep"])
]["ipf_peptidoform_m_score"].values[0],
}
)
)
.reset_index(level="id")
)
[docs]
def _get_base_openswath_data(self, con, cfg):
"""Get base OpenSWATH data without augmentations."""
query = f"""
SELECT RUN.ID AS id_run,
PEPTIDE.ID AS id_peptide,
PRECURSOR.ID AS transition_group_id,
PRECURSOR.DECOY AS decoy,
RUN.ID AS run_id,
RUN.FILENAME AS filename,
FEATURE.EXP_RT AS RT,
FEATURE.EXP_RT - FEATURE.DELTA_RT AS assay_rt,
FEATURE.DELTA_RT AS delta_rt,
FEATURE.NORM_RT AS iRT,
PRECURSOR.LIBRARY_RT AS assay_iRT,
FEATURE.NORM_RT - PRECURSOR.LIBRARY_RT AS delta_iRT,
FEATURE.ID AS id,
PEPTIDE.UNMODIFIED_SEQUENCE AS Sequence,
PEPTIDE.MODIFIED_SEQUENCE AS FullPeptideName,
PRECURSOR.CHARGE AS Charge,
PRECURSOR.PRECURSOR_MZ AS mz,
FEATURE_MS2.AREA_INTENSITY AS Intensity,
FEATURE_MS1.AREA_INTENSITY AS aggr_prec_Peak_Area,
FEATURE_MS1.APEX_INTENSITY AS aggr_prec_Peak_Apex,
FEATURE.LEFT_WIDTH AS leftWidth,
FEATURE.RIGHT_WIDTH AS rightWidth,
SCORE_MS2.RANK AS peak_group_rank,
SCORE_MS2.SCORE AS d_score,
SCORE_MS2.QVALUE AS m_score,
SCORE_MS2.PEP AS pep
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN PEPTIDE ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN FEATURE ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN RUN ON RUN.ID = FEATURE.RUN_ID
LEFT JOIN FEATURE_MS1 ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
LEFT JOIN FEATURE_MS2 ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
LEFT JOIN SCORE_MS2 ON SCORE_MS2.FEATURE_ID = FEATURE.ID
WHERE SCORE_MS2.QVALUE < {cfg.max_rs_peakgroup_qvalue}
ORDER BY transition_group_id, peak_group_rank;
"""
return pd.read_sql_query(query, con)
[docs]
def _add_transition_data(self, data, con, cfg):
"""Add transition-level quantification data."""
if check_sqlite_table(con, "SCORE_TRANSITION"):
transition_query = f"""
SELECT FEATURE_TRANSITION.FEATURE_ID AS id,
GROUP_CONCAT(AREA_INTENSITY,';') AS aggr_Peak_Area,
GROUP_CONCAT(APEX_INTENSITY,';') AS aggr_Peak_Apex,
GROUP_CONCAT(TRANSITION.ID || "_" || TRANSITION.TYPE || TRANSITION.ORDINAL || "_" || TRANSITION.CHARGE,';') AS aggr_Fragment_Annotation
FROM FEATURE_TRANSITION
INNER JOIN TRANSITION ON FEATURE_TRANSITION.TRANSITION_ID = TRANSITION.ID
INNER JOIN SCORE_TRANSITION ON FEATURE_TRANSITION.TRANSITION_ID = SCORE_TRANSITION.TRANSITION_ID AND FEATURE_TRANSITION.FEATURE_ID = SCORE_TRANSITION.FEATURE_ID
WHERE TRANSITION.DECOY == 0 AND SCORE_TRANSITION.PEP < {cfg.max_transition_pep}
GROUP BY FEATURE_TRANSITION.FEATURE_ID
"""
else:
transition_query = """
SELECT FEATURE_ID AS id,
GROUP_CONCAT(AREA_INTENSITY,';') AS aggr_Peak_Area,
GROUP_CONCAT(APEX_INTENSITY,';') AS aggr_Peak_Apex,
GROUP_CONCAT(TRANSITION.ID || "_" || TRANSITION.TYPE || TRANSITION.ORDINAL || "_" || TRANSITION.CHARGE,';') AS aggr_Fragment_Annotation
FROM FEATURE_TRANSITION
INNER JOIN TRANSITION ON FEATURE_TRANSITION.TRANSITION_ID = TRANSITION.ID
GROUP BY FEATURE_ID
"""
data_transition = pd.read_sql_query(transition_query, con)
return pd.merge(data, data_transition, how="left", on=["id"])
[docs]
def _add_protein_data(self, data, con):
"""Add protein identifier data."""
data_protein = pd.read_sql_query(
"""
SELECT PEPTIDE_ID AS id_peptide,
GROUP_CONCAT(PROTEIN.PROTEIN_ACCESSION,';') AS ProteinName
FROM PEPTIDE_PROTEIN_MAPPING
INNER JOIN PROTEIN ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = PROTEIN.ID
GROUP BY PEPTIDE_ID;
""",
con,
)
return pd.merge(data, data_protein, how="inner", on=["id_peptide"])
[docs]
def _add_peptide_data(self, data, con, cfg):
"""Add peptide-level error rate data."""
if not check_sqlite_table(con, "SCORE_PEPTIDE"):
return data
# Add run-specific peptide data
data_peptide_run = pd.read_sql_query(
"""
SELECT RUN_ID AS id_run,
PEPTIDE_ID AS id_peptide,
QVALUE AS m_score_peptide_run_specific
FROM SCORE_PEPTIDE
WHERE CONTEXT == 'run-specific';
""",
con,
)
if len(data_peptide_run.index) > 0:
data = pd.merge(
data, data_peptide_run, how="inner", on=["id_run", "id_peptide"]
)
# Add experiment-wide peptide data
data_peptide_experiment = pd.read_sql_query(
"""
SELECT RUN_ID AS id_run,
PEPTIDE_ID AS id_peptide,
QVALUE AS m_score_peptide_experiment_wide
FROM SCORE_PEPTIDE
WHERE CONTEXT == 'experiment-wide';
""",
con,
)
if len(data_peptide_experiment.index) > 0:
data = pd.merge(data, data_peptide_experiment, on=["id_run", "id_peptide"])
# Add global peptide data
data_peptide_global = pd.read_sql_query(
"""
SELECT PEPTIDE_ID AS id_peptide,
QVALUE AS m_score_peptide_global
FROM SCORE_PEPTIDE
WHERE CONTEXT == 'global';
""",
con,
)
if len(data_peptide_global.index) > 0:
data = pd.merge(
data,
data_peptide_global[
data_peptide_global["m_score_peptide_global"]
< cfg.max_global_peptide_qvalue
],
on=["id_peptide"],
)
return data
[docs]
def _add_protein_error_data(self, data, con, cfg):
"""Add protein-level error rate data."""
if not check_sqlite_table(con, "SCORE_PROTEIN"):
return data
# Add run-specific protein data
data_protein_run = pd.read_sql_query(
"""
SELECT RUN_ID AS id_run,
PEPTIDE_ID AS id_peptide,
MIN(QVALUE) AS m_score_protein_run_specific
FROM PEPTIDE_PROTEIN_MAPPING
INNER JOIN SCORE_PROTEIN ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = SCORE_PROTEIN.PROTEIN_ID
WHERE CONTEXT == 'run-specific'
GROUP BY RUN_ID, PEPTIDE_ID;
""",
con,
)
if len(data_protein_run.index) > 0:
data = pd.merge(
data, data_protein_run, how="inner", on=["id_run", "id_peptide"]
)
# Add experiment-wide protein data
data_protein_experiment = pd.read_sql_query(
"""
SELECT RUN_ID AS id_run,
PEPTIDE_ID AS id_peptide,
MIN(QVALUE) AS m_score_protein_experiment_wide
FROM PEPTIDE_PROTEIN_MAPPING
INNER JOIN SCORE_PROTEIN ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = SCORE_PROTEIN.PROTEIN_ID
WHERE CONTEXT == 'experiment-wide'
GROUP BY RUN_ID, PEPTIDE_ID;
""",
con,
)
if len(data_protein_experiment.index) > 0:
data = pd.merge(
data,
data_protein_experiment,
how="inner",
on=["id_run", "id_peptide"],
)
# Add global protein data
data_protein_global = pd.read_sql_query(
"""
SELECT PEPTIDE_ID AS id_peptide,
MIN(QVALUE) AS m_score_protein_global
FROM PEPTIDE_PROTEIN_MAPPING
INNER JOIN SCORE_PROTEIN ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = SCORE_PROTEIN.PROTEIN_ID
WHERE CONTEXT == 'global'
GROUP BY PEPTIDE_ID;
""",
con,
)
if len(data_protein_global.index) > 0:
data = pd.merge(
data,
data_protein_global[
data_protein_global["m_score_protein_global"]
< cfg.max_global_protein_qvalue
],
how="inner",
on=["id_peptide"],
)
return data
[docs]
def _fetch_alignment_features(self, con, cfg):
"""
Fetch aligned features with good alignment scores.
This method retrieves features that have been aligned across runs
and pass the alignment quality threshold. Only features whose reference
feature passes the MS2 QVALUE threshold are included, ensuring that
recovered peaks are aligned to high-quality reference features.
Args:
con: Database connection
cfg: Configuration object with max_alignment_pep threshold
Returns:
DataFrame with aligned feature IDs that pass quality threshold
"""
max_alignment_pep = cfg.max_alignment_pep
max_rs_peakgroup_qvalue = cfg.max_rs_peakgroup_qvalue
query = f"""
SELECT
DENSE_RANK() OVER (ORDER BY FEATURE_MS2_ALIGNMENT.PRECURSOR_ID, FEATURE_MS2_ALIGNMENT.ALIGNMENT_ID) AS alignment_group_id,
FEATURE_MS2_ALIGNMENT.ALIGNED_FEATURE_ID AS id,
FEATURE_MS2_ALIGNMENT.PRECURSOR_ID AS transition_group_id,
FEATURE_MS2_ALIGNMENT.RUN_ID AS run_id,
CAST(FEATURE_MS2_ALIGNMENT.REFERENCE_FEATURE_ID AS INTEGER) AS alignment_reference_feature_id,
FEATURE_MS2_ALIGNMENT.REFERENCE_RT AS alignment_reference_rt,
SCORE_ALIGNMENT.PEP AS alignment_pep,
SCORE_ALIGNMENT.QVALUE AS alignment_qvalue
FROM (
SELECT DISTINCT * FROM FEATURE_MS2_ALIGNMENT
) AS FEATURE_MS2_ALIGNMENT
INNER JOIN (
SELECT DISTINCT *, MIN(QVALUE)
FROM SCORE_ALIGNMENT
GROUP BY FEATURE_ID
) AS SCORE_ALIGNMENT
ON SCORE_ALIGNMENT.FEATURE_ID = FEATURE_MS2_ALIGNMENT.ALIGNED_FEATURE_ID
INNER JOIN (
SELECT FEATURE_ID, QVALUE
FROM SCORE_MS2
) AS REF_SCORE_MS2
ON REF_SCORE_MS2.FEATURE_ID = FEATURE_MS2_ALIGNMENT.REFERENCE_FEATURE_ID
WHERE FEATURE_MS2_ALIGNMENT.LABEL = 1
AND SCORE_ALIGNMENT.PEP < {max_alignment_pep}
AND REF_SCORE_MS2.QVALUE < {max_rs_peakgroup_qvalue}
"""
df = pd.read_sql_query(query, con)
# Ensure Int64 dtype for large integer IDs (pandas nullable integer type)
if "alignment_reference_feature_id" in df.columns:
df["alignment_reference_feature_id"] = df[
"alignment_reference_feature_id"
].astype("Int64")
if "alignment_group_id" in df.columns:
df["alignment_group_id"] = df["alignment_group_id"].astype("Int64")
logger.info(
f"Found {len(df)} aligned features passing alignment PEP < {max_alignment_pep} with reference features passing MS2 QVALUE < {max_rs_peakgroup_qvalue}"
)
return df
##################################
# Export-specific readers below
##################################
[docs]
def _read_for_export_scored_report(self, con) -> pd.DataFrame:
"""
Lightweight reader that returns the minimal scored-report columns from an OSW SQLite.
It tolerates missing tables by emitting NULL columns for absent sources.
"""
cfg = self.config
# Helpful on big files; ignore failures gracefully
try:
self._create_indexes()
except Exception:
pass
# What’s available?
has_ms2 = check_sqlite_table(con, "SCORE_MS2")
has_ms2_f = check_sqlite_table(con, "FEATURE_MS2")
has_ipf = check_sqlite_table(con, "SCORE_IPF")
has_pep = check_sqlite_table(con, "SCORE_PEPTIDE")
has_prot = check_sqlite_table(con, "SCORE_PROTEIN")
# Build WITH-views for peptide/protein score contexts (uses your existing helper)
with_views = []
if has_pep:
with_views.append(
self._get_peptide_protein_score_table_sqlite(con, "peptide")
)
if has_prot:
with_views.append(
self._get_peptide_protein_score_table_sqlite(con, "protein")
)
with_clause = f"WITH {', '.join(with_views)}" if with_views else ""
# Pieces that depend on table presence
ms2_feature_area_sel = (
"FEATURE_MS2.AREA_INTENSITY AS FEATURE_MS2_AREA_INTENSITY"
if has_ms2_f
else "NULL AS FEATURE_MS2_AREA_INTENSITY"
)
ms2_feature_join = (
"LEFT JOIN FEATURE_MS2 ON FEATURE_MS2.FEATURE_ID = FEATURE.ID"
if has_ms2_f
else ""
)
ms2_score_sel = (
"SCORE_MS2.SCORE AS SCORE_MS2_SCORE, "
"SCORE_MS2.RANK AS SCORE_MS2_PEAK_GROUP_RANK, "
"SCORE_MS2.QVALUE AS SCORE_MS2_Q_VALUE"
if has_ms2
else "NULL AS SCORE_MS2_SCORE, NULL AS SCORE_MS2_PEAK_GROUP_RANK, NULL AS SCORE_MS2_Q_VALUE"
)
ms2_score_join = (
"LEFT JOIN SCORE_MS2 ON SCORE_MS2.FEATURE_ID = FEATURE.ID"
if has_ms2
else ""
)
ipf_sel = (
"SCORE_IPF.QVALUE AS SCORE_IPF_QVALUE"
if has_ipf
else "NULL AS SCORE_IPF_QVALUE"
)
ipf_join = (
"LEFT JOIN SCORE_IPF ON SCORE_IPF.FEATURE_ID = FEATURE.ID"
if has_ipf
else ""
)
# Peptide/protein context score columns (names match your report columns)
pep_cols = (
"score_peptide_view.SCORE_PEPTIDE_GLOBAL_SCORE, "
"score_peptide_view.SCORE_PEPTIDE_GLOBAL_Q_VALUE, "
"score_peptide_view.SCORE_PEPTIDE_EXPERIMENT_WIDE_SCORE, "
"score_peptide_view.SCORE_PEPTIDE_EXPERIMENT_WIDE_Q_VALUE, "
"score_peptide_view.SCORE_PEPTIDE_RUN_SPECIFIC_SCORE, "
"score_peptide_view.SCORE_PEPTIDE_RUN_SPECIFIC_Q_VALUE"
if has_pep
else "NULL AS SCORE_PEPTIDE_GLOBAL_SCORE, "
"NULL AS SCORE_PEPTIDE_GLOBAL_Q_VALUE, "
"NULL AS SCORE_PEPTIDE_EXPERIMENT_WIDE_SCORE, "
"NULL AS SCORE_PEPTIDE_EXPERIMENT_WIDE_Q_VALUE, "
"NULL AS SCORE_PEPTIDE_RUN_SPECIFIC_SCORE, "
"NULL AS SCORE_PEPTIDE_RUN_SPECIFIC_Q_VALUE"
)
pep_join = (
"LEFT JOIN score_peptide_view "
" ON score_peptide_view.PEPTIDE_ID = PEPTIDE.ID "
" AND score_peptide_view.RUN_ID = RUN.ID"
if has_pep
else ""
)
prot_cols = (
"score_protein_view.SCORE_PROTEIN_GLOBAL_SCORE, "
"score_protein_view.SCORE_PROTEIN_GLOBAL_Q_VALUE, "
"score_protein_view.SCORE_PROTEIN_EXPERIMENT_WIDE_SCORE, "
"score_protein_view.SCORE_PROTEIN_EXPERIMENT_WIDE_Q_VALUE"
if has_prot
else "NULL AS SCORE_PROTEIN_GLOBAL_SCORE, "
"NULL AS SCORE_PROTEIN_GLOBAL_Q_VALUE, "
"NULL AS SCORE_PROTEIN_EXPERIMENT_WIDE_SCORE, "
"NULL AS SCORE_PROTEIN_EXPERIMENT_WIDE_Q_VALUE"
)
prot_join = (
"LEFT JOIN score_protein_view "
" ON score_protein_view.PROTEIN_ID = PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID "
" AND score_protein_view.RUN_ID = RUN.ID"
if has_prot
else ""
)
# Assemble query
query = f"""
{with_clause}
SELECT
RUN.ID AS RUN_ID,
PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID AS PROTEIN_ID,
PEPTIDE.ID AS PEPTIDE_ID,
PRECURSOR.ID AS PRECURSOR_ID,
PRECURSOR.DECOY AS PRECURSOR_DECOY,
{ms2_feature_area_sel},
{ms2_score_sel},
{pep_cols},
{prot_cols},
{ipf_sel}
FROM PRECURSOR
INNER JOIN PRECURSOR_PEPTIDE_MAPPING
ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN PEPTIDE
ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN PEPTIDE_PROTEIN_MAPPING
ON PEPTIDE.ID = PEPTIDE_PROTEIN_MAPPING.PEPTIDE_ID
INNER JOIN FEATURE
ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN RUN
ON RUN.ID = FEATURE.RUN_ID
{ms2_feature_join}
{ms2_score_join}
{pep_join}
{prot_join}
{ipf_join}
-- Only keep top-ranked peak-groups if SCORE_MS2 exists and rank filtering is desired.
-- If you want to mimic your other readers’ Q-value gating, add a WHERE clause here:
-- {"WHERE SCORE_MS2.QVALUE < " + str(cfg.max_rs_peakgroup_qvalue) if has_ms2 else ""}
;
"""
return pd.read_sql_query(query, con)
[docs]
def _get_peptide_protein_score_table_sqlite(self, con, level: str) -> str:
"""
SQLite-compatible view builder for peptide/protein score tables.
Always exposes the same columns:
- *_EXPERIMENT_WIDE_(SCORE|PVALUE|QVALUE|PEP) (per RUN_ID)
- *_RUN_SPECIFIC_(SCORE|PVALUE|QVALUE|PEP) (per RUN_ID)
- *_GLOBAL_(SCORE|PVALUE|QVALUE|PEP) (no RUN_ID)
"""
assert level in ("peptide", "protein")
if level == "peptide":
id_col = "PEPTIDE_ID"
score_table = "SCORE_PEPTIDE"
view_name = "score_peptide_view"
tag = "SCORE_PEPTIDE"
else:
id_col = "PROTEIN_ID"
score_table = "SCORE_PROTEIN"
view_name = "score_protein_view"
tag = "SCORE_PROTEIN"
# Non-global (per RUN_ID): expose BOTH experiment-wide and run-specific columns unconditionally.
non_global_query = f"""
SELECT
{id_col},
RUN_ID,
MIN(CASE WHEN context='experiment-wide' THEN SCORE END) AS {tag}_EXPERIMENT_WIDE_SCORE,
MIN(CASE WHEN context='experiment-wide' THEN PVALUE END) AS {tag}_EXPERIMENT_WIDE_P_VALUE,
MIN(CASE WHEN context='experiment-wide' THEN QVALUE END) AS {tag}_EXPERIMENT_WIDE_Q_VALUE,
MIN(CASE WHEN context='experiment-wide' THEN PEP END) AS {tag}_EXPERIMENT_WIDE_PEP,
MIN(CASE WHEN context='run-specific' THEN SCORE END) AS {tag}_RUN_SPECIFIC_SCORE,
MIN(CASE WHEN context='run-specific' THEN PVALUE END) AS {tag}_RUN_SPECIFIC_P_VALUE,
MIN(CASE WHEN context='run-specific' THEN QVALUE END) AS {tag}_RUN_SPECIFIC_Q_VALUE,
MIN(CASE WHEN context='run-specific' THEN PEP END) AS {tag}_RUN_SPECIFIC_PEP
FROM {score_table}
WHERE context <> 'global'
GROUP BY {id_col}, RUN_ID
"""
# Global: one row per {id_col}, no RUN_ID
global_query = f"""
SELECT
{id_col},
SCORE AS {tag}_GLOBAL_SCORE,
PVALUE AS {tag}_GLOBAL_P_VALUE,
QVALUE AS {tag}_GLOBAL_Q_VALUE,
PEP AS {tag}_GLOBAL_PEP
FROM {score_table}
WHERE context='global'
"""
# Merge global onto non-global by {id_col}; keeps RUN_ID from the non-global side.
merged = f"""
SELECT
ng.{id_col},
ng.RUN_ID,
ng.{tag}_EXPERIMENT_WIDE_SCORE,
ng.{tag}_EXPERIMENT_WIDE_P_VALUE,
ng.{tag}_EXPERIMENT_WIDE_Q_VALUE,
ng.{tag}_EXPERIMENT_WIDE_PEP,
ng.{tag}_RUN_SPECIFIC_SCORE,
ng.{tag}_RUN_SPECIFIC_P_VALUE,
ng.{tag}_RUN_SPECIFIC_Q_VALUE,
ng.{tag}_RUN_SPECIFIC_PEP,
g.{tag}_GLOBAL_SCORE,
g.{tag}_GLOBAL_P_VALUE,
g.{tag}_GLOBAL_Q_VALUE,
g.{tag}_GLOBAL_PEP
FROM ({non_global_query}) AS ng
LEFT JOIN ({global_query}) AS g
ON ng.{id_col} = g.{id_col}
"""
return f"{view_name} AS ({merged})"
[docs]
def export_feature_scores(self, outfile: str, plot_callback):
"""
Export feature scores from OSW file for plotting.
Detects if SCORE tables exist and adjusts behavior:
- If SCORE tables exist: applies RANK==1 filtering and plots SCORE + VAR_ columns
- If SCORE tables don't exist: plots only VAR_ columns
Parameters
----------
outfile : str
Path to the output PDF file.
plot_callback : callable
Function to call for plotting each level's data.
Signature: plot_callback(df, outfile, level, append)
"""
con = sqlite3.connect(self.infile)
try:
# Check for SCORE tables
has_score_ms1 = check_sqlite_table(con, "SCORE_MS1")
has_score_ms2 = check_sqlite_table(con, "SCORE_MS2")
has_score_transition = check_sqlite_table(con, "SCORE_TRANSITION")
if has_score_ms1 or has_score_ms2 or has_score_transition:
logger.info(
"SCORE tables detected - applying RANK==1 filter and plotting SCORE + VAR_ columns"
)
else:
logger.info("No SCORE tables detected - plotting only VAR_ columns")
# Process MS1 level if available
if check_sqlite_table(con, "FEATURE_MS1"):
logger.info("Processing MS1 level feature scores")
if has_score_ms1:
# Scored mode: Include SCORE columns and apply RANK==1 filter
ms1_query = """
SELECT *,
RUN_ID || '_' || PRECURSOR_ID AS GROUP_ID
FROM FEATURE_MS1
INNER JOIN
(SELECT RUN_ID,
ID,
PRECURSOR_ID,
EXP_RT
FROM FEATURE) AS FEATURE ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
INNER JOIN
(SELECT ID,
CHARGE AS VAR_PRECURSOR_CHARGE,
DECOY
FROM PRECURSOR) AS PRECURSOR ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN SCORE_MS1 ON FEATURE.ID = SCORE_MS1.FEATURE_ID
WHERE RANK == 1
ORDER BY RUN_ID,
PRECURSOR.ID ASC,
FEATURE.EXP_RT ASC
"""
else:
# Unscored mode: Only VAR_ columns
cursor = con.cursor()
cursor.execute("PRAGMA table_info(FEATURE_MS1)")
all_cols = [row[1] for row in cursor.fetchall()]
var_cols = [col for col in all_cols if "VAR_" in col.upper()]
if var_cols:
var_cols_sql = ", ".join(
[f"FEATURE_MS1.{col}" for col in var_cols]
)
ms1_query = f"""
SELECT
{var_cols_sql},
PRECURSOR.DECOY
FROM FEATURE_MS1
INNER JOIN FEATURE ON FEATURE_MS1.FEATURE_ID = FEATURE.ID
INNER JOIN PRECURSOR ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
"""
else:
logger.warning("No VAR_ columns found in FEATURE_MS1 table")
ms1_query = None
if ms1_query:
df_ms1 = pd.read_sql_query(ms1_query, con)
if not df_ms1.empty:
plot_callback(df_ms1, outfile, "ms1", append=False)
# Process MS2 level if available
if check_sqlite_table(con, "FEATURE_MS2"):
logger.info("Processing MS2 level feature scores")
if has_score_ms2:
# Scored mode: Include SCORE columns and apply RANK==1 filter
ms2_query = """
SELECT *,
RUN_ID || '_' || PRECURSOR_ID AS GROUP_ID
FROM FEATURE_MS2
INNER JOIN
(SELECT RUN_ID,
ID,
PRECURSOR_ID,
EXP_RT
FROM FEATURE) AS FEATURE ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
INNER JOIN
(SELECT ID,
CHARGE AS VAR_PRECURSOR_CHARGE,
DECOY
FROM PRECURSOR) AS PRECURSOR ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN
(SELECT PRECURSOR_ID AS ID,
COUNT(*) AS VAR_TRANSITION_NUM_SCORE
FROM TRANSITION_PRECURSOR_MAPPING
INNER JOIN TRANSITION ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID
WHERE DETECTING==1
GROUP BY PRECURSOR_ID) AS VAR_TRANSITION_SCORE ON FEATURE.PRECURSOR_ID = VAR_TRANSITION_SCORE.ID
INNER JOIN SCORE_MS2 ON FEATURE.ID = SCORE_MS2.FEATURE_ID
WHERE RANK == 1
ORDER BY RUN_ID,
PRECURSOR.ID ASC,
FEATURE.EXP_RT ASC
"""
else:
# Unscored mode: Only VAR_ columns
cursor = con.cursor()
cursor.execute("PRAGMA table_info(FEATURE_MS2)")
all_cols = [row[1] for row in cursor.fetchall()]
var_cols = [col for col in all_cols if "VAR_" in col.upper()]
if var_cols:
var_cols_sql = ", ".join(
[f"FEATURE_MS2.{col}" for col in var_cols]
)
ms2_query = f"""
SELECT
{var_cols_sql},
PRECURSOR.DECOY
FROM FEATURE_MS2
INNER JOIN FEATURE ON FEATURE_MS2.FEATURE_ID = FEATURE.ID
INNER JOIN PRECURSOR ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
"""
else:
logger.warning("No VAR_ columns found in FEATURE_MS2 table")
ms2_query = None
if ms2_query:
df_ms2 = pd.read_sql_query(ms2_query, con)
if not df_ms2.empty:
append = check_sqlite_table(con, "FEATURE_MS1")
plot_callback(df_ms2, outfile, "ms2", append=append)
# Process transition level if available
if check_sqlite_table(con, "FEATURE_TRANSITION"):
logger.info("Processing transition level feature scores")
if has_score_transition:
# Scored mode: Include SCORE columns and apply RANK==1 filter
transition_query = """
SELECT TRANSITION.DECOY AS DECOY,
FEATURE_TRANSITION.*,
PRECURSOR.CHARGE AS VAR_PRECURSOR_CHARGE,
TRANSITION.VAR_PRODUCT_CHARGE AS VAR_PRODUCT_CHARGE,
SCORE_TRANSITION.*,
RUN_ID || '_' || FEATURE_TRANSITION.FEATURE_ID || '_' || PRECURSOR_ID || '_' || FEATURE_TRANSITION.TRANSITION_ID AS GROUP_ID
FROM FEATURE_TRANSITION
INNER JOIN
(SELECT RUN_ID,
ID,
PRECURSOR_ID,
EXP_RT
FROM FEATURE) AS FEATURE ON FEATURE_TRANSITION.FEATURE_ID = FEATURE.ID
INNER JOIN PRECURSOR ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN SCORE_TRANSITION ON FEATURE_TRANSITION.FEATURE_ID = SCORE_TRANSITION.FEATURE_ID
AND FEATURE_TRANSITION.TRANSITION_ID = SCORE_TRANSITION.TRANSITION_ID
INNER JOIN
(SELECT ID,
CHARGE AS VAR_PRODUCT_CHARGE,
DECOY
FROM TRANSITION) AS TRANSITION ON FEATURE_TRANSITION.TRANSITION_ID = TRANSITION.ID
ORDER BY RUN_ID,
PRECURSOR.ID,
FEATURE.EXP_RT,
TRANSITION.ID
"""
else:
# Unscored mode: Only VAR_ columns
cursor = con.cursor()
cursor.execute("PRAGMA table_info(FEATURE_TRANSITION)")
all_cols = [row[1] for row in cursor.fetchall()]
var_cols = [col for col in all_cols if "VAR_" in col.upper()]
if var_cols:
var_cols_sql = ", ".join(
[f"FEATURE_TRANSITION.{col}" for col in var_cols]
)
transition_query = f"""
SELECT
{var_cols_sql},
TRANSITION.DECOY
FROM FEATURE_TRANSITION
INNER JOIN FEATURE ON FEATURE_TRANSITION.FEATURE_ID = FEATURE.ID
INNER JOIN TRANSITION ON FEATURE_TRANSITION.TRANSITION_ID = TRANSITION.ID
"""
else:
logger.warning(
"No VAR_ columns found in FEATURE_TRANSITION table"
)
transition_query = None
if transition_query:
df_transition = pd.read_sql_query(transition_query, con)
if not df_transition.empty:
append = check_sqlite_table(
con, "FEATURE_MS1"
) or check_sqlite_table(con, "FEATURE_MS2")
plot_callback(
df_transition, outfile, "transition", append=append
)
# Process alignment level if available (no SCORE tables for alignment)
if check_sqlite_table(con, "FEATURE_MS2_ALIGNMENT"):
logger.info("Processing alignment level feature scores")
# Get only VAR_ columns to reduce memory usage
cursor = con.cursor()
cursor.execute("PRAGMA table_info(FEATURE_MS2_ALIGNMENT)")
all_cols = [row[1] for row in cursor.fetchall()]
var_cols = [col for col in all_cols if "VAR_" in col.upper()]
if var_cols:
var_cols_sql = ", ".join(var_cols)
alignment_query = f"""
SELECT
{var_cols_sql},
LABEL AS DECOY
FROM FEATURE_MS2_ALIGNMENT
"""
df_alignment = pd.read_sql_query(alignment_query, con)
if not df_alignment.empty:
append = (
check_sqlite_table(con, "FEATURE_MS1")
or check_sqlite_table(con, "FEATURE_MS2")
or check_sqlite_table(con, "FEATURE_TRANSITION")
)
plot_callback(df_alignment, outfile, "alignment", append=append)
else:
logger.warning(
"No VAR_ columns found in FEATURE_MS2_ALIGNMENT table"
)
finally:
con.close()
[docs]
class OSWWriter(BaseOSWWriter):
"""
Class for writing OpenSWATH results to various formats.
"""
[docs]
def __init__(self, config: ExportIOConfig):
super().__init__(config)
[docs]
def export(self) -> None:
"""Main entry point for writing data based on configured format"""
if self.config.export_format in ["parquet", "parquet_split"]:
self._write_parquet()
else:
raise ValueError(
f"Unsupported OSW export format: {self.config.export_format}. "
"Supported formats are 'parquet' and 'parquet_split'."
)
[docs]
def _write_parquet(self) -> None:
"""Handle parquet export based on configuration"""
if self.config.file_type != "osw":
raise ValueError("Parquet export only supported from OSW files")
if self.config.export_format == "parquet_split":
self._convert_to_split_parquet()
else:
self._convert_to_single_parquet()
[docs]
def _convert_to_split_parquet(self) -> None:
"""Convert OSW to split parquet format"""
conn = duckdb.connect(":memory:")
load_sqlite_scanner(conn)
try:
# Prepare column information
column_info = self._prepare_column_info(conn)
if self.config.split_runs:
self._export_split_by_run(conn, column_info)
else:
self._export_combined(conn, column_info)
finally:
conn.close()
[docs]
def _convert_to_single_parquet(self) -> None:
"""Convert OSW to single parquet file"""
conn = duckdb.connect(":memory:")
load_sqlite_scanner(conn)
try:
# Prepare column information
column_info = self._prepare_column_info(conn)
self._export_single_file(conn, column_info)
finally:
conn.close()
[docs]
def _prepare_column_info(self, conn) -> dict:
"""Prepare column information and table checks"""
with sqlite3.connect(self.config.infile) as sql_conn:
table_names = set(
row[0]
for row in sql_conn.execute(
"SELECT name FROM sqlite_master WHERE type='table'"
).fetchall()
)
column_info = {
"gene_tables_exist": {"PEPTIDE_GENE_MAPPING", "GENE"}.issubset(
table_names
),
"precursor_columns": get_table_columns(self.config.infile, "PRECURSOR"),
"transition_columns": get_table_columns(
self.config.infile, "TRANSITION"
),
"feature_columns": get_table_columns(self.config.infile, "FEATURE"),
"feature_ms2_alignment_exists": check_sqlite_table(
sql_conn, "FEATURE_MS2_ALIGNMENT"
),
"has_library_drift_time": "LIBRARY_DRIFT_TIME"
in get_table_columns(self.config.infile, "PRECURSOR"),
"has_annotation": "ANNOTATION"
in get_table_columns(self.config.infile, "TRANSITION"),
"has_im": "EXP_IM" in get_table_columns(self.config.infile, "FEATURE"),
"has_im_boundaries": all(
col in get_table_columns(self.config.infile, "FEATURE")
for col in ["EXP_IM_LEFTWIDTH", "EXP_IM_RIGHTWIDTH"]
),
"feature_ms1_cols": [
col
for col in get_table_columns_with_types(
self.config.infile, "FEATURE_MS1"
)
if col[0] != "FEATURE_ID" and col[1] # Ensure column has a type
],
"feature_ms2_cols": [
col
for col in get_table_columns_with_types(
self.config.infile, "FEATURE_MS2"
)
if col[0] != "FEATURE_ID" and col[1] # Ensure column has a type
],
"feature_transition_cols": [
col
for col in get_table_columns_with_types(
self.config.infile, "FEATURE_TRANSITION"
)
if col[0] not in ["FEATURE_ID", "TRANSITION_ID"]
and col[1] # Ensure column has a type
],
"score_ms1_exists": {"SCORE_MS1"}.issubset(table_names),
"score_ms2_exists": {"SCORE_MS2"}.issubset(table_names),
"score_ipf_exists": {"SCORE_IPF"}.issubset(table_names),
"score_peptide_exists": {"SCORE_PEPTIDE"}.issubset(table_names),
"score_protein_exists": {"SCORE_PROTEIN"}.issubset(table_names),
"score_gene_exists": {"SCORE_GENE"}.issubset(table_names),
"score_transition_exists": {"SCORE_TRANSITION"}.issubset(table_names),
}
if column_info["score_protein_exists"]:
logger.debug("Checking SCORE_PROTEIN contexts")
column_info["score_protein_contexts"] = self._check_contexts(
sql_conn, "SCORE_PROTEIN"
)
if column_info["score_peptide_exists"]:
logger.debug("Checking SCORE_PEPTIDE contexts")
column_info["score_peptide_contexts"] = self._check_contexts(
sql_conn, "SCORE_PEPTIDE"
)
return column_info
[docs]
def _export_split_by_run(self, conn, column_info: dict) -> None:
"""Export data split by run into separate directories"""
os.makedirs(self.config.outfile, exist_ok=True)
# Get run information
run_df = conn.execute(
f"SELECT ID, FILENAME FROM sqlite_scan('{self.config.infile}', 'RUN')"
).fetchdf()
run_df["BASENAME"] = run_df["FILENAME"].apply(
lambda x: re.sub(r"(\.[^.]*)*$", "", os.path.basename(x))
)
logger.info(f"Found {len(run_df)} runs to export.")
# Export each run
for _, row in run_df.iterrows():
run_id = row["ID"]
run_name = row["BASENAME"]
run_dir = os.path.join(self.config.outfile, f"{run_name}.oswpq")
os.makedirs(run_dir, exist_ok=True)
logger.info(f"Exporting run: {run_name} to {run_dir}")
# Export precursor data
precursor_path = os.path.join(run_dir, "precursors_features.parquet")
precursor_query = (
self._build_precursor_query(conn, column_info)
+ f"\nWHERE FEATURE.RUN_ID = {run_id}"
)
logger.info(f"Exporting precursor data to {precursor_path}")
self._execute_copy_query(conn, precursor_query, precursor_path)
# Export transition data if requested
if self.config.include_transition_data:
transition_path = os.path.join(run_dir, "transition_features.parquet")
transition_query_run = (
self._build_transition_query(column_info)
+ f"\nWHERE FEATURE.RUN_ID = {run_id}"
)
transition_query_null = (
self._build_transition_query(column_info)
+ "\nWHERE FEATURE.RUN_ID IS NULL"
)
combined_transition_query = (
f"{transition_query_run}\nUNION ALL\n{transition_query_null}"
)
logger.info(f"Exporting transition data to {transition_path}")
self._execute_copy_query(
conn, combined_transition_query, transition_path
)
else:
logger.info(
"Skipping transition data export (include_transition_data=False)"
)
# Export alignment data if exists
if column_info["feature_ms2_alignment_exists"]:
logger.info("Exporting alignment data for all runs")
self._export_alignment_data(conn)
[docs]
def _export_combined(self, conn, column_info: dict) -> None:
"""Export combined data (all runs together)"""
os.makedirs(self.config.outfile, exist_ok=True)
# Export precursor data
precursor_path = os.path.join(
self.config.outfile, "precursors_features.parquet"
)
logger.info(f"Exporting precursor data to {precursor_path}")
precursor_query = self._build_precursor_query(conn, column_info)
self._execute_copy_query(conn, precursor_query, precursor_path)
# Export transition data if requested
if self.config.include_transition_data:
transition_path = os.path.join(
self.config.outfile, "transition_features.parquet"
)
logger.info(f"Exporting transition data to {transition_path}")
transition_query = self._build_transition_query(column_info)
self._execute_copy_query(conn, transition_query, transition_path)
else:
logger.info(
"Skipping transition data export (include_transition_data=False)"
)
# Export alignment data if exists
if column_info["feature_ms2_alignment_exists"]:
logger.info("Exporting alignment data")
self._export_alignment_data(conn)
[docs]
def _export_single_file(self, conn, column_info: dict) -> None:
"""Export all data to a single parquet file"""
# Create temp table with combined schema
logger.debug("Creating temporary table for combined export")
self._create_temp_table(conn, column_info)
# Insert precursor data
logger.debug("Inserting precursor data into temp table")
precursor_query = self._build_combined_precursor_query(conn, column_info)
# print(precursor_query)
conn.execute(f"INSERT INTO temp_table {precursor_query}")
# Insert transition data if requested
if self.config.include_transition_data:
logger.debug("Inserting transition data into temp table")
transition_query = self._build_combined_transition_query(column_info)
conn.execute(f"INSERT INTO temp_table {transition_query}")
else:
logger.info(
"Skipping transition data export (include_transition_data=False)"
)
# Export to parquet
logger.info(f"Exporting combined data to {self.config.outfile}")
self._execute_copy_query(conn, "SELECT * FROM temp_table", self.config.outfile)
# Export alignment data if exists
if column_info["feature_ms2_alignment_exists"]:
alignment_path = (
os.path.splitext(self.config.outfile)[0] + "_feature_alignment.parquet"
)
logger.info(f"Exporting alignment data to {alignment_path}")
self._export_alignment_data(conn, alignment_path)
[docs]
def _register_peptide_ipf_map(self, conn: duckdb.DuckDBPyConnection) -> None:
"""Create or refresh peptide ↔ IPF peptide mapping inside DuckDB."""
logger.info("Preparing peptide unimod to codename mapping view")
conn.create_function("unimod_to_codename", unimod_to_codename, [str], str)
conn.execute(
f"""
CREATE OR REPLACE TEMP TABLE UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING AS
WITH peptides AS (
SELECT
ID,
MODIFIED_SEQUENCE,
unimod_to_codename(MODIFIED_SEQUENCE) AS CODENAME,
MODIFIED_SEQUENCE LIKE '%%UniMod%%' AS HAS_UNIMOD
FROM sqlite_scan('{self.config.infile}', 'PEPTIDE')
),
unimod_peptides AS (
SELECT CODENAME, ID AS PEPTIDE_ID
FROM peptides
WHERE HAS_UNIMOD
),
codename_peptides AS (
SELECT CODENAME, ID AS IPF_PEPTIDE_ID
FROM peptides
WHERE NOT HAS_UNIMOD
)
SELECT DISTINCT
COALESCE(unimod_peptides.PEPTIDE_ID, codename_peptides.IPF_PEPTIDE_ID) AS PEPTIDE_ID,
COALESCE(codename_peptides.IPF_PEPTIDE_ID, unimod_peptides.PEPTIDE_ID) AS IPF_PEPTIDE_ID,
COALESCE(unimod_peptides.CODENAME, codename_peptides.CODENAME) AS CODENAME
FROM unimod_peptides
FULL OUTER JOIN codename_peptides USING (CODENAME)
"""
)
[docs]
def _create_unimod_to_codename_peptide_id_mapping_table(self) -> None:
"""Create peptide unimod to codename mapping table in SQLite database."""
logger.info(
"Generating peptide unimod to codename mapping and storing in SQLite"
)
with sqlite3.connect(self.config.infile) as sql_conn:
# First get the peptide table and process it with pyopenms
peptide_df = pd.read_sql_query(
"SELECT ID, MODIFIED_SEQUENCE FROM PEPTIDE", sql_conn
)
peptide_df["codename"] = peptide_df["MODIFIED_SEQUENCE"].apply(
unimod_to_codename
)
# Create the merged mapping
unimod_mask = peptide_df["MODIFIED_SEQUENCE"].str.contains("UniMod")
merged_df = pd.merge(
peptide_df[unimod_mask][["codename", "ID"]],
peptide_df[~unimod_mask][["codename", "ID"]],
on="codename",
suffixes=("_unimod", "_codename"),
how="outer",
)
# Fill NaN values in the 'ID_codename' column with the 'ID_unimod' values
merged_df["ID_codename"] = merged_df["ID_codename"].fillna(
merged_df["ID_unimod"]
)
# Fill NaN values in the 'ID_unimod' column with the 'ID_codename' values
merged_df["ID_unimod"] = merged_df["ID_unimod"].fillna(
merged_df["ID_codename"]
)
merged_df["ID_unimod"] = merged_df["ID_unimod"].astype(int)
merged_df["ID_codename"] = merged_df["ID_codename"].astype(int)
# Create the UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING table in SQLite
sql_conn.execute(
"""
CREATE TABLE IF NOT EXISTS UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING (
ID_unimod INTEGER,
ID_codename INTEGER,
codename TEXT,
PRIMARY KEY (ID_unimod, ID_codename)
)
"""
)
sql_conn.execute("DELETE FROM UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING")
# Insert the data into SQLite table
merged_df[["ID_unimod", "ID_codename", "codename"]].to_sql(
"UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING",
sql_conn,
if_exists="append",
index=False,
)
# Create indices for better performance
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_peptide_ipf_unimod ON UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING(ID_unimod)"
)
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_peptide_ipf_codename ON UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING(ID_codename)"
)
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_peptide_ipf_codename_text ON UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING(codename)"
)
sql_conn.commit()
logger.info(
f"Successfully created UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING table with {len(merged_df)} mappings"
)
[docs]
def _insert_precursor_peptide_ipf_map(self) -> None:
"""Insert precursor-peptide-IPF table into the input sqlite OSW file."""
logger.info("Inserting precursor-peptide-IPF mapping into OSW file")
with sqlite3.connect(self.config.infile) as sql_conn:
# Create the main mapping table
sql_conn.execute(
"""
CREATE TABLE IF NOT EXISTS PRECURSOR_PEPTIDE_IPF_MAPPING (
PRECURSOR_ID INTEGER,
ID_unimod INTEGER,
ID_codename INTEGER,
MODIFIED_SEQUENCE TEXT,
CODENAME TEXT,
FEATURE_ID INTEGER,
PRECURSOR_PEAKGROUP_PEP REAL,
QVALUE REAL,
PEP REAL
)
"""
)
sql_conn.execute("DELETE FROM PRECURSOR_PEPTIDE_IPF_MAPPING")
# Insert the data using your join logic
sql_conn.execute(
"""
INSERT INTO PRECURSOR_PEPTIDE_IPF_MAPPING (
PRECURSOR_ID, ID_unimod, ID_codename, MODIFIED_SEQUENCE,
CODENAME, FEATURE_ID, PRECURSOR_PEAKGROUP_PEP, QVALUE, PEP
)
SELECT
ppm.PRECURSOR_ID,
pim.ID_unimod,
pim.ID_codename,
p.MODIFIED_SEQUENCE,
pim.codename,
si.FEATURE_ID,
si.PRECURSOR_PEAKGROUP_PEP,
si.QVALUE,
si.PEP
FROM PEPTIDE p
INNER JOIN UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING pim ON pim.ID_unimod = p.ID
INNER JOIN PRECURSOR_PEPTIDE_MAPPING ppm ON ppm.PEPTIDE_ID = p.ID
INNER JOIN SCORE_IPF si ON si.PEPTIDE_ID = pim.ID_codename
"""
)
# Create indices for better query performance
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_ppim_precursor_id ON PRECURSOR_PEPTIDE_IPF_MAPPING(PRECURSOR_ID)"
)
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_ppim_feature_id ON PRECURSOR_PEPTIDE_IPF_MAPPING(FEATURE_ID)"
)
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_ppim_id_unimod ON PRECURSOR_PEPTIDE_IPF_MAPPING(ID_unimod)"
)
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_ppim_id_codename ON PRECURSOR_PEPTIDE_IPF_MAPPING(ID_codename)"
)
sql_conn.execute(
"CREATE INDEX IF NOT EXISTS idx_ppim_precursor_feature ON PRECURSOR_PEPTIDE_IPF_MAPPING(PRECURSOR_ID, FEATURE_ID)"
)
sql_conn.commit()
logger.info(
"Successfully created PRECURSOR_PEPTIDE_IPF_MAPPING table with indices"
)
[docs]
def _build_precursor_query(self, conn, column_info: dict) -> str:
"""Build SQL query for precursor data"""
feature_ms1_cols_sql = ", ".join(
f"FEATURE_MS1.{col[0]} AS FEATURE_MS1_{col[0]}"
for col in column_info["feature_ms1_cols"]
)
feature_ms2_cols_sql = ", ".join(
f"FEATURE_MS2.{col[0]} AS FEATURE_MS2_{col[0]}"
for col in column_info["feature_ms2_cols"]
)
# First get the peptide table and process it with pyopenms
# self._register_peptide_ipf_map(conn)
self._create_unimod_to_codename_peptide_id_mapping_table()
# Check if score tables exist and build score SQLs
score_cols_selct, score_table_joins, score_column_views = (
self._build_score_column_selection_and_joins(column_info)
)
return f"""
-- Need to map the unimod peptide ids to the ipf codename peptide ids. The section below is commented out, since it's limited to only the 4 common modifications. Have replaced it above with a more general approach that handles all modifications using pyopenms
--WITH normalized_peptides AS (
-- SELECT
-- ID AS PEPTIDE_ID,
-- REPLACE(
-- REPLACE(
-- REPLACE(
-- REPLACE(MODIFIED_SEQUENCE, '(UniMod:1)', '(Acetyl)'),
-- '(UniMod:35)', '(Oxidation)'),
-- '(UniMod:21)', '(Phospho)'),
-- '(UniMod:4)', '(Carbamidomethyl)') AS NORMALIZED_SEQUENCE
-- FROM sqlite_scan('{self.config.infile}', 'PEPTIDE')
--),
--ipf_groups AS (
-- SELECT
-- NORMALIZED_SEQUENCE,
-- MIN(PEPTIDE_ID) AS IPF_PEPTIDE_ID
-- FROM normalized_peptides
-- GROUP BY NORMALIZED_SEQUENCE
--),
--UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING AS (
-- SELECT
-- np.PEPTIDE_ID,
-- g.IPF_PEPTIDE_ID
-- FROM normalized_peptides np
-- JOIN ipf_groups g USING (NORMALIZED_SEQUENCE)
--)
{score_column_views}
SELECT
PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID AS PROTEIN_ID,
{"SCORE_IPF.ID_unimod AS PEPTIDE_ID," if column_info["score_ipf_exists"] else "PEPTIDE.ID AS PEPTIDE_ID,"}
{"SCORE_IPF.ID_codename AS IPF_PEPTIDE_ID," if column_info["score_ipf_exists"] else "pipf.ID_codename AS IPF_PEPTIDE_ID,"}
{"SCORE_IPF.PRECURSOR_ID AS PRECURSOR_ID," if column_info["score_ipf_exists"] else "PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID AS PRECURSOR_ID,"}
PROTEIN.PROTEIN_ACCESSION AS PROTEIN_ACCESSION,
PEPTIDE.UNMODIFIED_SEQUENCE,
{"SCORE_IPF.MODIFIED_SEQUENCE AS MODIFIED_SEQUENCE," if column_info["score_ipf_exists"] else "PEPTIDE.MODIFIED_SEQUENCE,"}
PRECURSOR.TRAML_ID AS PRECURSOR_TRAML_ID,
PRECURSOR.GROUP_LABEL AS PRECURSOR_GROUP_LABEL,
PRECURSOR.PRECURSOR_MZ AS PRECURSOR_MZ,
PRECURSOR.CHARGE AS PRECURSOR_CHARGE,
PRECURSOR.LIBRARY_INTENSITY AS PRECURSOR_LIBRARY_INTENSITY,
PRECURSOR.LIBRARY_RT AS PRECURSOR_LIBRARY_RT,
{"PRECURSOR.LIBRARY_DRIFT_TIME" if column_info["has_library_drift_time"] else "NULL"} AS PRECURSOR_LIBRARY_DRIFT_TIME,
{"PEPTIDE_GENE_MAPPING.GENE_ID" if column_info["gene_tables_exist"] else "NULL"} AS GENE_ID,
{"GENE.GENE_NAME" if column_info["gene_tables_exist"] else "NULL"} AS GENE_NAME,
{"GENE.DECOY" if column_info["gene_tables_exist"] else "NULL"} AS GENE_DECOY,
PROTEIN.DECOY AS PROTEIN_DECOY,
PEPTIDE.DECOY AS PEPTIDE_DECOY,
PRECURSOR.DECOY AS PRECURSOR_DECOY,
FEATURE.RUN_ID AS RUN_ID,
RUN.FILENAME,
FEATURE.ID AS FEATURE_ID,
FEATURE.EXP_RT,
{"FEATURE.EXP_IM" if column_info["has_im"] else "NULL"} AS EXP_IM,
FEATURE.NORM_RT,
FEATURE.DELTA_RT,
FEATURE.LEFT_WIDTH,
FEATURE.RIGHT_WIDTH,
{"FEATURE.EXP_IM_LEFTWIDTH" if column_info.get("has_im_boundaries", False) else "NULL"} AS IM_leftWidth,
{"FEATURE.EXP_IM_RIGHTWIDTH" if column_info.get("has_im_boundaries", False) else "NULL"} AS IM_rightWidth,
{feature_ms1_cols_sql},
{feature_ms2_cols_sql},
{score_cols_selct}
FROM sqlite_scan('{self.config.infile}', 'PRECURSOR') AS PRECURSOR
INNER JOIN sqlite_scan('{self.config.infile}', 'PRECURSOR_PEPTIDE_MAPPING') AS PRECURSOR_PEPTIDE_MAPPING
ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'PEPTIDE') AS PEPTIDE
ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN sqlite_scan('{self.config.infile}', 'UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING') AS pipf
ON PEPTIDE.ID = pipf.ID_unimod
INNER JOIN sqlite_scan('{self.config.infile}', 'PEPTIDE_PROTEIN_MAPPING') AS PEPTIDE_PROTEIN_MAPPING
ON PEPTIDE.ID = PEPTIDE_PROTEIN_MAPPING.PEPTIDE_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'PROTEIN') AS PROTEIN
ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = PROTEIN.ID
{self._build_gene_joins(column_info)}
INNER JOIN sqlite_scan('{self.config.infile}', 'FEATURE') AS FEATURE
ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN sqlite_scan('{self.config.infile}', 'FEATURE_MS1') AS FEATURE_MS1
ON FEATURE.ID = FEATURE_MS1.FEATURE_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'FEATURE_MS2') AS FEATURE_MS2
ON FEATURE.ID = FEATURE_MS2.FEATURE_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'RUN') AS RUN
ON FEATURE.RUN_ID = RUN.ID
{score_table_joins}
"""
[docs]
def _build_transition_score_columns_and_join(
self, column_info: dict
) -> Tuple[str, str]:
"""Build score columns and join clause for transition scores"""
score_transition_cols = ""
score_transition_join = ""
if column_info.get("score_transition_exists", False):
logger.debug(
"SCORE_TRANSITION table exists, adding score columns to transition query"
)
score_cols = [
"SCORE_TRANSITION.SCORE AS SCORE_TRANSITION_SCORE",
"SCORE_TRANSITION.RANK AS SCORE_TRANSITION_RANK",
"SCORE_TRANSITION.PVALUE AS SCORE_TRANSITION_P_VALUE",
"SCORE_TRANSITION.QVALUE AS SCORE_TRANSITION_Q_VALUE",
"SCORE_TRANSITION.PEP AS SCORE_TRANSITION_PEP",
]
score_transition_cols = ", " + ", ".join(score_cols)
score_transition_join = (
f"LEFT JOIN sqlite_scan('{self.config.infile}', 'SCORE_TRANSITION') AS SCORE_TRANSITION "
f"ON FEATURE_TRANSITION.TRANSITION_ID = SCORE_TRANSITION.TRANSITION_ID "
f"AND FEATURE_TRANSITION.FEATURE_ID = SCORE_TRANSITION.FEATURE_ID"
)
return score_transition_cols, score_transition_join
[docs]
def _build_transition_query(self, column_info: dict) -> str:
"""Build SQL query for transition data"""
feature_transition_cols_sql = ", ".join(
f"FEATURE_TRANSITION.{col[0]} AS FEATURE_TRANSITION_{col[0]}"
for col in column_info["feature_transition_cols"]
)
annotation = (
"TRANSITION.ANNOTATION"
if column_info["has_annotation"]
else "TRANSITION.TYPE || CAST(TRANSITION.ORDINAL AS VARCHAR) || '^' || CAST(TRANSITION.CHARGE AS VARCHAR)"
)
# Add transition score columns if they exist
score_transition_cols, score_transition_join = (
self._build_transition_score_columns_and_join(column_info)
)
return f"""
SELECT
FEATURE.RUN_ID AS RUN_ID,
TRANSITION_PEPTIDE_MAPPING.PEPTIDE_ID AS IPF_PEPTIDE_ID,
TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID AS PRECURSOR_ID,
TRANSITION.ID AS TRANSITION_ID,
TRANSITION.TRAML_ID AS TRANSITION_TRAML_ID,
TRANSITION.PRODUCT_MZ,
TRANSITION.CHARGE AS TRANSITION_CHARGE,
TRANSITION.TYPE AS TRANSITION_TYPE,
TRANSITION.ORDINAL AS TRANSITION_ORDINAL,
{annotation} AS ANNOTATION,
TRANSITION.DETECTING AS TRANSITION_DETECTING,
TRANSITION.LIBRARY_INTENSITY AS TRANSITION_LIBRARY_INTENSITY,
TRANSITION.DECOY AS TRANSITION_DECOY,
FEATURE.ID AS FEATURE_ID,
{feature_transition_cols_sql}
{score_transition_cols}
FROM sqlite_scan('{self.config.infile}', 'TRANSITION') AS TRANSITION
FULL JOIN sqlite_scan('{self.config.infile}', 'TRANSITION_PRECURSOR_MAPPING') AS TRANSITION_PRECURSOR_MAPPING
ON TRANSITION.ID = TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID
FULL JOIN sqlite_scan('{self.config.infile}', 'TRANSITION_PEPTIDE_MAPPING') AS TRANSITION_PEPTIDE_MAPPING
ON TRANSITION.ID = TRANSITION_PEPTIDE_MAPPING.TRANSITION_ID
FULL JOIN sqlite_scan('{self.config.infile}', 'FEATURE_TRANSITION') AS FEATURE_TRANSITION
ON TRANSITION.ID = FEATURE_TRANSITION.TRANSITION_ID
FULL JOIN (
SELECT ID, RUN_ID
FROM sqlite_scan('{self.config.infile}', 'FEATURE')
) AS FEATURE
ON FEATURE_TRANSITION.FEATURE_ID = FEATURE.ID
{score_transition_join}
"""
[docs]
def _build_combined_precursor_query(self, conn, column_info: dict) -> str:
"""Build combined precursor query for single file export"""
feature_ms1_cols_sql = ", ".join(
f"FEATURE_MS1.{col[0]} AS FEATURE_MS1_{col[0]}"
for col in column_info["feature_ms1_cols"]
)
feature_ms2_cols_sql = ", ".join(
f"FEATURE_MS2.{col[0]} AS FEATURE_MS2_{col[0]}"
for col in column_info["feature_ms2_cols"]
)
as_null_feature_transition_cols_sql = ", ".join(
f"NULL AS FEATURE_TRANSITION_{col[0]}"
for col in column_info["feature_transition_cols"]
)
# First get the peptide table and process it with pyopenms
# self._register_peptide_ipf_map(conn)
self._create_unimod_to_codename_peptide_id_mapping_table()
# Get score columns for precursor level
score_cols_select, score_table_joins, score_column_views = (
self._build_score_column_selection_and_joins(column_info)
)
# Add NULL columns for transition score columns
as_null_transition_score_cols = ""
if column_info.get("score_transition_exists", False):
as_null_transition_score_cols = ", NULL AS SCORE_TRANSITION_SCORE, NULL AS SCORE_TRANSITION_RANK, NULL AS SCORE_TRANSITION_P_VALUE, NULL AS SCORE_TRANSITION_Q_VALUE, NULL AS SCORE_TRANSITION_PEP"
return f"""
-- Need to map the unimod peptide ids to the ipf codename peptide ids. The section below is commented out, since it's limited to only the 4 common modifications. Have replaced it above with a more general approach that handles all modifications using pyopenms
--WITH normalized_peptides AS (
-- SELECT
-- ID AS PEPTIDE_ID,
-- REPLACE(
-- REPLACE(
-- REPLACE(
-- REPLACE(MODIFIED_SEQUENCE, '(UniMod:1)', '(Acetyl)'),
-- '(UniMod:35)', '(Oxidation)'),
-- '(UniMod:21)', '(Phospho)'),
-- '(UniMod:4)', '(Carbamidomethyl)') AS NORMALIZED_SEQUENCE
-- FROM sqlite_scan('{self.config.infile}', 'PEPTIDE')
--),
--ipf_groups AS (
-- SELECT
-- NORMALIZED_SEQUENCE,
-- MIN(PEPTIDE_ID) AS IPF_PEPTIDE_ID
-- FROM normalized_peptides
-- GROUP BY NORMALIZED_SEQUENCE
--),
--UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING AS (
-- SELECT
-- np.PEPTIDE_ID,
-- g.IPF_PEPTIDE_ID
-- FROM normalized_peptides np
-- JOIN ipf_groups g USING (NORMALIZED_SEQUENCE)
--)
{score_column_views}
SELECT
PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID AS PROTEIN_ID,
{"SCORE_IPF.ID_unimod AS PEPTIDE_ID," if column_info["score_ipf_exists"] else "PEPTIDE.ID AS PEPTIDE_ID,"}
{"SCORE_IPF.ID_codename AS IPF_PEPTIDE_ID," if column_info["score_ipf_exists"] else "pipf.ID_codename AS IPF_PEPTIDE_ID,"}
{"SCORE_IPF.PRECURSOR_ID AS PRECURSOR_ID," if column_info["score_ipf_exists"] else "PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID AS PRECURSOR_ID,"}
PROTEIN.PROTEIN_ACCESSION AS PROTEIN_ACCESSION,
PEPTIDE.UNMODIFIED_SEQUENCE,
{"SCORE_IPF.MODIFIED_SEQUENCE AS MODIFIED_SEQUENCE," if column_info["score_ipf_exists"] else "PEPTIDE.MODIFIED_SEQUENCE,"}
PRECURSOR.TRAML_ID AS PRECURSOR_TRAML_ID,
PRECURSOR.GROUP_LABEL AS PRECURSOR_GROUP_LABEL,
PRECURSOR.PRECURSOR_MZ AS PRECURSOR_MZ,
PRECURSOR.CHARGE AS PRECURSOR_CHARGE,
PRECURSOR.LIBRARY_INTENSITY AS PRECURSOR_LIBRARY_INTENSITY,
PRECURSOR.LIBRARY_RT AS PRECURSOR_LIBRARY_RT,
{"PRECURSOR.LIBRARY_DRIFT_TIME" if column_info["has_library_drift_time"] else "NULL"} AS PRECURSOR_LIBRARY_DRIFT_TIME,
{"PEPTIDE_GENE_MAPPING.GENE_ID" if column_info["gene_tables_exist"] else "NULL"} AS GENE_ID,
{"GENE.GENE_NAME" if column_info["gene_tables_exist"] else "NULL"} AS GENE_NAME,
{"GENE.DECOY" if column_info["gene_tables_exist"] else "NULL"} AS GENE_DECOY,
PROTEIN.DECOY AS PROTEIN_DECOY,
PEPTIDE.DECOY AS PEPTIDE_DECOY,
PRECURSOR.DECOY AS PRECURSOR_DECOY,
FEATURE.RUN_ID AS RUN_ID,
RUN.FILENAME,
FEATURE.ID AS FEATURE_ID,
FEATURE.EXP_RT,
{"FEATURE.EXP_IM" if column_info["has_im"] else "NULL"} AS EXP_IM,
FEATURE.NORM_RT,
FEATURE.DELTA_RT,
FEATURE.LEFT_WIDTH,
FEATURE.RIGHT_WIDTH,
{"FEATURE.EXP_IM_LEFTWIDTH" if column_info.get("has_im_boundaries", False) else "NULL"} AS IM_leftWidth,
{"FEATURE.EXP_IM_RIGHTWIDTH" if column_info.get("has_im_boundaries", False) else "NULL"} AS IM_rightWidth,
{feature_ms1_cols_sql},
{feature_ms2_cols_sql},
NULL AS TRANSITION_ID,
NULL AS TRANSITION_TRAML_ID,
NULL AS PRODUCT_MZ,
NULL AS TRANSITION_CHARGE,
NULL AS TRANSITION_TYPE,
NULL AS TRANSITION_ORDINAL,
NULL AS ANNOTATION,
NULL AS TRANSITION_DETECTING,
NULL AS TRANSITION_LIBRARY_INTENSITY,
NULL AS TRANSITION_DECOY,
{as_null_feature_transition_cols_sql},
{score_cols_select}
{as_null_transition_score_cols}
FROM sqlite_scan('{self.config.infile}', 'PRECURSOR') AS PRECURSOR
INNER JOIN sqlite_scan('{self.config.infile}', 'PRECURSOR_PEPTIDE_MAPPING') AS PRECURSOR_PEPTIDE_MAPPING
ON PRECURSOR.ID = PRECURSOR_PEPTIDE_MAPPING.PRECURSOR_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'PEPTIDE') AS PEPTIDE
ON PRECURSOR_PEPTIDE_MAPPING.PEPTIDE_ID = PEPTIDE.ID
INNER JOIN sqlite_scan('{self.config.infile}', 'UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING') AS pipf
ON PEPTIDE.ID = pipf.ID_unimod
INNER JOIN sqlite_scan('{self.config.infile}', 'PEPTIDE_PROTEIN_MAPPING') AS PEPTIDE_PROTEIN_MAPPING
ON PEPTIDE.ID = PEPTIDE_PROTEIN_MAPPING.PEPTIDE_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'PROTEIN') AS PROTEIN
ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = PROTEIN.ID
{self._build_gene_joins(column_info)}
INNER JOIN sqlite_scan('{self.config.infile}', 'FEATURE') AS FEATURE
ON FEATURE.PRECURSOR_ID = PRECURSOR.ID
INNER JOIN sqlite_scan('{self.config.infile}', 'FEATURE_MS1') AS FEATURE_MS1
ON FEATURE.ID = FEATURE_MS1.FEATURE_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'FEATURE_MS2') AS FEATURE_MS2
ON FEATURE.ID = FEATURE_MS2.FEATURE_ID
INNER JOIN sqlite_scan('{self.config.infile}', 'RUN') AS RUN
ON FEATURE.RUN_ID = RUN.ID
{score_table_joins}
"""
[docs]
def _build_combined_transition_query(self, column_info: dict) -> str:
"""Build combined transition query for single file export"""
as_null_feature_ms1_cols_sql = ", ".join(
f"NULL AS FEATURE_MS1_{col[0]}" for col in column_info["feature_ms1_cols"]
)
as_null_feature_ms2_cols_sql = ", ".join(
f"NULL AS FEATURE_MS2_{col[0]}" for col in column_info["feature_ms2_cols"]
)
feature_transition_cols_sql = ", ".join(
f"FEATURE_TRANSITION.{col[0]} AS FEATURE_TRANSITION_{col[0]}"
for col in column_info["feature_transition_cols"]
)
annotation = (
"TRANSITION.ANNOTATION"
if column_info["has_annotation"]
else "TRANSITION.TYPE || CAST(TRANSITION.ORDINAL AS VARCHAR) || '^' || CAST(TRANSITION.CHARGE AS VARCHAR)"
)
# Add transition score columns if they exist
score_transition_cols, score_transition_join = (
self._build_transition_score_columns_and_join(column_info)
)
# Also need to add NULL columns for score columns that appear in precursor query
as_null_score_cols = ""
if column_info.get("score_ms1_exists", False):
as_null_score_cols += ", NULL AS SCORE_MS1_SCORE, NULL AS SCORE_MS1_RANK, NULL AS SCORE_MS1_P_VALUE, NULL AS SCORE_MS1_Q_VALUE, NULL AS SCORE_MS1_PEP"
if column_info.get("score_ms2_exists", False):
as_null_score_cols += ", NULL AS SCORE_MS2_SCORE, NULL AS SCORE_MS2_PEAK_GROUP_RANK, NULL AS SCORE_MS2_P_VALUE, NULL AS SCORE_MS2_Q_VALUE, NULL AS SCORE_MS2_PEP"
if column_info.get("score_ipf_exists", False):
as_null_score_cols += ", NULL AS SCORE_IPF_PRECURSOR_PEAKGROUP_PEP, NULL AS SCORE_IPF_PEP, NULL AS SCORE_IPF_QVALUE"
# Add NULL columns for peptide and protein score contexts
for table in ["peptide", "protein"]:
if column_info.get(f"score_{table}_exists", False):
for context in column_info.get(f"score_{table}_contexts", []):
safe_context = context.upper().replace("-", "_")
as_null_score_cols += f", NULL AS SCORE_{table.upper()}_{safe_context}_SCORE, NULL AS SCORE_{table.upper()}_{safe_context}_P_VALUE, NULL AS SCORE_{table.upper()}_{safe_context}_Q_VALUE, NULL AS SCORE_{table.upper()}_{safe_context}_PEP"
return f"""
SELECT
NULL AS PROTEIN_ID,
NULL AS PEPTIDE_ID,
TRANSITION_PEPTIDE_MAPPING.PEPTIDE_ID AS IPF_PEPTIDE_ID,
TRANSITION_PRECURSOR_MAPPING.PRECURSOR_ID AS PRECURSOR_ID,
NULL AS PROTEIN_ACCESSION,
NULL AS UNMODIFIED_SEQUENCE,
NULL AS MODIFIED_SEQUENCE,
NULL AS PRECURSOR_TRAML_ID,
NULL AS PRECURSOR_GROUP_LABEL,
NULL AS PRECURSOR_MZ,
NULL AS PRECURSOR_CHARGE,
NULL AS PRECURSOR_LIBRARY_INTENSITY,
NULL AS PRECURSOR_LIBRARY_RT,
NULL AS PRECURSOR_LIBRARY_DRIFT_TIME,
NULL AS GENE_ID,
NULL AS GENE_NAME,
NULL AS GENE_DECOY,
NULL AS PROTEIN_DECOY,
NULL AS PEPTIDE_DECOY,
NULL AS PRECURSOR_DECOY,
NULL AS RUN_ID,
NULL AS FILENAME,
FEATURE_TRANSITION.FEATURE_ID AS FEATURE_ID,
NULL AS EXP_RT,
NULL AS EXP_IM,
NULL as NORM_RT,
NULL AS DELTA_RT,
NULL AS LEFT_WIDTH,
NULL AS RIGHT_WIDTH,
NULL AS IM_leftWidth,
NULL AS IM_rightWidth,
{as_null_feature_ms1_cols_sql},
{as_null_feature_ms2_cols_sql},
TRANSITION.ID AS TRANSITION_ID,
TRANSITION.TRAML_ID AS TRANSITION_TRAML_ID,
TRANSITION.PRODUCT_MZ,
TRANSITION.CHARGE AS TRANSITION_CHARGE,
TRANSITION.TYPE AS TRANSITION_TYPE,
TRANSITION.ORDINAL AS TRANSITION_ORDINAL,
{annotation} AS ANNOTATION,
TRANSITION.DETECTING AS TRANSITION_DETECTING,
TRANSITION.LIBRARY_INTENSITY AS TRANSITION_LIBRARY_INTENSITY,
TRANSITION.DECOY AS TRANSITION_DECOY,
{feature_transition_cols_sql}
{as_null_score_cols}
{score_transition_cols}
FROM sqlite_scan('{self.config.infile}', 'TRANSITION_PRECURSOR_MAPPING') AS TRANSITION_PRECURSOR_MAPPING
INNER JOIN sqlite_scan('{self.config.infile}', 'TRANSITION') AS TRANSITION
ON TRANSITION_PRECURSOR_MAPPING.TRANSITION_ID = TRANSITION.ID
FULL JOIN sqlite_scan('{self.config.infile}', 'TRANSITION_PEPTIDE_MAPPING') AS TRANSITION_PEPTIDE_MAPPING
ON TRANSITION.ID = TRANSITION_PEPTIDE_MAPPING.TRANSITION_ID
FULL JOIN sqlite_scan('{self.config.infile}', 'FEATURE_TRANSITION') AS FEATURE_TRANSITION
ON TRANSITION.ID = FEATURE_TRANSITION.TRANSITION_ID
{score_transition_join}
"""
[docs]
def _create_temp_table(self, conn, column_info: dict) -> None:
"""Create temporary table with combined schema"""
feature_ms1_cols_types = ", ".join(
f"FEATURE_MS1_{col[0]} {col[1]}" for col in column_info["feature_ms1_cols"]
)
feature_ms2_cols_types = ", ".join(
f"FEATURE_MS2_{col[0]} {col[1]}" for col in column_info["feature_ms2_cols"]
)
feature_transition_cols_types = ", ".join(
f"FEATURE_TRANSITION_{col[0]} {col[1]}"
for col in column_info["feature_transition_cols"]
)
# Build score column types
score_cols_types = []
if column_info.get("score_ms1_exists", False):
score_cols_types.extend(
[
"SCORE_MS1_SCORE DOUBLE",
"SCORE_MS1_RANK INTEGER",
"SCORE_MS1_P_VALUE DOUBLE",
"SCORE_MS1_Q_VALUE DOUBLE",
"SCORE_MS1_PEP DOUBLE",
]
)
if column_info.get("score_ms2_exists", False):
score_cols_types.extend(
[
"SCORE_MS2_SCORE DOUBLE",
"SCORE_MS2_PEAK_GROUP_RANK INTEGER",
"SCORE_MS2_P_VALUE DOUBLE",
"SCORE_MS2_Q_VALUE DOUBLE",
"SCORE_MS2_PEP DOUBLE",
]
)
if column_info.get("score_ipf_exists", False):
score_cols_types.extend(
[
"SCORE_IPF_PRECURSOR_PEAKGROUP_PEP DOUBLE",
"SCORE_IPF_PEP DOUBLE",
"SCORE_IPF_QVALUE DOUBLE",
]
)
# Add peptide and protein score columns for each context
for table in ["peptide", "protein"]:
if column_info.get(f"score_{table}_exists", False):
for context in column_info.get(f"score_{table}_contexts", []):
safe_context = context.upper().replace("-", "_")
score_cols_types.extend(
[
f"SCORE_{table.upper()}_{safe_context}_SCORE DOUBLE",
f"SCORE_{table.upper()}_{safe_context}_P_VALUE DOUBLE",
f"SCORE_{table.upper()}_{safe_context}_Q_VALUE DOUBLE",
f"SCORE_{table.upper()}_{safe_context}_PEP DOUBLE",
]
)
# Add transition score columns
if column_info.get("score_transition_exists", False):
score_cols_types.extend(
[
"SCORE_TRANSITION_SCORE DOUBLE",
"SCORE_TRANSITION_RANK INTEGER",
"SCORE_TRANSITION_P_VALUE DOUBLE",
"SCORE_TRANSITION_Q_VALUE DOUBLE",
"SCORE_TRANSITION_PEP DOUBLE",
]
)
# Prepend comma and space to score columns if there are any
score_cols_types_sql = (
(", " + ", ".join(score_cols_types)) if score_cols_types else ""
)
create_temp_table_query = f"""
CREATE TABLE temp_table (
PROTEIN_ID BIGINT,
PEPTIDE_ID BIGINT,
IPF_PEPTIDE_ID BIGINT,
PRECURSOR_ID BIGINT,
PROTEIN_ACCESSION TEXT,
UNMODIFIED_SEQUENCE TEXT,
MODIFIED_SEQUENCE TEXT,
PRECURSOR_TRAML_ID TEXT,
PRECURSOR_GROUP_LABEL TEXT,
PRECURSOR_MZ DOUBLE,
PRECURSOR_CHARGE INTEGER,
PRECURSOR_LIBRARY_INTENSITY DOUBLE,
PRECURSOR_LIBRARY_RT DOUBLE,
PRECURSOR_LIBRARY_DRIFT_TIME DOUBLE,
GENE_ID BIGINT,
GENE_NAME TEXT,
GENE_DECOY BOOLEAN,
PROTEIN_DECOY BOOLEAN,
PEPTIDE_DECOY BOOLEAN,
PRECURSOR_DECOY BOOLEAN,
RUN_ID BIGINT,
FILENAME TEXT,
FEATURE_ID BIGINT,
EXP_RT DOUBLE,
EXP_IM DOUBLE,
NORM_RT DOUBLE,
DELTA_RT DOUBLE,
LEFT_WIDTH DOUBLE,
RIGHT_WIDTH DOUBLE,
IM_leftWidth DOUBLE,
IM_rightWidth DOUBLE,
{feature_ms1_cols_types},
{feature_ms2_cols_types},
TRANSITION_ID BIGINT,
TRANSITION_TRAML_ID TEXT,
PRODUCT_MZ DOUBLE,
TRANSITION_CHARGE INTEGER,
TRANSITION_TYPE TEXT,
TRANSITION_ORDINAL INTEGER,
ANNOTATION TEXT,
TRANSITION_DETECTING BOOLEAN,
TRANSITION_LIBRARY_INTENSITY DOUBLE,
TRANSITION_DECOY BOOLEAN,
{feature_transition_cols_types}
{score_cols_types_sql}
);
"""
conn.execute(create_temp_table_query)
[docs]
def _export_alignment_data(self, conn, path: str = None) -> None:
"""Export feature alignment data with scores if available"""
if path is None:
path = os.path.join(self.config.outfile, "feature_alignment.parquet")
# Check if SCORE_ALIGNMENT table exists
with sqlite3.connect(self.config.infile) as sql_conn:
has_score_alignment = check_sqlite_table(sql_conn, "SCORE_ALIGNMENT")
if has_score_alignment:
# Export with alignment scores
query = f"""
SELECT
FEATURE_MS2_ALIGNMENT.ALIGNMENT_ID,
FEATURE_MS2_ALIGNMENT.RUN_ID,
FEATURE_MS2_ALIGNMENT.PRECURSOR_ID,
FEATURE_MS2_ALIGNMENT.ALIGNED_FEATURE_ID AS FEATURE_ID,
FEATURE_MS2_ALIGNMENT.REFERENCE_FEATURE_ID,
FEATURE_MS2_ALIGNMENT.ALIGNED_RT,
FEATURE_MS2_ALIGNMENT.REFERENCE_RT,
FEATURE_MS2_ALIGNMENT.XCORR_COELUTION_TO_REFERENCE AS VAR_XCORR_COELUTION_TO_REFERENCE,
FEATURE_MS2_ALIGNMENT.XCORR_SHAPE_TO_REFERENCE AS VAR_XCORR_SHAPE_TO_REFERENCE,
FEATURE_MS2_ALIGNMENT.MI_TO_REFERENCE AS VAR_MI_TO_REFERENCE,
FEATURE_MS2_ALIGNMENT.XCORR_COELUTION_TO_ALL AS VAR_XCORR_COELUTION_TO_ALL,
FEATURE_MS2_ALIGNMENT.XCORR_SHAPE_TO_ALL AS VAR_XCORR_SHAPE,
FEATURE_MS2_ALIGNMENT.MI_TO_ALL AS VAR_MI_TO_ALL,
FEATURE_MS2_ALIGNMENT.RETENTION_TIME_DEVIATION AS VAR_RETENTION_TIME_DEVIATION,
FEATURE_MS2_ALIGNMENT.PEAK_INTENSITY_RATIO AS VAR_PEAK_INTENSITY_RATIO,
FEATURE_MS2_ALIGNMENT.LABEL AS DECOY,
SCORE_ALIGNMENT.SCORE AS SCORE,
SCORE_ALIGNMENT.PEP AS PEP,
SCORE_ALIGNMENT.QVALUE AS QVALUE
FROM sqlite_scan('{self.config.infile}', 'FEATURE_MS2_ALIGNMENT') AS FEATURE_MS2_ALIGNMENT
LEFT JOIN (
SELECT FEATURE_ID, SCORE, PEP, QVALUE, MIN(QVALUE) as MIN_QVALUE
FROM sqlite_scan('{self.config.infile}', 'SCORE_ALIGNMENT')
GROUP BY FEATURE_ID
) AS SCORE_ALIGNMENT
ON FEATURE_MS2_ALIGNMENT.ALIGNED_FEATURE_ID = SCORE_ALIGNMENT.FEATURE_ID
"""
else:
# Export without scores (original behavior)
query = f"""
SELECT
ALIGNMENT_ID,
RUN_ID,
PRECURSOR_ID,
ALIGNED_FEATURE_ID AS FEATURE_ID,
REFERENCE_FEATURE_ID,
ALIGNED_RT,
REFERENCE_RT,
XCORR_COELUTION_TO_REFERENCE AS VAR_XCORR_COELUTION_TO_REFERENCE,
XCORR_SHAPE_TO_REFERENCE AS VAR_XCORR_SHAPE_TO_REFERENCE,
MI_TO_REFERENCE AS VAR_MI_TO_REFERENCE,
XCORR_COELUTION_TO_ALL AS VAR_XCORR_COELUTION_TO_ALL,
XCORR_SHAPE_TO_ALL AS VAR_XCORR_SHAPE,
MI_TO_ALL AS VAR_MI_TO_ALL,
RETENTION_TIME_DEVIATION AS VAR_RETENTION_TIME_DEVIATION,
PEAK_INTENSITY_RATIO AS VAR_PEAK_INTENSITY_RATIO,
LABEL AS DECOY
FROM sqlite_scan('{self.config.infile}', 'FEATURE_MS2_ALIGNMENT')
"""
self._execute_copy_query(conn, query, path)
[docs]
def _build_gene_joins(self, column_info: dict) -> str:
"""Build gene join clauses if gene tables exist"""
if column_info["gene_tables_exist"]:
return f"""
LEFT JOIN sqlite_scan('{self.config.infile}', 'PEPTIDE_GENE_MAPPING') AS PEPTIDE_GENE_MAPPING
ON PEPTIDE.ID = PEPTIDE_GENE_MAPPING.PEPTIDE_ID
LEFT JOIN sqlite_scan('{self.config.infile}', 'GENE') AS GENE
ON PEPTIDE_GENE_MAPPING.GENE_ID = GENE.ID
"""
return ""
[docs]
def _check_contexts(self, conn, score_table) -> list:
"""Get list of contexts available in score table"""
contexts_query = f""" SELECT DISTINCT context FROM {score_table} """
result = conn.execute(contexts_query).fetchall()
logger.debug("result of contexts query: ", result)
return [row[0] for row in result]
[docs]
def _get_peptide_protein_score_table(self, level, contexts: list) -> str:
"""Create a DuckDB view for peptide/protein score data and return the view name"""
if level == "peptide":
id_col = "PEPTIDE_ID"
score_table = "SCORE_PEPTIDE"
else: # level == 'protein'
id_col = "PROTEIN_ID"
score_table = "SCORE_PROTEIN"
view_name = f"{score_table.lower()}_view"
# Build pivot columns for non-global contexts.
pivot_cols = []
non_global_col_names = []
for context in contexts:
if context != "global":
safe_context = context.upper().replace("-", "_")
non_global_col_names.extend(
[
f"{score_table}_{safe_context}_SCORE",
f"{score_table}_{safe_context}_PVALUE",
f"{score_table}_{safe_context}_QVALUE",
f"{score_table}_{safe_context}_PEP",
]
)
pivot_cols.extend(
[
f"ANY_VALUE(CASE WHEN context = '{context}' THEN SCORE END) as {score_table}_{safe_context}_SCORE",
f"ANY_VALUE(CASE WHEN context = '{context}' THEN PVALUE END) as {score_table}_{safe_context}_PVALUE",
f"ANY_VALUE(CASE WHEN context = '{context}' THEN QVALUE END) as {score_table}_{safe_context}_QVALUE",
f"ANY_VALUE(CASE WHEN context = '{context}' THEN PEP END) as {score_table}_{safe_context}_PEP",
]
)
pivot_cols_str = ", ".join(pivot_cols)
# Non-global contexts are keyed by (ID, RUN_ID).
if pivot_cols_str:
non_global_query = f"""
SELECT {id_col}, RUN_ID, {pivot_cols_str}
FROM sqlite_scan('{self.config.infile}', '{score_table}')
WHERE context != 'global'
GROUP BY {id_col}, RUN_ID
"""
else:
non_global_query = None
global_exists = "global" in contexts
global_col_names = [
f"{score_table}_GLOBAL_SCORE",
f"{score_table}_GLOBAL_PVALUE",
f"{score_table}_GLOBAL_QVALUE",
f"{score_table}_GLOBAL_PEP",
]
# Global scores are keyed only by the entity ID. Some files store RUN_ID as NULL.
if global_exists:
glob_query = f"""
SELECT {id_col},
ANY_VALUE(SCORE) as {score_table}_GLOBAL_SCORE,
ANY_VALUE(PVALUE) as {score_table}_GLOBAL_PVALUE,
ANY_VALUE(QVALUE) as {score_table}_GLOBAL_QVALUE,
ANY_VALUE(PEP) as {score_table}_GLOBAL_PEP
FROM sqlite_scan('{self.config.infile}', '{score_table}')
WHERE context = 'global'
GROUP BY {id_col}
"""
else:
glob_query = None
# Build final merged query based on what exists
if non_global_query and glob_query:
non_global_select_cols = ",\n ".join(
f"ng.{col}" for col in non_global_col_names
)
global_select_cols = ",\n ".join(
f"g.{col}" for col in global_col_names
)
merged_query = f"""
SELECT
COALESCE(ng.{id_col}, g.{id_col}) AS {id_col},
ng.RUN_ID,
{non_global_select_cols},
{global_select_cols}
FROM ({non_global_query}) ng
FULL OUTER JOIN ({glob_query}) g ON ng.{id_col} = g.{id_col}
"""
elif non_global_query:
merged_query = non_global_query
elif glob_query:
global_select_cols = ",\n ".join(
f"g.{col}" for col in global_col_names
)
merged_query = f"""
SELECT
g.{id_col},
NULL AS RUN_ID,
{global_select_cols}
FROM ({glob_query}) g
"""
else:
merged_query = f"""
SELECT {id_col}, RUN_ID
FROM sqlite_scan('{self.config.infile}', '{score_table}')
WHERE 1=0
"""
# Create the view in DuckDB
return f"{view_name} AS ({merged_query})"
[docs]
def _build_score_column_selection_and_joins(
self, column_info: dict
) -> Tuple[str, str, str]:
"""Build score column selection and joins based on available score tables"""
score_columns_to_select = []
score_tables_to_join = []
score_views = []
if column_info["score_ms1_exists"]:
logger.debug("SCORE_MS1 table exists, adding score columns to selection")
score_columns_to_select.append(
"SCORE_MS1.SCORE AS SCORE_MS1_SCORE, SCORE_MS1.RANK AS SCORE_MS1_RANK, SCORE_MS1.PVALUE AS SCORE_MS1_P_VALUE, SCORE_MS1.QVALUE AS SCORE_MS1_Q_VALUE, SCORE_MS1.PEP AS SCORE_MS1_PEP"
)
score_tables_to_join.append(
f"INNER JOIN sqlite_scan('{self.config.infile}', 'SCORE_MS1') AS SCORE_MS1 ON FEATURE.ID = SCORE_MS1.FEATURE_ID"
)
if column_info["score_ms2_exists"]:
logger.debug("SCORE_MS2 table exists, adding score columns to selection")
score_columns_to_select.append(
"SCORE_MS2.SCORE AS SCORE_MS2_SCORE, SCORE_MS2.RANK AS SCORE_MS2_PEAK_GROUP_RANK, SCORE_MS2.PVALUE AS SCORE_MS2_P_VALUE, SCORE_MS2.QVALUE AS SCORE_MS2_Q_VALUE, SCORE_MS2.PEP AS SCORE_MS2_PEP"
)
score_tables_to_join.append(
f"INNER JOIN sqlite_scan('{self.config.infile}', 'SCORE_MS2') AS SCORE_MS2 ON FEATURE.ID = SCORE_MS2.FEATURE_ID"
)
if column_info["score_ipf_exists"]:
logger.debug("SCORE_IPF table exists, adding score columns to selection")
score_columns_to_select.append(
"SCORE_IPF.PRECURSOR_PEAKGROUP_PEP AS SCORE_IPF_PRECURSOR_PEAKGROUP_PEP, SCORE_IPF.PEP AS SCORE_IPF_PEP, SCORE_IPF.QVALUE AS SCORE_IPF_QVALUE"
)
# NOTE: UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING needs to be created before this join is actually executed. This is done by registering the table in DuckDB in the precursor query builder.
# TODO: We should maybe add the UNIMOD_TO_CODENAME_PEPTIDE_ID_MAPPING during OpenSwathWorkflow execution to avoid doing it here?
self._insert_precursor_peptide_ipf_map()
score_tables_to_join.append(
f"""
LEFT JOIN sqlite_scan('{self.config.infile}', 'PRECURSOR_PEPTIDE_IPF_MAPPING') AS SCORE_IPF ON SCORE_IPF.FEATURE_ID = FEATURE.ID
"""
)
# Create views for peptide and protein score tables if they exist
if column_info["score_peptide_exists"]:
logger.debug("SCORE_PEPTIDE table exists, adding score table view to query")
score_views.append(
self._get_peptide_protein_score_table(
"peptide", column_info["score_peptide_contexts"]
)
)
# Add JOIN for peptide score view
score_tables_to_join.append(
"LEFT JOIN score_peptide_view ON PEPTIDE.ID = score_peptide_view.PEPTIDE_ID "
"AND (FEATURE.RUN_ID = score_peptide_view.RUN_ID OR score_peptide_view.RUN_ID IS NULL)"
)
if column_info["score_protein_exists"]:
logger.debug("SCORE_PROTEIN table exists, adding score table view to query")
score_views.append(
self._get_peptide_protein_score_table(
"protein", column_info["score_protein_contexts"]
)
)
# Add JOIN for protein score view
score_tables_to_join.append(
"LEFT JOIN score_protein_view ON PEPTIDE_PROTEIN_MAPPING.PROTEIN_ID = score_protein_view.PROTEIN_ID "
"AND (FEATURE.RUN_ID = score_protein_view.RUN_ID OR score_protein_view.RUN_ID IS NULL)"
)
# Add score columns for peptide and protein contexts
for table in ["peptide", "protein"]:
if column_info[f"score_{table}_exists"]:
logger.debug(
f"SCORE_{table.upper()} table exists, adding score columns to selection"
)
for context in column_info[f"score_{table}_contexts"]:
safe_context = context.upper().replace("-", "_")
score_columns_to_select.append(
f"score_{table.lower()}_view.SCORE_{table.upper()}_{safe_context}_SCORE AS SCORE_{table.upper()}_{safe_context}_SCORE, "
f"score_{table.lower()}_view.SCORE_{table.upper()}_{safe_context}_PVALUE AS SCORE_{table.upper()}_{safe_context}_P_VALUE, "
f"score_{table.lower()}_view.SCORE_{table.upper()}_{safe_context}_QVALUE AS SCORE_{table.upper()}_{safe_context}_Q_VALUE, "
f"score_{table.lower()}_view.SCORE_{table.upper()}_{safe_context}_PEP AS SCORE_{table.upper()}_{safe_context}_PEP"
)
# Properly format the WITH clause
with_clause = ""
if score_views:
with_clause = "WITH " + ", ".join(score_views)
return (
", ".join(score_columns_to_select),
" ".join(score_tables_to_join),
with_clause,
)