diff --git a/pyproject.toml b/pyproject.toml index e7e200657..610acf08e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -125,6 +125,7 @@ TomographyMetadataContext = "murfey.client.contexts.tomo_metadata:TomographyMeta "spa.ctf_estimated" = "murfey.workflows.spa.ctf_estimation:ctf_estimated" "spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess" "spa.motion_corrected" = "murfey.workflows.spa.motion_correction:motion_corrected" +"sxt.process_tilt_series" = "murfey.workflows.sxt.process_sxt_tilt_series:run" [tool.setuptools] package-dir = {"" = "src"} diff --git a/src/murfey/client/contexts/sxt.py b/src/murfey/client/contexts/sxt.py index 2e58993c0..b87dfbac0 100644 --- a/src/murfey/client/contexts/sxt.py +++ b/src/murfey/client/contexts/sxt.py @@ -52,7 +52,7 @@ def _find_reference(txrm_file: Path) -> Path | None: )[0] ) if mosaic_size == 0: - logger.info(f"Found reference {ref_option}") + logger.info(f"Found reference {ref_option.name}") return Path(ref_option.full_path) logger.warning(f"No reference found for {txrm_file}") return None @@ -125,9 +125,7 @@ def register_sxt_data_collection( data=dc_data, ) - recipes_to_assign_pjids = [ - "sxt-aretomo", - ] + recipes_to_assign_pjids = self._machine_config.get("recipes", {}).values() for recipe in recipes_to_assign_pjids: capture_post( base_url=str(environment.url.geturl()), diff --git a/src/murfey/server/api/workflow.py b/src/murfey/server/api/workflow.py index 467989a85..5de015dfa 100644 --- a/src/murfey/server/api/workflow.py +++ b/src/murfey/server/api/workflow.py @@ -80,7 +80,6 @@ from murfey.util.tomo import midpoint from murfey.workflows.sxt.process_sxt_tilt_series import ( SXTTiltSeriesInfo, - process_sxt_tilt_series_workflow, ) from murfey.workflows.tomo.tomo_metadata import register_search_map_in_database @@ -1086,9 +1085,16 @@ def process_sxt_tilt_series( tilt_series_info: SXTTiltSeriesInfo, db=murfey_db, ): - return process_sxt_tilt_series_workflow( - visit_name, session_id, tilt_series_info, db - ) + if _transport_object: + _transport_object.send( + _transport_object.feedback_queue, + { + "register": "sxt.process_tilt_series", + "session_id": session_id, + "visit_name": visit_name, + "tilt_series_info": tilt_series_info.model_dump(mode="json"), + }, + ) correlative_router = APIRouter( diff --git a/src/murfey/workflows/sxt/process_sxt_tilt_series.py b/src/murfey/workflows/sxt/process_sxt_tilt_series.py index a3e64b399..68b3b79a8 100644 --- a/src/murfey/workflows/sxt/process_sxt_tilt_series.py +++ b/src/murfey/workflows/sxt/process_sxt_tilt_series.py @@ -3,6 +3,7 @@ from pydantic import BaseModel from sqlmodel import select +from sqlmodel.orm.session import Session as SQLModelSession from werkzeug.utils import secure_filename from murfey.server import _transport_object @@ -31,12 +32,12 @@ class SXTTiltSeriesInfo(BaseModel): xrm_reference: str | None -def process_sxt_tilt_series_workflow( +def process_sxt_tilt_series( visit_name: str, session_id: MurfeySessionID, tilt_series_info: SXTTiltSeriesInfo, - murfey_db: Session, -): + murfey_db: SQLModelSession, +) -> dict[str, bool]: tilt_series_query = murfey_db.exec( select(TiltSeries) .where(TiltSeries.session_id == session_id) @@ -47,7 +48,7 @@ def process_sxt_tilt_series_workflow( tilt_series = tilt_series_query[0] if tilt_series.processing_requested: logger.info(f"Tilt series {tilt_series.tag} has already been processed") - return + return {"success": True} else: tilt_series = TiltSeries( session_id=session_id, @@ -59,6 +60,7 @@ def process_sxt_tilt_series_workflow( murfey_db.add(tilt_series) murfey_db.commit() + # Find all processing jobs registered for this tilt series collected_ids = murfey_db.exec( select(DataCollectionGroup, DataCollection, ProcessingJob, AutoProcProgram) .where(DataCollectionGroup.session_id == session_id) @@ -67,8 +69,11 @@ def process_sxt_tilt_series_workflow( .where(DataCollection.dcg_id == DataCollectionGroup.id) .where(ProcessingJob.dc_id == DataCollection.id) .where(AutoProcProgram.pj_id == ProcessingJob.id) - .where(ProcessingJob.recipe == "sxt-aretomo") - ).one() + ).all() + if len(collected_ids) == 0: + logger.warning(f"No processing recipes found for {tilt_series.tag}") + return {"success": False, "requeue": False} + instrument_name = ( murfey_db.exec(select(Session).where(Session.id == session_id)) .one() @@ -78,46 +83,66 @@ def process_sxt_tilt_series_workflow( instrument_name ] + # Find the visit folder and any subfolders needed parts = [secure_filename(p) for p in Path(tilt_series_info.txrm).parts] visit_idx = parts.index(visit_name) core = Path(*Path(tilt_series_info.txrm).parts[: visit_idx + 1]) ppath = Path( "/".join(secure_filename(p) for p in Path(tilt_series_info.txrm).parts) ) - sub_dataset = "/".join(ppath.relative_to(core).parts[:-1]) - extra_path = machine_config.processed_extra_directory - stack_file = ( - core - / machine_config.processed_directory_name - / sub_dataset - / extra_path - / "Tomograms" - / f"{tilt_series.tag}_stack.mrc" - ) - stack_file.parent.mkdir(parents=True, exist_ok=True) - zocalo_message = { - "recipes": ["sxt-aretomo"], - "parameters": { - "txrm_file": tilt_series_info.txrm, - "xrm_reference": tilt_series_info.xrm_reference or "", - "dcid": collected_ids[1].id, - "appid": collected_ids[3].id, - "stack_file": str(stack_file), - "tilt_axis": 0, - "pixel_size": tilt_series_info.pixel_size, - "manual_tilt_offset": -tilt_series_info.tilt_offset, - "node_creator_queue": machine_config.node_creator_queue, - }, - } - if _transport_object: - logger.info( - f"Sending Zocalo message for processing: {sanitise(str(zocalo_message))}" - ) - _transport_object.send("processing_recipe", zocalo_message, new_connection=True) - else: - logger.info( - f"No transport object found. Zocalo message would be {sanitise(str(zocalo_message))}" + sub_dataset = "/".join(ppath.relative_to(core).parts[1:-1]) + + # Loop over all processing jobs, and send the alignment recipe for it + for recipe_ids in collected_ids: + # Stack file path needs to contain both recipe name and tilt series name + stack_file = ( + core + / machine_config.processed_directory_name + / machine_config.processed_extra_directory + / sub_dataset + / tilt_series.tag + / recipe_ids[2].recipe + / "Tomograms" + / f"{tilt_series.tag}_stack.mrc" ) + stack_file.parent.mkdir(parents=True, exist_ok=True) + + # Send message to rabbitmq + zocalo_message = { + "recipes": [recipe_ids[2].recipe], + "parameters": { + "txrm_file": tilt_series_info.txrm, + "xrm_reference": tilt_series_info.xrm_reference or "", + "dcid": recipe_ids[1].id, + "appid": recipe_ids[3].id, + "stack_file": str(stack_file), + "tilt_axis": 0, + "pixel_size": tilt_series_info.pixel_size, + "manual_tilt_offset": -tilt_series_info.tilt_offset, + "node_creator_queue": machine_config.node_creator_queue, + }, + } + if _transport_object: + logger.info( + f"Sending Zocalo message for processing: {sanitise(str(zocalo_message))}" + ) + _transport_object.send( + "processing_recipe", zocalo_message, new_connection=True + ) + else: + logger.info( + f"No transport object found. Zocalo message would be {sanitise(str(zocalo_message))}" + ) tilt_series.processing_requested = True murfey_db.add(tilt_series) murfey_db.commit() + return {"success": True} + + +def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]: + return process_sxt_tilt_series( + message["visit_name"], + message["session_id"], + SXTTiltSeriesInfo(**message["tilt_series_info"]), + murfey_db, + ) diff --git a/tests/client/contexts/test_sxt.py b/tests/client/contexts/test_sxt.py index d1705ad29..4e620d2f5 100644 --- a/tests/client/contexts/test_sxt.py +++ b/tests/client/contexts/test_sxt.py @@ -69,7 +69,12 @@ def test_sxt_context_txrm(mock_ole_file, mock_post, tmp_path): visit="cm12345-6", murfey_session=1, ) - context = SXTContext("zeiss", tmp_path / "cm12345-6/grid1", {}, "") + context = SXTContext( + "zeiss", + tmp_path / "cm12345-6/grid1", + {"recipes": {"aretomo": "sxt-aretomo"}}, + "", + ) context.post_transfer( tmp_path / "cm12345-6/grid1/example.txrm", required_position_files=[], @@ -191,7 +196,12 @@ def test_sxt_context_txrm_external_ref(mock_ole_file, mock_post, tmp_path): visit="cm12345-6", murfey_session=1, ) - context = SXTContext("zeiss", tmp_path / "cm12345-6/grid1", {}, "") + context = SXTContext( + "zeiss", + tmp_path / "cm12345-6/grid1", + {"recipes": {"aretomo": "sxt-aretomo", "imod": "sxt-imod-patch-wbp"}}, + "", + ) context.post_transfer( tmp_path / "cm12345-6/grid1/example_-60to60@0.5.txrm", required_position_files=[], @@ -204,7 +214,7 @@ def test_sxt_context_txrm_external_ref(mock_ole_file, mock_post, tmp_path): ) mock_ole_file.assert_any_call(str(tmp_path / "cm12345-6/grid1/ref.xrm")) - assert mock_post.call_count == 5 + assert mock_post.call_count == 6 mock_post.assert_any_call( "http://localhost:8000/workflow/visits/cm12345-6/sessions/1/register_data_collection_group", json={ @@ -245,6 +255,16 @@ def test_sxt_context_txrm_external_ref(mock_ole_file, mock_post, tmp_path): }, headers={"Authorization": "Bearer "}, ) + mock_post.assert_any_call( + "http://localhost:8000/workflow/visits/cm12345-6/sessions/1/register_processing_job", + json={ + "tag": "example", + "source": f"{tmp_path}/cm12345-6/grid1", + "recipe": "sxt-imod-patch-wbp", + "experiment_type": "sxt", + }, + headers={"Authorization": "Bearer "}, + ) mock_post.assert_any_call( "http://localhost:8000/workflow/sxt/visits/cm12345-6/sessions/1/sxt_tilt_series", json={ diff --git a/tests/workflows/sxt/test_process_sxt_tilt_series.py b/tests/workflows/sxt/test_process_sxt_tilt_series.py index 3a34ea73c..53ea80968 100644 --- a/tests/workflows/sxt/test_process_sxt_tilt_series.py +++ b/tests/workflows/sxt/test_process_sxt_tilt_series.py @@ -33,7 +33,7 @@ def set_up_db(murfey_db_session: Session): "dcg_id": dcg_entry.id, }, ) - processing_job_entry: ProcessingJob = get_or_create_db_entry( + aretomo_pj_entry: ProcessingJob = get_or_create_db_entry( murfey_db_session, ProcessingJob, lookup_kwargs={ @@ -42,15 +42,32 @@ def set_up_db(murfey_db_session: Session): "dc_id": dc_entry.id, }, ) - auto_proc_entry = get_or_create_db_entry( + imod_pj_entry: ProcessingJob = get_or_create_db_entry( + murfey_db_session, + ProcessingJob, + lookup_kwargs={ + "id": 2, + "recipe": "sxt-imod-patch-wbp", + "dc_id": dc_entry.id, + }, + ) + aretomo_autoproc_entry = get_or_create_db_entry( murfey_db_session, AutoProcProgram, lookup_kwargs={ "id": 0, - "pj_id": processing_job_entry.id, + "pj_id": aretomo_pj_entry.id, }, ) - return dcg_entry.id, dc_entry.id, processing_job_entry.id, auto_proc_entry.id + imod_autoproc_entry = get_or_create_db_entry( + murfey_db_session, + AutoProcProgram, + lookup_kwargs={ + "id": 1, + "pj_id": imod_pj_entry.id, + }, + ) + return dcg_entry.id, dc_entry.id, aretomo_autoproc_entry.id, imod_autoproc_entry.id @mock.patch("murfey.workflows.sxt.process_sxt_tilt_series._transport_object") @@ -58,7 +75,7 @@ def test_process_new_sxt_tilt_series( mock_transport, murfey_db_session: Session, tmp_path ): """Run the picker feedback with less particles than needed for classification""" - dcg_id, dc_id, pj_id, app_id = set_up_db(murfey_db_session) + dcg_id, dc_id, app_id_aretomo, app_id_imod = set_up_db(murfey_db_session) new_parameters = process_sxt_tilt_series.SXTTiltSeriesInfo( session_id=ExampleVisit.murfey_session_id, @@ -72,7 +89,7 @@ def test_process_new_sxt_tilt_series( ) # Run the registration - process_sxt_tilt_series.process_sxt_tilt_series_workflow( + process_sxt_tilt_series.process_sxt_tilt_series( "cm12345-6", ExampleVisit.murfey_session_id, new_parameters, @@ -83,18 +100,36 @@ def test_process_new_sxt_tilt_series( mock_transport.send.assert_any_call( "processing_recipe", { + "recipes": ["sxt-aretomo"], "parameters": { "txrm_file": f"{tmp_path}/cm12345-6/raw/tomogram_tag.txrm", "xrm_reference": f"{tmp_path}/cm12345-6/raw/ref.xrm", "dcid": dc_id, - "appid": app_id, - "stack_file": f"{tmp_path}/cm12345-6/processed/raw/Tomograms/tomogram_tag_stack.mrc", + "appid": app_id_aretomo, + "stack_file": f"{tmp_path}/cm12345-6/processed/tomogram_tag/sxt-aretomo/Tomograms/tomogram_tag_stack.mrc", + "tilt_axis": 0, + "pixel_size": 100, + "manual_tilt_offset": -1, + "node_creator_queue": "node_creator", + }, + }, + new_connection=True, + ) + mock_transport.send.assert_any_call( + "processing_recipe", + { + "recipes": ["sxt-imod-patch-wbp"], + "parameters": { + "txrm_file": f"{tmp_path}/cm12345-6/raw/tomogram_tag.txrm", + "xrm_reference": f"{tmp_path}/cm12345-6/raw/ref.xrm", + "dcid": dc_id, + "appid": app_id_imod, + "stack_file": f"{tmp_path}/cm12345-6/processed/tomogram_tag/sxt-imod-patch-wbp/Tomograms/tomogram_tag_stack.mrc", "tilt_axis": 0, "pixel_size": 100, "manual_tilt_offset": -1, "node_creator_queue": "node_creator", }, - "recipes": ["sxt-aretomo"], }, new_connection=True, )