From ae35894968ce272d0d03c5b5e340e5f81c892734 Mon Sep 17 00:00:00 2001 From: Clair Mould <86794332+clmould@users.noreply.github.com> Date: Tue, 9 Jun 2026 13:22:55 +0100 Subject: [PATCH] Convert scan variables to dataclass --- process/core/init.py | 2 - process/core/input.py | 32 +-- process/core/model.py | 2 + process/core/scan.py | 102 +++++---- process/data_structure/scan_variables.py | 267 +++++++++++------------ 5 files changed, 191 insertions(+), 214 deletions(-) diff --git a/process/core/init.py b/process/core/init.py index 497654e1fb..7724f6f681 100644 --- a/process/core/init.py +++ b/process/core/init.py @@ -21,7 +21,6 @@ from process.data_structure.impurity_radiation_variables import N_IMPURITIES from process.data_structure.numerics import FiguresOfMerit, PROCESSRunMode from process.data_structure.physics_variables import DivertorNumberModels -from process.data_structure.scan_variables import init_scan_variables from process.models.pfcoil import PFLocationTypes from process.models.physics.profiles import DensityProfilePedestalType from process.models.stellarator.initialization import st_init @@ -233,7 +232,6 @@ def init_all_module_vars(): """ logging_model_handler.clear_logs() data_structure.numerics.init_numerics() - init_scan_variables() constants.init_constants() diff --git a/process/core/input.py b/process/core/input.py index 60a7ae6afe..97b524cfda 100644 --- a/process/core/input.py +++ b/process/core/input.py @@ -19,6 +19,7 @@ from process.data_structure.impurity_radiation_variables import N_IMPURITIES from process.data_structure.pfcoil_variables import N_PF_GROUPS_MAX from process.data_structure.physics_variables import N_CONFINEMENT_SCALINGS +from process.data_structure.scan_variables import IPNSCNS, IPNSCNV if TYPE_CHECKING: from collections.abc import Callable @@ -68,11 +69,12 @@ class InputVariable: been cast to the specified `type`. """ additional_actions: ( - Callable[[str, ValidInputTypes, int | None, InputVariable], None] | None + Callable[[str, ValidInputTypes, int | None, InputVariable, DataStructure], None] + | None ) = None - """A function that takes the input variable: name, value, array index, and config (this dataclass) - as input and performs some additional action in addition to the default actions prescribed by the variables - config. May raise a ProcessValidationError. + """A function that takes the input variable: name, value, array index, config (this dataclass), and + the data structure object as input and performs some additional action in addition to the default + actions prescribed by the variables config. May raise a ProcessValidationError. NOTE: The value passed in has already been cleaned in the default way and has been cast to the specified `type`. @@ -1049,7 +1051,7 @@ def __post_init__(self): "output_costs": InputVariable("costs", int, choices=[0, 1]), "i_p_coolant_pumping": InputVariable("fwbs", int, range=(0, 3)), "reinke_mode": InputVariable("reinke", int, choices=[0, 1]), - "scan_dim": InputVariable(data_structure.scan_variables, int, range=(1, 2)), + "scan_dim": InputVariable("scan", int, range=(1, 2)), "i_thermal_electric_conversion": InputVariable("fwbs", int, range=(0, 4)), "secondary_cycle_liq": InputVariable("fwbs", int, range=(2, 4)), "supercond_cost_model": InputVariable("costs", int, choices=[0, 1]), @@ -1091,27 +1093,27 @@ def __post_init__(self): "v2matf": InputVariable("ife", float, array=True), "v3matf": InputVariable("ife", float, array=True), "isweep": InputVariable( - data_structure.scan_variables, + "scan", int, - choices=range(data_structure.scan_variables.IPNSCNS + 1), + choices=range(IPNSCNS + 1), ), "nsweep": InputVariable( - data_structure.scan_variables, + "scan", int, - choices=range(1, data_structure.scan_variables.IPNSCNV + 1), + choices=range(1, IPNSCNV + 1), ), "isweep_2": InputVariable( - data_structure.scan_variables, + "scan", int, - choices=range(data_structure.scan_variables.IPNSCNS + 1), + choices=range(IPNSCNS + 1), ), "nsweep_2": InputVariable( - data_structure.scan_variables, + "scan", int, - choices=range(1, data_structure.scan_variables.IPNSCNV + 1), + choices=range(1, IPNSCNV + 1), ), - "sweep": InputVariable(data_structure.scan_variables, float, array=True), - "sweep_2": InputVariable(data_structure.scan_variables, float, array=True), + "sweep": InputVariable("scan", float, array=True), + "sweep_2": InputVariable("scan", float, array=True), "impvardiv": InputVariable( "reinke", int, diff --git a/process/core/model.py b/process/core/model.py index b8acd843d5..e4fa92fb05 100644 --- a/process/core/model.py +++ b/process/core/model.py @@ -27,6 +27,7 @@ from process.data_structure.pulse_variables import PulseData from process.data_structure.rebco_variables import RebcoData from process.data_structure.reinke_variables import ReinkeData +from process.data_structure.scan_variables import ScanData from process.data_structure.stellarator_configuration import StellaratorConfigData from process.data_structure.stellarator_variables import StellaratorData from process.data_structure.structure_variables import StructureData @@ -77,6 +78,7 @@ class DataStructure: tfcoil: TFData = initialise_later superconducting_tfcoil: SuperconductingTFData = initialise_later globals: GlobalData = initialise_later + scan: ScanData = initialise_later def __post_init__(self): for f in fields(self): diff --git a/process/core/scan.py b/process/core/scan.py index d4f7d3b3ad..51a6c11693 100644 --- a/process/core/scan.py +++ b/process/core/scan.py @@ -15,11 +15,9 @@ from process.core.log import logging_model_handler, show_errors from process.core.solver import constraints from process.core.solver.solver_handler import SolverHandler -from process.data_structure import ( - numerics, - scan_variables, -) +from process.data_structure import numerics from process.data_structure.numerics import FiguresOfMerit, PROCESSRunMode +from process.data_structure.scan_variables import IPNSCNS, NOUTVARS, ScanData if TYPE_CHECKING: from process.core.model import DataStructure, Model @@ -216,7 +214,7 @@ def run_scan(self): number of output variable values are written to the MFILE.DAT file at each scan point, for plotting or other post-processing purposes. """ - if scan_variables.isweep == 0: + if self.data.scan.isweep == 0: # Solve single problem, rather than an array of problems (scan) # doopt() can also run just an evaluation start_time = time.time() @@ -230,14 +228,14 @@ def run_scan(self): show_errors(constants.NOUT) return - if scan_variables.isweep > scan_variables.IPNSCNS: + if self.data.scan.isweep > IPNSCNS: raise ProcessValueError( "Illegal value of isweep", - isweep=scan_variables.isweep, - IPNSCNS=scan_variables.IPNSCNS, + isweep=self.data.scan.isweep, + IPNSCNS=IPNSCNS, ) - if scan_variables.scan_dim == 2: + if self.data.scan.scan_dim == 2: self.scan_2d() else: self.scan_1d() @@ -825,7 +823,7 @@ def scan_1d(self): # initialise dict which will contain ifail values for each scan point scan_1d_ifail_dict = {} - for iscan in range(1, scan_variables.isweep + 1): + for iscan in range(1, self.data.scan.isweep + 1): self.scan_1d_write_point_header(iscan) start_time = time.time() ifail = self.doopt() @@ -841,13 +839,13 @@ def scan_1d(self): logging_model_handler.clear_logs() # outvar now contains results - self.scan_1d_write_plot() + self.scan_1d_write_plot(self.data.scan) print( " ****************************************** Scan Convergence Summary ****************************************** \n" ) - sweep_values = scan_variables.sweep[: scan_variables.isweep] + sweep_values = self.data.scan.sweep[: self.data.scan.isweep] nsweep_var_name, _ = self.scan_select( - scan_variables.nsweep, scan_variables.sweep, scan_variables.isweep + self.data.scan.nsweep, self.data.scan.sweep, self.data.scan.isweep ) converged_count = 0 # offsets for aligning the converged/unconverged column @@ -856,7 +854,7 @@ def scan_1d(self): max_sweep_value_length - len(str(sweep_val).replace(".", "")) for sweep_val in sweep_values ] - for iscan in range(1, scan_variables.isweep + 1): + for iscan in range(1, self.data.scan.isweep + 1): if scan_1d_ifail_dict[iscan] == 1: converged_count += 1 print( @@ -870,23 +868,23 @@ def scan_1d(self): + " " * offsets[iscan - 1] + "\u001b[31mUNCONVERGED \u001b[0m" ) - converged_percentage = converged_count / scan_variables.isweep * 100 + converged_percentage = converged_count / self.data.scan.isweep * 100 print(f"\nConvergence Percentage: {converged_percentage:.2f}%") def scan_2d(self): """Run a 2-D scan.""" # Initialise intent(out) arrays - self.scan_2d_init() + self.scan_2d_init(self.data.scan) iscan = 1 # initialise array which will contain ifail values for each scan point scan_2d_ifail_list = np.zeros( - (scan_variables.NOUTVARS, scan_variables.IPNSCNS), + (NOUTVARS, IPNSCNS), dtype=np.float64, order="F", ) - for iscan_1 in range(1, scan_variables.isweep + 1): - for iscan_2 in range(1, scan_variables.isweep_2 + 1): + for iscan_1 in range(1, self.data.scan.isweep + 1): + for iscan_2 in range(1, self.data.scan.isweep_2 + 1): self.scan_2d_write_point_header(iscan, iscan_1, iscan_2) start_time = time.time() ifail = self.doopt() @@ -905,13 +903,13 @@ def scan_2d(self): print( " ****************************************** Scan Convergence Summary ****************************************** \n" ) - sweep_1_values = scan_variables.sweep[: scan_variables.isweep] - sweep_2_values = scan_variables.sweep_2[: scan_variables.isweep_2] + sweep_1_values = self.data.scan.sweep[: self.data.scan.isweep] + sweep_2_values = self.data.scan.sweep_2[: self.data.scan.isweep_2] nsweep_var_name, _ = self.scan_select( - scan_variables.nsweep, scan_variables.sweep, scan_variables.isweep + self.data.scan.nsweep, self.data.scan.sweep, self.data.scan.isweep ) nsweep_2_var_name, _ = self.scan_select( - scan_variables.nsweep_2, scan_variables.sweep_2, scan_variables.isweep_2 + self.data.scan.nsweep_2, self.data.scan.sweep_2, self.data.scan.isweep_2 ) converged_count = 0 scan_point = 1 @@ -919,7 +917,7 @@ def scan_2d(self): max_sweep1_value_length = len(str(np.max(sweep_1_values)).replace(".", "")) max_sweep2_value_length = len(str(np.max(sweep_2_values)).replace(".", "")) offsets = np.zeros( - (scan_variables.isweep, scan_variables.isweep_2), dtype=int, order="F" + (self.data.scan.isweep, self.data.scan.isweep_2), dtype=int, order="F" ) for count1, sweep1 in enumerate(sweep_1_values): for count2, sweep2 in enumerate(sweep_2_values): @@ -930,8 +928,8 @@ def scan_2d(self): - len(str(sweep2).replace(".", "")) ) - for iscan_1 in range(1, scan_variables.isweep + 1): - for iscan_2 in range(1, scan_variables.isweep_2 + 1): + for iscan_1 in range(1, self.data.scan.isweep + 1): + for iscan_2 in range(1, self.data.scan.isweep_2 + 1): if scan_2d_ifail_list[iscan_1][iscan_2] == 1: converged_count += 1 print( @@ -948,53 +946,53 @@ def scan_2d(self): ) scan_point += 1 converged_percentage = ( - converged_count / (scan_variables.isweep * scan_variables.isweep_2) * 100 + converged_count / (self.data.scan.isweep * self.data.scan.isweep_2) * 100 ) print(f"\nConvergence Percentage: {converged_percentage:.2f}%") @staticmethod - def scan_2d_init(): + def scan_2d_init(scan_data: ScanData): process_output.ovarin( constants.MFILE, "Number of first variable scan points", "(isweep)", - scan_variables.isweep, + scan_data.isweep, ) process_output.ovarin( constants.MFILE, "Number of second variable scan points", "(isweep_2)", - scan_variables.isweep_2, + scan_data.isweep_2, ) process_output.ovarin( constants.MFILE, "Scanning first variable number", "(nsweep)", - scan_variables.nsweep, + scan_data.nsweep, ) process_output.ovarin( constants.MFILE, "Scanning second variable number", "(nsweep_2)", - scan_variables.nsweep_2, + scan_data.nsweep_2, ) process_output.ovarin( constants.MFILE, "Scanning second variable number", "(nsweep_2)", - scan_variables.nsweep_2, + scan_data.nsweep_2, ) process_output.ovarin( constants.MFILE, "Scanning second variable number", "(nsweep_2)", - scan_variables.nsweep_2, + scan_data.nsweep_2, ) def scan_1d_write_point_header(self, iscan: int): self.data.globals.iscan_global = iscan self.data.globals.vlabel, self.data.globals.xlabel = self.scan_select( - scan_variables.nsweep, scan_variables.sweep, iscan + self.data.scan.nsweep, self.data.scan.sweep, iscan ) process_output.oblnkl(constants.NOUT) @@ -1002,8 +1000,8 @@ def scan_1d_write_point_header(self, iscan: int): process_output.write( constants.NOUT, - f"***** Scan point {iscan} of {scan_variables.isweep} : {self.data.globals.xlabel}" - f", {self.data.globals.vlabel} = {scan_variables.sweep[iscan - 1]} " + f"***** Scan point {iscan} of {self.data.scan.isweep} : {self.data.globals.xlabel}" + f", {self.data.globals.vlabel} = {self.data.scan.sweep[iscan - 1]} " "*****", ) process_output.ostars(constants.NOUT, 110) @@ -1011,22 +1009,22 @@ def scan_1d_write_point_header(self, iscan: int): process_output.ovarin(constants.MFILE, "Scan point number", "(iscan)", iscan) print( - f"Starting scan point {iscan} of {scan_variables.isweep} : " + f"Starting scan point {iscan} of {self.data.scan.isweep} : " f"{self.data.globals.xlabel} , {self.data.globals.vlabel}" - f" = {scan_variables.sweep[iscan - 1]}" + f" = {self.data.scan.sweep[iscan - 1]}" ) def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): - iscan_r = scan_variables.isweep_2 - iscan_2 + 1 if iscan_1 % 2 == 0 else iscan_2 + iscan_r = self.data.scan.isweep_2 - iscan_2 + 1 if iscan_1 % 2 == 0 else iscan_2 # Makes iscan available globally (read-only) self.data.globals.iscan_global = iscan self.data.globals.vlabel, self.data.globals.xlabel = self.scan_select( - scan_variables.nsweep, scan_variables.sweep, iscan_1 + self.data.scan.nsweep, self.data.scan.sweep, iscan_1 ) self.data.globals.vlabel_2, self.data.globals.xlabel_2 = self.scan_select( - scan_variables.nsweep_2, scan_variables.sweep_2, iscan_r + self.data.scan.nsweep_2, self.data.scan.sweep_2, iscan_r ) process_output.oblnkl(constants.NOUT) @@ -1034,9 +1032,9 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): process_output.write( constants.NOUT, - f"***** 2D Scan point {iscan} of {scan_variables.isweep * scan_variables.isweep_2} : " - f"{self.data.globals.vlabel} = {scan_variables.sweep[iscan_1 - 1]} and" - f" {self.data.globals.vlabel_2} = {scan_variables.sweep_2[iscan_r - 1]} " + f"***** 2D Scan point {iscan} of {self.data.scan.isweep * self.data.scan.isweep_2} : " + f"{self.data.globals.vlabel} = {self.data.scan.sweep[iscan_1 - 1]} and" + f" {self.data.globals.vlabel_2} = {self.data.scan.sweep_2[iscan_r - 1]} " "*****", ) process_output.ostars(constants.NOUT, 110) @@ -1045,30 +1043,30 @@ def scan_2d_write_point_header(self, iscan, iscan_1, iscan_2): print( f"Starting scan point {iscan}: {self.data.globals.xlabel}, " - f"{self.data.globals.vlabel} = {scan_variables.sweep[iscan_1 - 1]}" + f"{self.data.globals.vlabel} = {self.data.scan.sweep[iscan_1 - 1]}" f" and {self.data.globals.xlabel_2}, " - f"{self.data.globals.vlabel_2} = {scan_variables.sweep_2[iscan_r - 1]} " + f"{self.data.globals.vlabel_2} = {self.data.scan.sweep_2[iscan_r - 1]} " ) return iscan_r @staticmethod - def scan_1d_write_plot(): - if scan_variables.first_call_1d: + def scan_1d_write_plot(scan_data: ScanData): + if scan_data.first_call_1d: process_output.ovarin( constants.MFILE, "Number of scan points", "(isweep)", - scan_variables.isweep, + scan_data.isweep, ) process_output.ovarin( constants.MFILE, "Scanning variable number", "(nsweep)", - scan_variables.nsweep, + scan_data.nsweep, ) - scan_variables.first_call_1d = False + scan_data.first_call_1d = False def scan_select(self, nwp, swp, iscn): match nwp: diff --git a/process/data_structure/scan_variables.py b/process/data_structure/scan_variables.py index f4cdec5d5a..9e8703ffbe 100644 --- a/process/data_structure/scan_variables.py +++ b/process/data_structure/scan_variables.py @@ -5,156 +5,133 @@ over a range of values of a particular scanning variable. """ +from dataclasses import dataclass, field + import numpy as np -IPNSCNS: int = 1000 +IPNSCNS = 1000 """Maximum number of scan points""" -IPNSCNV: int = 81 +IPNSCNV = 81 """Number of available scan variables""" -NOUTVARS: int = 84 - - -WIDTH: int = 110 - - -scan_dim: int = None -"""1-D or 2-D scan switch (1=1D, 2=2D)""" - - -isweep: int = None -"""Number of scan points to calculate""" - - -isweep_2: int = None -"""Number of 2D scan points to calculate""" - - -nsweep: int = None -"""Switch denoting quantity to scan: