Skip to content

parallel_scan

Classes¤

ParallelScan ¤

ParallelScan(
    scanning_parameter=None,
    start=None,
    end=None,
    step=None,
    working_dir="",
    distribution_file="generator.in",
    run_file="photo_track.in",
    n_parallel=3,
    log_dir="log",
    initial_run_number=None,
    console_log=True,
)

Bases: Core

Parameters:

Name Type Description Default
scanning_parameter str | list

Name of the parameters to be scanned. If multiple parameters are to be scanned, they must be must be inserted as a list of strings. Scanning parameters options can be added later by the self.add_scanning_parameter method (recommanded), this optional parameter can be therefore omitted. Defaults to None.

None
start float | list

Starting value of the scan. If the scan is performed for a single parameter, type is float, otherwise list of floats. Scanning parameters options can be added later by the self.add_scanning_parameter method (recommanded), this optional parameter can be therefore omitted. Defaults to None.

None
end float | list

Final value of the scan. If the scan is performed for a single parameter, type is float, otherwise list of floats. Scanning parameters options can be added later by the self.add_scanning_parameter method (recommanded), this optional parameter can be therefore omitted. Defaults to None.

None
step float | list

Scanning step. If the scan is performed for a single parameter, type is float, otherwise list of floats. Scanning parameters options can be added later by the self.add_scanning_parameter method (recommanded), this optional parameter can be therefore omitted. Defaults to None.

None
working_dir str | Path

description. Defaults to "".

''
distribution_file str | Path

Name of the ASTRA distribution file. Defaults to "generator.in".

'generator.in'
run_file str | Path

Name of the ASTRA run file. Defaults to "photo_track.in".

'photo_track.in'
n_parallel int

Number of parallel ASTRA simulations to be run. Defaults to 3.

3
log_dir str | Path

Name of the log directory, that will be created inside the working directory (can be relative path). Defaults to "log".

'log'
initial_run_number int

Starting number of run that will be used by ASTRA to identify different runs. If left to None (by default), the run number will be assigned automatically. Defaults to None.

None
console_log bool

If False, the log will not be written to the console. Defaults to False.

True
Source code in astra/parallel_scan.py
def __init__(self, scanning_parameter=None, start=None, end=None, step=None, working_dir="", distribution_file="generator.in", run_file="photo_track.in", n_parallel:int=3, log_dir="log", initial_run_number:int=None, console_log:bool=True) -> None:
    """Scanner allowing to perform scan of one or multiple parameters,
    ASTRA simulations are run in parallel to save time.

    Args:
        scanning_parameter (str | list, optional): Name of the parameters to be scanned. If multiple parameters are to be scanned, they must be must be inserted as a list of strings. Scanning parameters options can be added later by the `self.add_scanning_parameter` method (recommanded), this optional parameter can be therefore omitted. Defaults to None.
        start (float | list, optional): Starting value of the scan. If the scan is performed for a single parameter, type is `float`, otherwise list of floats. Scanning parameters options can be added later by the `self.add_scanning_parameter` method (recommanded), this optional parameter can be therefore omitted. Defaults to None.
        end (float | list, optional): Final value of the scan. If the scan is performed for a single parameter, type is `float`, otherwise list of floats. Scanning parameters options can be added later by the `self.add_scanning_parameter` method (recommanded), this optional parameter can be therefore omitted. Defaults to None.
        step (float | list, optional): Scanning step. If the scan is performed for a single parameter, type is `float`, otherwise list of floats. Scanning parameters options can be added later by the `self.add_scanning_parameter` method (recommanded), this optional parameter can be therefore omitted.  Defaults to None.
        working_dir (str | pathlib.Path, optional): _description_. Defaults to "".
        distribution_file (str | pathlib.Path, optional): Name of the ASTRA distribution file. Defaults to "generator.in".
        run_file (str | pathlib.Path, optional): Name of the ASTRA run file. Defaults to "photo_track.in".
        n_parallel (int, optional): Number of parallel ASTRA simulations to be run. Defaults to 3.
        log_dir (str | pathlib.Path, optional): Name of the log directory, that will be created inside the working directory (can be relative path). Defaults to "log".
        initial_run_number (int, optional): Starting number of run that will be used by ASTRA to identify different runs. If left to None (by default), the run number will be assigned automatically. Defaults to None.
        console_log (bool, optional): If False, the log will not be written to the console. Defaults to False.
    """        
    super().__init__(working_dir, distribution_file, run_file, log_dir, "parallel_scan.log", console_log=console_log)

    self.initial_run_number = int(initial_run_number) if str(initial_run_number).strip().lower() != "none" else self.get_first_free_run_number()

    self.scanning_parameters = []

    self.output_handler = self.get_output_file_handler(self.working_dir)
    self.logger.info(f"Using output file: \n {self.output_handler.output_file}")
    self.verify_initial_run_number()

    self.n_paralel = int(n_parallel)
    self.n_steps = None

    self.mean_run_time = 0

    if isinstance(scanning_parameter, list):
        for i in range(len(scanning_parameter)):
            self.add_scanning_parameter(scanning_parameter[i], start[i], end[i], step[i])
    else:
        if scanning_parameter is not None:
            self.add_scanning_parameter(scanning_parameter, start=start, end=end, step=step)

