Skip to main content

Introduction to background jobs

The core unit of "work" in the Pioreactor software is a background job (called activities in the web interface). Background jobs include od_reading, monitor, automation controllers, all the automations themselves, etc. Often, a community plugin is a background job (or multiple jobs) that gives your bioreactor new abilities. There are a few important features of background jobs to be highlighted if you intend on working with them.

Inheritance and file structure

All background jobs inherit from the base class pioreactor.background_jobs.base.BackgroundJob. This class controls most of the behind-the-scenes behaviour of the class.

Typically, we structure background job code such that the Python file has a single background job class and a command-line interface. See examples here.

State of a job

A background job can be in one of five different states:

  • init: the job is initializing. The job starts in this state.
  • ready: the job is prepared to do "work", or currently doing "work"
  • sleeping: the job's work is paused (ex: when the stirring job is in state sleeping, it turns off its power.) This state is called "paused" in the web interface.
  • disconnected: the job has disconnected from services (MQTT, databases, etc.) and cleaned itself up successfully.
  • lost: the job did not disconnect gracefully. Something may be wrong.

Here's how the states can transition to each other:

                                        ┌──────────┐
│ │
┌──────────────────────┬───────► lost ◄────────┐
│ │ │ │ │
│ │ └─────▲────┘ │
│ │ │ │
┌────┴─────┐ ┌─────┴──────┐ │ ┌───────┴──────┐
│ │ │ │ │ │ │
│ init ├──────────► ready ├──────┼─────► disconnected │
│ │ │ │ │ │ │
└──────────┘ └────┬──▲────┘ │ └──────▲───────┘
│ │ │ │
│ │ │ │
│ │ │ │
┌────▼──┴────┬──────┘ │
│ │ │
│ sleeping ├───────────────────┘
│ │
└────────────┘

The author can optionally define hooks when a job moves between states, and when it enters a new state. For example, the method .on_ready_to_sleeping() is called when the job moves from ready to sleeping. Similarly, the method .on_sleeping() is called when the jobs enters state sleeping. By default, these methods are empty.

The recommended way to move a job between states is with job.set_state(job.READY). This will invoke any hooks that exist between the states. State can also be changed remotely over MQTT - we'll get to that point later.

Publish & subscribe, also known as pub/sub

On job creation, the job will connect to MQTT to allow for publishing and subscribing. The attribute sub_client is for subscribing from MQTT, and pub_client is for publishing to MQTT. Internally, sub_client and pub_client are Paho client objects.

Since publishing is so common, we also expose a .publish method:

job = SomeBackgroundJob(unit, experiment)
job.publish(f"pioreactor/{job.unit}/{job.experiment}/...", payload)

Another common pattern is to subscribe to a MQTT topic, and execute a threaded callback whenever a message comes in from that topic. This is done with the .subscribe_and_callback method. By convention, this is most often used in a .start_passive_listeners method:

from pioreactor.background_jobs.base import BackgroundJob

class SomeBackgroundJob(BackgroundJob):

def __init__(self, ...):
...

self.start_passive_listeners()


def response_to_dosing_event(self, msg):
# change self in response, etc.
...

def start_passive_listeners(self):
self.subscribe_and_callback(
self.response_to_dosing_event,
f"pioreactor/{self.unit}/{self.experiment}/dosing_events",
)

Publishing attributes to MQTT

A common task is when we have a job running, say stirring job, and we want to dynamically update an attribute, like the target RPM, without restarting the job.

We also want to know the value of a job's attribute when it changes. For example, in the stirring job, we'd like to know what the actual RPM is. This can't be edited externally (it's a measured value...), but we want the value available for other jobs to use, for the web interface to display, or to sink it into a database.

These two tasks, updating attributes and reading attributes in real-time, are so common that we've wrapped the logic into the parent BackgroundJob class, and allow job authors to tell us what attributes they wish to update and/or read.

In the class definition, the attribute published_settings defines which class attributes they would like to track. For example, the job responsible for stirring looks like:

class Stirrer(BackgroundJob):

published_settings = {
"target_rpm": {"datatype": "json", "settable": True, "unit": "RPM"},
"measured_rpm": {"datatype": "json", "settable": False, "unit": "RPM"},
"duty_cycle": {"datatype": "float", "settable": True, "unit": "%"},
}

...

Thus the attributes target_rpm, measured_rpm, duty_cycle are all published to MQTT when they change, but only duty_cycle and target_rpm are able to be updated over MQTT (as defined by settable).

The "datatype" field is normally one of: "string", "float", "integer", "json", "boolean" (full list in pioreactor/types.py).

info

The unit key is optional, and can be omitted. Also, there is an optional persist key (not shown above), which contains a boolean describing if the value should be kept in MQTT after the job exits, default False.

When a class' attribute that's present in published_settings changes, a MQTT message is published under the topic pioreactor/{self.unit}/{self.experiment}/{self.job_name}/{attr} with payload equal to the new value of the attribute. This is how the web interface is provided real-time data.

For updating an attribute, the BackgroundJob parent class listens to the MQTT topic:

pioreactor/{self.unit}/{self.experiment}/{self.job_name}/+/set

(the + is a MQTT wildcard), and when a message comes in, the class will check if the attribute (defined by +) is settable. If so, a lookup is done to see if a class method called set_<attr> is defined, and if present, calls that, with the only argument the message's payload. Otherwise, a simple assignment is done: self.<attr> = payload. The utility of a set_<attr> method is when changing the attribute requires more logic (ex: changing a PID controller's set point, see example here).

info

state is special attribute that is automatically appended to published_settings (with settable: True and persist: True) so the state of the job can always be updated and read from MQTT. Over MQTT, the field is called referenced as pioreactor/{unit}/{experiment}/{job_name}/$state - note the $.

When the job disconnects and cleans up, the published settings in MQTT are removed by sending an empty payload to the respective topics (unless persist: True, see above.)

Uniqueness

Only a single instance of a background job, modulo the job's name, can be running on a Raspberry Pi. For example, only a single Monitor background job can run, likewise only a single ODReading can run. (It's not clear what running multiple ODReadings means, or how they will interact with the hardware - poorly we expect).

The uniqueness is across processes, too. So if a script is running ODReading, then pio run od_reading will fail.

note

This uniqueness constraint is currently enforced. It's possible we will see a reason to remove the uniqueness constraint in a future version - let us know if you want to see this, too!

Entry & exit

It's important to treat background jobs as objects that need to be cleaned up properly. There are two traditional ways to use a background job:

job = SomeBackgroundJob(unit, experiment)
...
job.clean_up()
# all cleaned up - state is set to disconnected

alternatively, with a context manager:

with SomeBackgroundJob(unit, experiment) as job:
...

# all cleaned up - state is set to disconnected

The job is also cleaned up when the Python process exits.

caution

The following will cause problems:

#❌ don't do this
def function_that_doesnt_clean_up():
job = SomeBackgroundJob(unit, experiment)
# job not cleaned up
return True

value = function_that_doesnt_clean_up()
...

In the above, the job SomeBackgroundJob isn't disconnected, and state hasn't changed - nor can you access the job anymore (no accessible references). Do this instead:

# ✅ this is okay
def function_that_does_clean_up():
with SomeBackgroundJob(unit, experiment) as job:
...
return True

value = function_that_does_clean_up()

Blocking

Often we want the job to pause (or a script to pause), and wait for commands or MQTT messages. We can accomplish this with

job.block_until_disconnected()

This blocks the execution until the job is disconnected (either a MQTT state change, or a signal from the OS).