controller.py

# SPDX-License-Identifier: AGPL-3.0-or-later
# Copyright (C) 2025 SWGY, Inc
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
import csv
import logging
import model
import numpy as np

from concurrent.futures import ProcessPoolExecutor, as_completed
from dataclasses import dataclass
from datetime import datetime

@dataclass
class BlastResult:
    """Stores pressure readings for each armor configuration"""
    p0: float
    x: float
    y: float
    z: float
    full_armor: float
    helmet_only: float
    vest_only: float
    no_armor: float

@dataclass
class ControllerParams:
    """Global settings for this controller."""
    p0: float
    resolution_arcmin: int
    max_bounces: int
    max_length: float
    max_workers: int
    min_blast_dist: float


def compute_blast_result(blast_x: float, blast_y: float, blast_z: float,
                         res: int, mb: int, ml: float, P0_KPA: float,
                         helmet, vest, sensor):
    """
    Simulates the pressure exerted by a blast at a given location and calculates the total impulse
    for different armor configurations.

    The function simulates the blast pressure and impulse for the following configurations:
    1. Full armor (helmet and vest)
    2. Helmet only
    3. Vest only
    4. No armor

    The results are computed by tracing the blast rays and simulating the pressure based on the
    armor configurations and other parameters like resolution, max bounces, and max ray length.

    Args:
        blast_x (float): The x-coordinate of the blast point (in meters).
        blast_y (float): The y-coordinate of the blast point (in meters).
        blast_z (float): The z-coordinate of the blast point (in meters).
        res (int): The resolution of the blast ray tracing (in arc-minutes).
        mb (int): The maximum number of bounces for ray tracing.
        ml (float): The maximum length to trace the blast ray (in meters).
        P0_KPA (float): The initial peak pressure of the blast (in kilopascals).
        helmet (object): The helmet geometry object for armor simulation.
        vest (object): The vest geometry object for armor simulation.
        sensor (object): The sensor object used to simulate pressure and measure impulse.

    Returns:
        BlastResult: An object containing the computed total impulse for each armor configuration:
            - `full_armor`: Impulse with both helmet and vest.
            - `helmet_only`: Impulse with helmet only.
            - `vest_only`: Impulse with vest only.
            - `no_armor`: Impulse with no armor.

    Notes:
        The function uses the `simulate_pressure` method from the `model` to simulate the blast pressure
        and compute the total impulse for each armor configuration. The method uses ray tracing to compute
        the pressure at each configuration's point of impact.
    """
    bp_n = np.array([blast_x, blast_y, blast_z])

    full_armor = model.simulate_pressure(
        geometry=[helmet, vest], blast_point=bp_n, sensor=sensor,
        extents=model.compute_extents([], bp_n, sensor),
        resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
        max_length=ml, max_bounces=mb,
    ).total_impulse

    helmet_only = model.simulate_pressure(
        geometry=[helmet], blast_point=bp_n, sensor=sensor,
        extents=model.compute_extents([], bp_n, sensor),
        resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
        max_length=ml, max_bounces=mb,
    ).total_impulse

    vest_only = model.simulate_pressure(
        geometry=[vest], blast_point=bp_n, sensor=sensor,
        extents=model.compute_extents([], bp_n, sensor),
        resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
        max_length=ml, max_bounces=mb,
    ).total_impulse

    no_armor = model.simulate_pressure(
        geometry=[], blast_point=bp_n, sensor=sensor,
        extents=model.compute_extents([], bp_n, sensor),
        resolution_arcmin=res, ray_callback=model.trace_ray, p0=P0_KPA,
        max_length=ml, max_bounces=mb,
    ).total_impulse

    return BlastResult(
        P0_KPA, bp_n[0], bp_n[1], bp_n[2],
        full_armor, helmet_only, vest_only, no_armor
    )