Functions¤

add_scanning_parameter ¤
add_scanning_parameter(
    parameter,
    values=None,
    start=None,
    end=None,
    step=None,
    n_steps=None,
    **kwargs
)

Method to add a parameter to scan. Based on what parameters are specified, the method calculates the rest of them.

For to method not to raise Exception, the parameter must be fully defined. This can be achieved by the following: 1. Full values list (values) 2. Start and end values (start, end) + step size (step) 3. Start and end values (start, end) + number of steps (n_steps)

This list also corresponds to the order at which the definition of the parameter is checked.

Once the parameter is fully defined (one of the steps above is well inputed), the rest of inputed values may be discarded may be internally overwritten to match the calculated values from the full definition.

Parameters:

Name Type Description Default
parameter str

Name of the scanning parameter.

required
values list

Full list of values to be scanned. If specified, defines fully the parameter. Defaults to None.

None
start float

Starting value of the scan. Will be taken into account only if values argument was not specified. For full definition of the parameter, the final argument along with step or n_steps must be specified. Defaults to None.For constant step size, use float type, for varying step sizes, use list of floats. Will be taken into account only if values argument was not specified. For full definition of the parameter, the start and end arguments must be specified. Defaults to None.If not specified, the step size (step) argument is taken. Defaults to None.

None
end float

Final value of the scan. Will be taken into account only if values argument was not specified. For full definition of the parameter, the start argument along with step or n_steps must be specified. Defaults to None.

None
step float | list

Step size to be performed. For constant step size, use float type, for varying step sizes, use list of floats. Will be taken into account only if values argument was not specified. For full definition of the parameter, the start and end arguments must be specified. Defaults to None.

None
n_steps int

Number of constant steps to be performed. Will be taken into account only if values and steps arguments were not specified. For full definition of the parameter, the start and end arguments must be specified.. Defaults to None.

None

Raises:

Type Description
Exception

Raised if neither number of steps (n_step) nor step size (step) is given.

Source code in astra/parallel_scan.py
def add_scanning_parameter(self, parameter, values=None, start=None, end=None, step=None, n_steps=None, **kwargs):
    """Method to add a parameter to scan.
    Based on what parameters are specified, the method calculates the rest of them. 

    For to method not to raise Exception, the parameter must be fully defined. This can be achieved by the following:
    1. Full values list (`values`)
    2. Start and end values (`start`, `end`) + step size (`step`)
    3. Start and end values (`start`, `end`) + number of steps (`n_steps`)

    This list also corresponds to the order at which the definition of the parameter is checked.

    Once the parameter is fully defined (one of the steps above is well inputed), the rest of inputed values may be discarded may be internally overwritten to match the calculated values from the full definition.


    Args:
        parameter (str): Name of the scanning parameter.
        values (list, optional): Full list of values to be scanned. If specified, defines fully the parameter. Defaults to None.
        start (float, optional): Starting value of the scan. Will be taken into account only if `values` argument was not specified. For full definition of the parameter, the `final` argument along with `step` or `n_steps` must be specified. Defaults to None.For constant step size, use `float` type, for varying step sizes, use list of floats. Will be taken into account only if `values` argument was not specified. For full definition of the parameter, the `start` and `end` arguments must be specified. Defaults to None.If not specified, the step size (step) argument is taken. Defaults to None.
        end (float, optional): Final value of the scan. Will be taken into account only if `values` argument was not specified. For full definition of the parameter, the `start` argument along with `step` or `n_steps` must be specified. Defaults to None.
        step (float | list, optional): Step size to be performed. For constant step size, use `float` type, for varying step sizes, use list of floats. Will be taken into account only if `values` argument was not specified. For full definition of the parameter, the `start` and `end` arguments must be specified. Defaults to None.
        n_steps (int, optional): Number of constant steps to be performed. Will be taken into account only if `values` and `steps` arguments were not specified. For full definition of the parameter, the `start` and `end` arguments must be specified.. Defaults to None.

    Raises:
        Exception: Raised if neither number of steps (n_step) nor step size (step) is given.
    """        
    parameter_dictionary = self._parameter_values_initialization(parameter, values, start, end, step, n_steps)
    n_steps = parameter_dictionary["n_steps"]

    if self.n_steps is None:
        self.n_steps = n_steps 
    if n_steps != self.n_steps:
        raise Exception(f"Parameter {parameter} was set with different number of steps ({n_steps}) than it was expected to ({self.n_steps}).")

    self.scanning_parameters += [parameter_dictionary]


    self.logger.info(f"Added scanning parameter {parameter_dictionary['parameter']}")
    self.logger.info(f"Parameter scan starting at: {parameter_dictionary['start']}")
    self.logger.info(f"Parameter scan ending at: {parameter_dictionary['end']}")
    if parameter_dictionary['step'] is not None: 
        self.logger.info(f"Scanning step set to: {parameter_dictionary['step']} (number of steps: {parameter_dictionary['n_steps']})")
    else:
        self.logger.info(f"Scan will be performed for values: " + str(value) for value in parameter_dictionary['values'])
