Source code for sharpy.generators.turbvelocityfieldbts

import numpy as np
import scipy.interpolate as interpolate

import sharpy.utils.generator_interface as generator_interface
import sharpy.utils.settings as settings
import sharpy.utils.cout_utils as cout


def interp_rectgrid_vectorfield(points, grid, vector_field, out_value, regularGrid=False, num_cores=1):
    # check: https://en.wikipedia.org/wiki/Trilinear_interpolation
    npoints = points.shape[0]
    output = np.zeros((npoints, 3))
    if regularGrid:
        length = np.zeros((3))
        npoints_grid = np.zeros((3), dtype=int)
        delta = np.zeros((3))
        for idim in range(3):
            length[idim] = grid[idim][-1] - grid[idim][0]
            npoints_grid[idim] = len(grid[idim])
            delta[idim] = length[idim]/(npoints_grid[idim] - 1)

    for ipoint in range(npoints):
        # Check if the point is outside the box
        isout = False
        for idim in range(3):
            if (points[ipoint, idim] > grid[idim][-1]) or (points[ipoint, idim] < grid[idim][0]):
                isout = True
                output[ipoint, :] = out_value
                break

        # If the point is in the grid
        if not isout:
            # Compute the position
            igrid = np.zeros((3,), dtype=int)
            if regularGrid:
                for idim in range(3):
                    igrid[idim] = int(np.ceil((points[ipoint, idim] - grid[idim][0])/delta[idim]))
            else:
                for idim in range(3):
                    while points[ipoint, idim] >= grid[idim][igrid[idim]]:
                        igrid[idim] += 1

            xvec = np.array([grid[0][igrid[0] - 1],
                             grid[0][igrid[0]    ]])
            xvec = np.concatenate((xvec, xvec, xvec, xvec))
            yvec = np.array([grid[1][igrid[1] - 1],
                             grid[1][igrid[1] - 1],
                             grid[1][igrid[1]    ],
                             grid[1][igrid[1]    ]])
            yvec = np.concatenate((yvec, yvec))
            zvec = np.ones((8))
            zvec[0:4] *= grid[2][igrid[2] - 1]
            zvec[4:8] *= grid[2][igrid[2]    ]

            A = np.zeros((8,8))
            A[:, 0] = np.ones((8,))
            A[:, 1] = xvec
            A[:, 2] = yvec
            A[:, 3] = zvec
            A[:, 4] = xvec*yvec
            A[:, 5] = xvec*zvec
            A[:, 6] = yvec*zvec
            A[:, 7] = xvec*yvec*zvec

            Ainv = np.linalg.inv(A)
            x = points[ipoint, 0]
            y = points[ipoint, 1]
            z = points[ipoint, 2]
            for idim in range(3):
                b = np.array([vector_field[idim, igrid[0] - 1, igrid[1] - 1, igrid[2] - 1],
                              vector_field[idim, igrid[0]    , igrid[1] - 1, igrid[2] - 1],
                              vector_field[idim, igrid[0] - 1, igrid[1]    , igrid[2] - 1],
                              vector_field[idim, igrid[0]    , igrid[1]    , igrid[2] - 1],
                              vector_field[idim, igrid[0] - 1, igrid[1] - 1, igrid[2]    ],
                              vector_field[idim, igrid[0]    , igrid[1] - 1, igrid[2]    ],
                              vector_field[idim, igrid[0] - 1, igrid[1]    , igrid[2]    ],
                              vector_field[idim, igrid[0]    , igrid[1]    , igrid[2]    ]
                             ])
                f = np.dot(Ainv, b)
                output[ipoint, idim] = f[0] + f[1]*x + f[2]*y + f[3]*z + f[4]*x*y + f[5]*x*z + f[6]*y*z + f[7]*x*y*z

    return output


