yaq-python/daemons


Python Implementation of yaq Daemons

Installation

Writing a daemon

Loading and saving state

Logging

The core library provides a wrapper for the standard library logging library. The wrapper ensures consistent log formatting, and introduces additional logging levels (based on sd-daemon(3):

To use the logging system in your daemon, add the following code:


from yaqd_core import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)

Notes on async/await and yaq daemons

The _busy property

The update_state async function

Using yaqd-cookiecutter-python

yaqd-core-python base classes

There are four base classes provided by the core library:

Implementing is-daemon

Everything in the is-daemon trait is implemented by the Base daemon class. Simply subclassing gives all methods and configuration parsing. All you need to do is add the functions/implement the traits unique to your daemon.

Implementing has-position

The has-position trait defines the Hardware class. Implementing this trait usually involves subclassing and writing two functions:


from yaqd_core import Hardware

class ExampleHasPosition(Hardware):
    _kind = "example-has-position"

    def __init__(self, name, config, config_filepath):
        super().__init__(name, config, config_filepath)
	self._units = "mm"

    def _set_position(self, position):
        # The super class handles exposing set_position externally,
	# as well as setting `_busy`, and keeping track of the destination
	...
	# Actually communicate with your device here
	self.device.set_position(position)

    async def update_state(self):
        # See above for more context on the asynchronous nature of this function
	# For a `Hardware`, the important things to update include the position
	# and the busy state.
	# Each device will have a unique varient of this method, so a simple
	# example of a device that exposes these in single python calls is shown.
        while True:
	    self._position = self.device.get_position()
	    self._busy = not self.device.is_ready()
	    if self._busy:
	        await asyncio.sleep(0.01)
	    else:
	        await self._busy_sig.wait()

Note that _set_position does not_ wait for the position to be attained. The _units attribute can be defined in a number of ways, ranging from only allowing one value, to being user configurable, to being read from the device itself. All other parts of this trait are handled by the Hardware class

Implementing has-limits

The has-limits trait defines the ContinuousHardware class. The class implements all that is needed to be compliant with the trait, and introduces an attribute hw_limits. hw_limits is a 2-tuple which can represent programatically defined limits (e.g. firmware limits from the device itself). Handling of taking the intersection with user defined configuration limits is handled by the ContinuousHardware class. While implementing has-position as above is required as well, here is an example demonstrating new behavior:


from yaqd_core import ContinuousHardware

class ExampleHasLimits(ContinuousHardware):
    _kind = "example-has-limits"

    def __init__(self, name, config, config_filepath):
        super().__init__(name, config, config_filepath)
	self.hw_limits = (0., 50.)

Implementing is-homeable

Homeable hardware have a procedure which resets to a known position. Homed devices are then returned to their destination. This trait is required to be applied to a Hardware (or a subclass like ContinuousHardware) daemon, and introduces only one additional method:


from yaqd_core import Hardware

class ExampleHomeable(Hardware):
    _kind = "example_homeable"
    traits = ["is-homeable"]

    def home(self):
        # Since homing is typically a long process, start a new asynchronous task
	# This may not be necessary, depending on how your device behaves,
	# but remember that home is defined as returning to the current destination
	# This method should return quickly, not wait for the homing to complete.
        loop = asyncio.get_event_loop()
        loop.create_task(self._home())

    async def _home(self):
        self._busy = True
        # Initiate the home
	...
	await self._not_busy_sig.wait()
	self.set_position(self._destination)

Implementing has-turret

Implementing has-turret involves writing two short methods, and storing one state variable:


from yaqd_core import Base

class ExampleTurret(Base): # Often daemons implementing has-turret will be some kind of Hardware, but it is not required
    _kind = "example-turret"
    traits = ["has-turret"]

    def get_state(self):
        state = super().get_state()
        state["turret"] = self._turret
        return state

    def _load_state(self, state):
        super()._load_state(state)
        self._turret = state.get("turret", 0)

    def set_turret(self, index):
        self._busy = True
	# Perform the actual setting of the turret for your device

    def get_turret(self):
        return self._turret