check_lost_particles ¤
check_lost_particles()

Check for particles lost in the runs and logs it.

Source code in astra/parallel_scan.py
def check_lost_particles(self):
    """Check for particles lost in the runs and logs it.
    """
    # load data from output file, where lost particles are listed
    df = self.get_output_file_handler(self.working_dir).output_content
    df = df.sort_values(by=["Run number"])
    n = self.initial_run_number

    self.logger.info("Checking for lost particles.")
    # iterate over the runs in this parallel scan
    while n < self.initial_run_number + self.n_steps:
        try:
            # get the active particle ratio
            active_ratio = float(df.loc[df["Run number"] == n]["Active ratio"])
        except:
            try:
                # if multiple runs with same run number, the mean is taken and warning is trigerred
                active_ratio = df.loc[df["Run number"] == n]["Active ratio"].mean()
                self.logger.warning(f"Multiple runs with run number {n}. Taki   ng mean value of the ratio.")
            except:
                # trigger warning if the run number cant be found in the output file and continue
                self.logger.warning(f"No run with run number {n} in the output file! Skipping.")
                n += 1
                continue
        if math.isnan(active_ratio):
            self.logger.critical(f"Active ratio for run number {n} is NaN. Please investigate.")
        elif active_ratio != 1:
            # trigger warning where there were particles lost
            self.logger.warning(f"Particles lost in run {n}: {100 - round(active_ratio*100)} % of particles lost.")
        else:
            self.logger.info(f"No particles lost in run {n}.")
        n += 1
    self.logger.info("Check complete.")
get_estimated_time_to_finish ¤
get_estimated_time_to_finish()

Calculates the estimated time to finish.

Returns:

Name Type Description
str str

XXh XXm format of time to finish.

