import struct
import yaml
import logging
import numpy as np
logger = logging.getLogger(__name__)
class Variable:
num_vars = 0
def __init__(self, name, inout, **kwargs):
self.name = name # str: should be the same as in timestep info
self.xplane_name = kwargs.get('xplane_name', None) # equivalent xplane name
self.inout = inout # str: (in, out, inout)
self.index = kwargs.get('index', None) # if variable is a vector
position = kwargs.get('position', None) # int for node, (i_surf, m, n, idx) for panel
self.node = None
self.panel = None
self.cs_index = None
self.var_type = kwargs.get('var_type', None)
if self.var_type == 'node':
self.node = position
elif self.var_type == 'panel':
self.panel = position
elif self.var_type == 'control_surface':
self.cs_index = position
elif self.name == 'dt' or self.name == 'nt':
pass
else:
raise Exception('Unknown variable type')
self.dref_name = None
self.set_dref_name()
# update variable counter
self.variable_index = Variable.num_vars
Variable.num_vars += 1
self.value = None
logger.info('Loaded variable {}'.format(self.dref_name))
def get_variable_value(self, data, timestep_index=-1):
"""
Get the variables value at the selected timestep (last one by default)
Args:
data (sharpy.presharpy.PreSharpy): the standard SHARPy class
timestep_index (int (optional)): Integer representing the time step value. Defaults to ``-1`` i.e. the
last one available.
Returns:
float: value of the variable
"""
if self.node is not None:
# structural variables for now
variable = getattr(data.structure.timestep_info[timestep_index], self.name)
if len(variable.shape) == 2:
try:
value = variable[self.node, self.index]
except IndexError:
msg = 'Node {} and/or Index {} are out of index of variable {}, ' \
'which is of size ({})'.format(self.node, self.index, self.dref_name,
variable.shape)
logger.error(msg)
raise IndexError(msg)
elif len(variable.shape) == 3:
try:
ielem, inode_in_elem = data.structure.node_master_elem[self.node]
value = variable[ielem, inode_in_elem, self.index]
except IndexError:
msg = 'Node {} and/or Index {} are out of index of variable {}, ' \
'which is of size ({})'.format(self.node, self.index, self.dref_name,
variable.shape)
logger.error(msg)
raise IndexError(msg)
else:
msg = f'Variable {self.name} is neither a node variable nor an element variable. The ' \
f'variable {self.name} is stored as a {variable.shape} array.'
logger.error(msg)
raise IndexError(msg)
elif self.name == 'dt':
value = data.settings['DynamicCoupled']['dt']
elif self.name == 'nt':
value = len(data.structure.timestep_info[:timestep_index]) - 1 # (-1) needed since first time step is idx 0
elif self.panel is not None:
variable = getattr(data.aero.timestep_info[timestep_index], self.name)[self.panel[0]] # surface index
i_m = self.panel[1]
i_n = self.panel[2]
try:
i_idx = self.panel[3]
except IndexError:
value = variable[i_m, i_n]
else:
value = variable[i_m, i_n, i_idx]
elif self.cs_index is not None:
try:
value = data.aero.timestep_info[timestep_index].control_surface_deflection[self.cs_index]
except AttributeError:
logger.error('Model not equipped with dynamic control surfaces')
raise AttributeError
except IndexError:
logger.error('Requested index {} for control surface is out of range (size {})'.format(
self.cs_index, len(data.aero.timestep_info[timestep_index].control_surface_deflection)))
else:
raise NotImplementedError('Unable to get value for {} variable'.format(self.name))
self.value = value
logger.debug('Getting value {} for variable {}'.format(self.value, self.dref_name))
return value
def set_variable_value(self, value):
"""
Set the value of input variables
Args:
value: value of variable
"""
if self.inout == 'in' or self.inout == 'inout':
self.value = value
else:
logger.warning('Trying to set the value of {} which is only an output variable'.format(self.dref_name))
def set_in_timestep(self, data):
"""
Set the variable value in the time step
Args:
data (sharpy.presharpy.PreSharpy): Simulation data object
"""
if self.node is not None: # structural variable then
variable = getattr(data.structure.timestep_info[-1], self.name)
try:
variable[self.node, self.index] = self.value
except IndexError:
logger.warning('Unable to set node {}, index {} of variable {}'.format(
self.node, self.index, self.dref_name
))
else:
setattr(data.structure.timestep_info[-1], self.name, variable)
logger.debug('Updated timestep')
if self.cs_index is not None:
variable = getattr(data.aero.timestep_info[-1], self.name)
if len(variable) == 0:
variable = np.hstack((variable, np.array([self.value])))
else:
variable[self.cs_index] = self.value
setattr(data.aero.timestep_info[-1], self.name, variable)
logger.debug('Updated control surface deflection')
def set_dref_name(self):
divider = '_'
dref_name = self.name
if self.node is not None:
dref_name += divider +'node{}'.format(self.node)
elif self.panel is not None:
dref_name += divider + 'paneli{}m{}n{}'.format(*self.panel)
elif self.cs_index is not None:
dref_name += divider + 'idx{}'.format(self.cs_index)
elif self.name == 'dt' or self.name == 'nt':
pass
else:
raise Exception('Unknown variable')
if self.index is not None:
dref_name += divider + 'index{}'.format(self.index)
self.dref_name = dref_name
[docs]class SetOfVariables:
"""
Iterable class containing the input and output variables
Attributes:
variables (list(Variable)): List of :class:`Variable`
"""
def __init__(self):
self.variables = [] # list of Variables()
self.out_variables = [] # indices
self.in_variables = []
self._byte_ordering = '<'
self.file_name = None # for input variables
def set_byte_ordering(self, value):
self._byte_ordering = value
def load_variables_from_yaml(self, path_to_yaml):
with open(path_to_yaml, 'r') as yaml_file:
variables_in_yaml = yaml.load(yaml_file, Loader=yaml.Loader)
for var in variables_in_yaml:
new_var = Variable(**var)
self.variables.append(new_var)
if new_var.inout == 'out' or new_var.inout == 'inout':
self.out_variables.append(new_var.variable_index)
if new_var.inout == 'in' or new_var.inout == 'inout':
self.in_variables.append(new_var.variable_index)
logger.debug('Number of tracked variables {}'.format(Variable.num_vars))
def set_input_file(self, filename):
self.file_name = filename
with open(self.file_name, 'w') as f:
header = ''
for var_idx in self.in_variables:
header += '{},\t'.format(self.variables[var_idx].dref_name)
header += '\n'
f.write(header)
@property
def input_msg_len(self):
msg_len = 5 + 8 * len(self.in_variables) # 5 bytes header + 8 for each channel
return msg_len
def __iter__(self):
return VariableIterator(self)
def __getitem__(self, item):
return self.variables[item]
def __len__(self):
return len(self.variables)
[docs] def encode(self):
"""
Encode output variables in binary format with little-endian byte ordering.
The signal consists of a 5-byte header ``RREF0`` followed by 8 bytes per variable.
Of those 8 bytes allocated to each variable, the first 4 are the integer value of the variable index
and the last 4 are the single precision float value.
Returns:
bytes: Encoded message of length ``5 + num_var * 8``.
"""
msg = struct.pack('{}5s'.format(self._byte_ordering), b'RREF0')
for var_idx in self.out_variables:
variable = self.variables[var_idx]
logger.debug('Encoding variable {}'.format(variable.dref_name))
msg += struct.pack('{}if'.format(self._byte_ordering), variable.variable_index, variable.value)
return msg
[docs] def get_value(self, data, timestep_index=-1):
"""
Gets the value from the data structure for output variables
Args:
data (sharpy.presharpy.PreSharpy): the standard SHARPy class
timestep_index (int (optional)): Integer representing the time step value. Defaults to ``-1`` i.e. the
last one available.
"""
for out_idx in self.out_variables:
self.variables[out_idx].get_variable_value(data, timestep_index=timestep_index)
[docs] def set_value(self, values):
"""
Sets the values of the input variables.
Args:
values (list(tuple)): List of tuples containing the index and value of the respective input variables.
"""
for idx, value in values:
self.variables[idx].set_variable_value(value)
logger.info('Set the input variable {} to {:.4f}'.format(self.variables[idx].dref_name,
self.variables[idx].value))
# save to file:
self.save_to_file(values)
def update_timestep(self, data, values):
logger.debug('Update time step routine')
self.set_value(values)
for idx in self.in_variables:
self.variables[idx].set_in_timestep(data)
def save_to_file(self, input_variables):
if self.file_name is not None:
input_values = [value for idx, value in input_variables]
with open(self.file_name, 'a') as f:
out_msg = ''
for value in input_values:
out_msg += '{:10.6f},\t'.format(value)
out_msg += '\n'
f.write(out_msg)
class VariableIterator:
def __init__(self, set_of_variables):
self._set_variables = set_of_variables
self._index = 0
def __next__(self):
if self._index < len(self._set_variables):
res = self._set_variables(self._index)
self._index += 1
return res
raise StopIteration