Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Pipeline Integration) Add starter factory #4295

Draft
wants to merge 3 commits into
base: dev-start-pipelines
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
class NextflowConfigFileCreator:

def __init__(
self, store: Store, platform: str, workflow_config_path: str, resources: str, account: str
self, account: str, platform: str, resources: str, store: Store, workflow_config_path: str
):
self.store = store
self.account = account
self.platform = platform
self.workflow_config_path = workflow_config_path
self.resources = resources
self.account = account
self.store = store
self.workflow_config_path = workflow_config_path

@staticmethod
def get_file_path(case_id: str, case_path: Path) -> Path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

class RarediseaseSampleSheetCreator(NextflowSampleSheetCreator):

def __init__(self, store: Store, housekeeper_api: HousekeeperAPI, lims: LimsAPI):
self.store = store
def __init__(self, housekeeper_api: HousekeeperAPI, lims: LimsAPI, store: Store):
self.housekeeper_api = housekeeper_api
self.lims = lims
self.store = store

@staticmethod
def get_file_path(case_id: str, case_path: Path) -> Path:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ class NextflowConfigurator(Configurator):

def __init__(
self,
pipeline_config: CommonAppConfig,
store: Store,
config_file_creator: NextflowConfigFileCreator,
housekeeper_api: HousekeeperAPI,
lims: LimsAPI,
config_file_creator: NextflowConfigFileCreator,
sample_sheet_creator: NextflowSampleSheetCreator,
params_file_creator: ParamsFileCreator,
pipeline_config: CommonAppConfig,
sample_sheet_creator: NextflowSampleSheetCreator,
store: Store,
pipeline_extension: PipelineExtension = PipelineExtension(),
):
self.root_dir: str = pipeline_config.root
Expand Down
18 changes: 18 additions & 0 deletions cg/services/analysis_starter/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from cg.constants import Workflow

FASTQ_WORKFLOWS = [
Workflow.BALSAMIC,
Workflow.BALSAMIC_PON,
Workflow.BALSAMIC_UMI,
Workflow.FLUFFY,
Workflow.MICROSALT,
Workflow.MIP_DNA,
Workflow.MIP_RNA,
Workflow.MUTANT,
Workflow.RAREDISEASE,
Workflow.RNAFUSION,
Workflow.TAXPROFILER,
Workflow.TOMTE,
]

BAM_WORKFLOWS = [Workflow.NALLO]
125 changes: 125 additions & 0 deletions cg/services/analysis_starter/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from cg.constants import Workflow
from cg.constants.nextflow import NEXTFLOW_WORKFLOWS
from cg.meta.archive.archive import SpringArchiveAPI
from cg.models.cg_config import CGConfig, CommonAppConfig
from cg.services.analysis_starter.configurator.abstract_service import Configurator
from cg.services.analysis_starter.configurator.extensions.abstract import PipelineExtension
from cg.services.analysis_starter.configurator.extensions.raredisease import RarediseaseExtension
from cg.services.analysis_starter.configurator.file_creators.config_file import (
NextflowConfigFileCreator,
)
from cg.services.analysis_starter.configurator.file_creators.gene_panel import GenePanelFileCreator
from cg.services.analysis_starter.configurator.file_creators.managed_variants import (
ManagedVariantsFileCreator,
)
from cg.services.analysis_starter.configurator.file_creators.params_file.abstract import (
ParamsFileCreator,
)
from cg.services.analysis_starter.configurator.file_creators.params_file.raredisease import (
RarediseaseParamsFileCreator,
)
from cg.services.analysis_starter.configurator.file_creators.sample_sheet.raredisease import (
RarediseaseSampleSheetCreator,
)
from cg.services.analysis_starter.configurator.implementations.nextflow import NextflowConfigurator
from cg.services.analysis_starter.constants import FASTQ_WORKFLOWS
from cg.services.analysis_starter.input_fetcher.implementations.fastq_fetcher import FastqFetcher
from cg.services.analysis_starter.input_fetcher.input_fetcher import InputFetcher
from cg.services.analysis_starter.service import AnalysisStarter
from cg.services.analysis_starter.submitters.seqera_platform.client import SeqeraPlatformClient
from cg.services.analysis_starter.submitters.seqera_platform.submitter import (
SeqeraPlatformSubmitter,
)
from cg.services.analysis_starter.submitters.submitter import Submitter
from cg.store.store import Store


class AnalysisStarterFactory:
def __init__(self, cg_config: CGConfig):
self.cg_config = cg_config
self.store: Store = cg_config.status_db