Source code in astra/parallel_scan.py
def get_estimated_time_to_finish(self) -> str:
    """Calculates the estimated time to finish.

    Returns:
        str: XXh XXm format of time to finish.
    """
    # if no processes finished yet
    if self.mean_run_time == 0:
        return "(no estimate)"
    # estimate the run time by mean run time times number of parallel packs to be ru
    estimated_run_time = self.mean_run_time*(self.n_steps//self.n_paralel)
    # if the last parallel pack is not full, in is not counted in the previous line and thus mean run time must be added
    estimated_run_time += self.mean_run_time if self.n_steps%self.n_paralel != 0 else 0
    return time_get_h_m(estimated_run_time - (time.time() - self.start_time))
log_config_file ¤
log_config_file(config_file)

Copies the config_file to the log directory

Parameters:

Name Type Description Default
config_file (str, path)

Config file to be logged.

required
Source code in astra/parallel_scan.py
def log_config_file(self, config_file) -> None:
    """Copies the config_file to the log directory

    Args:
        config_file (str, path): Config file to be logged.
    """
    # make sure it is a Path
    config_file = pathlib.Path(config_file)

    # copy the file
    config_file_log = self.log_dir.joinpath(config_file.name)
    shutil.copy(config_file, config_file_log)

    self.logger.info(f"Config file backed to: {config_file_log}")      
run_scan ¤
run_scan()

Runs the scan.

Source code in astra/parallel_scan.py
def run_scan(self) -> None:
    """Runs the scan.
    """
    if self.n_steps is None:
        self.logger.info("No scanning parameters were set. The scan will not be performed.")
        return
    else:
        self.logger.info(f"Total number of {self.n_steps} simulations will be performed, number of parallel simulations is set to {self.n_paralel}.")
    self.logger.info(f"This scan will be performed in runs {self.initial_run_number} -- {self.initial_run_number + self.n_steps-1}.")

    # Setup of the scan properties
    now_running = 0
    processes = []
    self.n_finished = 0
    run_number = self.initial_run_number

    # Setup start time for the estimated finish time
    self.start_time = time.time()

    while self.n_finished < self.n_steps:
        # Iterate iver the current running processes and check, if they ended
        for p,n,process_start_time in processes:
            if p.poll() is not None: # True if process finished
                processes.remove([p,n,process_start_time]) # remove from running processes
                self.update_mean_runtime(time.time() - process_start_time)# Must be called before the self.n_finished is updated
                # update scan properties
                now_running -= 1
                self.n_finished += 1
                self.logger.info(f"Process with run number {n} finished after {time_get_h_m(time.time() - process_start_time)}.")

        # submit new processes until the limit is reached    
        while now_running < self.n_paralel and self.n_steps > self.n_finished + now_running:
            # this is to allow to submit the process only if the astra lock is released
            with self.astraLock.acquire():
                pass
            p = subprocess.Popen(["python", pathlib.Path(self.config["Paths"]["base_dir"]).joinpath("astra/run.py"), 
                                "--working_dir", str(self.working_dir), 
                                "--distribution_file", self.distribution_file.name, 
                                "--run_file", self.run_file.name, 
                                "--run_number", str(run_number), 
                                "--set_parameters", str(
                                    {self.scanning_parameters[i]["parameter"]: self.scanning_parameters[i]["values"][run_number-self.initial_run_number]
                                        for i in range(len(self.scanning_parameters))
                                        }
                                    ).replace("'",'"'),
                                    ], 
                                shell=True, stdin=subprocess.PIPE)
            self.logger.info(f"Submitted new process with run number {run_number}.")
            processes = processes + [[p, run_number, time.time()]] # update list of running properties
            # update run properties
            run_number += 1
            now_running += 1
            # this is to prevent warning then all try to write at once to the same NORAN file and correct astraLock acquiry
            time.sleep(1) 

        # Logging section
        run_time = time.time() - self.start_time
        self.logger.info(f"Total: {self.n_steps} :: Now running: {now_running}/{self.n_paralel} :: Finished: {self.n_finished} :: Waiting: {self.n_steps - self.n_finished - now_running} :: Time running: {time_get_h_m(run_time)} :: Estimated time to finish: {self.get_estimated_time_to_finish()}")

        # if there are processes running, wait until new check is made   
        if now_running != 0: 
            time.sleep(10)

    self.logger.info("Scan completed.")

    self.check_lost_particles()
update_mean_runtime ¤
update_mean_runtime(time)

Updates the mean run time.

Parameters:

Name Type Description Default
time int

Time after the running process finished.

required
Source code in astra/parallel_scan.py
def update_mean_runtime(self, time:int) -> None:
    """Updates the mean run time.

    Args:
        time (int): Time after the running process finished.
    """
    # Must be called before the self.n_finished is updated
    self.mean_run_time = (self.mean_run_time*self.n_finished + time)/(self.n_finished+1)
verify_initial_run_number ¤
verify_initial_run_number()

Checks if run number is free. If not, the run number is set to first free run number.

Run number is considered as free, if all run number equal or higher than this are not taken.

Source code in astra/parallel_scan.py
def verify_initial_run_number(self):
    """Checks if run number is free. If not, the run number is set to first free run number.

    Run number is considered as free, if all run number equal or higher than this are not taken.
    """
    # check if inital run number is higher than the first free number
    if self.get_first_free_run_number() < self.initial_run_number:
        # if not, set it to the first free run number
        self.logger.warning(f"Run number {self.initial_run_number} has already been taken!")
        self.initial_run_number = self.get_first_free_run_number()
        self.info(f"Initial run number was set to the first free run number, which is {self.initial_run_number}.")

Last update: October 31, 2023
Created: October 31, 2023