import os
import pickle
from shutil import copyfile
import sqlite3
import duckdb
import pandas as pd
import click
from loguru import logger
from ..util import check_sqlite_table, check_duckdb_table
from .._base import BaseOSWReader, BaseOSWWriter
from ..._config import RunnerIOConfig
[docs]
class OSWReader(BaseOSWReader):
"""
Class for reading and processing data from an OpenSWATH workflow OSW-sqlite based file.
The OSWReader class provides methods to read different levels of data from the file and process it accordingly.
It supports reading data for semi-supervised learning, IPF analysis, context level analysis.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
read(): Read data from the input file based on the alogorithm.
"""
[docs]
def __init__(self, config: RunnerIOConfig):
super().__init__(config)
[docs]
def read(self) -> pd.DataFrame:
"""
Reads the data for scoring from the specified file using DuckDB if available,
falling back to SQLite if DuckDB is not available.
Returns:
pd.DataFrame: The data read from the file.
"""
self._create_indexes()
try:
con = duckdb.connect()
con.execute("INSTALL sqlite_scanner;")
con.execute("LOAD sqlite_scanner;")
con.execute(f"ATTACH DATABASE '{self.infile}' AS osw (TYPE sqlite);")
self._init_duckdb_views(con)
return self._read_using_duckdb(con)
except ModuleNotFoundError as e:
logger.warning(
f"Warn: DuckDB sqlite_scanner failed, falling back to SQLite. Reason: {e}"
)
con = sqlite3.connect(self.infile)
return self._read_using_sqlite(con)
[docs]
def _create_indexes(self):
"""
Always use a temporary SQLite connection to create indexes directly on the .osw file,
since DuckDB doesn't seem to currently support creating indexes on attached SQLite databases.
"""
try:
sqlite_con = sqlite3.connect(self.infile)
index_statements = [
"CREATE INDEX IF NOT EXISTS idx_precursor_id ON PRECURSOR (ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_precursor_id ON FEATURE (PRECURSOR_ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_feature_id ON FEATURE (ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_ms1_feature_id ON FEATURE_MS1 (FEATURE_ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_ms2_feature_id ON FEATURE_MS2 (FEATURE_ID);",
"CREATE INDEX IF NOT EXISTS idx_score_ms2_feature_id ON SCORE_MS2 (FEATURE_ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_transition_feature_id ON FEATURE_TRANSITION (FEATURE_ID);",
"CREATE INDEX IF NOT EXISTS idx_feature_transition_transition_id ON FEATURE_TRANSITION (TRANSITION_ID);",
"CREATE INDEX IF NOT EXISTS idx_transition_id ON TRANSITION (ID);",
]
for stmt in index_statements:
try:
sqlite_con.execute(stmt)
except sqlite3.OperationalError as e:
logger.warning(f"Warn: SQLite index creation failed: {e}")
sqlite_con.commit()
sqlite_con.close()
except Exception as e:
raise click.ClickException(
f"Failed to create indexes via SQLite fallback: {e}"
)
[docs]
def _read_using_duckdb(self, con):
"""
Read features from SQLite using DuckDB based on the specified level.
Parameters:
- con: Connection to the DuckDB database.
Returns:
- Features based on the specified level.
Raises:
- click.ClickException: If the specified level is unsupported.
"""
level = self.level
if level in ("ms2", "ms1ms2"):
return self._fetch_ms2_features_duckdb(con)
elif level == "ms1":
return self._fetch_ms1_features_duckdb(con)
elif level == "transition":
return self._fetch_transition_features_duckdb(con)
elif level == "alignment":
return self._fetch_alignment_features_duckdb(con)
else:
raise click.ClickException(f"Unsupported level: {level}")
[docs]
def _read_using_sqlite(self, con):
"""
Read features from SQLite database based on the specified level.
Parameters:
- con: SQLite connection object
Returns:
- Features based on the specified level
Raises:
- click.ClickException: If the specified level is unsupported
"""
level = self.level
if level in ("ms2", "ms1ms2"):
return self._fetch_ms2_features_sqlite(con)
elif level == "ms1":
return self._fetch_ms1_features_sqlite(con)
elif level == "transition":
return self._fetch_transition_features_sqlite(con)
elif level == "alignment":
return self._fetch_alignment_features_sqlite(con)
else:
raise click.ClickException(f"Unsupported level: {level}")
# ----------------------------
# DuckDB Queries
# ----------------------------
def _fetch_tables_duckdb(self, con):
tables = con.execute(
"SELECT table_schema, table_name FROM information_schema.tables"
).fetchdf()
return tables
[docs]
def _get_precursor_filter_clause(self):
"""
Return a WHERE/AND clause fragment for filtering by sampled precursor IDs when subsampling is enabled.
Returns empty string if no subsampling, otherwise returns a clause like:
" AND f.PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)"
"""
if self.subsample_ratio < 1.0:
return " AND f.PRECURSOR_ID IN (SELECT PRECURSOR_ID FROM sampled_precursor_ids)"
return ""
def _fetch_ms2_features_duckdb(self, con):
if not check_duckdb_table(con, "main", "FEATURE_MS2"):
raise click.ClickException(
f"MS2-level feature table not present in file.\nTable Info:\n{self._fetch_tables_duckdb(con)}"
)
filter_clause = self._get_precursor_filter_clause()
if self.glyco:
con.execute(
f"""
CREATE OR REPLACE VIEW ms2_table AS
SELECT
fm.*,
f.RUN_ID,
f.PRECURSOR_ID,
f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE,
p.DECOY,
g.DECOY_PEPTIDE,
g.DECOY_GLYCAN,
COALESCE(ts.TRANSITION_COUNT, 0) AS TRANSITION_COUNT,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM osw.FEATURE_MS2 fm
INNER JOIN osw.FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN osw.PRECURSOR p ON f.PRECURSOR_ID = p.ID
INNER JOIN (
SELECT
pgm.PRECURSOR_ID,
gp.DECOY_PEPTIDE,
gp.DECOY_GLYCAN
FROM osw.PRECURSOR_GLYCOPEPTIDE_MAPPING pgm
INNER JOIN osw.GLYCOPEPTIDE gp ON pgm.GLYCOPEPTIDE_ID = gp.ID
) g ON f.PRECURSOR_ID = g.PRECURSOR_ID
LEFT JOIN (
SELECT
tpm.PRECURSOR_ID,
COUNT(*) AS TRANSITION_COUNT
FROM osw.TRANSITION_PRECURSOR_MAPPING tpm
INNER JOIN osw.TRANSITION t ON tpm.TRANSITION_ID = t.ID
WHERE t.DETECTING = 1
GROUP BY tpm.PRECURSOR_ID
) ts ON f.PRECURSOR_ID = ts.PRECURSOR_ID
WHERE 1=1{filter_clause}
"""
)
else:
con.execute(
f"""
CREATE OR REPLACE VIEW ms2_table AS
SELECT
fm.*,
f.RUN_ID,
f.PRECURSOR_ID,
f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE,
p.DECOY,
COALESCE(ts.TRANSITION_COUNT, 0) AS TRANSITION_COUNT,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM osw.FEATURE_MS2 fm
INNER JOIN osw.FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN osw.PRECURSOR p ON f.PRECURSOR_ID = p.ID
LEFT JOIN (
SELECT
tpm.PRECURSOR_ID,
COUNT(*) AS TRANSITION_COUNT
FROM osw.TRANSITION_PRECURSOR_MAPPING tpm
INNER JOIN osw.TRANSITION t ON tpm.TRANSITION_ID = t.ID
WHERE t.DETECTING = 1
GROUP BY tpm.PRECURSOR_ID
) ts ON f.PRECURSOR_ID = ts.PRECURSOR_ID
WHERE 1=1{filter_clause}
"""
)
df = con.execute(
"SELECT * FROM ms2_table ORDER BY RUN_ID, PRECURSOR_ID, EXP_RT"
).fetchdf()
if self.level == "ms1ms2":
ms1_df = con.execute("SELECT * FROM osw.FEATURE_MS1").fetchdf()
ms1_scores = [c for c in ms1_df.columns if c.startswith("VAR_")]
ms1_df = ms1_df[["FEATURE_ID"] + ms1_scores]
ms1_df.columns = ["FEATURE_ID"] + [
"VAR_MS1_" + s.split("VAR_")[1] for s in ms1_scores
]
df = pd.merge(df, ms1_df, how="left", on="FEATURE_ID")
return self._finalize_feature_table(df, self.config.runner.ss_main_score)
def _fetch_ms1_features_duckdb(self, con):
if not check_duckdb_table(con, "main", "FEATURE_MS1"):
raise click.ClickException(
f"MS1-level feature table not present in file.\nTable Info:\n{self._fetch_tables_duckdb(con)}"
)
rc = self.config.runner
glyco = rc.glyco
ipf_max_rank = rc.ipf_max_peakgroup_rank
filter_clause = self._get_precursor_filter_clause()
if not glyco:
con.execute(
f"""
CREATE OR REPLACE VIEW ms1_table AS
SELECT fm.*, f.RUN_ID, f.PRECURSOR_ID, f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE, p.DECOY,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM osw.FEATURE_MS1 fm
INNER JOIN osw.FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN osw.PRECURSOR p ON f.PRECURSOR_ID = p.ID
WHERE 1=1{filter_clause}
ORDER BY f.RUN_ID, p.ID, f.EXP_RT
"""
)
else:
if not check_duckdb_table(con, "main", "SCORE_MS2"):
raise click.ClickException(
f"""MS1-level scoring for glycoform inference requires prior MS2 or MS1MS2-level scoring.\n
Please run 'pyprophet score --level=ms2' or 'pyprophet score --level=ms1ms2' on this file first.level\nTable Info:\n{self._fetch_tables_duckdb(con)}"""
)
con.execute(
f"""
CREATE OR REPLACE VIEW ms1_table AS
SELECT g.DECOY_PEPTIDE, g.DECOY_GLYCAN,
fm.*, f.*, p.*,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM osw.FEATURE_MS1 fm
INNER JOIN osw.FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN osw.SCORE_MS2 s ON f.ID = s.FEATURE_ID
INNER JOIN osw.PRECURSOR p ON f.PRECURSOR_ID = p.ID
INNER JOIN (
SELECT pgm.PRECURSOR_ID,
gp.DECOY_PEPTIDE,
gp.DECOY_GLYCAN
FROM osw.PRECURSOR_GLYCOPEPTIDE_MAPPING pgm
INNER JOIN osw.GLYCOPEPTIDE gp ON pgm.GLYCOPEPTIDE_ID = gp.ID
) g ON f.PRECURSOR_ID = g.PRECURSOR_ID
WHERE s.RANK <= {ipf_max_rank}{filter_clause}
ORDER BY f.RUN_ID, p.ID, f.EXP_RT
"""
)
df = con.execute("SELECT * FROM ms1_table").fetchdf()
return self._finalize_feature_table(df, rc.ss_main_score)
def _fetch_transition_features_duckdb(self, con):
if not check_duckdb_table(con, "main", "SCORE_MS2"):
raise click.ClickException(
f"""Transition-level scoring for IPF requires prior MS2 or MS1MS2-level scoring.\n
Please run 'pyprophet score --level=ms2' or 'pyprophet score --level=ms1ms2' first.\nTable Info:\n{self._fetch_tables_duckdb(con)}"""
)
if not check_duckdb_table(con, "main", "FEATURE_TRANSITION"):
raise click.ClickException(
f"Transition-level feature table not present in file.\nTable Info:\n{self._fetch_tables_duckdb(con)}"
)
rc = self.config.runner
filter_clause = self._get_precursor_filter_clause()
con.execute(
f"""
CREATE OR REPLACE VIEW transition_table AS
SELECT
ft.*,
t.DECOY AS DECOY,
f.RUN_ID,
f.PRECURSOR_ID,
f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE,
t.CHARGE AS PRODUCT_CHARGE,
f.RUN_ID || '_' || ft.FEATURE_ID || '_' || f.PRECURSOR_ID || '_' || ft.TRANSITION_ID AS GROUP_ID
FROM osw.FEATURE_TRANSITION ft
INNER JOIN osw.FEATURE f ON ft.FEATURE_ID = f.ID
INNER JOIN osw.SCORE_MS2 s ON f.ID = s.FEATURE_ID
INNER JOIN osw.PRECURSOR p ON f.PRECURSOR_ID = p.ID
INNER JOIN osw.TRANSITION t ON ft.TRANSITION_ID = t.ID
WHERE s.RANK <= {rc.ipf_max_peakgroup_rank}
AND s.PEP <= {rc.ipf_max_peakgroup_pep}
AND ft.VAR_ISOTOPE_OVERLAP_SCORE <= {rc.ipf_max_transition_isotope_overlap}
AND ft.VAR_LOG_SN_SCORE > {rc.ipf_min_transition_sn}
AND p.DECOY = 0{filter_clause}
"""
)
df = con.execute(
"""
SELECT *
FROM transition_table
ORDER BY RUN_ID, FEATURE_ID, PRECURSOR_ID, EXP_RT, TRANSITION_ID
"""
).fetchdf()
return self._finalize_feature_table(df, self.config.runner.ss_main_score)
def _fetch_alignment_features_duckdb(self, con):
if not check_duckdb_table(con, "main", "FEATURE_MS2_ALIGNMENT"):
raise click.ClickException(
f"MS2-level feature alignment table not present in file.\nTable Info:\n{self._fetch_tables_duckdb(con)}"
)
filter_clause = self._get_precursor_filter_clause()
con.execute(
f"""
CREATE OR REPLACE VIEW alignment_table AS
SELECT
fa.ALIGNMENT_ID AS ALIGNMENT_ID, fa.RUN_ID,
fa.PRECURSOR_ID, fa.ALIGNED_FEATURE_ID AS FEATURE_ID,
fa.ALIGNED_RT, fa.LABEL AS DECOY,
fa.XCORR_COELUTION_TO_REFERENCE AS VAR_XCORR_COELUTION_TO_REFERENCE,
fa.XCORR_SHAPE_TO_REFERENCE AS VAR_XCORR_SHAPE,
fa.MI_TO_REFERENCE AS VAR_MI_TO_REFERENCE,
fa.XCORR_COELUTION_TO_ALL AS VAR_XCORR_COELUTION_TO_ALL,
fa.XCORR_SHAPE_TO_ALL AS VAR_XCORR_SHAPE_TO_ALL,
fa.MI_TO_ALL AS VAR_MI_TO_ALL,
fa.RETENTION_TIME_DEVIATION AS VAR_RETENTION_TIME_DEVIATION,
fa.PEAK_INTENSITY_RATIO AS VAR_PEAK_INTENSITY_RATIO,
fa.ALIGNED_FEATURE_ID || '_' || fa.PRECURSOR_ID AS GROUP_ID
FROM osw.FEATURE_MS2_ALIGNMENT fa
WHERE 1=1{filter_clause}
ORDER BY fa.RUN_ID, fa.PRECURSOR_ID, fa.REFERENCE_RT
"""
)
df = con.execute("SELECT * FROM alignment_table").fetchdf()
df["DECOY"] = df["DECOY"].map({1: 0, -1: 1})
return self._finalize_feature_table(df, self.config.runner.ss_main_score)
# ----------------------------
# SQLite fallback
# ----------------------------
def _fetch_ms2_features_sqlite(self, con):
if not check_sqlite_table(con, "FEATURE_MS2"):
raise click.ClickException("MS2-level feature table not present in file.")
if not self.glyco:
query = """
SELECT fm.*,
f.RUN_ID,
f.PRECURSOR_ID,
f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE,
p.DECOY,
COALESCE(ts.TRANSITION_COUNT, 0) AS TRANSITION_COUNT,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM FEATURE_MS2 fm
INNER JOIN FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN PRECURSOR p ON f.PRECURSOR_ID = p.ID
LEFT JOIN (
SELECT tpm.PRECURSOR_ID,
COUNT(*) AS TRANSITION_COUNT
FROM TRANSITION_PRECURSOR_MAPPING tpm
INNER JOIN TRANSITION t ON tpm.TRANSITION_ID = t.ID
WHERE t.DETECTING = 1
GROUP BY tpm.PRECURSOR_ID
) ts ON f.PRECURSOR_ID = ts.PRECURSOR_ID
ORDER BY f.RUN_ID, p.ID, f.EXP_RT
"""
else:
query = """
SELECT fm.*,
f.RUN_ID,
f.PRECURSOR_ID,
f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE,
p.DECOY,
g.DECOY_PEPTIDE,
g.DECOY_GLYCAN,
COALESCE(ts.TRANSITION_COUNT, 0) AS TRANSITION_COUNT,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM FEATURE_MS2 fm
INNER JOIN FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN PRECURSOR p ON f.PRECURSOR_ID = p.ID
INNER JOIN (
SELECT pgm.PRECURSOR_ID,
gp.DECOY_PEPTIDE,
gp.DECOY_GLYCAN
FROM PRECURSOR_GLYCOPEPTIDE_MAPPING pgm
INNER JOIN GLYCOPEPTIDE gp ON pgm.GLYCOPEPTIDE_ID = gp.ID
) g ON f.PRECURSOR_ID = g.PRECURSOR_ID
LEFT JOIN (
SELECT tpm.PRECURSOR_ID,
COUNT(*) AS TRANSITION_COUNT
FROM TRANSITION_PRECURSOR_MAPPING tpm
INNER JOIN TRANSITION t ON tpm.TRANSITION_ID = t.ID
WHERE t.DETECTING = 1
GROUP BY tpm.PRECURSOR_ID
) ts ON f.PRECURSOR_ID = ts.PRECURSOR_ID
ORDER BY f.RUN_ID, p.ID, f.EXP_RT
"""
df = pd.read_sql_query(query, con)
if self.level == "ms1ms2":
ms1_df = pd.read_sql_query("SELECT * FROM FEATURE_MS1", con)
ms1_scores = [c for c in ms1_df.columns if c.startswith("VAR_")]
ms1_df = ms1_df[["FEATURE_ID"] + ms1_scores]
ms1_df.columns = ["FEATURE_ID"] + [
"VAR_MS1_" + s.split("VAR_")[1] for s in ms1_scores
]
df = pd.merge(df, ms1_df, how="left", on="FEATURE_ID")
return self._finalize_feature_table(df, self.config.runner.ss_main_score)
def _fetch_ms1_features_sqlite(self, con):
rc = self.config.runner
glyco = rc.glyco
ipf_max_rank = rc.ipf_max_peakgroup_rank
if not check_sqlite_table(con, "FEATURE_MS1"):
raise click.ClickException("MS1-level feature table not present in file.")
if not glyco:
query = """
SELECT fm.*, f.RUN_ID, f.PRECURSOR_ID, f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE, p.DECOY,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM FEATURE_MS1 fm
INNER JOIN FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN PRECURSOR p ON f.PRECURSOR_ID = p.ID
ORDER BY f.RUN_ID, p.ID, f.EXP_RT
"""
else:
if not check_sqlite_table(con, "SCORE_MS2"):
raise click.ClickException(
"MS1-level scoring for glycoform inference requires prior MS2 or MS1MS2-level scoring. "
"Please run 'pyprophet score --level=ms2' or 'pyprophet score --level=ms1ms2' on this file first."
)
query = f"""
SELECT g.DECOY_PEPTIDE, g.DECOY_GLYCAN,
fm.*, f.*, p.*,
f.RUN_ID || '_' || f.PRECURSOR_ID AS GROUP_ID
FROM FEATURE_MS1 fm
INNER JOIN FEATURE f ON fm.FEATURE_ID = f.ID
INNER JOIN SCORE_MS2 s ON f.ID = s.FEATURE_ID
INNER JOIN PRECURSOR p ON f.PRECURSOR_ID = p.ID
INNER JOIN (
SELECT pgm.PRECURSOR_ID,
gp.DECOY_PEPTIDE,
gp.DECOY_GLYCAN
FROM PRECURSOR_GLYCOPEPTIDE_MAPPING pgm
INNER JOIN GLYCOPEPTIDE gp ON pgm.GLYCOPEPTIDE_ID = gp.ID
) g ON f.PRECURSOR_ID = g.PRECURSOR_ID
WHERE s.RANK <= {ipf_max_rank}
ORDER BY f.RUN_ID, p.ID, f.EXP_RT
"""
df = pd.read_sql_query(query, con)
return self._finalize_feature_table(df, rc.ss_main_score)
def _fetch_transition_features_sqlite(self, con):
rc = self.config.runner
if not check_sqlite_table(con, "SCORE_MS2"):
raise click.ClickException(
"Transition-level scoring for IPF requires prior MS2 or MS1MS2-level scoring. "
"Please run 'pyprophet score --level=ms2' or 'pyprophet score --level=ms1ms2' first."
)
if not check_sqlite_table(con, "FEATURE_TRANSITION"):
raise click.ClickException(
"Transition-level feature table not present in file."
)
query = f"""
SELECT
ft.*,
t.DECOY AS DECOY,
f.RUN_ID,
f.PRECURSOR_ID,
f.EXP_RT,
p.CHARGE AS PRECURSOR_CHARGE,
t.CHARGE AS PRODUCT_CHARGE,
f.RUN_ID || '_' || ft.FEATURE_ID || '_' || f.PRECURSOR_ID || '_' || ft.TRANSITION_ID AS GROUP_ID
FROM FEATURE_TRANSITION ft
INNER JOIN FEATURE f ON ft.FEATURE_ID = f.ID
INNER JOIN SCORE_MS2 s ON f.ID = s.FEATURE_ID
INNER JOIN PRECURSOR p ON f.PRECURSOR_ID = p.ID
INNER JOIN TRANSITION t ON ft.TRANSITION_ID = t.ID
WHERE s.RANK <= {rc.ipf_max_peakgroup_rank}
AND s.PEP <= {rc.ipf_max_peakgroup_pep}
AND ft.VAR_ISOTOPE_OVERLAP_SCORE <= {rc.ipf_max_transition_isotope_overlap}
AND ft.VAR_LOG_SN_SCORE > {rc.ipf_min_transition_sn}
AND p.DECOY = 0
ORDER BY f.RUN_ID, f.PRECURSOR_ID, f.EXP_RT, ft.TRANSITION_ID
"""
df = pd.read_sql_query(query, con)
return self._finalize_feature_table(df, self.config.runner.ss_main_score)
def _fetch_alignment_features_sqlite(self, con):
if not check_sqlite_table(con, "FEATURE_MS2_ALIGNMENT"):
raise click.ClickException(
"MS2-level feature alignment table not present in file."
)
query = """
SELECT
fa.ALIGNMENT_ID AS ALIGNMENT_ID, fa.RUN_ID,
fa.PRECURSOR_ID, fa.ALIGNED_FEATURE_ID AS FEATURE_ID,
fa.ALIGNED_RT, fa.LABEL AS DECOY,
fa.XCORR_COELUTION_TO_REFERENCE AS VAR_XCORR_COELUTION_TO_REFERENCE,
fa.XCORR_SHAPE_TO_REFERENCE AS VAR_XCORR_SHAPE,
fa.MI_TO_REFERENCE AS VAR_MI_TO_REFERENCE,
fa.XCORR_COELUTION_TO_ALL AS VAR_XCORR_COELUTION_TO_ALL,
fa.XCORR_SHAPE_TO_ALL AS VAR_XCORR_SHAPE_TO_ALL,
fa.MI_TO_ALL AS VAR_MI_TO_ALL,
fa.RETENTION_TIME_DEVIATION AS VAR_RETENTION_TIME_DEVIATION,
fa.PEAK_INTENSITY_RATIO AS VAR_PEAK_INTENSITY_RATIO,
fa.ALIGNED_FEATURE_ID || '_' || fa.PRECURSOR_ID AS GROUP_ID
FROM osw.FEATURE_MS2_ALIGNMENT fa
ORDER BY fa.RUN_ID, fa.PRECURSOR_ID, fa.REFERENCE_RT
"""
df = pd.read_sql_query(query, con)
df["DECOY"] = df["DECOY"].map({1: 0, -1: 1})
return self._finalize_feature_table(df, self.config.runner.ss_main_score)
[docs]
class OSWWriter(BaseOSWWriter):
"""
Class for writing OpenSWATH results to an OSW-sqlite based file.
Attributes:
infile (str): Input file path.
outfile (str): Output file path.
classifier (str): Classifier used for semi-supervised learning.
level (str): Level used in semi-supervised learning (e.g., 'ms1', 'ms2', 'ms1ms2', 'transition', 'alignment'), or context level used peptide/protein/gene inference (e.g., 'global', 'experiment-wide', 'run-specific').
glyco (bool): Flag indicating whether analysis is glycoform-specific.
Methods:
save_results(result, pi0): Save the results to the output file based on the module using this class.
save_weights(weights): Save the weights to the output file.
"""
[docs]
def __init__(self, config: RunnerIOConfig):
super().__init__(config)
[docs]
def save_results(self, result, pi0):
"""
Save the results to the output file based on the specified level and glyco flag.
Parameters:
- result: The result object containing scored tables.
- pi0: The pi0 value.
Returns:
None
"""
if self.infile != self.outfile:
copyfile(self.infile, self.outfile)
df = result.scored_tables
level = self.level
glyco = self.glyco
# Determine output table(s)
if glyco and level in ("ms2", "ms1ms2"):
tables = ["SCORE_MS2", "SCORE_MS2_PART_PEPTIDE", "SCORE_MS2_PART_GLYCAN"]
elif glyco and level == "ms1":
tables = ["SCORE_MS1", "SCORE_MS1_PART_PEPTIDE", "SCORE_MS1_PART_GLYCAN"]
else:
tables = {
"ms2": "SCORE_MS2",
"ms1ms2": "SCORE_MS2",
"ms1": "SCORE_MS1",
"transition": "SCORE_TRANSITION",
"alignment": "SCORE_ALIGNMENT",
}[level]
if isinstance(tables, str):
tables = [tables]
# Drop existing tables
with sqlite3.connect(self.config.outfile) as con:
cur = con.cursor()
for tbl in tables:
cur.execute(f"DROP TABLE IF EXISTS {tbl}")
con.commit()
# Prepare data for writing
if glyco and level in ("ms2", "ms1ms2"):
df_main = df[
[
"feature_id",
"d_score_combined",
"peak_group_rank",
"q_value",
"pep",
]
].copy()
if "h_score" in df.columns:
df_main["h_score"] = df["h_score"]
df_main["h0_score"] = df["h0_score"]
df_main.columns = [c.upper() for c in df_main.columns]
df_main = df_main.rename(
columns={"PEAK_GROUP_RANK": "RANK", "D_SCORE_COMBINED": "SCORE"}
)
df_main.to_sql("SCORE_MS2", con, index=False)
# Write peptide/glycan part scores
for part in ["peptide", "glycan"]:
df_part = df[
["feature_id", f"d_score_{part}", f"pep_{part}"]
].copy()
df_part.columns = ["FEATURE_ID", "SCORE", "PEP"]
df_part.to_sql(f"SCORE_MS2_PART_{part.upper()}", con, index=False)
elif glyco and level == "ms1":
df_main = df[
[
"feature_id",
"d_score_combined",
"peak_group_rank",
"q_value",
"pep",
]
].copy()
if "h_score" in df.columns:
df_main["h_score"] = df["h_score"]
df_main["h0_score"] = df["h0_score"]
df_main.columns = [c.upper() for c in df_main.columns]
df_main = df_main.rename(
columns={
"PEAK_GROUP_RANK": "RANK",
"D_SCORE_COMBINED": "SCORE",
"QVALUE": "Q_VALUE",
}
)
df_main.to_sql("SCORE_MS1", con, index=False)
for part in ["peptide", "glycan"]:
df_part = df[
["feature_id", f"d_score_{part}", f"pep_{part}"]
].copy()
df_part.columns = ["FEATURE_ID", "SCORE", "PEP"]
df_part.to_sql(f"SCORE_MS1_PART_{part.upper()}", con, index=False)
else:
# Regular MS1, MS2, transition, or alignment
table_name = tables[0]
score_df = self._prepare_score_dataframe(df, level, table_name + "_")
score_df.to_sql(table_name, con, index=False)
logger.success(f"{self.outfile} written.")
# Save report if statistics are present
self._write_pdf_report(result, pi0)
[docs]
def save_weights(self, weights):
"""
Save the weights to a SQLite database based on the classifier type.
If classifier is "LDA" or "SVM", weights are saved to PYPROPHET_WEIGHTS table.
If classifier is "XGBoost", weights are saved to PYPROPHET_XGB or GLYCOPEPTIDEPROPHET_XGB table based on glyco and level.
"""
if self.classifier in ("LDA", "SVM"):
weights["level"] = self.level
con = sqlite3.connect(self.outfile)
c = con.cursor()
if self.glyco and self.level in ["ms2", "ms1ms2"]:
c.execute(
'SELECT count(name) FROM sqlite_master WHERE type="table" AND name="GLYCOPEPTIDEPROPHET_WEIGHTS";'
)
if c.fetchone()[0] == 1:
c.execute(
'DELETE FROM GLYCOPEPTIDEPROPHET_WEIGHTS WHERE LEVEL =="%s"'
% self.level
)
else:
c.execute(
'SELECT count(name) FROM sqlite_master WHERE type="table" AND name="PYPROPHET_WEIGHTS";'
)
if c.fetchone()[0] == 1:
c.execute(
'DELETE FROM PYPROPHET_WEIGHTS WHERE LEVEL =="%s"' % self.level
)
c.close()
# print(weights)
weights.to_sql("PYPROPHET_WEIGHTS", con, index=False, if_exists="append")
con.commit()
elif self.classifier == "XGBoost":
con = sqlite3.connect(self.outfile)
c = con.cursor()
if self.glyco and self.level in ["ms2", "ms1ms2"]:
c.execute(
'SELECT count(name) FROM sqlite_master WHERE type="table" AND name="GLYCOPEPTIDEPROPHET_XGB";'
)
if c.fetchone()[0] == 1:
c.execute(
'DELETE FROM GLYCOPEPTIDEPROPHET_XGB WHERE LEVEL =="%s"'
% self.level
)
else:
c.execute(
"CREATE TABLE GLYCOPEPTIDEPROPHET_XGB (level TEXT, xgb BLOB)"
)
c.execute(
"INSERT INTO GLYCOPEPTIDEPROPHET_XGB VALUES(?, ?)",
[self.level, pickle.dumps(weights)],
)
else:
c.execute(
'SELECT count(name) FROM sqlite_master WHERE type="table" AND name="PYPROPHET_XGB";'
)
if c.fetchone()[0] == 1:
c.execute(
'DELETE FROM PYPROPHET_XGB WHERE LEVEL =="%s"' % self.level
)
else:
c.execute("CREATE TABLE PYPROPHET_XGB (level TEXT, xgb BLOB)")
c.execute(
"INSERT INTO PYPROPHET_XGB VALUES(?, ?)",
[self.level, pickle.dumps(weights)],
)
con.commit()
c.close()