def get_analysis_starter(self, case_id: str) -> AnalysisStarter:
configurator: Configurator = self._get_configurator(case_id)
input_fetcher: InputFetcher = self._get_input_fetcher(case_id)
submitter: Submitter = self._get_submitter(case_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These all take the case_id as input but they all just depend on the workflow. And some of the smaller functions take workflow as input. Do you think it makes sense to just pass the workflow here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking initially was that the input fetcher depends on the delivery type, but then I remembered that most delivery types are not only fastq or bam, so at this point, yeah - we should probably pass the workflow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is more information required than just the workflow it makes sense, if it turns out later you need more information than case_id will allow to fetch more information of course. I'd wait with changing it until it is more final.

return AnalysisStarter(
configurator=configurator, input_fetcher=input_fetcher, submitter=submitter
)

def _get_configurator(self, case_id: str) -> Configurator:
workflow: Workflow = self.store.get_case_workflow(case_id)
if workflow in NEXTFLOW_WORKFLOWS:
pipeline_config: CommonAppConfig = self._get_pipeline_config(workflow)
config_file_creator = NextflowConfigFileCreator(
account=pipeline_config.account,
platform=pipeline_config.platform,
resources=pipeline_config.resources,
store=self.store,
workflow_config_path=pipeline_config.workflow_config_path,
)
params_file_creator: ParamsFileCreator = RarediseaseParamsFileCreator(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These likely needs it own function as soon as there are more than the "RareDiseaseParamsFileCreator"

lims=self.cg_config.lims_api, store=self.store, params=pipeline_config.params
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are the params here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is what the devbio have named the pipeline parameters file path that you can find in the cg.yaml

)
sample_sheet_creator: RarediseaseSampleSheetCreator = RarediseaseSampleSheetCreator(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clearly these should be initiated in a dynamic way and not only be Raredisease do we know if the inits are the same? What do we need to pass to fetch the correct one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm 99% sure the signatures of other NF pipelines will be the same

Copy link
Contributor

@ChrOertlin ChrOertlin Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make a function that pulls the correct one from a map for example. workflow_to_samplesheet_creator (so pass the workflow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that might be the nicest approach but then we need to fix the init in the parent NextflowSampleSheetCreator class.

housekeeper_api=self.cg_config.housekeeper_api,
lims=self.cg_config.lims_api,
store=self.store,
)
return NextflowConfigurator(
config_file_creator=config_file_creator,
housekeeper_api=self.cg_config.housekeeper_api,
lims=self.cg_config.lims_api,
params_file_creator=params_file_creator,
pipeline_config=pipeline_config,
sample_sheet_creator=sample_sheet_creator,
store=self.store,
pipeline_extension=self._get_pipeline_extension(workflow),
)

def _get_pipeline_config(self, workflow: Workflow) -> CommonAppConfig:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that all the nextflow configs inherit only from CommonAppConfig leads to our code being riddled with warnings because of unknown attributes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to create a separate basemodel to inherit from IMO

return getattr(self.cg_config, workflow)

def _get_pipeline_extension(self, workflow: Workflow) -> PipelineExtension:
gene_panel_creator: GenePanelFileCreator = self._get_gene_panel_file_creator()
managed_variants_creator: ManagedVariantsFileCreator = (
self._get_managed_variants_file_creator()
)
return RarediseaseExtension(
gene_panel_file_creator=gene_panel_creator,
managed_variants_file_creator=managed_variants_creator,
)

def _get_gene_panel_file_creator(self) -> GenePanelFileCreator:
return GenePanelFileCreator(scout_api=self.cg_config.scout_api, store=self.store)

def _get_managed_variants_file_creator(self) -> ManagedVariantsFileCreator:
return ManagedVariantsFileCreator(scout_api=self.cg_config.scout_api, store=self.store)

def _get_input_fetcher(self, case_id: str) -> InputFetcher:
workflow: Workflow = self.store.get_case_workflow(case_id)
if workflow in FASTQ_WORKFLOWS:
spring_archive_api = SpringArchiveAPI(
status_db=self.cg_config.status_db,
housekeeper_api=self.cg_config.housekeeper_api,
data_flow_config=self.cg_config.data_flow,
)
return FastqFetcher(
compress_api=self.cg_config.meta_apis["compress_api"],
housekeeper_api=self.cg_config.housekeeper_api,
spring_archive_api=spring_archive_api,
status_db=self.store,
)
return InputFetcher()

def _get_submitter(self, case_id: str) -> Submitter:
workflow: Workflow = self.store.get_case_workflow(case_id)
if workflow in NEXTFLOW_WORKFLOWS:
seqera_platform_client: SeqeraPlatformClient = self._get_seqera_platform_client()
return SeqeraPlatformSubmitter(
client=seqera_platform_client,
compute_environment_ids=self.cg_config.seqera_platform.compute_environments,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reaching through the SeqeraPlatformConfig indicates that our server config is not designed properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we have a method like this?

def _get_seqera_platform_submitter(config: SeqeraPlatformConfig):
    client = SeqeraPlatformClient(config)
    return SeqeraPlatformSubmitter(client=client, compute_environemnt_ids=config.compute_environments)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is more the fact that the computeEnvIds are not needed by the client at all anymore, but they are set in the config we designed for the client. I guess we should have one config for the submitter and one for the client?

)

def _get_seqera_platform_client(self) -> SeqeraPlatformClient:
return SeqeraPlatformClient(self.cg_config.seqera_platform)
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ class FastqFetcher(InputFetcher):

def __init__(
self,
status_db: Store,
housekeeper_api: HousekeeperAPI,
compress_api: CompressAPI,
housekeeper_api: HousekeeperAPI,
spring_archive_api: SpringArchiveAPI,
status_db: Store,
):
self.status_db = status_db
self.housekeeper_api = housekeeper_api
Expand Down
Loading