import hashlib
from pydash import py_ as py__
import nadamq as nq
import nadamq.NadaMq
import numpy as np
import pydash as py_
from .dir_mixin import DirMixIn
from .address_of import get_attributes
from .member_header import get_functions
_fp = py__()
[docs]def get_np_dtype(type_name, default=False):
for type_i in (type_name, type_name[:-2]):
try:
return np.dtype(type_i)
except TypeError:
pass
else:
if default == False:
raise TypeError('Type not understood: {}'.format(type_name))
else:
return default
operation_code = lambda v: np.fromstring(hashlib.sha256(v).digest(),
dtype='uint8').view('uint16')[0]
[docs]def get_namespace_path(namespace_str):
parts_i = filter(None, namespace_str.split('::'))
return ('namespaces.' + '.namespaces.'.join(parts_i)
if parts_i else '')
[docs]class Context(object):
def __init__(self, cpp_ast_json, namespace=''):
self.cpp_ast_json = cpp_ast_json
self.namespace_str = namespace
if namespace:
self.namespace = py_.get(cpp_ast_json,
get_namespace_path(namespace))
else:
self.namespace = self.cpp_ast_json
self._attributes = get_attributes(self.namespace['members'])
self._functions = get_functions(self.namespace['members'])
[docs]class RemoteContext(Context, DirMixIn):
'''
This class provides access to public variables and fields within a remote
context (i.e., namespace).
Variables and fields are accessible as Python attributes, allowing, for
example, tab completion in IPython.
Parameters
----------
stream : serial.Serial
A serial connection to the remote device.
cpp_ast_json : dict
A JSON-serializable C++ abstract syntax tree, as parsed by
`clang_helpers.clang_ast.parse_cpp_ast(..., format='json')`.
namespace : str, optional
A namespace specifier (e.g., ``"foo::bar"``) indicating the namespace
to expose.
A value of ``""`` corresponds to the top-level namespace.
Attributes
----------
.<remote variable/field>
An attribute corresponding to each public variable or field within the
remote context :attr:`namespace`.
'''
def __init__(self, stream, cpp_ast_json, namespace=''):
self.stream = stream
super(RemoteContext, self).__init__(cpp_ast_json, namespace=namespace)
self._addresses = dict([(k, self._address_of(str(k)))
for k in sorted(self._attributes.keys())])
def __dir__(self):
'''
Add remote attribute keys to :func:`dir` result.
Allows, for example, tab completion for remote attributes in IPython.
'''
return super(RemoteContext, self).__dir__() + self._attributes.keys()
def __getattr__(self, attr):
'''
If :data:`attr` matches the name of a variable or field in the remote
context, return the corresponding value.
Returns
-------
type of attr
Value of specified attribute in remote context.
If type is not supported (i.e., not a plain old data type), return
``None``.
See also
--------
:meth:`_read_attribute`
'''
if attr in self._attributes:
return self._read_attribute(attr, None)
else:
raise AttributeError
def __setattr__(self, attr, value):
'''
If :data:`attr` matches the name of a variable or field in the remote
context, set the corresponding value.
Parameters
----------
attr : str
Name of attribute in remote context.
value : type of attr
Value to set for specified attribute in remote context.
Raises
------
TypeError
If attribute type is not supported (i.e., not a plain old data
type).
See also
--------
:meth:`__getattr__`, :meth:`_read_attribute`
'''
if hasattr(self, '_attributes') and attr in self._attributes:
self._write_attribute(attr, value)
else:
super(RemoteContext, self).__setattr__(attr, value)
[docs] def _address_of(self, label):
'''
Parameters
----------
label : str
Name/label of variable or field in remote context.
Returns
-------
int
Address in memory of specified variable or field in remote context.
'''
op_code = operation_code('address_of')
rec = np.rec.array([op_code, label], dtype=[('op_code', 'uint16'),
('address', 'S{}'
.format(len(label)))])
packet = nq.NadaMq.cPacket(data=rec.tobytes(),
type_=nq.NadaMq.PACKET_TYPES.DATA)
self.stream.write(packet.tostring())
while not self.stream.in_waiting:
pass
return np.fromstring(self.stream.read(self.stream.in_waiting),
dtype='uint32')[0]
[docs] def _mem_read(self, address, size):
'''
Parameters
----------
address : int
Memory address in remote context.
size : int
Number of bytes to read.
Returns
-------
np.array(dtype='uint8')
Array of data read from remote context.
See also
--------
:meth:`_read_attribute`
'''
op_code = operation_code('mem_read')
rec = np.rec.array([op_code, address, size],
dtype=[('op_code', 'uint16'), ('address', 'uint32'),
('size', 'uint16')])
packet = nq.NadaMq.cPacket(data=rec.tobytes(),
type_=nq.NadaMq.PACKET_TYPES.DATA)
self.stream.write(packet.tostring())
while not self.stream.in_waiting:
pass
return np.fromstring(self.stream.read(self.stream.in_waiting),
dtype='uint8')
[docs] def _mem_write(self, address, data):
'''
Write data to specified address in remote context.
Parameters
----------
address : int
Memory address in remote context.
data : numpy.array-like
Array or :module:`numpy` data type.
See also
--------
:meth:`_write_attribute`
'''
op_code = operation_code('mem_write')
bytes_ = data.tobytes()
rec = np.rec.array([op_code, address, len(bytes_), bytes_],
dtype=[('op_code', 'uint16'), ('address', 'uint32'),
('size', 'uint16'),
('bytes', 'S{}'.format(len(bytes_)))])
packet = nq.NadaMq.cPacket(data=rec.tobytes(),
type_=nq.NadaMq.PACKET_TYPES.DATA)
self.stream.write(packet.tostring())
[docs] def _read_attribute(self, attr, *args):
'''
Parameters
----------
attr : str
Name of attribute in remote context.
default : object, optional
Default return value.
Returns
-------
type of attr
Value of specified attribute in remote context.
If type is not supported (i.e., not a plain old data type),
:data:`default` is returned (if specified).
'''
has_default = True if args else False
address = self._addresses[attr]
try:
np_dtype = get_np_dtype(self._attributes[attr]['type'])
except TypeError:
if has_default:
return args[0]
raise
data = self._mem_read(address, np_dtype.itemsize)
return data.view(np_dtype)[0]
[docs] def _read_attributes(self):
'''
Returns
-------
dict
Value of each attribute in remote context.
For each attribute, if type is not supported (i.e., not a plain old
data type), value is set to ``None``.
See also
--------
:meth:`_write_attribute`
'''
return py_.map_values(self._attributes, lambda v, k:
self._read_attribute(k, None))
[docs] def _write_attribute(self, attr, value):
'''
Parameters
----------
attr : str
Name of attribute in remote context.
value : type of attr
Value to set for specified attribute in remote context.
Raises
------
TypeError
If attribute type is not supported (i.e., not a plain old data
type).
See also
--------
:meth:`_read_attribute`
'''
address = self._addresses[attr]
attr_node = self._attributes[attr]
if attr_node['const']:
location = attr_node['location']
raise AttributeError('Attribute "{}" is read-only (declared as '
'"const" at `{} (line: {}, col: {})`)'
.format(attr, location['file'],
location['start']['line'],
location['start']['column']))
np_dtype = get_np_dtype(attr_node['type'])
value = np_dtype.type(value)
self._mem_write(address, value)