-
Notifications
You must be signed in to change notification settings - Fork 1
Generate UDC strategy in zocalo #344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pblowey
wants to merge
13
commits into
main
Choose a base branch
from
udc-strategy-in-ispyb
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
3c9d420
Add strategy service and trigger
pblowey e47d6a4
Read in params from agamemnon recipes
pblowey 6172aed
Get beamline limits from filesystem
pblowey 1282763
Get resolution from query
pblowey bc865fe
Handle errors
pblowey ae639ff
Add check for beamlines
pblowey 8350807
Refactoring
pblowey bfcd35e
Remove bespoke failure function
pblowey cf448b6
Use constants for resolution calculation
pblowey 6d438df
Fix incorrectly set parameters
pblowey 52106f7
Add breadcrumbs for scaling functions
pblowey 02a95e8
Fix query and beamlines for trigger
pblowey 9482532
Put commands at correct level of nested loops
pblowey File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,330 @@ | ||
| from __future__ import annotations | ||
|
|
||
| from pathlib import Path | ||
|
|
||
| import workflows.recipe | ||
| import yaml | ||
| from pydantic import BaseModel, Field, ValidationError | ||
| from workflows.services.common_service import CommonService | ||
|
|
||
| from dlstbx.util import ChainMapWithReplacement | ||
|
|
||
|
|
||
| def scale_parameter( | ||
| value: float, scale_factor: float, limits: tuple[float, float] | None = None | ||
| ) -> tuple[float, float]: | ||
| def apply_limit(parameter: float, limits: tuple[float, float]) -> float: | ||
| lower_limit, upper_limit = limits | ||
| if lower_limit is not None: | ||
| parameter = max(lower_limit, parameter) | ||
| if upper_limit is not None: | ||
| parameter = min(upper_limit, parameter) | ||
| return parameter | ||
|
|
||
| ref_value = value * scale_factor | ||
| if limits is not None: | ||
| scaled_value = apply_limit(ref_value, limits) | ||
| else: | ||
| scaled_value = ref_value | ||
| if scaled_value == 0: | ||
| raise ValueError("Scaled value cannot be zero") | ||
| # Scale factor to apply to opposite parameter to achieve the desired scaling effect, accounting for limits | ||
| corrective_scale_factor = ref_value / scaled_value | ||
| return scaled_value, corrective_scale_factor | ||
|
|
||
|
|
||
| def get_resolution_scale(resolution: float) -> float: | ||
| """ | ||
| Set dose scaling factor based on resolution. Comes from polynomial fit to empirical user and test data | ||
| modelling dose limits vs resolution. Loosely based on: Atakisi, H., Conger, L., Moreau, D. W., & Thorne, | ||
| R. E. (2019). Resolution and dose dependence of radiation damage in biomolecular systems. IUCrJ, 6(Pt 6), | ||
| 1040–1053. https://doi.org/10.1107/S2052252519008777 | ||
| """ | ||
| return resolution**2 - 0.4 * resolution + 0.5 | ||
|
|
||
|
|
||
| def get_wavelength_scale(wavelength: float, default_wavelength: float) -> float: | ||
| """ | ||
| Set dose scaling factor for wavelength. Dose is proportional to energy * X-ray absorption. Dominant | ||
| absorption at typical wavelengths is by photoelectric effect, which is proportional to wavelength^3, | ||
| but energy is inversely proportional to wavelength, so overall dose is approximately proportional to | ||
| wavelength^2. | ||
| """ | ||
| return (default_wavelength / wavelength) ** 2 | ||
|
|
||
|
|
||
| class AgamemnonParameters(BaseModel): | ||
| chi: float | ||
| comment: str | ||
| exposure_time: float = Field(gt=0) | ||
| dose: float = Field(gt=0) | ||
| kappa: float | ||
| number_of_images: int = Field(gt=0) | ||
| omega_increment: float = Field(gt=0) | ||
| omega_overlap: float | ||
| omega_start: float | ||
| phi_increment: float | ||
| phi_overlap: float | ||
| phi_start: float | ||
| scan_axis: str | ||
| transmission: float = Field(gt=0) | ||
| two_theta: float | ||
| wavelength: float = Field(gt=0) | ||
|
|
||
|
|
||
| def parse_agamemnon_recipe(recipe_path: Path) -> list[AgamemnonParameters]: | ||
| with open(recipe_path, "r") as f: | ||
| recipe = yaml.safe_load(f) | ||
| return [AgamemnonParameters(**step) for step in recipe] | ||
|
|
||
|
|
||
| def parse_config_file(config_file: Path) -> dict: | ||
| config = {} | ||
|
|
||
| for record in open(config_file, errors="ignore"): | ||
| if "#" in record: | ||
| record = record.split("#")[0] | ||
| record = record.strip() | ||
| if not record: | ||
| continue | ||
| if "=" not in record: | ||
| continue | ||
|
|
||
| key, value = record.split("=") | ||
| key = key.strip() | ||
| value = value.strip() | ||
|
|
||
| if key == "include": | ||
| if value.startswith(".."): | ||
| include = config_file.parent / value | ||
| name = Path(value).name.split(".")[0] | ||
| included = parse_config_file(include) | ||
| for k in included: | ||
| config[f"{name}.{k}"] = included[k] | ||
| continue | ||
|
|
||
| config[key] = value | ||
| # Resolve references to other variables | ||
| for key, val in config.items(): | ||
| if isinstance(val, str) and val[:2] == "${" and val[-1] == "}": | ||
| try: | ||
| config[key] = config[val[2:-1]] | ||
| except KeyError: | ||
| continue | ||
| return config | ||
|
|
||
|
|
||
| def get_beamline_param( | ||
| config: dict, param_names: tuple[str, ...], default: float | ||
| ) -> float: | ||
| """ | ||
| Get a beamline parameter from the config, trying multiple possible parameter names and returning the first one found, or a default value if none are found. | ||
| """ | ||
| for param_name in param_names: | ||
| if param_name in config: | ||
| return float(config[param_name]) | ||
| return default | ||
|
|
||
|
|
||
| class DLSStrategy(CommonService): | ||
| """Service for creating data collection strategies.""" | ||
|
|
||
| # Human readable service name | ||
| _service_name = "Strategy" | ||
|
|
||
| # Logger name | ||
| _logger_name = "dlstbx.services.strategy" | ||
|
|
||
| def initializing(self): | ||
| """Subscribe to channel.""" | ||
| self.log.info("Strategy service starting") | ||
| workflows.recipe.wrap_subscribe( | ||
| self._transport, | ||
| "strategy", | ||
| self.generate_strategy, | ||
| acknowledgement=True, | ||
| log_extender=self.extend_log, | ||
| ) | ||
|
|
||
| def generate_strategy( | ||
| self, rw: workflows.recipe.RecipeWrapper, header: dict, message: dict | ||
| ): | ||
| """Generate a strategy from the results of an upstream pipeline""" | ||
| self.log.info("Received strategy request, generating strategy") | ||
|
|
||
| parameters = ChainMapWithReplacement( | ||
| message.get("parameters", {}) if isinstance(message, dict) else {}, | ||
| rw.recipe_step["parameters"].get("ispyb_parameters", {}), | ||
| substitutions=rw.environment, | ||
| ) | ||
| self.log.info(f"Received parameters for strategy generation:\n{parameters}") | ||
| # Conditionally acknowledge receipt of the message | ||
| txn = self._transport.transaction_begin(subscription_id=header["subscription"]) | ||
| self._transport.ack(header, transaction=txn) | ||
|
|
||
| beamline = ( | ||
| parameters["beamline"][0] | ||
| if isinstance(parameters["beamline"], list) | ||
| else parameters["beamline"] | ||
| ) | ||
| wavelength = ( | ||
| float(parameters["wavelength"][0]) | ||
| if isinstance(parameters["wavelength"], list) | ||
| else float(parameters["wavelength"]) | ||
| ) | ||
| resolution_estimate = ( | ||
| float(parameters["resolution"][0]) | ||
| if isinstance(parameters["resolution"], list) | ||
| else float(parameters["resolution"]) | ||
| ) | ||
| resolution_offset = 0.5 | ||
| min_resolution = 0.9 | ||
| resolution = max((resolution_estimate) - resolution_offset, min_resolution) | ||
|
|
||
| beamline_config_file = Path( | ||
| f"/dls_sw/{beamline}/software/daq_configuration/domain/domain.properties" | ||
| ) | ||
| if not beamline_config_file.is_file(): | ||
| self.log.error( | ||
| f"Beamline configuration file {beamline_config_file} not found, terminating strategy generation" | ||
| ) | ||
| raise FileNotFoundError( | ||
| f"Beamline configuration file {beamline_config_file} not found" | ||
| ) | ||
| beamline_config = parse_config_file(beamline_config_file) | ||
|
|
||
| transmission_limits = ( | ||
| get_beamline_param(beamline_config, ("gda.mx.udc.minTransmission",), 0.0), | ||
| get_beamline_param(beamline_config, ("gda.mx.udc.maxTransmission",), 1.0), | ||
| ) | ||
| exposure_time_limits = ( | ||
| get_beamline_param( | ||
| beamline_config, | ||
| ("gda.mx.udc.minExposureTime", "gda.exptTableModel.minExposureTime"), | ||
| 0.002, | ||
| ), | ||
pblowey marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| get_beamline_param( | ||
| beamline_config, | ||
| ("gda.mx.udc.maxExposureTime", "gda.exptTableModel.maxExposureTime"), | ||
| float("inf"), | ||
| ), | ||
| ) | ||
|
|
||
| recipes = {"OSC.yaml": "OSC", "Ligand binding.yaml": "Ligand"} | ||
| ispyb_command_list = [] | ||
|
|
||
| for recipe, recipe_alias in recipes.items(): | ||
| recipe_path = Path(f"/dls_sw/{beamline}/etc/agamemnon-recipes/{recipe}") | ||
| if not recipe_path.is_file(): | ||
| self.log.error( | ||
| f"Recipe file {recipe_path} not found, terminating strategy generation" | ||
| ) | ||
| raise FileNotFoundError(f"Recipe file {recipe_path} not found") | ||
| try: | ||
| recipe_steps = parse_agamemnon_recipe(recipe_path) | ||
| except ValidationError as e: | ||
| self.log.error(f"Invalid recipe step in {recipe_path}: {e}") | ||
| raise ValidationError(f"Invalid recipe step in {recipe_path}: {e}") | ||
|
|
||
| # Step 1: Create screeningOutput record for recipe, linked to the screeningId | ||
| # Keep the screeningOutputId | ||
| d = { | ||
| "program": "udc-strategy", | ||
| "strategysuccess": 1, | ||
| "ispyb_command": "insert_screening_output", | ||
| "screening_id": "$ispyb_screening_id", | ||
| "store_result": "ispyb_screening_output_id", | ||
| } | ||
| ispyb_command_list.append(d) | ||
|
|
||
| # Step 2: Store screeningStrategy results, linked to the screeningOutputId | ||
| # Keep the screeningStrategyId | ||
| d = { | ||
| "program": f"udc-strategy: {recipe_alias}", | ||
| "ispyb_command": "insert_screening_strategy", | ||
| "screening_output_id": "$ispyb_screening_output_id", | ||
| "store_result": "ispyb_screening_strategy_id", | ||
| } | ||
| ispyb_command_list.append(d) | ||
|
|
||
| for n_step, recipe_step in enumerate(recipe_steps, start=1): | ||
| scale = 1.0 | ||
| default_wavelength = recipe_step.wavelength | ||
| scale *= get_wavelength_scale(wavelength, default_wavelength) | ||
| scale *= get_resolution_scale(resolution) | ||
|
|
||
| dose, _ = scale_parameter(recipe_step.dose, scale) | ||
|
|
||
| rotation_axis = recipe_step.scan_axis | ||
| rotation_start = recipe_step.__getattribute__(f"{rotation_axis}_start") | ||
| rotation_increment = recipe_step.__getattribute__( | ||
| f"{rotation_axis}_increment" | ||
| ) | ||
| transmission = recipe_step.transmission | ||
| exposure_time = recipe_step.exposure_time | ||
|
|
||
| # Runs twice to ensure that limits are applied correctly to both parameters, as they are interdependent - is this necessary? | ||
| for _ in range(2): | ||
| if scale > 1.0: | ||
| transmission, scale = scale_parameter( | ||
| transmission, scale, limits=transmission_limits | ||
| ) | ||
| exposure_time, scale = scale_parameter( | ||
| exposure_time, scale, limits=exposure_time_limits | ||
| ) | ||
| else: | ||
| exposure_time, scale = scale_parameter( | ||
| exposure_time, scale, limits=exposure_time_limits | ||
| ) | ||
| transmission, scale = scale_parameter( | ||
| transmission, scale, limits=transmission_limits | ||
| ) | ||
| self.log.debug( | ||
| f"Exposure time scaled to {exposure_time:.3f} s, transmission scaled to {transmission:.3f}, scale factor now {scale:.3f}" | ||
| ) | ||
|
|
||
| # Step 3: Store screeningStrategyWedge results, linked to the screeningStrategyId | ||
| # Keep the screeningStrategyWedgeId | ||
| d = { | ||
| "wedgenumber": n_step, | ||
| "resolution": resolution, | ||
| "phi": recipe_step.phi_start, | ||
| "chi": recipe_step.chi, | ||
| "kappa": recipe_step.kappa, | ||
| "wavelength": wavelength, | ||
| "dosetotal": dose, | ||
| "comments": recipe_alias, | ||
| "ispyb_command": "insert_screening_strategy_wedge", | ||
| "screening_strategy_id": "$ispyb_screening_strategy_id", | ||
| "store_result": f"ispyb_screening_strategy_wedge_id_{n_step}", | ||
| } | ||
| ispyb_command_list.append(d) | ||
|
|
||
| # Step 4: Store second screeningStrategySubWedge results, linked to the screeningStrategyWedgeId | ||
| # Keep the screeningStrategyWedgeId | ||
| d = { | ||
| "subwedgenumber": 1, | ||
| "rotationaxis": recipe_step.scan_axis, | ||
| "axisstart": rotation_start, | ||
| "axisend": rotation_start | ||
| + rotation_increment * recipe_step.number_of_images, | ||
| "exposuretime": exposure_time, | ||
| "transmission": transmission, | ||
| "oscillationrange": rotation_increment, | ||
| "numberOfImages": recipe_step.number_of_images, | ||
| "resolution": resolution, | ||
| "ispyb_command": "insert_screening_strategy_sub_wedge", | ||
| "screening_strategy_wedge_id": f"$ispyb_screening_strategy_wedge_id_{n_step}", | ||
| "store_result": f"ispyb_screening_strategy_sub_wedge_id_{n_step}", | ||
| } | ||
| ispyb_command_list.append(d) | ||
|
|
||
| # Send results onwards | ||
| rw.set_default_channel("ispyb") | ||
| rw.send_to("ispyb", {"ispyb_command_list": ispyb_command_list}, transaction=txn) | ||
| self.log.info(f"Sent {len(ispyb_command_list)} commands to ISPyB") | ||
| self.log.debug(f"Commands sent to ISPyB:\n{ispyb_command_list}") | ||
|
|
||
| # Commit transaction | ||
| self._transport.transaction_commit(txn) | ||
| self.log.info("Strategy generation complete") | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.