def parallel_calculation(params: ControllerParams, blast_cube_center,
                         x_offsets, y_offsets, z_offsets,
                         helmet, vest, sensor):
    """
    Perform a parallelized calculation of blast results at multiple locations
    within a cube of blast points.

    This function divides the cube into discrete segments and performs parallel
    calculations to simulate the blast pressure and impulse at each point,
    using different armor configurations (full armor, helmet only, vest only,
    and no armor). The results are computed using multiple processes to speed
    up the simulation.

    Args:
        params (ControllerParams): An object containing simulation parameters,
            including resolution, maximum bounces, maximum ray length, and initial
            peak pressure.
        blast_cube_center (tuple): A tuple representing the center of the cube
            (x, y, z) where the blast points are located.
        x_offsets (list of float): A list of x-offset values for the cube grid.
        y_offsets (list of float): A list of y-offset values for the cube grid.
        z_offsets (list of float): A list of z-offset values for the cube grid.
        helmet (object): The MICH helmet geometry object for simulation.
        vest (object): The SAPI plate geometry object for simulation.
        sensor (object): The sensor object for simulating the blast pressure
            and impulse.

    Returns:
        list: A list of `BlastResult` objects, each containing the calculated
            total impulse for different armor configurations (full armor, helmet
            only, vest only, and no armor) for each blast point.

    Notes:
        - The cube is divided into grid points using the provided offsets for
          x, y, and z dimensions.
        - The calculation of the blast result is done using the
          `compute_blast_result` function, which simulates 
          pressure and impulse using ray tracing.
        - Progress updates are logged every 5% of the total tasks completed
          during the parallel processing.
        - The results are sorted by the x, y, and z coordinates before being
          returned.

    Example:
        results = parallel_calculation(params, (0, 0, 0), x_offsets, y_offsets,
                                       z_offsets, helmet, vest, sensor)
    """

    blast_locs = [(x + blast_cube_center[0],
                   y + blast_cube_center[1],
                   z + blast_cube_center[2])
                  for x in x_offsets
                  for y in y_offsets
                  for z in z_offsets]

    # Generate filename based on current date and time
    filename = datetime.now().strftime("blast-results-%Y-%m-%d-%H%M.csv")

    # Collect results in a list for final sorting and CSV writing
    results = []

    total_blasts = len(blast_locs)
    # Emit an INFO update every so often
    progress_interval = max(1, total_blasts // 50)

    logging.info(f"Running {total_blasts} blast simulations.")

    # Use parallel processing with up to 4 workers
    with ProcessPoolExecutor(max_workers=params.max_workers) as executor:
        # Submit all blast point jobs
        future_to_offset = {
            executor.submit(
                compute_blast_result,
                x, y, z,
                params.resolution_arcmin,
                params.max_bounces,
                params.max_length,
                params.p0,
                helmet, vest, sensor
            ): (x, y, z)
            for (x, y, z) in blast_locs
            if model.calculate_distance((x, y, z), sensor.origin) >= params.min_blast_dist
        }

        completed_tasks = 0
        for future in as_completed(future_to_offset):
            res = future.result()
            results.append(res)
            completed_tasks += 1

            if completed_tasks % progress_interval == 0:
                percent_complete = (completed_tasks / total_blasts) * 100
                logging.info(f"Progress: {percent_complete:.1f}% complete")

    # Sort the results by (x, y, z) so they proceed spatially
    # in a consistent order
    results.sort(key=lambda r: (r.x, r.y, r.z))

    logging.info("Finished parallel simulation")
    return results

def sequential_calculation(params: ControllerParams, blast_cube_center,
                           x_offsets, y_offsets, z_offsets,
                           helmet, vest, sensor):
    """
    Perform a sequential calculation of blast results at multiple locations
    within a cube of blast points.

    This function iterates over all the blast locations in the cube and
    calculates the blast result for each location using the
    `compute_blast_result` function. It simulates the pressure and impulse for
    different armor configurations (full armor, helmet only, vest only, and no
    armor). The results are computed sequentially, one by one, and progress is
    logged.

    Args:
        params (ControllerParams): An object containing simulation parameters,
            including resolution, maximum bounces, maximum ray length, and initial
            peak pressure.
        blast_cube_center (tuple): A tuple representing the center of the cube
            (x, y, z) where the blast points are located.
        x_offsets (list of float): A list of x-offset values for the cube grid.
        y_offsets (list of float): A list of y-offset values for the cube grid.
        z_offsets (list of float): A list of z-offset values for the cube grid.
        helmet (object): The MICH helmet geometry object for simulation.
        vest (object): The SAPI plate geometry object for simulation.
        sensor (object): The sensor object for simulating the blast pressure
            and impulse.

    Returns:
        list: A list of `BlastResult` objects, each containing the calculated
            total impulse for different armor configurations (full armor, helmet
            only, vest only, and no armor) for each blast point.

    Notes:
        - The cube is divided into grid points using the provided offsets for
          x, y, and z dimensions.
        - The calculation of the blast result is done sequentially by iterating
          over each point in the grid.
        - Progress updates are logged every 5% of the total tasks completed
          during the sequential processing.
        - The results are sorted by the x, y, and z coordinates before being
          returned.

    Example:
        results = sequential_calculation(params, (0, 0, 0),
                                        x_offsets, y_offsets, z_offsets,
                                        helmet, vest, sensor)
    """

    blast_locs = [(x + blast_cube_center[0],
                   y + blast_cube_center[1],
                   z + blast_cube_center[2])
                  for x in x_offsets
                  for y in y_offsets
                  for z in z_offsets]

    # Generate filename based on current date and time
    filename = datetime.now().strftime("blast-results-%Y-%m-%d-%H%M.csv")

    # Collect results in a list for final sorting and CSV writing
    results = []

    total_blasts = len(blast_locs)
    # Emit an INFO update every so often
    progress_interval = max(1, total_blasts // 50)

    logging.info(f"Running {total_blasts} blast simulations.")

    # Perform calculations sequentially by iterating over all blast locations
    for idx, (x, y, z) in enumerate(blast_locs, 1):
        if model.calculate_distance((x, y, z), sensor.origin) >= params.min_blast_dist:
            continue
        # Compute the blast result for each location
        res = compute_blast_result(
                x, y, z,
                params.resolution_arcmin,
                params.max_bounces,
                params.max_length,
                params.p0,
                helmet, vest, sensor
        )
        results.append(res)

        if idx % progress_interval == 0:
            percent_complete = (idx / total_blasts) * 100
            logging.info(f"Progress: {percent_complete:.1f}% complete")

    # Sort the results by (x, y, z) so they proceed spatially in a consistent order
    results.sort(key=lambda r: (r.x, r.y, r.z))
    logging.info("Finished sequential simulation")
    return results