Source code for sharpy.generators.turbvelocityfield

import numpy as np
import scipy.interpolate as interpolate
import h5py as h5
import os
from lxml import objectify, etree

import sharpy.utils.generator_interface as generator_interface
import sharpy.utils.settings as settings
import sharpy.utils.cout_utils as cout


[docs]@generator_interface.generator class TurbVelocityField(generator_interface.BaseGenerator): r""" Turbulent Velocity Field Generator ``TurbVelocitityField`` is a class inherited from ``BaseGenerator`` The ``TurbVelocitityField`` class generates a velocity field based on the input from an [XDMF](http://www.xdmf.org) file. It supports time-dependant fields as well as frozen turbulence. To call this generator, the ``generator_id = TurbVelocityField`` shall be used. This is parsed as the value for the ``velocity_field_generator`` key in the desired aerodynamic solver's settings. Supported files: - `field_id.xdmf`: Steady or Unsteady XDMF file This generator also performs time interpolation between two different time steps. For now, only linear interpolation is possible. Space interpolation is done through `scipy.interpolate` trilinear interpolation. However, turbulent fields are read directly from the binary file and not copied into memory. This is performed using `np.memmap`. The overhead of this procedure is ~18% for the interpolation stage, however, initially reading the binary velocity field (which will be much more common with time-domain simulations) is faster by a factor of 1e4. Also, memory savings are quite substantial: from 6Gb for a typical field to a handful of megabytes for the whole program. Args: in_dict (dict): Input data in the form of dictionary. See acceptable entries below: Attributes: See Also: .. py:class:: sharpy.utils.generator_interface.BaseGenerator """ generator_id = 'TurbVelocityField' settings_types = dict() settings_default = dict() settings_description = dict() settings_types['print_info'] = 'bool' settings_default['print_info'] = True settings_description['print_info'] = 'Output solver-specific information in runtime.' settings_types['turbulent_field'] = 'str' settings_default['turbulent_field'] = None settings_description['turbulent_field'] = 'XDMF file path of the velocity field' settings_types['offset'] = 'list(float)' settings_default['offset'] = np.zeros((3,)) settings_description['offset'] = 'Spatial offset in the 3 dimensions' settings_types['centre_y'] = 'bool' settings_default['centre_y'] = True settings_description['centre_y'] = 'Flat for changing the domain to [``-y_max/2``, ``y_max/2``]' settings_types['periodicity'] = 'str' settings_default['periodicity'] = 'xy' settings_description['periodicity'] = 'Axes in which periodicity is enforced' settings_types['frozen'] = 'bool' settings_default['frozen'] = True settings_description['frozen'] = 'If ``True``, the turbulent field will not be updated in time' settings_types['store_field'] = 'bool' settings_default['store_field'] = False settings_description['store_field'] = 'If ``True``, the xdmf snapshots are stored in memory. Only two at a time for the linear interpolation' settings_table = settings.SettingsTable() __doc__ += settings_table.generate(settings_types, settings_default, settings_description) def __init__(self): self.in_dict = dict() self.settings = dict() self.file = None self.extension = None self.grid_data = dict() self.interpolator = 3*[None] self.x_periodicity = False self.y_periodicity = False # variables for interpolator wrapper self._t0 = -1 self._t1 = -1 self._it0 = -1 self._it1 = -1 self._interpolator0 = None self._interpolator1 = None self.coeff = 0. self.double_initialisation = True self.vel_holder0 = 3*[None] self.vel_holder1 = 3*[None] def initialise(self, in_dict): self.in_dict = in_dict settings.to_custom_types(self.in_dict, self.settings_types, self.settings_default) self.settings = self.in_dict _, self.extension = os.path.splitext(self.settings['turbulent_field']) if self.extension is '.h5': self.read_btl(self.settings['turbulent_field']) if self.extension in '.xdmf': self.read_xdmf(self.settings['turbulent_field']) if 'z' in self.settings['periodicity']: raise ValueError('Periodicitiy setting in TurbVelocityField cannot be z.\n A turbulent boundary layer is not periodic in the z direction!') if 'x' in self.settings['periodicity']: self.x_periodicity = True if 'y' in self.settings['periodicity']: self.y_periodicity = True # ADC: VERY VERY UGLY. NEED A BETTER WAY def interpolator_wrapper0(self, coords, i_dim=0): coeff = self.get_coeff() return (1.0 - self.coeff)*self._interpolator0[i_dim](coords) + self.coeff*self._interpolator1[i_dim](coords) def interpolator_wrapper1(self, coords, i_dim=1): coeff = self.get_coeff() return (1.0 - self.coeff)*self._interpolator0[i_dim](coords) + self.coeff*self._interpolator1[i_dim](coords) def interpolator_wrapper2(self, coords, i_dim=2): coeff = self.get_coeff() return (1.0 - self.coeff)*self._interpolator0[i_dim](coords) + self.coeff*self._interpolator1[i_dim](coords) def get_coeff(self): return self.coeff def init_interpolator(self): if self.settings['frozen']: self.interpolator = self._interpolator0 return # continuing the ugliness self.interpolator[0] = self.interpolator_wrapper0 self.interpolator[1] = self.interpolator_wrapper1 self.interpolator[2] = self.interpolator_wrapper2 # these functions need to define the interpolators
[docs] def read_btl(self, in_file): """ Legacy function, not using the custom format based on HDF5 anymore. """ raise NotImplementedError('The BTL reader is not up to date!')
[docs] def read_xdmf(self, in_file): """ Reads the xml file `<case_name>.xdmf`. Writes the self.grid_data data structure with all the information necessary. Note: this function does not load any turbulence data (such as ux000, ...), it only reads the header information contained in the xdmf file. """ # store route of file for the other files self.route = os.path.dirname(os.path.abspath(in_file)) # file to string with open(in_file, 'r') as self.file: data = self.file.read().replace('\n', '') # parse data # this next line is necessary to avoid problems with parsing in the Time part: # <!--Start.... # 0.0, 1.0 ... # see https://stackoverflow.com/a/18313932 parser = objectify.makeparser(remove_comments=True) tree = objectify.fromstring(data, parser=parser) # mesh dimensions self.grid_data['dimensions'] = np.fromstring(tree.Domain.Topology.attrib['Dimensions'], sep=' ', count=3, dtype=int) # origin self.grid_data['origin'] = np.fromstring(tree.Domain.Geometry.DataItem[0].text, sep=' ', count=int(tree.Domain.Geometry.DataItem[0].attrib['Dimensions']), dtype=float) # dxdydz # because of how XDMF does it, it is actually dzdydx self.grid_data['dxdydz'] = ( np.fromstring(tree.Domain.Geometry.DataItem[1].text, sep=' ', count=int(tree.Domain.Geometry.DataItem[1].attrib['Dimensions']), dtype=float)) # now onto the grid # time information # [0] is start, [1] is stride self.grid_data['time'] = np.fromstring(tree.Domain.Grid.Time.DataItem.text, sep=' ', count=2, dtype=float) self.grid_data['n_grid'] = len(tree.Domain.Grid.Grid) # self.grid_data['grid'] = [dict()]*self.grid_data['n_grid'] self.grid_data['grid'] = [] for i, i_grid in enumerate(tree.Domain.Grid.Grid): self.grid_data['grid'].append(dict()) # cycle through attributes for k_attrib, v_attrib in i_grid.attrib.items(): self.grid_data['grid'][i][k_attrib] = v_attrib # get Attributes (upper case A is not a mistake) for i_attrib, attrib in enumerate(i_grid.Attribute): self.grid_data['grid'][i][attrib.attrib['Name']] = dict() self.grid_data['grid'][i][attrib.attrib['Name']]['file'] = ( attrib.DataItem.text.replace(' ', '')) if attrib.DataItem.attrib['Precision'].strip() == '4': self.grid_data['grid'][i][attrib.attrib['Name']]['Precision'] = np.float32 elif attrib.DataItem.attrib['Precision'].strip() == '8': self.grid_data['grid'][i][attrib.attrib['Name']]['Precision'] = np.float64 # now we have the file names and the dimensions self.grid_data['initial_x_grid'] = np.array(np.arange(0, self.grid_data['dimensions'][2]))*self.grid_data['dxdydz'][2] # z in the file is -y for us in sharpy (y_sharpy = right) self.grid_data['initial_y_grid'] = np.array(np.arange(0, self.grid_data['dimensions'][1]))*self.grid_data['dxdydz'][1] # y in the file is z for us in sharpy (up) self.grid_data['initial_z_grid'] = np.array(np.arange(0, self.grid_data['dimensions'][0]))*self.grid_data['dxdydz'][0] # the domain now goes: # x \in [0, dimensions[0]*dx] # y \in [-dimensions[2]*dz, 0] # z \in [0, dimensions[1]*dy] centre_z_offset = 0. if self.settings['centre_y']: centre_z_offset = -0.5*(self.grid_data['initial_z_grid'][-1] - self.grid_data['initial_z_grid'][0]) self.grid_data['initial_x_grid'] += self.settings['offset'][0] + self.grid_data['origin'][0] self.grid_data['initial_x_grid'] -= np.max(self.grid_data['initial_x_grid']) self.grid_data['initial_y_grid'] += self.settings['offset'][1] + self.grid_data['origin'][1] self.grid_data['initial_z_grid'] += self.settings['offset'][2] + self.grid_data['origin'][2] + centre_z_offset self.bbox = self.get_field_bbox(self.grid_data['initial_x_grid'], self.grid_data['initial_y_grid'], self.grid_data['initial_z_grid'], frame='G') if self.settings['print_info']: cout.cout_wrap('The domain bbox is:', 1) cout.cout_wrap(' x = [' + str(self.bbox[0, 0]) + ', ' + str(self.bbox[0, 1]) + ']', 1) cout.cout_wrap(' y = [' + str(self.bbox[1, 0]) + ', ' + str(self.bbox[1, 1]) + ']', 1) cout.cout_wrap(' z = [' + str(self.bbox[2, 0]) + ', ' + str(self.bbox[2, 1]) + ']', 1)
def generate(self, params, uext): zeta = params['zeta'] for_pos = params['for_pos'] t = params['t'] self.update_cache(t) self.update_coeff(t) self.init_interpolator() self.interpolate_zeta(zeta, for_pos, uext) def update_cache(self, t): self.double_initialisation = False if self.settings['frozen']: if self._interpolator0 is None: self._t0 = self.timestep_2_time(0) self._it0 = 0 self._interpolator0 = self.read_grid(self._it0, i_cache=0) return # most common case: t already in the [t0, t1] interval if self._t0 <= t <= self._t1: return # t < t0, something weird (time going backwards) if t < self._t0: raise ValueError('Please make sure everything is ok. Your time is going backwards.') # t > t1, need initialisation if t > self._t1: new_it = self.time_2_timestep(t) # new timestep requires initialising the two of them (not likely at all) # this means that the simulation timestep > LES timestep if new_it > self._it1: self.double_initialisation = True else: # t1 goes to t0 self._t0 = self._t1 self._it0 = self._it1 self._interpolator0 = self._interpolator1.copy() # t1 updates to the next (new_it + 1) self._it1 = new_it + 1 self._t1 = self.timestep_2_time(self._it1) self._interpolator1 = self.read_grid(self._it1, i_cache=1) return # last case, both interp need to be initialised if (self._t0 is None or self.double_initialisation): self._t0 = self.timestep_2_time(new_it) self._it0 = new_it self._interpolator0 = self.read_grid(self._it0, i_cache=0) self._it1 = new_it + 1 self._t1 = self.timestep_2_time(self._it1) self._interpolator1 = self.read_grid(self._it1, i_cache=1) def update_coeff(self, t): if self.settings['frozen']: self.coeff = 0.0 return self.coeff = self.linear_coeff([self._t0, self._t1], t) return def time_2_timestep(self, t): return int(max(0, np.floor((t - self.grid_data['time'][0])/self.grid_data['time'][1]))) def timestep_2_time(self, it): return it*self.grid_data['time'][1] + self.grid_data['time'][0] def get_field_bbox(self, x_grid, y_grid, z_grid, frame='G'): bbox = np.zeros((3, 2)) bbox[0, :] = [np.min(x_grid), np.max(x_grid)] bbox[1, :] = [np.min(y_grid), np.max(y_grid)] bbox[2, :] = [np.min(z_grid), np.max(z_grid)] if frame == 'G': bbox[:, 0] = self.gstar_2_g(bbox[:, 0]) bbox[:, 1] = self.gstar_2_g(bbox[:, 1]) return bbox def create_interpolator(self, data, x_grid, y_grid, z_grid, i_dim): interpolator = interpolate.RegularGridInterpolator((x_grid, y_grid, z_grid), data, bounds_error=False, fill_value=0.0) return interpolator def interpolate_zeta(self, zeta, for_pos, u_ext, interpolator=None, offset=np.zeros((3))): if interpolator is None: interpolator = self.interpolator for isurf in range(len(zeta)): _, n_m, n_n = zeta[isurf].shape for i_m in range(n_m): for i_n in range(n_n): coord = self.g_2_gstar(self.apply_periodicity(zeta[isurf][:, i_m, i_n] + for_pos[0:3] + offset)) for i_dim in range(3): try: u_ext[isurf][i_dim, i_m, i_n] = self.interpolator[i_dim](coord) except ValueError: print(coord) raise ValueError() u_ext[isurf][:, i_m, i_n] = self.gstar_2_g(u_ext[isurf][:, i_m, i_n]) @staticmethod def periodicity(x, bbox): try: new_x = bbox[0] + divmod(x - bbox[0], bbox[1] - bbox[0])[1] except ZeroDivisionError: new_x = x return new_x def apply_periodicity(self, coord): new_coord = coord.copy() if self.x_periodicity: i = 0 new_coord[i] = self.periodicity(new_coord[i], self.bbox[i, :]) if self.y_periodicity: i = 1 new_coord[i] = self.periodicity(new_coord[i], self.bbox[i, :]) # if self.x_periodicity: #TODO I think this does not work when bbox is not ordered (bbox[i, 0] is not < bbox[i, 1]) # i = 0 # # x in interval: # if self.bbox[i, 0] <= new_coord[i] <= self.bbox[i, 1]: # pass # # lower than min bbox # elif new_coord[i] < self.bbox[i, 0]: # temp = divmod(new_coord[i], self.bbox[i, 0])[1] # if np.isnan(temp): # pass # else: # new_coord[i] = temp # # greater than max bbox # elif new_coord[i] > self.bbox[i, 1]: # temp = divmod(new_coord[i], self.bbox[i, 1])[1] # if np.isnan(temp): # pass # else: # new_coord[i] = temp # if self.y_periodicity: # i = 1 # # y in interval: # if self.bbox[i, 0] <= new_coord[i] <= self.bbox[i, 1]: # pass # # lower than min bbox # elif new_coord[i] < self.bbox[i, 0]: # try: # temp = divmod(new_coord[i], self.bbox[i, 0])[1] # except ZeroDivisionError: # temp = new_coord[i] # if np.isnan(temp): # pass # else: # new_coord[i] = temp # if new_coord[i] < 0.0: # new_coord[i] = self.bbox[i, 1] + new_coord[i] # # greater than max bbox # elif new_coord[i] > self.bbox[i, 1]: # temp = divmod(new_coord[i], self.bbox[i, 1])[1] # if np.isnan(temp): # pass # else: # new_coord[i] = temp # if new_coord[i] < 0.0: # new_coord[i] = self.bbox[i, 1] + new_coord[i] return new_coord @staticmethod def linear_coeff(t_vec, t): # this is 0 when t == t_vec[0] # 1 when t == t_vec[1] return (t - t_vec[0])/(t_vec[1] - t_vec[0])
[docs] def read_grid(self, i_grid, i_cache=0): """ This function returns an interpolator list of size 3 made of `scipy.interpolate.RegularGridInterpolator` objects. """ velocities = ['ux', 'uy', 'uz'] interpolator = list() for i_dim in range(3): file_name = self.grid_data['grid'][i_grid][velocities[i_dim]]['file'] if i_cache == 0: if not self.settings['store_field']: # load file, but dont copy it self.vel_holder0[i_dim] = np.memmap(self.route + '/' + file_name, # dtype='float64', dtype=self.grid_data['grid'][i_grid][velocities[i_dim]]['Precision'], shape=(self.grid_data['dimensions'][2], self.grid_data['dimensions'][1], self.grid_data['dimensions'][0]), order='F') else: # load and store file self.vel_holder0[i_dim] = (np.fromfile(open(self.route + '/' + file_name, 'rb'), dtype=self.grid_data['grid'][i_grid][velocities[i_dim]]['Precision']).\ reshape((self.grid_data['dimensions'][2], self.grid_data['dimensions'][1], self.grid_data['dimensions'][0]), order='F')) interpolator.append(self.create_interpolator(self.vel_holder0[i_dim], self.grid_data['initial_x_grid'], self.grid_data['initial_y_grid'], self.grid_data['initial_z_grid'], i_dim=i_dim)) elif i_cache == 1: if not self.settings['store_field']: # load file, but dont copy it self.vel_holder1[i_dim] = np.memmap(self.route + '/' + file_name, # dtype='float64', dtype=self.grid_data['grid'][i_grid][velocities[i_dim]]['Precision'], shape=(self.grid_data['dimensions'][2], self.grid_data['dimensions'][1], self.grid_data['dimensions'][0]), order='F') else: # load and store file self.vel_holder1[i_dim] = (np.fromfile(open(self.route + '/' + file_name, 'rb'), dtype=self.grid_data['grid'][i_grid][velocities[i_dim]]['Precision']).\ reshape((self.grid_data['dimensions'][2], self.grid_data['dimensions'][1], self.grid_data['dimensions'][0]), order='F')) interpolator.append(self.create_interpolator(self.vel_holder1[i_dim], self.grid_data['initial_x_grid'], self.grid_data['initial_y_grid'], self.grid_data['initial_z_grid'], i_dim=i_dim)) else: raise Error('i_cache has to be 0 or 1') return interpolator
@staticmethod def g_2_gstar(coord_g): return np.array([coord_g[0], coord_g[2], -coord_g[1]]) @staticmethod def gstar_2_g(coord_star): return np.array([coord_star[0], -coord_star[2], coord_star[1]])