Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ TomographyMetadataContext = "murfey.client.contexts.tomo_metadata:TomographyMeta
"spa.ctf_estimated" = "murfey.workflows.spa.ctf_estimation:ctf_estimated"
"spa.flush_spa_preprocess" = "murfey.workflows.spa.flush_spa_preprocess:flush_spa_preprocess"
"spa.motion_corrected" = "murfey.workflows.spa.motion_correction:motion_corrected"
"sxt.process_tilt_series" = "murfey.workflows.sxt.process_sxt_tilt_series:run"

[tool.setuptools]
package-dir = {"" = "src"}
Expand Down
6 changes: 2 additions & 4 deletions src/murfey/client/contexts/sxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _find_reference(txrm_file: Path) -> Path | None:
)[0]
)
if mosaic_size == 0:
logger.info(f"Found reference {ref_option}")
logger.info(f"Found reference {ref_option.name}")
return Path(ref_option.full_path)
logger.warning(f"No reference found for {txrm_file}")
return None
Expand Down Expand Up @@ -125,9 +125,7 @@ def register_sxt_data_collection(
data=dc_data,
)

recipes_to_assign_pjids = [
"sxt-aretomo",
]
recipes_to_assign_pjids = self._machine_config.get("recipes", {}).values()
for recipe in recipes_to_assign_pjids:
capture_post(
base_url=str(environment.url.geturl()),
Expand Down
14 changes: 10 additions & 4 deletions src/murfey/server/api/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
from murfey.util.tomo import midpoint
from murfey.workflows.sxt.process_sxt_tilt_series import (
SXTTiltSeriesInfo,
process_sxt_tilt_series_workflow,
)
from murfey.workflows.tomo.tomo_metadata import register_search_map_in_database

Expand Down Expand Up @@ -1086,9 +1085,16 @@ def process_sxt_tilt_series(
tilt_series_info: SXTTiltSeriesInfo,
db=murfey_db,
):
return process_sxt_tilt_series_workflow(
visit_name, session_id, tilt_series_info, db
)
if _transport_object:
_transport_object.send(
_transport_object.feedback_queue,
{
"register": "sxt.process_tilt_series",
"session_id": session_id,
"visit_name": visit_name,
"tilt_series_info": tilt_series_info.model_dump(mode="json"),
},
)


correlative_router = APIRouter(
Expand Down
103 changes: 64 additions & 39 deletions src/murfey/workflows/sxt/process_sxt_tilt_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from pydantic import BaseModel
from sqlmodel import select
from sqlmodel.orm.session import Session as SQLModelSession
from werkzeug.utils import secure_filename

from murfey.server import _transport_object
Expand Down Expand Up @@ -31,12 +32,12 @@ class SXTTiltSeriesInfo(BaseModel):
xrm_reference: str | None


def process_sxt_tilt_series_workflow(
def process_sxt_tilt_series(
visit_name: str,
session_id: MurfeySessionID,
tilt_series_info: SXTTiltSeriesInfo,
murfey_db: Session,
):
murfey_db: SQLModelSession,
) -> dict[str, bool]:
tilt_series_query = murfey_db.exec(
select(TiltSeries)
.where(TiltSeries.session_id == session_id)
Expand All @@ -47,7 +48,7 @@ def process_sxt_tilt_series_workflow(
tilt_series = tilt_series_query[0]
if tilt_series.processing_requested:
logger.info(f"Tilt series {tilt_series.tag} has already been processed")
return
return {"success": True}
else:
tilt_series = TiltSeries(
session_id=session_id,
Expand All @@ -59,6 +60,7 @@ def process_sxt_tilt_series_workflow(
murfey_db.add(tilt_series)
murfey_db.commit()

# Find all processing jobs registered for this tilt series
collected_ids = murfey_db.exec(
select(DataCollectionGroup, DataCollection, ProcessingJob, AutoProcProgram)
.where(DataCollectionGroup.session_id == session_id)
Expand All @@ -67,8 +69,11 @@ def process_sxt_tilt_series_workflow(
.where(DataCollection.dcg_id == DataCollectionGroup.id)
.where(ProcessingJob.dc_id == DataCollection.id)
.where(AutoProcProgram.pj_id == ProcessingJob.id)
.where(ProcessingJob.recipe == "sxt-aretomo")
).one()
).all()
if len(collected_ids) == 0:
logger.warning(f"No processing recipes found for {tilt_series.tag}")
return {"success": False, "requeue": False}

instrument_name = (
murfey_db.exec(select(Session).where(Session.id == session_id))
.one()
Expand All @@ -78,46 +83,66 @@ def process_sxt_tilt_series_workflow(
instrument_name
]

# Find the visit folder and any subfolders needed
parts = [secure_filename(p) for p in Path(tilt_series_info.txrm).parts]
visit_idx = parts.index(visit_name)
core = Path(*Path(tilt_series_info.txrm).parts[: visit_idx + 1])
ppath = Path(
"/".join(secure_filename(p) for p in Path(tilt_series_info.txrm).parts)
)
sub_dataset = "/".join(ppath.relative_to(core).parts[:-1])
extra_path = machine_config.processed_extra_directory
stack_file = (
core
/ machine_config.processed_directory_name
/ sub_dataset
/ extra_path
/ "Tomograms"
/ f"{tilt_series.tag}_stack.mrc"
)
stack_file.parent.mkdir(parents=True, exist_ok=True)
zocalo_message = {
"recipes": ["sxt-aretomo"],
"parameters": {
"txrm_file": tilt_series_info.txrm,
"xrm_reference": tilt_series_info.xrm_reference or "",
"dcid": collected_ids[1].id,
"appid": collected_ids[3].id,
"stack_file": str(stack_file),
"tilt_axis": 0,
"pixel_size": tilt_series_info.pixel_size,
"manual_tilt_offset": -tilt_series_info.tilt_offset,
"node_creator_queue": machine_config.node_creator_queue,
},
}
if _transport_object:
logger.info(
f"Sending Zocalo message for processing: {sanitise(str(zocalo_message))}"
)
_transport_object.send("processing_recipe", zocalo_message, new_connection=True)
else:
logger.info(
f"No transport object found. Zocalo message would be {sanitise(str(zocalo_message))}"
sub_dataset = "/".join(ppath.relative_to(core).parts[1:-1])

# Loop over all processing jobs, and send the alignment recipe for it
for recipe_ids in collected_ids:
# Stack file path needs to contain both recipe name and tilt series name
stack_file = (
core
/ machine_config.processed_directory_name
/ machine_config.processed_extra_directory
/ sub_dataset
/ tilt_series.tag
/ recipe_ids[2].recipe
/ "Tomograms"
/ f"{tilt_series.tag}_stack.mrc"
)
stack_file.parent.mkdir(parents=True, exist_ok=True)

# Send message to rabbitmq
zocalo_message = {
"recipes": [recipe_ids[2].recipe],
"parameters": {
"txrm_file": tilt_series_info.txrm,
"xrm_reference": tilt_series_info.xrm_reference or "",
"dcid": recipe_ids[1].id,
"appid": recipe_ids[3].id,
"stack_file": str(stack_file),
"tilt_axis": 0,
"pixel_size": tilt_series_info.pixel_size,
"manual_tilt_offset": -tilt_series_info.tilt_offset,
"node_creator_queue": machine_config.node_creator_queue,
},
}
if _transport_object:
logger.info(
f"Sending Zocalo message for processing: {sanitise(str(zocalo_message))}"
)
_transport_object.send(
"processing_recipe", zocalo_message, new_connection=True
)
else:
logger.info(
f"No transport object found. Zocalo message would be {sanitise(str(zocalo_message))}"
)
tilt_series.processing_requested = True
murfey_db.add(tilt_series)
murfey_db.commit()
return {"success": True}


def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]:
return process_sxt_tilt_series(
message["visit_name"],
message["session_id"],
SXTTiltSeriesInfo(**message["tilt_series_info"]),
murfey_db,
)
26 changes: 23 additions & 3 deletions tests/client/contexts/test_sxt.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ def test_sxt_context_txrm(mock_ole_file, mock_post, tmp_path):
visit="cm12345-6",
murfey_session=1,
)
context = SXTContext("zeiss", tmp_path / "cm12345-6/grid1", {}, "")
context = SXTContext(
"zeiss",
tmp_path / "cm12345-6/grid1",
{"recipes": {"aretomo": "sxt-aretomo"}},
"",
)
context.post_transfer(
tmp_path / "cm12345-6/grid1/example.txrm",
required_position_files=[],
Expand Down Expand Up @@ -191,7 +196,12 @@ def test_sxt_context_txrm_external_ref(mock_ole_file, mock_post, tmp_path):
visit="cm12345-6",
murfey_session=1,
)
context = SXTContext("zeiss", tmp_path / "cm12345-6/grid1", {}, "")
context = SXTContext(
"zeiss",
tmp_path / "cm12345-6/grid1",
{"recipes": {"aretomo": "sxt-aretomo", "imod": "sxt-imod-patch-wbp"}},
"",
)
context.post_transfer(
tmp_path / "cm12345-6/grid1/example_-60to60@0.5.txrm",
required_position_files=[],
Expand All @@ -204,7 +214,7 @@ def test_sxt_context_txrm_external_ref(mock_ole_file, mock_post, tmp_path):
)
mock_ole_file.assert_any_call(str(tmp_path / "cm12345-6/grid1/ref.xrm"))

assert mock_post.call_count == 5
assert mock_post.call_count == 6
mock_post.assert_any_call(
"http://localhost:8000/workflow/visits/cm12345-6/sessions/1/register_data_collection_group",
json={
Expand Down Expand Up @@ -245,6 +255,16 @@ def test_sxt_context_txrm_external_ref(mock_ole_file, mock_post, tmp_path):
},
headers={"Authorization": "Bearer "},
)
mock_post.assert_any_call(
"http://localhost:8000/workflow/visits/cm12345-6/sessions/1/register_processing_job",
json={
"tag": "example",
"source": f"{tmp_path}/cm12345-6/grid1",
"recipe": "sxt-imod-patch-wbp",
"experiment_type": "sxt",
},
headers={"Authorization": "Bearer "},
)
mock_post.assert_any_call(
"http://localhost:8000/workflow/sxt/visits/cm12345-6/sessions/1/sxt_tilt_series",
json={
Expand Down
53 changes: 44 additions & 9 deletions tests/workflows/sxt/test_process_sxt_tilt_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def set_up_db(murfey_db_session: Session):
"dcg_id": dcg_entry.id,
},
)
processing_job_entry: ProcessingJob = get_or_create_db_entry(
aretomo_pj_entry: ProcessingJob = get_or_create_db_entry(
murfey_db_session,
ProcessingJob,
lookup_kwargs={
Expand All @@ -42,23 +42,40 @@ def set_up_db(murfey_db_session: Session):
"dc_id": dc_entry.id,
},
)
auto_proc_entry = get_or_create_db_entry(
imod_pj_entry: ProcessingJob = get_or_create_db_entry(
murfey_db_session,
ProcessingJob,
lookup_kwargs={
"id": 2,
"recipe": "sxt-imod-patch-wbp",
"dc_id": dc_entry.id,
},
)
aretomo_autoproc_entry = get_or_create_db_entry(
murfey_db_session,
AutoProcProgram,
lookup_kwargs={
"id": 0,
"pj_id": processing_job_entry.id,
"pj_id": aretomo_pj_entry.id,
},
)
return dcg_entry.id, dc_entry.id, processing_job_entry.id, auto_proc_entry.id
imod_autoproc_entry = get_or_create_db_entry(
murfey_db_session,
AutoProcProgram,
lookup_kwargs={
"id": 1,
"pj_id": imod_pj_entry.id,
},
)
return dcg_entry.id, dc_entry.id, aretomo_autoproc_entry.id, imod_autoproc_entry.id


@mock.patch("murfey.workflows.sxt.process_sxt_tilt_series._transport_object")
def test_process_new_sxt_tilt_series(
mock_transport, murfey_db_session: Session, tmp_path
):
"""Run the picker feedback with less particles than needed for classification"""
dcg_id, dc_id, pj_id, app_id = set_up_db(murfey_db_session)
dcg_id, dc_id, app_id_aretomo, app_id_imod = set_up_db(murfey_db_session)

new_parameters = process_sxt_tilt_series.SXTTiltSeriesInfo(
session_id=ExampleVisit.murfey_session_id,
Expand All @@ -72,7 +89,7 @@ def test_process_new_sxt_tilt_series(
)

# Run the registration
process_sxt_tilt_series.process_sxt_tilt_series_workflow(
process_sxt_tilt_series.process_sxt_tilt_series(
"cm12345-6",
ExampleVisit.murfey_session_id,
new_parameters,
Expand All @@ -83,18 +100,36 @@ def test_process_new_sxt_tilt_series(
mock_transport.send.assert_any_call(
"processing_recipe",
{
"recipes": ["sxt-aretomo"],
"parameters": {
"txrm_file": f"{tmp_path}/cm12345-6/raw/tomogram_tag.txrm",
"xrm_reference": f"{tmp_path}/cm12345-6/raw/ref.xrm",
"dcid": dc_id,
"appid": app_id,
"stack_file": f"{tmp_path}/cm12345-6/processed/raw/Tomograms/tomogram_tag_stack.mrc",
"appid": app_id_aretomo,
"stack_file": f"{tmp_path}/cm12345-6/processed/tomogram_tag/sxt-aretomo/Tomograms/tomogram_tag_stack.mrc",
"tilt_axis": 0,
"pixel_size": 100,
"manual_tilt_offset": -1,
"node_creator_queue": "node_creator",
},
},
new_connection=True,
)
mock_transport.send.assert_any_call(
"processing_recipe",
{
"recipes": ["sxt-imod-patch-wbp"],
"parameters": {
"txrm_file": f"{tmp_path}/cm12345-6/raw/tomogram_tag.txrm",
"xrm_reference": f"{tmp_path}/cm12345-6/raw/ref.xrm",
"dcid": dc_id,
"appid": app_id_imod,
"stack_file": f"{tmp_path}/cm12345-6/processed/tomogram_tag/sxt-imod-patch-wbp/Tomograms/tomogram_tag_stack.mrc",
"tilt_axis": 0,
"pixel_size": 100,
"manual_tilt_offset": -1,
"node_creator_queue": "node_creator",
},
"recipes": ["sxt-aretomo"],
},
new_connection=True,
)
Expand Down