Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raredisease configurator - Create services for file creation #4260

Merged
merged 44 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4c75f39
add code for the params file
diitaz93 Feb 28, 2025
683ea3c
add creation of file in create_cpnfig
diitaz93 Feb 28, 2025
4b6665d
fix 2 out of 3 failing tests
diitaz93 Feb 28, 2025
6c1c8aa
add also sample sheet code to configurator
diitaz93 Feb 28, 2025
9706056
fix fixture
diitaz93 Feb 28, 2025
c189d12
add skeleton of new classes
diitaz93 Mar 3, 2025
82e2898
Merge branch 'dev-start-pipelines' into config-separate-services
diitaz93 Mar 3, 2025
5854181
resolve conflicts
diitaz93 Mar 3, 2025
baf2220
stash commit
diitaz93 Mar 3, 2025
e8308fa
enhance sample sheet and config
diitaz93 Mar 3, 2025
8bc008d
unify classes
diitaz93 Mar 4, 2025
cc83172
stash commit
diitaz93 Mar 4, 2025
1cc90cf
remove out of scope change
diitaz93 Mar 4, 2025
1e4830a
created a nextflow level of inheritance in configurator
diitaz93 Mar 5, 2025
c5ec339
add constants for gene files
diitaz93 Mar 5, 2025
f447e20
removed duplicated utils method
diitaz93 Mar 5, 2025
215ced4
renamed write function
diitaz93 Mar 5, 2025
e6fd37c
rolled back small fix for scope reasons
diitaz93 Mar 5, 2025
4f82eac
inject content creators
diitaz93 Mar 6, 2025
edb4adb
add gene panel creator
diitaz93 Mar 6, 2025
77db7d1
some renaming
diitaz93 Mar 6, 2025
945733a
add sample sheet creator fixture
diitaz93 Mar 6, 2025
b06cd5a
add params file creator fixture
diitaz93 Mar 6, 2025
5caddaf
add other file creator fixtures
diitaz93 Mar 6, 2025
1dde474
add fixtures in plugins to conftest
diitaz93 Mar 6, 2025
9ccef27
fix constructors and fixtures
diitaz93 Mar 6, 2025
e92ab44
Add managed_variants support (#4265) (patch)
islean Mar 6, 2025
52878f3
add first tests for content creation
diitaz93 Mar 6, 2025
cedfd59
Rework raredisease extension (#4266)
islean Mar 6, 2025
2932e07
remove inheritance of nextflow configurator (#4267)
diitaz93 Mar 7, 2025
890e9ce
fi sample sheet fixture (#4270)
diitaz93 Mar 7, 2025
229d4df
Implement Raredisease Extension (#4271)
islean Mar 7, 2025
e4d88c4
move raredisease models to the analysis starter directory
diitaz93 Mar 7, 2025
5d72e1c
rename content creator files
diitaz93 Mar 7, 2025
aab06db
add tests for managed variants
diitaz93 Mar 7, 2025
be00ee0
add tests for gene panel
diitaz93 Mar 7, 2025
b7a4932
fix sample sheet creator
diitaz93 Mar 7, 2025
5a057e6
fix creator
diitaz93 Mar 7, 2025
2ceaa73
fix sample sheet test
diitaz93 Mar 7, 2025
7717213
Address comments
islean Mar 10, 2025
e8971a5
Remove TODO
islean Mar 10, 2025
4eedb27
Fix typo
islean Mar 10, 2025
a926e82
Make interface
islean Mar 10, 2025
cdab38c
Revert interface
islean Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cg/constants/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@ class NfTowerStatus(StrEnum):
UNKNOWN: str = "UNKNOWN"


class NextflowFileType(StrEnum):
PARAMS = "params_file"
SAMPLE_SHEET = "sample_sheet"
CONFIG = "nextflow_config"
GENE_PANEL = "gene_panel"
MANAGED_VARIANTS = "managed_variants"


NALLO_METRIC_CONDITIONS: dict[str, dict[str, Any]] = {
"median_coverage": {"norm": "gt", "threshold": 25},
}
Expand Down
2 changes: 1 addition & 1 deletion cg/meta/workflow/nf_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,8 @@ def create_params_file(self, case_id: str, dry_run: bool) -> None:
)

def replace_values_in_params_file(self, workflow_parameters: dict) -> dict:
replaced_workflow_parameters = copy.deepcopy(workflow_parameters)
"""Iterate through the dictionary until all placeholders are replaced with the corresponding value from the dictionary"""
replaced_workflow_parameters = copy.deepcopy(workflow_parameters)
while True:
resolved: bool = True
for key, value in replaced_workflow_parameters.items():
Expand Down
7 changes: 4 additions & 3 deletions cg/models/raredisease/raredisease.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class RarediseaseQCMetrics(QCMetrics):
total_reads: int


# TODO: Move these to models folder in appropriate service
class RarediseaseSampleSheetEntry(NextflowSampleSheetEntry):
"""Raredisease sample model is used when building the sample sheet."""

Expand All @@ -31,15 +32,15 @@ def reformat_sample_content(self) -> list[list[str]]:
[
self.name,
lane + 1,
self.fastq_forward_read_paths,
self.fastq_reverse_read_paths,
forward_path,
reverse_path,
self.sex,
self.phenotype,
self.paternal_id,
self.maternal_id,
self.case_id,
]
for lane, (self.fastq_forward_read_paths, self.fastq_reverse_read_paths) in enumerate(
for lane, (forward_path, reverse_path) in enumerate(
zip(self.fastq_forward_read_paths, self.fastq_reverse_read_paths)
)
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from abc import ABC

from cg.services.analysis_starter.configurator.abstract_model import CaseConfig


class Configurator(ABC):

def create_config(self, case_id: str, dry_run: bool = False):
def create_config(self, case_id: str) -> CaseConfig:
"""Abstract method to create a case config for a case."""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from pathlib import Path


class FileContentCreator(ABC):

@abstractmethod
def create(self, case_path: Path) -> any:
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from cg.io.txt import concat_txt
from cg.services.analysis_starter.configurator.file_creators.abstract import FileContentCreator
from cg.store.models import Case
from cg.store.store import Store


class NextflowConfigFileContentCreator(FileContentCreator):

def __init__(
self, store: Store, platform: str, workflow_config_path: str, resources: str, account: str
):
self.store = store
self.platform = platform
self.workflow_config_path = workflow_config_path
self.resources = resources
self.account = account

def create(self, case_id: str) -> str:
"""Get the content of the nextflow config file."""
config_files_list: list[str] = [
self.platform,
self.workflow_config_path,
self.resources,
]
case_specific_params: list[str] = [
self._get_cluster_options(case_id=case_id),
]
return concat_txt(
file_paths=config_files_list,
str_content=case_specific_params,
)

def _get_cluster_options(self, case_id: str) -> str:
case: Case = self.store.get_case_by_internal_id(case_id)
return f'process.clusterOptions = "-A {self.account} --qos={case.slurm_priority}"\n'
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from pathlib import Path

from cg.apps.lims import LimsAPI
from cg.constants import DEFAULT_CAPTURE_KIT
from cg.constants.nf_analysis import NextflowFileType
from cg.constants.scout import ScoutExportFileName
from cg.constants.tb import AnalysisType
from cg.exc import CgDataError
from cg.io.yaml import read_yaml
from cg.models.raredisease.raredisease import RarediseaseParameters
from cg.services.analysis_starter.configurator.file_creators.abstract import FileContentCreator
from cg.services.analysis_starter.configurator.file_creators.params.utils import (
replace_values_in_params_file,
)
from cg.services.analysis_starter.configurator.file_creators.utils import (
get_case_id_from_path,
get_file_path,
)
from cg.store.models import BedVersion, Case, Sample
from cg.store.store import Store


class RarediseaseParamsFileContentCreator(FileContentCreator):

def __init__(self, store: Store, lims: LimsAPI, params: str):
self.store = store
self.lims = lims
self.params = params

def create(self, case_path: Path) -> dict:
"""Create parameters file for a case."""
case_workflow_parameters: dict = self._get_case_parameters(case_path).model_dump()
workflow_parameters: any = read_yaml(self.params)
parameters: dict = case_workflow_parameters | workflow_parameters
curated_parameters: dict = replace_values_in_params_file(parameters)
return curated_parameters

def _get_case_parameters(self, case_path: Path) -> RarediseaseParameters:
"""Return case-specific parameters for the analysis."""
case_id: str = get_case_id_from_path(case_path=case_path)
sample_sheet_path: Path = get_file_path(
case_path=case_path, file_type=NextflowFileType.SAMPLE_SHEET
)
analysis_type: str = self._get_data_analysis_type(case_id=case_id)
target_bed_file: str = self._get_target_bed(case_id=case_id, analysis_type=analysis_type)
skip_germlinecnvcaller: bool = self._get_germlinecnvcaller_flag(analysis_type=analysis_type)
return RarediseaseParameters(
input=sample_sheet_path,
outdir=case_path,
analysis_type=analysis_type,
target_bed_file=target_bed_file,
save_mapped_as_cram=True,
skip_germlinecnvcaller=skip_germlinecnvcaller,
vcfanno_extra_resources=f"{case_path}/{ScoutExportFileName.MANAGED_VARIANTS}",
vep_filters_scout_fmt=f"{case_path}/{ScoutExportFileName.PANELS}",
)

def _get_data_analysis_type(self, case_id: str) -> str:
"""
Return case analysis type (WEG, WGS, WTS or TGS). Assumes all case samples have the same
analysis type.
"""
sample: Sample = self.store.get_samples_by_case_id(case_id=case_id)[0]
return sample.application_version.application.analysis_type

def _get_target_bed(self, case_id: str, analysis_type: str) -> str:
"""
Return the target bed file from LIMS or use default capture kit for WHOLE_GENOME_SEQUENCING.
"""
target_bed_file: str = self._get_target_bed_from_lims(case_id=case_id)
if not target_bed_file:
if analysis_type == AnalysisType.WGS:
return DEFAULT_CAPTURE_KIT
raise ValueError("No capture kit was found in LIMS")
return target_bed_file

def _get_target_bed_from_lims(self, case_id: str) -> str | None:
"""
Get target bed filename from LIMS.
Raises:
CgDataError: if the bed target capture version is not found in StatusDB.
"""
case: Case = self.store.get_case_by_internal_id(internal_id=case_id)
sample: Sample = case.links[0].sample
if sample.from_sample:
sample: Sample = self.store.get_sample_by_internal_id(internal_id=sample.from_sample)
target_bed_shortname: str | None = self.lims.capture_kit(lims_id=sample.internal_id)
if not target_bed_shortname:
return None
bed_version: BedVersion | None = self.store.get_bed_version_by_short_name(
bed_version_short_name=target_bed_shortname
)
if not bed_version:
raise CgDataError(f"Bed-version {target_bed_shortname} does not exist")
return bed_version.filename

@staticmethod
def _get_germlinecnvcaller_flag(analysis_type: str) -> bool:
"""Return True if the germlinecnvcaller should be skipped."""
return True if analysis_type == AnalysisType.WGS else False
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import copy
import re


def replace_values_in_params_file(workflow_parameters: dict) -> dict:
"""
Iterate through the dictionary until all placeholders are replaced with the corresponding value
from the dictionary
"""
replaced_workflow_parameters = copy.deepcopy(workflow_parameters)
while True:
resolved: bool = True
for key, value in replaced_workflow_parameters.items():
new_value: str | int = _replace_params_placeholders(value, workflow_parameters)
if new_value != value:
resolved = False
replaced_workflow_parameters[key] = new_value
if resolved:
break
return replaced_workflow_parameters


def _replace_params_placeholders(value: str | int, workflow_parameters: dict) -> str:
"""Replace values marked as placeholders with values from the given dictionary"""
if isinstance(value, str):
placeholders: list[str] = re.findall(r"{{\s*([^{}\s]+)\s*}}", value)
for placeholder in placeholders:
if placeholder in workflow_parameters:
value = value.replace(
f"{{{{{placeholder}}}}}", str(workflow_parameters[placeholder])
)
return value
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import logging
import re
from pathlib import Path

from cg.apps.housekeeper.hk import HousekeeperAPI
from cg.apps.lims import LimsAPI
from cg.constants import SequencingFileTag
from cg.constants.subject import PlinkPhenotypeStatus, PlinkSex
from cg.io.gzip import read_gzip_first_line
from cg.meta.workflow.fastq import _is_undetermined_in_path
from cg.models.fastq import FastqFileMeta, GetFastqFileMeta
from cg.models.raredisease.raredisease import (
RarediseaseSampleSheetEntry,
RarediseaseSampleSheetHeaders,
)
from cg.services.analysis_starter.configurator.file_creators.abstract import FileContentCreator
from cg.services.analysis_starter.configurator.file_creators.utils import get_case_id_from_path
from cg.store.models import Case, CaseSample, Sample
from cg.store.store import Store

LOG = logging.getLogger(__name__)


class RarediseaseSampleSheetContentCreator(FileContentCreator):

def __init__(self, store: Store, housekeeper_api: HousekeeperAPI, lims: LimsAPI):
self.store = store
self.housekeeper_api = housekeeper_api
self.lims = lims

def create(self, case_path: Path) -> any:
"""Return formatted information required to build a sample sheet for a case.
This contains information for all samples linked to the case."""
case_id: str = get_case_id_from_path(case_path=case_path)
case: Case = self.store.get_case_by_internal_id(internal_id=case_id)
sample_sheet_content: list[list[str]] = [RarediseaseSampleSheetHeaders.list()]
for link in case.links:
sample_sheet_content.extend(self._get_sample_sheet_content_per_sample(case_sample=link))
return sample_sheet_content

def _get_sample_sheet_content_per_sample(self, case_sample: CaseSample) -> list[list[str]]:
"""Collect and format information required to build a sample sheet for a single sample."""
fastq_forward_read_paths, fastq_reverse_read_paths = self._get_paired_read_paths(
sample=case_sample.sample
)
sample_sheet_entry = RarediseaseSampleSheetEntry(
name=case_sample.sample.internal_id,
fastq_forward_read_paths=fastq_forward_read_paths,
fastq_reverse_read_paths=fastq_reverse_read_paths,
sex=self._get_sex_code(case_sample.sample.sex),
phenotype=self._get_phenotype_code(case_sample.status),
paternal_id=case_sample.get_paternal_sample_id,
maternal_id=case_sample.get_maternal_sample_id,
case_id=case_sample.case.internal_id,
)
return sample_sheet_entry.reformat_sample_content

def _get_paired_read_paths(self, sample: Sample) -> tuple[list[str], list[str]]:
"""Returns a tuple of paired fastq file paths for the forward and reverse read."""
sample_metadata: list[FastqFileMeta] = self._get_fastq_metadata_for_sample(sample)
fastq_forward_read_paths: list[str] = self._extract_read_files(
metadata=sample_metadata, forward_read=True
)
fastq_reverse_read_paths: list[str] = self._extract_read_files(
metadata=sample_metadata, reverse_read=True
)
return fastq_forward_read_paths, fastq_reverse_read_paths

def _get_fastq_metadata_for_sample(self, sample: Sample) -> list[FastqFileMeta]:
"""Return FASTQ metadata objects for all fastq files linked to a sample."""
return [
self._parse_fastq_data(hk_file.full_path)
for hk_file in self.housekeeper_api.files(
bundle=sample.internal_id, tags={SequencingFileTag.FASTQ}
)
]

def _parse_fastq_data(self, fastq_path: Path) -> FastqFileMeta:
header_line: str = read_gzip_first_line(file_path=fastq_path)
fastq_file_meta: FastqFileMeta = self._parse_fastq_header(header_line)
fastq_file_meta.path = fastq_path
fastq_file_meta.undetermined = _is_undetermined_in_path(fastq_path)
matches = re.findall(r"-l[1-9]t([1-9]{2})_", str(fastq_path))
if len(matches) > 0:
fastq_file_meta.flow_cell_id = f"{fastq_file_meta.flow_cell_id}-{matches[0]}"
return fastq_file_meta

@staticmethod
def _parse_fastq_header(line: str) -> FastqFileMeta | None:
"""Parse and return fastq header metadata.
Handle Illumina's two different header formats
@see https://en.wikipedia.org/wiki/FASTQ_format
Raise:
TypeError if unable to split line into expected parts.
"""
parts = line.split(":")
try:
return GetFastqFileMeta.header_format.get(len(parts))(parts=parts)
except TypeError as exception:
LOG.error(f"Could not parse header format for header: {line}")
raise exception

@staticmethod
def _extract_read_files(
metadata: list[FastqFileMeta], forward_read: bool = False, reverse_read: bool = False
) -> list[str]:
"""Extract a list of fastq file paths for either forward or reverse reads."""
if forward_read and not reverse_read:
read_direction = 1
elif reverse_read and not forward_read:
read_direction = 2
else:
raise ValueError("Either forward or reverse needs to be specified")
sorted_metadata: list = sorted(metadata, key=lambda k: k.path)
return [
fastq_file.path
for fastq_file in sorted_metadata
if fastq_file.read_direction == read_direction
]

@staticmethod
def _get_phenotype_code(phenotype: str) -> int:
"""Return Raredisease phenotype code."""
try:
code = PlinkPhenotypeStatus[phenotype.upper()]
except KeyError:
raise ValueError(f"{phenotype} is not a valid phenotype")
return code

@staticmethod
def _get_sex_code(sex: str) -> PlinkSex:
"""Return Raredisease sex code."""
try:
code = PlinkSex[sex.upper()]
except KeyError:
raise ValueError(f"{sex} is not a valid sex")
return code
Loading
Loading