Another example writing a new automation
Let's design a simple LED automation for Light/Dark cycles. The basic idea is that the LED automation "wakes up" (specifically: it runs its execute
method) every hour, and on specific hours, turns on or turns off the white light LEDs.
The Code
- Imports at the top of the file!
# -*- coding: utf-8 -*-
from __future__ import annotations
from pioreactor.automations.led.base import LEDAutomationJobContrib
from pioreactor.whoami import get_unit_name, get_assigned_experiment_name
from pioreactor.automations import events
- Define a new class as a subclass of
LEDAutomationJobContrib
. We useLEDAutomationJobContrib
since this is a 3rd party automation. We give the new class a descriptive name. Theautomation_name
attribute is necessary - and is often just the camel case version of the class name.
class LightDarkCycle(LEDAutomationJobContrib):
automation_name = "light_dark_cycle"
- Define our
published_settings
for this class. This is a dictionary ofLightDarkCycle
attributes that we can modify/inspect from MQTT (and hence from the UI, or from the leader Pioreactor). Not all attributes need to go in here - only the ones that users may want to modify mid-experiment.
Later in this post, we'll see what each of these attributes does.
published_settings = {
"duration": {"datatype": "float", "settable": False, "unit": "min"},
"light_intensity": {"datatype": "float", "settable": True, "unit": "%"},
"light_duration_hours": {"datatype": "float", "settable": True, "unit": "h"},
"dark_duration_hours": {"datatype": "float", "settable": True, "unit": "h"},
}
- Create the
__init__
, with the attributes we'll need.
def __init__(self, light_intensity: float, light_duration_hours: int, dark_duration_hours: int, **kwargs):
super().__init__(**kwargs)
self.hours_online = -1
self.light_active = False
self.channels = ["B", "C"]
self.set_light_intensity(light_intensity)
self.light_duration_hours = float(light_duration_hours)
self.dark_duration_hours = float(dark_duration_hours)
hours_online
will keep track of how many elapsed hours have gone by.light_active
keeps track of whether the LEDs are currently on or offchannels
refers to which LED channels on the Pioreactor HAT we are using.light_duration_hours
: the number of hours to keep the light on for, typically 16dark_duration_hours
: the number of hours to keep the light off for, typically 8light_intensity
: the level of intensity, as a percent, of the LEDs when turned on.
- We define the
execute
function, which is what runs everyduration
minutes. In the function, we incrementhours_online
by 1 (since it runs every 60 minutes), and ask a separate function,trigger_leds
, to handle turning on and off LEDs. Theexecute
can return anEvent
, which is useful for logging and testing purposes.
def execute(self):
self.hours_online += 1
event = self.trigger_leds(self.hours_online)
return event
The other class function, trigger_leds
, role is to:
- determine if we should turn on LEDs, turn off LEDs, or do nothing.
- If we are changing LEDs status (on to off, or off to on), perform that task.
- return an
Event
, with some description of what occurred (even if nothing changed).
To do 1., we think about the total hours passed modulo how long our cycle is. That is, if our cycle lasts 24 hours (which might be the result of choosing 16h light + 8h dark), then the hour 33 is really the same as hour 9, likewise the hour 123 is the same as hour 3: we take the hour modulo the duration.
We then ask, is this hour in the dark period, or the light period, of the cycle? We also ask if the lights_active
is on, or off, respectively? If so, we change the status of the LEDs. For example, if we should be in the dark period, but our LEDs are on, well, we turn them off, and return a ChangeLEDIntensity
event. The function set_led_intensity
is from the parent class, and is a helper function.
def trigger_leds(self, hours: int) -> events.Event:
cycle_duration: int = self.light_duration_hours + self.dark_duration_hours
if ((hours % cycle_duration) < self.light_duration_hours) and (not self.light_active):
self.light_active = True
for channel in self.channels:
self.set_led_intensity(channel, self.light_intensity)
return events.ChangedLedIntensity(f"{hours}h: turned on LEDs")
elif ((hours % cycle_duration) >= self.light_duration_hours) and (self.light_active):
self.light_active = False
for channel in self.channels:
self.set_led_intensity(channel, 0)
return events.ChangedLedIntensity(f"{hours}h: turned off LEDs")
else:
return events.NoEvent(f"{hours}h: no change")
We also need to define that set_light_intensity
function above. This function is automatically called whenever we change light_intensity
(see note below). We need additional logic to immediately change the light_intensity
when asked, otherwise, the LEDs wouldn't actually update until the next execute
is called, which could be an hour away.
def set_light_intensity(self, intensity) -> None:
self.light_intensity = float(intensity)
if self.light_active:
# update now!
for channel in self.channels:
self.set_led_intensity(channel, self.light_intensity)
else:
pass
Updates over MQTT either update the attribute directly, i.e. :self.<attr> = new_value
, or, if defined, the method set_<attr>(new_value)
is called instead. This is what is happening with set_light_intensity
. Using set_<attr>
methods is useful when you have additional logic that needs to be accomplished when an attribute changes.
That's the end of the class! In total, our file looks like:
# -*- coding: utf-8 -*-
from __future__ import annotations
from pioreactor.automations.led.base import LEDAutomationJobContrib
from pioreactor.automations import events
class LightDarkCycle(LEDAutomationJobContrib):
"""
Follows as h light / h dark cycle. Starts dark.
"""
automation_name = "light_dark_cycle"
published_settings = {
"duration": {"datatype": "float", "settable": False, "unit": "min"}, # doesn't make sense to change duration.
"light_intensity": {"datatype": "float", "settable": True, "unit": "%"},
"light_duration_hours": {"datatype": "int", "settable": True, "unit": "h"},
"dark_duration_hours": {"datatype": "int", "settable": True, "unit": "h"},
}
def __init__(self, light_intensity: float, light_duration_hours: int, dark_duration_hours: int, **kwargs):
super().__init__(**kwargs)
self.hours_online: int = -1
self.light_active: bool = False
self.channels = ["B", "C"]
self.set_light_intensity(light_intensity)
self.light_duration_hours = float(light_duration_hours)
self.dark_duration_hours = float(dark_duration_hours)
def trigger_leds(self, hours: int) -> events.Event:
cycle_duration: int = self.light_duration_hours + self.dark_duration_hours
if ((hours % cycle_duration) < self.light_duration_hours) and (not self.light_active):
self.light_active = True
for channel in self.channels:
self.set_led_intensity(channel, self.light_intensity)
return events.ChangedLedIntensity(f"{hours}h: turned on LEDs")
elif ((hours % cycle_duration) >= self.light_duration_hours) and (self.light_active):
self.light_active = False
for channel in self.channels:
self.set_led_intensity(channel, 0)
return events.ChangedLedIntensity(f"{hours}h: turned off LEDs")
else:
return events.NoEvent(f"{hours}h: no change")
def set_light_intensity(self, intensity) -> None:
self.light_intensity = float(intensity)
if self.light_active:
# update now!
for channel in self.channels:
self.set_led_intensity(channel, self.light_intensity)
else:
pass
def execute(self) -> events.Event:
self.hours_online += 1
event = self.trigger_leds(self.hours_online)
return event
Setting up the Pioreactor
Setting up your Pioreactor is easy: attach LEDs to LED channels B
and C
, and stick them in pockets X2
and X3
.
Running the automation
Let's save this file to our Pioreactor plugin folder, by accessing the Pioreactor's command line, typing nano ~/.pioreactor/plugins/light_dark_cycle.py
, and pasting in the code above.
You can test the automation from the Pioreactor's command line by executing:
pio run led_automation --automation-name light_dark_cycle --light_intensity 45 --light_duration_hours 16 --dark_duration_hours 8
Adding the automation to the UI
To add your automation to the UI so it appears in the automation drop-down, follow the the steps here.