From 39613f273042378a34ef36379475ab8bebd5aac0 Mon Sep 17 00:00:00 2001 From: islean Date: Mon, 17 Mar 2025 16:32:08 +0100 Subject: [PATCH 1/3] Write factory --- .../configurator/file_creators/config_file.py | 8 +- .../file_creators/sample_sheet/raredisease.py | 4 +- .../configurator/implementations/nextflow.py | 8 +- cg/services/analysis_starter/constants.py | 18 +++ cg/services/analysis_starter/factory.py | 125 ++++++++++++++++++ .../implementations/fastq_fetcher.py | 4 +- 6 files changed, 155 insertions(+), 12 deletions(-) create mode 100644 cg/services/analysis_starter/constants.py create mode 100644 cg/services/analysis_starter/factory.py diff --git a/cg/services/analysis_starter/configurator/file_creators/config_file.py b/cg/services/analysis_starter/configurator/file_creators/config_file.py index 41d142f823..9bc8437922 100644 --- a/cg/services/analysis_starter/configurator/file_creators/config_file.py +++ b/cg/services/analysis_starter/configurator/file_creators/config_file.py @@ -10,13 +10,13 @@ class NextflowConfigFileCreator: def __init__( - self, store: Store, platform: str, workflow_config_path: str, resources: str, account: str + self, account: str, platform: str, resources: str, store: Store, workflow_config_path: str ): - self.store = store + self.account = account self.platform = platform - self.workflow_config_path = workflow_config_path self.resources = resources - self.account = account + self.store = store + self.workflow_config_path = workflow_config_path @staticmethod def get_file_path(case_id: str, case_path: Path) -> Path: diff --git a/cg/services/analysis_starter/configurator/file_creators/sample_sheet/raredisease.py b/cg/services/analysis_starter/configurator/file_creators/sample_sheet/raredisease.py index 513675c7c4..43263ffcc3 100644 --- a/cg/services/analysis_starter/configurator/file_creators/sample_sheet/raredisease.py +++ b/cg/services/analysis_starter/configurator/file_creators/sample_sheet/raredisease.py @@ -25,10 +25,10 @@ class RarediseaseSampleSheetCreator(NextflowSampleSheetCreator): - def __init__(self, store: Store, housekeeper_api: HousekeeperAPI, lims: LimsAPI): - self.store = store + def __init__(self, housekeeper_api: HousekeeperAPI, lims: LimsAPI, store: Store): self.housekeeper_api = housekeeper_api self.lims = lims + self.store = store @staticmethod def get_file_path(case_id: str, case_path: Path) -> Path: diff --git a/cg/services/analysis_starter/configurator/implementations/nextflow.py b/cg/services/analysis_starter/configurator/implementations/nextflow.py index 46cb461111..c6102f7872 100644 --- a/cg/services/analysis_starter/configurator/implementations/nextflow.py +++ b/cg/services/analysis_starter/configurator/implementations/nextflow.py @@ -23,13 +23,13 @@ class NextflowConfigurator(Configurator): def __init__( self, - pipeline_config: CommonAppConfig, - store: Store, + config_file_creator: NextflowConfigFileCreator, housekeeper_api: HousekeeperAPI, lims: LimsAPI, - config_file_creator: NextflowConfigFileCreator, - sample_sheet_creator: NextflowSampleSheetCreator, params_file_creator: ParamsFileCreator, + pipeline_config: CommonAppConfig, + sample_sheet_creator: NextflowSampleSheetCreator, + store: Store, pipeline_extension: PipelineExtension = PipelineExtension(), ): self.root_dir: str = pipeline_config.root diff --git a/cg/services/analysis_starter/constants.py b/cg/services/analysis_starter/constants.py new file mode 100644 index 0000000000..c50106ad56 --- /dev/null +++ b/cg/services/analysis_starter/constants.py @@ -0,0 +1,18 @@ +from cg.constants import Workflow + +FASTQ_WORKFLOWS = [ + Workflow.BALSAMIC, + Workflow.BALSAMIC_PON, + Workflow.BALSAMIC_UMI, + Workflow.FLUFFY, + Workflow.MICROSALT, + Workflow.MIP_DNA, + Workflow.MIP_RNA, + Workflow.MUTANT, + Workflow.RAREDISEASE, + Workflow.RNAFUSION, + Workflow.TAXPROFILER, + Workflow.TOMTE, +] + +BAM_WORKFLOWS = [Workflow.NALLO] diff --git a/cg/services/analysis_starter/factory.py b/cg/services/analysis_starter/factory.py new file mode 100644 index 0000000000..a47645261c --- /dev/null +++ b/cg/services/analysis_starter/factory.py @@ -0,0 +1,125 @@ +from cg.constants import Workflow +from cg.constants.nextflow import NEXTFLOW_WORKFLOWS +from cg.meta.archive.archive import SpringArchiveAPI +from cg.models.cg_config import CGConfig, CommonAppConfig +from cg.services.analysis_starter.configurator.abstract_service import Configurator +from cg.services.analysis_starter.configurator.extensions.abstract import PipelineExtension +from cg.services.analysis_starter.configurator.extensions.raredisease import RarediseaseExtension +from cg.services.analysis_starter.configurator.file_creators.config_file import ( + NextflowConfigFileCreator, +) +from cg.services.analysis_starter.configurator.file_creators.gene_panel import GenePanelFileCreator +from cg.services.analysis_starter.configurator.file_creators.managed_variants import ( + ManagedVariantsFileCreator, +) +from cg.services.analysis_starter.configurator.file_creators.params_file.abstract import ( + ParamsFileCreator, +) +from cg.services.analysis_starter.configurator.file_creators.params_file.raredisease import ( + RarediseaseParamsFileCreator, +) +from cg.services.analysis_starter.configurator.file_creators.sample_sheet.raredisease import ( + RarediseaseSampleSheetCreator, +) +from cg.services.analysis_starter.configurator.implementations.nextflow import NextflowConfigurator +from cg.services.analysis_starter.constants import FASTQ_WORKFLOWS +from cg.services.analysis_starter.input_fetcher.implementations.fastq_fetcher import FastqFetcher +from cg.services.analysis_starter.input_fetcher.input_fetcher import InputFetcher +from cg.services.analysis_starter.service import AnalysisStarter +from cg.services.analysis_starter.submitters.seqera_platform.client import SeqeraPlatformClient +from cg.services.analysis_starter.submitters.seqera_platform.submitter import ( + SeqeraPlatformSubmitter, +) +from cg.services.analysis_starter.submitters.submitter import Submitter +from cg.store.store import Store + + +class AnalysisStarterFactory: + def __init__(self, cg_config: CGConfig): + self.cg_config = cg_config + self.store: Store = cg_config.status_db + + def get_analysis_starter(self, case_id: str) -> AnalysisStarter: + configurator: Configurator = self._get_configurator(case_id) + input_fetcher: InputFetcher = self._get_input_fetcher(case_id) + submitter: Submitter = self._get_submitter(case_id) + return AnalysisStarter( + configurator=configurator, input_fetcher=input_fetcher, submitter=submitter + ) + + def _get_configurator(self, case_id: str) -> Configurator: + workflow: Workflow = self.store.get_case_workflow(case_id) + if workflow in NEXTFLOW_WORKFLOWS: + pipeline_config: CommonAppConfig = self._get_pipeline_config(workflow) + config_file_creator = NextflowConfigFileCreator( + account=pipeline_config.account, + platform=pipeline_config.platform, + resources=pipeline_config.resources, + store=self.store, + workflow_config_path=pipeline_config.workflow_config_path, + ) + params_file_creator: ParamsFileCreator = RarediseaseParamsFileCreator( + lims=self.cg_config.lims_api, store=self.store, params=pipeline_config.params + ) + sample_sheet_creator: RarediseaseSampleSheetCreator = RarediseaseSampleSheetCreator( + housekeeper_api=self.cg_config.housekeeper_api, + lims=self.cg_config.lims_api, + store=self.store, + ) + return NextflowConfigurator( + config_file_creator=config_file_creator, + housekeeper_api=self.cg_config.housekeeper_api, + lims=self.cg_config.lims_api, + params_file_creator=params_file_creator, + pipeline_config=pipeline_config, + sample_sheet_creator=sample_sheet_creator, + store=self.store, + pipeline_extension=self._get_pipeline_extension(workflow), + ) + + def _get_pipeline_config(self, workflow: Workflow) -> CommonAppConfig: + return getattr(self.cg_config, workflow) + + def _get_pipeline_extension(self, workflow: Workflow) -> PipelineExtension: + gene_panel_creator: GenePanelFileCreator = self._get_gene_panel_file_creator() + managed_variants_creator: ManagedVariantsFileCreator = ( + self._get_managed_variants_file_creator() + ) + return RarediseaseExtension( + gene_panel_file_creator=gene_panel_creator, + managed_variants_file_creator=managed_variants_creator, + ) + + def _get_gene_panel_file_creator(self) -> GenePanelFileCreator: + return GenePanelFileCreator(scout_api=self.cg_config.scout_api, store=self.store) + + def _get_managed_variants_file_creator(self) -> ManagedVariantsFileCreator: + return ManagedVariantsFileCreator(scout_api=self.cg_config.scout_api, store=self.store) + + def _get_input_fetcher(self, case_id: str) -> InputFetcher: + workflow: Workflow = self.store.get_case_workflow(case_id) + if workflow in FASTQ_WORKFLOWS: + spring_archive_api = SpringArchiveAPI( + status_db=self.cg_config.status_db, + housekeeper_api=self.cg_config.housekeeper_api, + data_flow_config=self.cg_config.data_flow, + ) + return FastqFetcher( + compress_api=self.cg_config.meta_apis["compress_api"], + housekeeper_api=self.cg_config.housekeeper_api, + spring_archive_api=spring_archive_api, + status_db=self.store, + ) + return InputFetcher() + + def _get_submitter(self, case_id: str) -> Submitter: + workflow: Workflow = self.store.get_case_workflow(case_id) + if workflow in NEXTFLOW_WORKFLOWS: + seqera_platform_client: SeqeraPlatformClient = self._get_seqera_platform_client() + return SeqeraPlatformSubmitter( + client=seqera_platform_client, + compute_environment_ids=self.cg_config.seqera_platform.compute_environmentsCan, + ) + + def _get_seqera_platform_client(self) -> SeqeraPlatformClient: + return SeqeraPlatformClient(self.cg_config.seqera_platform) diff --git a/cg/services/analysis_starter/input_fetcher/implementations/fastq_fetcher.py b/cg/services/analysis_starter/input_fetcher/implementations/fastq_fetcher.py index 96e5c1b9e0..419901dc5a 100644 --- a/cg/services/analysis_starter/input_fetcher/implementations/fastq_fetcher.py +++ b/cg/services/analysis_starter/input_fetcher/implementations/fastq_fetcher.py @@ -21,10 +21,10 @@ class FastqFetcher(InputFetcher): def __init__( self, - status_db: Store, - housekeeper_api: HousekeeperAPI, compress_api: CompressAPI, + housekeeper_api: HousekeeperAPI, spring_archive_api: SpringArchiveAPI, + status_db: Store, ): self.status_db = status_db self.housekeeper_api = housekeeper_api From 2ff5758a7119583151763846d00012a8797e5fe3 Mon Sep 17 00:00:00 2001 From: islean Date: Mon, 17 Mar 2025 16:36:00 +0100 Subject: [PATCH 2/3] Fix typo --- cg/services/analysis_starter/factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cg/services/analysis_starter/factory.py b/cg/services/analysis_starter/factory.py index a47645261c..76dea2f968 100644 --- a/cg/services/analysis_starter/factory.py +++ b/cg/services/analysis_starter/factory.py @@ -118,7 +118,7 @@ def _get_submitter(self, case_id: str) -> Submitter: seqera_platform_client: SeqeraPlatformClient = self._get_seqera_platform_client() return SeqeraPlatformSubmitter( client=seqera_platform_client, - compute_environment_ids=self.cg_config.seqera_platform.compute_environmentsCan, + compute_environment_ids=self.cg_config.seqera_platform.compute_environments, ) def _get_seqera_platform_client(self) -> SeqeraPlatformClient: From fd772e0f89d5d4307c5f768952ab6618f8ced52a Mon Sep 17 00:00:00 2001 From: islean Date: Tue, 18 Mar 2025 14:51:43 +0100 Subject: [PATCH 3/3] Initial refactoring --- cg/services/analysis_starter/factory.py | 102 ++++++++++++++---------- 1 file changed, 62 insertions(+), 40 deletions(-) diff --git a/cg/services/analysis_starter/factory.py b/cg/services/analysis_starter/factory.py index 76dea2f968..d0afeeb7f7 100644 --- a/cg/services/analysis_starter/factory.py +++ b/cg/services/analysis_starter/factory.py @@ -1,3 +1,6 @@ +from cg.apps.housekeeper.hk import HousekeeperAPI +from cg.apps.lims import LimsAPI +from cg.apps.scout.scoutapi import ScoutAPI from cg.constants import Workflow from cg.constants.nextflow import NEXTFLOW_WORKFLOWS from cg.meta.archive.archive import SpringArchiveAPI @@ -18,6 +21,9 @@ from cg.services.analysis_starter.configurator.file_creators.params_file.raredisease import ( RarediseaseParamsFileCreator, ) +from cg.services.analysis_starter.configurator.file_creators.sample_sheet.abstract import ( + NextflowSampleSheetCreator, +) from cg.services.analysis_starter.configurator.file_creators.sample_sheet.raredisease import ( RarediseaseSampleSheetCreator, ) @@ -37,83 +43,99 @@ class AnalysisStarterFactory: def __init__(self, cg_config: CGConfig): self.cg_config = cg_config + self.housekeeper_api: HousekeeperAPI = cg_config.housekeeper_api + self.lims_api: LimsAPI = cg_config.lims_api + self.scout_api: ScoutAPI = cg_config.scout_api self.store: Store = cg_config.status_db def get_analysis_starter(self, case_id: str) -> AnalysisStarter: - configurator: Configurator = self._get_configurator(case_id) - input_fetcher: InputFetcher = self._get_input_fetcher(case_id) - submitter: Submitter = self._get_submitter(case_id) + workflow: Workflow = self.store.get_case_workflow(case_id) + configurator: Configurator = self._get_configurator(workflow) + input_fetcher: InputFetcher = self._get_input_fetcher(workflow) + submitter: Submitter = self._get_submitter(workflow) return AnalysisStarter( configurator=configurator, input_fetcher=input_fetcher, submitter=submitter ) - def _get_configurator(self, case_id: str) -> Configurator: - workflow: Workflow = self.store.get_case_workflow(case_id) + def _get_configurator(self, workflow: Workflow) -> Configurator: if workflow in NEXTFLOW_WORKFLOWS: - pipeline_config: CommonAppConfig = self._get_pipeline_config(workflow) - config_file_creator = NextflowConfigFileCreator( - account=pipeline_config.account, - platform=pipeline_config.platform, - resources=pipeline_config.resources, - store=self.store, - workflow_config_path=pipeline_config.workflow_config_path, - ) - params_file_creator: ParamsFileCreator = RarediseaseParamsFileCreator( - lims=self.cg_config.lims_api, store=self.store, params=pipeline_config.params - ) - sample_sheet_creator: RarediseaseSampleSheetCreator = RarediseaseSampleSheetCreator( - housekeeper_api=self.cg_config.housekeeper_api, - lims=self.cg_config.lims_api, - store=self.store, + config_file_creator = self._get_config_file_creator(workflow) + params_file_creator: ParamsFileCreator = self._get_params_file_creator(workflow) + sample_sheet_creator: NextflowSampleSheetCreator = self._get_sample_sheet_creator( + workflow ) return NextflowConfigurator( config_file_creator=config_file_creator, - housekeeper_api=self.cg_config.housekeeper_api, - lims=self.cg_config.lims_api, + housekeeper_api=self.housekeeper_api, + lims=self.lims_api, params_file_creator=params_file_creator, - pipeline_config=pipeline_config, + pipeline_config=self._get_pipeline_config(workflow), sample_sheet_creator=sample_sheet_creator, store=self.store, pipeline_extension=self._get_pipeline_extension(workflow), ) + def _get_config_file_creator(self, workflow: Workflow) -> NextflowConfigFileCreator: + pipeline_config: CommonAppConfig = self._get_pipeline_config(workflow) + return NextflowConfigFileCreator( + account=pipeline_config.account, + platform=pipeline_config.platform, + resources=pipeline_config.resources, + store=self.store, + workflow_config_path=pipeline_config.workflow_config_path, + ) + + def _get_params_file_creator(self, workflow: Workflow) -> ParamsFileCreator: + if workflow == Workflow.RAREDISEASE: + pipeline_config: CommonAppConfig = self._get_pipeline_config(workflow) + return RarediseaseParamsFileCreator( + lims=self.lims_api, store=self.store, params=pipeline_config.params + ) + + def _get_sample_sheet_creator(self, workflow: Workflow) -> NextflowSampleSheetCreator: + if workflow == Workflow.RAREDISEASE: + return RarediseaseSampleSheetCreator( + housekeeper_api=self.cg_config.housekeeper_api, + lims=self.cg_config.lims_api, + store=self.store, + ) + def _get_pipeline_config(self, workflow: Workflow) -> CommonAppConfig: return getattr(self.cg_config, workflow) def _get_pipeline_extension(self, workflow: Workflow) -> PipelineExtension: - gene_panel_creator: GenePanelFileCreator = self._get_gene_panel_file_creator() - managed_variants_creator: ManagedVariantsFileCreator = ( - self._get_managed_variants_file_creator() - ) - return RarediseaseExtension( - gene_panel_file_creator=gene_panel_creator, - managed_variants_file_creator=managed_variants_creator, - ) + if workflow == Workflow.RAREDISEASE: + gene_panel_creator: GenePanelFileCreator = self._get_gene_panel_file_creator() + managed_variants_creator: ManagedVariantsFileCreator = ( + self._get_managed_variants_file_creator() + ) + return RarediseaseExtension( + gene_panel_file_creator=gene_panel_creator, + managed_variants_file_creator=managed_variants_creator, + ) def _get_gene_panel_file_creator(self) -> GenePanelFileCreator: - return GenePanelFileCreator(scout_api=self.cg_config.scout_api, store=self.store) + return GenePanelFileCreator(scout_api=self.scout_api, store=self.store) def _get_managed_variants_file_creator(self) -> ManagedVariantsFileCreator: - return ManagedVariantsFileCreator(scout_api=self.cg_config.scout_api, store=self.store) + return ManagedVariantsFileCreator(scout_api=self.scout_api, store=self.store) - def _get_input_fetcher(self, case_id: str) -> InputFetcher: - workflow: Workflow = self.store.get_case_workflow(case_id) + def _get_input_fetcher(self, workflow: Workflow) -> InputFetcher: if workflow in FASTQ_WORKFLOWS: spring_archive_api = SpringArchiveAPI( - status_db=self.cg_config.status_db, - housekeeper_api=self.cg_config.housekeeper_api, + status_db=self.store, + housekeeper_api=self.housekeeper_api, data_flow_config=self.cg_config.data_flow, ) return FastqFetcher( compress_api=self.cg_config.meta_apis["compress_api"], - housekeeper_api=self.cg_config.housekeeper_api, + housekeeper_api=self.housekeeper_api, spring_archive_api=spring_archive_api, status_db=self.store, ) return InputFetcher() - def _get_submitter(self, case_id: str) -> Submitter: - workflow: Workflow = self.store.get_case_workflow(case_id) + def _get_submitter(self, workflow: Workflow) -> Submitter: if workflow in NEXTFLOW_WORKFLOWS: seqera_platform_client: SeqeraPlatformClient = self._get_seqera_platform_client() return SeqeraPlatformSubmitter(