[docs]@generator_interface.generator class TurbVelocityFieldBts(generator_interface.BaseGenerator): r""" Turbulent Velocity Field Generator from TurbSim bts files ``TurbVelocitityFieldBts`` is a class inherited from ``BaseGenerator`` The ``TurbVelocitityFieldBts`` class generates a velocity field based on the input from a bts file generated by TurbSim. https://nwtc.nrel.gov/TurbSim To call this generator, the ``generator_id = TurbVelocityField`` shall be used. This is parsed as the value for the ``velocity_field_generator`` key in the desired aerodynamic solver's settings. """ generator_id = 'TurbVelocityFieldBts' generator_classification = 'TurbVelocityFieldBts' settings_types = dict() settings_default = dict() settings_description = dict() settings_types['print_info'] = 'bool' settings_default['print_info'] = True settings_description['print_info'] = 'Output solver-specific information in runtime' settings_types['turbulent_field'] = 'str' settings_default['turbulent_field'] = None settings_description['turbulent_field'] = 'BTS file path of the velocity file' settings_types['new_orientation'] = 'str' settings_default['new_orientation'] = 'xyz' settings_description['new_orientation'] = 'New orientation of the axes' settings_types['u_fed'] = 'list(float)' settings_default['u_fed'] = np.zeros((3,)) settings_description['u_fed'] = 'Velocity at which the turbulence field is fed into the solid' settings_types['u_out'] = 'list(float)' settings_default['u_out'] = np.zeros((3,)) settings_description['u_out'] = 'Velocity to set for points outside the interpolating box' settings_types['case_with_tower'] = 'bool' settings_default['case_with_tower'] = False settings_description['case_with_tower'] = 'Does the SHARPy case will include the tower in the simulation?' settings_types['interpolate_wake'] = 'bool' settings_default['interpolate_wake'] = True settings_description['interpolate_wake'] = 'If False, u_out will be assigned to all the points in the wake' settings_types['num_cores'] = 'int' settings_default['num_cores'] = 1 settings_description['num_cores'] = 'Number of cores to be used in parallel computation' settings_types['extra_offset'] = 'float' settings_default['extra_offset'] = 0. settings_description['extra_offset'] = 'Distance [m] to displace the turbulence box' settings_types['use_3_4_interpolation'] = 'bool' settings_default['use_3_4_interpolation'] = False settings_description['use_3_4_interpolation'] = 'Use the farfield velocity at 3/4 chord for all the points along the chord' setting_table = settings.SettingsTable() __doc__ += setting_table.generate(settings_types, settings_default, settings_description) def __init__(self): self.interpolator = [] self.bbox = None self.x_grid = None self.y_grid = None self.z_grid = None self.vel = None self.dist_to_recirculate = None self.gird_size_vec = None self.grid_size_ufed_dir = None def initialise(self, in_dict): self.in_dict = in_dict settings.to_custom_types(self.in_dict, self.settings_types, self.settings_default, no_ctype=True) self.settings = self.in_dict self.x_grid, self.y_grid, self.z_grid, self.vel = self.read_turbsim_bts(self.settings['turbulent_field'], self.settings['case_with_tower']) if not self.settings['new_orientation'] == 'xyz': # self.settings['new_orientation'] = 'zyx' self.x_grid, self.y_grid, self.z_grid, self.vel = self.change_orientation(self.x_grid, self.y_grid, self.z_grid, self.vel, self.settings['new_orientation']) self.bbox = self.get_field_bbox(self.x_grid, self.y_grid, self.z_grid) if self.settings['print_info']: cout.cout_wrap('The domain bbox is:', 1) cout.cout_wrap(' x = [' + str(self.bbox[0, 0]) + ', ' + str(self.bbox[0, 1]) + ']', 1) cout.cout_wrap(' y = [' + str(self.bbox[1, 0]) + ', ' + str(self.bbox[1, 1]) + ']', 1) cout.cout_wrap(' z = [' + str(self.bbox[2, 0]) + ', ' + str(self.bbox[2, 1]) + ']', 1) self.dist_to_recirculate = 0. self.grid_size_vec = np.array([np.max(self.x_grid) - np.min(self.x_grid), np.max(self.y_grid) - np.min(self.y_grid), np.max(self.z_grid) - np.min(self.z_grid)]) self.grid_size_ufed_dir = np.dot(self.grid_size_vec, self.settings['u_fed']/np.linalg.norm(self.settings['u_fed'])) # self.init_interpolator(x_grid, y_grid, z_grid, vel) def init_interpolator(self, x_grid, y_grid, z_grid, vel): pass # for ivel in range(3): # self.interpolator.append(interpolate.RegularGridInterpolator((x_grid, y_grid, z_grid), # vel[ivel,:,:,:], # bounds_error=False, # fill_value=self.settings['u_out'][ivel])) def generate(self, params, uext): zeta = params['zeta'] for_pos = params['for_pos'] t = params['t'] try: is_wake = params['is_wake'] except KeyError: is_wake = False if is_wake and not self.settings['interpolate_wake']: # The generator has received a wake and it will not be interpolated for isurf in range(len(uext)): _, n_m, n_n = uext[isurf].shape for i_m in range(n_m): for i_n in range(n_n): uext[isurf][:, i_m, i_n] = self.settings['u_out'] else: offset_mod = np.linalg.norm(self.settings['u_fed'])*t + self.settings['extra_offset'] while ((offset_mod - self.dist_to_recirculate) > self.grid_size_ufed_dir): cout.cout_wrap("Recirculate inflow", 2) self.dist_to_recirculate += self.grid_size_ufed_dir # Through "offstet" zeta can be modified to simulate the turbulence being fed to the solid # Usual method for wind turbines offset = (-1.*offset_mod + self.dist_to_recirculate)*self.settings['u_fed']/np.linalg.norm(self.settings['u_fed']) if ((not is_wake) and (self.settings['use_3_4_interpolation'])): nsurf = len(zeta) zeta_3_4_chord = [None]*nsurf uext_3_4_chord = [None]*nsurf for isurf in range(nsurf): N = zeta[isurf].shape[2] zeta_3_4_chord[isurf] = np.zeros((3, 1, N)) uext_3_4_chord[isurf] = np.zeros((3, 1, N)) # Compute the 3/4 chord position for i_n in range(N): zeta_3_4_chord[isurf][:, 0, i_n] = (zeta[isurf][:, 0, i_n] + 3.*zeta[isurf][:, -1, i_n])/4. # Interpolate at the 3/4 chord point self.interpolate_zeta(zeta_3_4_chord, for_pos, uext_3_4_chord, offset = offset) # Assign the values to all chord points for isurf in range(nsurf): _, M, N = zeta[isurf].shape for i_n in range(N): for i_m in range(M): uext[isurf][:, i_m, i_n] = uext_3_4_chord[isurf][:, 0, i_n] else: self.interpolate_zeta(zeta, for_pos, uext, offset = offset) def interpolate_zeta(self, zeta, for_pos, u_ext, interpolator=None, offset=np.zeros((3))): # if interpolator is None: # interpolator = self.interpolator for isurf in range(len(zeta)): _, n_m, n_n = zeta[isurf].shape # Reorder the coordinates points_list = np.zeros((n_m*n_n, 3)) ipoint = 0 for i_m in range(n_m): for i_n in range(n_n): points_list[ipoint, :] = zeta[isurf][:, i_m, i_n] + for_pos[0:3] + offset ipoint += 1 # Interpolate list_uext = interp_rectgrid_vectorfield(points_list, (self.x_grid, self.y_grid, self.z_grid), self.vel, self.settings['u_out'], regularGrid=True, num_cores=self.settings['num_cores']) # Reorder the values ipoint = 0 for i_m in range(n_m): for i_n in range(n_n): u_ext[isurf][:, i_m, i_n] = list_uext[ipoint] ipoint += 1 @staticmethod def read_turbsim_bts(fname, case_with_tower=False): # This post may be useful to understand the function: # https://wind.nrel.gov/forum/wind/viewtopic.php?t=1384 dtype = np.dtype([ ("id", np.int16), ("nz", np.int32), ("ny", np.int32), ("tower_points", np.int32), ("ntime_steps", np.int32), ("dz", np.float32), ("dy", np.float32), ("dt", np.float32), ("u_mean", np.float32), ("HubHt", np.float32), ("Zbottom", np.float32), ("u_slope_scaling", np.float32), ("u_offset_scaling", np.float32), ("v_slope_scaling", np.float32), ("v_offset_scaling", np.float32), ("w_slope_scaling", np.float32), ("w_offset_scaling", np.float32), ("n_char_description", np.int32), ("description", np.dtype((bytes, 73))), # ("data", np.dtype((bytes, 12865632))) ("data", np.dtype((bytes, 2))) ]) fileContent = np.fromfile(fname, dtype=dtype) n_char_description = fileContent[0][17] nbytes_data = 2*3*fileContent[0][4]*fileContent[0][1]*fileContent[0][2] dtype = np.dtype([ ("id", np.int16), ("nz", np.int32), ("ny", np.int32), ("tower_points", np.int32), ("ntime_steps", np.int32), ("dz", np.float32), ("dy", np.float32), ("dt", np.float32), ("u_mean", np.float32), ("HubHt", np.float32), ("Zbottom", np.float32), ("u_slope_scaling", np.float32), ("u_offset_scaling", np.float32), ("v_slope_scaling", np.float32), ("v_offset_scaling", np.float32), ("w_slope_scaling", np.float32), ("w_offset_scaling", np.float32), ("n_char_description", np.int32), ("description", np.dtype((bytes, n_char_description))), ("data", np.dtype((bytes, nbytes_data))) ]) fileContent = np.fromfile(fname, dtype=dtype) dictionary = {} for i in range(len(fileContent.dtype.names)): dictionary[fileContent.dtype.names[i]] = fileContent[0][i] scaling = np.array([dictionary['u_slope_scaling'], dictionary['v_slope_scaling'], dictionary['w_slope_scaling']]) offset = np.array([dictionary['u_offset_scaling'], dictionary['v_offset_scaling'], dictionary['w_offset_scaling']]) # Checks # print("Case description: ", dictionary['description']) if dictionary['description'][-1] == ".": cout.cout_wrap(("WARNING: I think there is something wrong with the case description. The length is not %d characters" % n_char_description), 3) # print("Input", dictionary['n_char_description'], "as the number of characters of the case description") # vel_aux = np.fromstring(dictionary['data'], dtype='>i2') vel_aux = np.fromstring(dictionary['data'], dtype=np.int16) vel = np.zeros((3,dictionary['ntime_steps'],dictionary['ny'],dictionary['nz'])) counter = -1 for ix in range(dictionary['ntime_steps']): for iz in range(dictionary['nz']): for iy in range(dictionary['ny']): for ivel in range(3): counter += 1 vel[ivel,-ix,iy,iz] = (vel_aux[counter] - offset[ivel])/scaling[ivel] # Generate the grid height = dictionary['dz']*(dictionary['nz'] - 1) width = dictionary['dy']*(dictionary['ny'] - 1) x_grid = np.linspace(-dictionary['ntime_steps'] + 1, 0, dictionary['ntime_steps'])*dictionary['dt']*dictionary['u_mean'] y_grid = np.linspace(-width/2, width/2, dictionary['ny']) if case_with_tower: z_grid = np.linspace(dictionary['Zbottom'], dictionary['Zbottom'] + height, dictionary['nz']) else: z_grid = np.linspace(-height/2, height/2, dictionary['nz']) return x_grid, y_grid, z_grid, vel @staticmethod def change_orientation(old_xgrid, old_ygrid, old_zgrid, old_vel, new_orientation_input): old_grid = [] old_grid.append(old_xgrid.copy()) old_grid.append(old_ygrid.copy()) old_grid.append(old_zgrid.copy()) new_orientation = ("%s." % new_orientation_input)[:-1] # Generate information for new_orientation if not old_vel.shape[0] == 3: print("ERROR: velocity must have three dimension") if (not (len(old_vel[0,:,0,0]) == len(old_xgrid))) or (not (len(old_vel[0,0,:,0]) == len(old_ygrid))) or (not (len(old_vel[0,0,0,:]) == len(old_zgrid))): print("ERROR: dimensions mismatch") return old_dim = np.array([len(old_xgrid),len(old_ygrid),len(old_zgrid)]) position_in_old = np.zeros((3), dtype=int) sign = np.array([1,1,1], dtype=int) for ivel in range(3): if new_orientation[0] == "-": sign[ivel] = -1 new_orientation = new_orientation[1:] if new_orientation[0] == "x": position_in_old[ivel] = 0 elif new_orientation[0] == "y": position_in_old[ivel] = 1 elif new_orientation[0] == "z": position_in_old[ivel] = 2 new_orientation = new_orientation[1:] # print("position_in_old: ", position_in_old) # Check the new orientation system new_ux = np.zeros((3), dtype=int) new_uy = np.zeros((3), dtype=int) new_uz = np.zeros((3), dtype=int) new_ux[position_in_old[0]] = sign[0] new_uy[position_in_old[1]] = sign[1] new_uz[position_in_old[2]] = sign[2] aux_ux = np.cross(new_uy, new_uz) aux_uy = np.cross(new_uz, new_ux) aux_uz = np.cross(new_ux, new_uy) if (not (np.abs(aux_ux - new_ux) < 1e-9).all()) or (not (np.abs(aux_uy - new_uy) < 1e-9).all()) or (not (np.abs(aux_uz - new_uz) < 1e-9).all()): print("ERROR: The new coordinate system is not coherent") print("ux error: ", aux_ux - new_ux) print("uy error: ", aux_uy - new_uy) print("uz error: ", aux_uz - new_uz) # Output variables new_grid = [None]*3 new_dim = old_dim[position_in_old] for ivel in range(3): new_grid[ivel] = old_grid[position_in_old[ivel]]*sign[ivel] if sign[ivel] == -1: new_grid[ivel] = new_grid[ivel][::-1] new_vel = np.zeros((3,new_dim[0],new_dim[1],new_dim[2])) # print("old_dim:", old_dim) # print("new_dim:", new_dim) # print("old_vel.shape:", old_vel.shape) # print("new_vel.shape:", new_vel.shape) # These loops will index variables associated with the new grid for ix in range(new_dim[0]): for iy in range(new_dim[1]): for iz in range(new_dim[2]): new_i = np.array([ix, iy, iz]) # old_i = np.array([new_i[position_in_old[0]]*sign[0], new_i[position_in_old[1]]*sign[1], new_i[position_in_old[2]]*sign[2]]) old_i = np.array([new_i[position_in_old[0]], new_i[position_in_old[1]], new_i[position_in_old[2]]]) for icoord in range(3): if sign[icoord] == -1: old_i[icoord] = -1*old_i[icoord] - 1 for ivel in range(3): # print("moving: ", position_in_old[ivel], ix, iy, iz, "to: ", ivel,new_i[0],new_i[1],new_i[2]) new_vel[ivel,new_i[0],new_i[1],new_i[2]] = old_vel[position_in_old[ivel], old_i[0], old_i[1], old_i[2]]*sign[ivel] return new_grid[0], new_grid[1], new_grid[2], new_vel def get_field_bbox(self, x_grid, y_grid, z_grid): bbox = np.zeros((3, 2)) bbox[0, :] = [np.min(x_grid), np.max(x_grid)] bbox[1, :] = [np.min(y_grid), np.max(y_grid)] bbox[2, :] = [np.min(z_grid), np.max(z_grid)] return bbox