Implementing is-sensor

The is-sensor trait defines the Sensor class. A lot of the machinery for making sensors work, including handling of looping and exposed methods, is implemented as part of the Sensor class. That said, each sensor will have it's own configuration and you necessarily have to implement the function to actually perform a measurement.

As the implementor, you are responsible for filling out three attributes: channel_names, channel_units, and channel_shapes. channel_names is a simple list of strings with names of each recorded value. channel_units is a dictionary mapping the names to strings representing the units. channel_shapes may be omitted if all channels are scalar values, otherwise it is a dictionary mapping names to tuples of integers representing the shapes.

A typical is-sensor daemon will look something like:


from yaqd_core import Sensor

class ExampleSensor(Sensor):
    _kind = "example-sensor"

    def __init__(self, name, config, config_filepath):
        super().__init__(name, config, config_filepath)
	self.channel_names = ["channel0", "channel1"]
	self.channel_units = {"channel0": "V", "channel1": "A"}
	# If shaped, you would also include self.channel_shapes

    async def _measure(self):
        # Do whatever needs to be done to fill a dictionary mapping names to values
	# (or arrays for shaped data)
	return {"channel0": 1.234, "channel1": 3.14}

Implementing uses-serial

The uses-serial trait is not typically going to be a terminal trait. More specific configuration will be provided by traits which depend on it. These must also implement the direct_serial_write method. See below for more in depth implementation for the cases of uart and i2c.

Implementing uses-uart

UART is the serial communication scheme used for RS-232 and similar protocols. UART serial communication is characterized by a baudrate. In python, the standard way of communicating with UART devices is the pyserial library. We include in the yaqd-core-python implementation a subclass of the pyserial implementation that has some asynchronous functions avialable.

A typical uses-uart daemon will look something like:


__all__ = ["ExampleUsesUart"]

import asyncio

from yaqd_core import Base, aserial

class ExampleUsesUart(Base):
    _kind = "example-uses-uart"
    traits = ["uses-uart", "uses-serial"]
    defaults = {"baud_rate": 9600}  # Check your device for appropriate default

    def __init__(self, name, config, config_filepath):
        super().__init__(name, config, config_filepath)
	self._serial_port = aserial.ASerial(config["serial_port"], config["baud_rate"])
	...
        # perfom other setup, possibly including reads and writes

    def close(self):
        self._serial_port.close()

    def direct_serial_write(self, message):
        self._busy = True
        self._serial_port.write(message.encode())

    async def update_state(self):
        while True:
	    self._serial_port.write(b"get_status")
	    line = await self._serial_port.areadline()
	    self._busy = line != b"ready"
	    if self._busy:
	        await asyncio.sleep(0.1)
	    else:
	        await self._busy_sig.wait()

Implementing uses-i2c

There are many libraries in python that can manage low level i2c communication. We have typically used smbus.

A typical uses-i2c daemon will look something like:


__all__ = ["ExampleUsesI2c"]

import asyncio

from yaqd_core import Base

class ExampleUsesI2c(Base):
    _kind = "example-uses-i2c"
    traits = ["uses-i2c", "uses-serial"]
    defaults = {"i2c_addr": 0x60}  # Check your device for appropriate default

    def __init__(self, name, config, config_filepath):
        super().__init__(name, config, config_filepath)
	self.address = config["address"]
	self.bus = smbus.SMBus(1)
        ...
        # perfom other setup, possibly including reads and writes

    def direct_serial_write(self, message):
        self._busy = True
	for byte in bytes(message, encoding="utf-8"):
	    self.bus.write_byte(self.address, byte)

    async def update_state(self):
        while True:
            self.bus.write_byte(self.address, 0x12)
            data = self.bus.read_i2c_block_data(self.address, 0x00, 3)
            self._busy = data == 0x01
            if self._busy:
                await asyncio.sleep(0.1)
            else:
                await self._busy_sig.wait()