Source code for ppModule.binFiles.read_grid

"""
    Module to read grid files from Musicaa. The grid files are in binary file format.

    The grid files are read from the repository and a tuple containing the grid data is returned.
    The grid data is stored in dictionnaries containing x, y, z coordinates.

    The grid data is read from the binary file using the numpy.fromfile() function.
    The grid data is reshaped to the correct dimensions using the numpy.reshape() function.

    Classes:
    --------
    ReadGrid:  class to read grid from binary files, this clss also handles 
               old classes without ghost points.

"""
import os
import logging
import numpy as np
# Import enecessary classes:
from ppModule.iniFiles.read_ini import InfoReader

# Set up logging
logger = logging.getLogger(__name__)

[docs] class ReadGrid(): """ Abstract class to implement some common methods and attributes for reading grid files. Attributes: ----------- dir (str): Directory containing the grid files. nbloc (int): Number of blocks in the domain. info (dict): Dictionary containing information about the grid. ngh (int): Number of ghost cells. """ def __init__(self, directory: str, config: dict) -> None: self._directory = directory # Info about blocks to read data: self.ngh = config.get("ngh", -1) self.endianess = config.get("endianess", "little") self.full_3d = config.get("full_3d", False) self.new_grid = config.get("new_grid", True) self.info = self._get_info() # =============== Public methods: def _get_info(self): """ Method to read the info.ini file """ reader = InfoReader(self._directory+"/info.ini") info_block = reader.info # if self.ngh == -1 and self.new_grid: logger.debug("No number of ghost points provided, for a new grid. " \ "Trying to read from info.ini") if reader.get_value(key="ngh") is not None: self.ngh = int(reader.get_value(key="ngh")) else: logger.debug("No number of ghost points provided in info.ini, " \ "using default value 5") self.ngh = 5 if not self.new_grid: logger.debug("Old grid files without ghost points") self.ngh = 0 return info_block
[docs] def read_grid(self): """ Method to read grid over whole domain and returns a grid without ghost points. The method takes care of old grid as well for old simulations using Musicaa. The grid is stored in a dictionnary containing: x, y, & z coordinates. Returns: tuple: Tuple containing grid data (x, y, z). """ def extract_grid(data, ngh, is_curv, full_3d): if is_curv == "F": return data[ngh:-ngh] if is_curv == "T" and not full_3d: return data[ngh:-ngh, ngh:-ngh] return data[ngh:-ngh, ngh:-ngh, ngh:-ngh] ngh = self.ngh if isinstance(self.ngh, (float, int)) else 0 if self.new_grid: logger.debug("Number of ghost points to be substracted form grid: %s", ngh) else: logger.debug("Old grid files without ghost points") end_filename = f"_ngh{ngh}.bin" if self.new_grid else ".bin" x, y, z = {}, {}, {} x_ex, y_ex, z_ex = {}, {}, {} for nbl in range(1, self.info["nbloc"] + 1): file = f"grid_bl{nbl}{end_filename}" nx = self.info[f"block {nbl}"]["nx"] + 2 * ngh ny = self.info[f"block {nbl}"]["ny"] + 2 * ngh nz = self.info[f"block {nbl}"]["nz"] + 2 * ngh \ if self.info[f"block {nbl}"]["nz"] > 1 else 1 x_ex[nbl], y_ex[nbl], z_ex[nbl] = self._read_one_block(file, nx, ny, nz) if self.new_grid and not self.full_3d: x[nbl] = extract_grid(x_ex[nbl], ngh, self.info["is_curv"], self.full_3d) y[nbl] = extract_grid(y_ex[nbl], ngh, self.info["is_curv"], self.full_3d) if nz > 1: z[nbl] = extract_grid(z_ex[nbl], ngh, "F", self.full_3d) else: z[nbl] = np.zeros(1) elif self.new_grid and self.full_3d: x[nbl] = extract_grid(x_ex[nbl], ngh, self.info["is_curv"], self.full_3d) y[nbl] = extract_grid(y_ex[nbl], ngh, self.info["is_curv"], self.full_3d) z[nbl] = extract_grid(z_ex[nbl], ngh, self.info["is_curv"], self.full_3d) else: x[nbl], y[nbl], z[nbl] = self._read_old_grid_block(file, nx, ny, nz) logger.info("Done reading grid from binary files") return x, y, z
# ============== Private methods: # ========= # Read grid # ========= def _read_one_block(self, filename, ngi, ngj, ngk): """ Reads the grid from one block given the filename and other parameters. Args: filename (str): Name of the binary file containing the grid data. ngi (int): Number of points in the i-dimension. ngj (int): Number of points in the j-dimension. ngk (int): Number of points in the k-dimension. Returns: tuple: Tuple containing grid data (x, y, z). """ sens = '<' if self.endianess == "little" else '>' # Reading of the grid # ------------------- try: with open(self._directory + "/" + filename, "rb") as f: if self.info["is_curv"] == "F": x = np.fromfile(f, dtype=(sens + 'f8'), count=ngi).reshape((ngi), order='F') y = np.fromfile(f, dtype=(sens + 'f8'), count=ngj).reshape((ngj), order='F') z = np.fromfile(f, dtype=(sens + 'f8'), count=ngk).reshape((ngk), order='F') elif self.info["is_curv"] == "T" and not self.full_3d: x = np.fromfile(f, dtype=(sens + 'f8'), count=ngi * ngj).reshape((ngi, ngj), order='F') y = np.fromfile(f, dtype=(sens + 'f8'), count=ngi * ngj).reshape((ngi, ngj), order='F') z = np.fromfile(f, dtype=(sens + 'f8'), count=ngk).reshape((ngk), order='F') if ngk > 1 else np.zeros(1) elif self.info["is_curv"] == "T" and self.full_3d: x = np.fromfile(f, dtype=(sens + 'f8'), count=ngi * ngj * ngk ).reshape((ngi, ngj, ngk), order='F') y = np.fromfile(f, dtype=(sens + 'f8'), count=ngi * ngj * ngk ).reshape((ngi, ngj, ngk), order='F') z = np.fromfile(f, dtype=(sens + 'f8'), count=ngi * ngj * ngk ).reshape((ngi, ngj, ngk), order='F') else: raise ValueError("Invalid value for is_curv or is_3D_curv.") except FileNotFoundError as f: raise SystemExit(f"File {filename} not found...") from f except Exception as e: raise SystemExit(f"An error occurred while reading the file {filename}: {e}") from e return x, y, z def _read_old_grid_block(self, filename, nx, ny, nz): """Reads the grid from one block given the filename and other parameters. Args: filename (str): Name of the binary file containing the grid data. verbose (bool, optional): Whether to print verbose output. Defaults to False. is_sw_rv (bool, optional): TBD. Defaults to False. Returns: tuple: Tuple containing grid data. """ # Reading of the grid # ------------------- sens = '<' if self.endianess == "little" else '>' try: f = open(self.directory+"/"+filename, "r") except FileNotFoundError as exc: raise SystemExit(f"File {filename} not found...") from exc arg = np.fromfile(f, dtype=(sens+'i4'), count=1) nx_ = np.fromfile(f, dtype=(sens+'i4'), count=1)[0] #=============================================================================== arg = np.fromfile(f, dtype=(sens+'i8'), count=1) ny_ = np.fromfile(f, dtype=(sens+'i4'), count=1)[0] #=============================================================================== arg = np.fromfile(f, dtype=(sens+'i8'), count=1) nz_ = np.fromfile(f, dtype=(sens+'i4'), count=1)[0] if self.info["is_curv"] == "T": x = np.zeros((nx, ny),dtype='float',order='F') for j in range(ny): x[:,j] = np.fromfile(f, dtype=(sens+'f8'), count=nx) #=============================================================================== arg = np.fromfile(f, dtype=(sens+'i8'), count=1) y = np.zeros((nx, ny),dtype='float',order='F') for j in range(ny): y[:,j] = np.fromfile(f, dtype=(sens+'f8'), count=nx) #=============================================================================== if nz>1: arg = np.fromfile(f, dtype=(sens+'i8'), count=1) z = np.fromfile(f, dtype=(sens+'f8'), count=nz) else: z = np.zeros(1) else: x = np.fromfile(f, dtype=(sens+'f8'), count=nx) #=============================================================================== arg = np.fromfile(f, dtype=(sens+'i8'), count=1) y = np.fromfile(f, dtype=(sens+'f8'), count=ny) #=============================================================================== if nz > 1: arg = np.fromfile(f, dtype=(sens+'i8'), count=1) z = np.fromfile(f, dtype=(sens+'f8'), count=nz) #=============================================================================== else: z = np.zeros(1) f.close() return x, y, z # ============ # Check if the directory exists: @property def directory(self): """ Property to get the directory where the grid can be found. """ return self._directory @directory.setter def directory(self, value): """ Property setter to check if the directory exists """ if not os.path.exists(value): raise ValueError(f"Directory {value} does not exist.") self._directory = value