Callbacks

Description

PyPlumIO has an extensive event driven system where each device property is an event that you can subscribe to.

When you subscribe callback to the event, your callback will be awaited with the property value each time the property is received from the device by PyPlumIO.

Note

Callbacks must be coroutines defined with async def.

Subscribing to events

pyplumio.devices.Device.subscribe(self, name: str, callback: CallbackT) CallbackT

Subscribe a callback to the event.

Parameters:
  • name (str) – Event name or ID

  • callback (Callback) – A coroutine callback function, that will be awaited on the with the event data as an argument.

Returns:

A reference to the callback, that can be used with EventManager.unsubscribe().

Return type:

Callback

pyplumio.devices.Device.subscribe_once(self, name: str, callback: Callable[[Any], Coroutine[Any, Any, Any]]) Callable[[Any], Coroutine[Any, Any, Any]]

Subscribe a callback to the event once.

Callback will be unsubscribed after single event.

Parameters:
  • name (str) – Event name or ID

  • callback (Callback) – A coroutine callback function, that will be awaited on the with the event data as an argument.

Returns:

A reference to the callback, that can be used with EventManager.unsubscribe().

Return type:

Callback

To remove the previously registered callback, use the unsubcribe() method.

pyplumio.devices.Device.unsubscribe(self, name: str, callback: Callable[[Any], Coroutine[Any, Any, Any]]) bool

Usubscribe a callback from the event.

Parameters:
  • name (str) – Event name or ID

  • callback (Callback) – A coroutine callback function, previously subscribed to an event using subscribe() or subscribe_once() methods.

Returns:

True if callback is found, False otherwise.

Return type:

bool

Filters

PyPlumIO’s callback system can be further improved upon by using built-in filters. Filters allow you to specify when you want your callback to be awaited.

All built-in filters are described below.

pyplumio.filters.aggregate(callback: Callable[[Any], Coroutine[Any, Any, Any]], seconds: float) _Aggregate

Return an aggregate filter.

A callback function will be called with a sum of values collected over a specified time period. Can only be used with numeric values.

Parameters:
  • callback (Callback) – A callback function to be awaited once filter conditions are fulfilled

  • seconds (float) – A callback will be awaited with a sum of values aggregated over this amount of seconds.

Returns:

A instance of callable filter

Return type:

_Aggregate

This filter aggregates value for specified amount of seconds and then calls the callback with the sum of values collected.

from pyplumio.filters import aggregate

# Await the callback with the fuel burned during 30 seconds.
ecomax.subscribe("fuel_burned", aggregate(my_callback, seconds=30))
pyplumio.filters.on_change(callback: Callable[[Any], Coroutine[Any, Any, Any]]) _OnChange

Return a value changed filter.

A callback function will only be called if value is changed from the previous call.

Parameters:

callback (Callback) – A callback function to be awaited on value change

Returns:

A instance of callable filter

Return type:

_OnChange

Normally callbacks are awaited each time the PyPlumIO receives data from the device, regardless of whether value is changed or not.

With this filter it’s possible to only await the callback when value is changed.

from pyplumio.filter import on_change

# Await the callback once heating_temp value is changed since
# last call.
ecomax.subscribe("heating_temp", on_change(my_callback))
pyplumio.filters.debounce(callback: Callable[[Any], Coroutine[Any, Any, Any]], min_calls: int) _Debounce

Return a debounce filter.

A callback function will only called once value is stabilized across multiple filter calls.

Parameters:
  • callback (Callback) – A callback function to be awaited on value change

  • min_calls (int) – Value shouldn’t change for this amount of filter calls

Returns:

A instance of callable filter

Return type:

_Debounce

This filter will only await the callback once value is settled across multiple calls, specified in min_calls argument.

from pyplumio.filter import debounce

# Await the callback once outside_temp stays the same for three
# consecutive times it's received by PyPlumIO.
ecomax.subscribe("outside_temp", debounce(my_callback, min_calls=3))
pyplumio.filters.throttle(callback: Callable[[Any], Coroutine[Any, Any, Any]], seconds: float) _Throttle

Return a throttle filter.

A callback function will only be called once a certain amount of seconds passed since the last call.

Parameters:
  • callback (Callback) – A callback function that will be awaited once filter conditions are fulfilled

  • seconds (float) – A callback will be awaited at most once per this amount of seconds

Returns:

A instance of callable filter

Return type:

_Throttle

This filter limits how often your callback will be awaited.

from pyplumio.filter import throttle

# Await the callback once per 5 seconds, regardless of
# how often outside_temp value is being processed by PyPlumIO.
ecomax.subscribe("outside_temp", throttle(my_callback, seconds=5))
pyplumio.filters.delta(callback: Callable[[Any], Coroutine[Any, Any, Any]]) _Delta

Return a difference filter.

A callback function will be called with a difference between two subsequent value.

Parameters:

callback (Callback) – A callback function that will be awaited with difference between values in two subsequent calls

Returns:

A instance of callable filter

Return type:

_Delta

Instead of raw value, this filter awaits callback with value change.

It can be used with numeric values, dictionaries, tuples or lists.

from pyplumio.filter import delta

# Await the callback with difference between values in current
# and last await.
ecomax.subscribe("outside_temp", delta(my_callback))
pyplumio.filters.custom(callback: Callable[[Any], Coroutine[Any, Any, Any]], filter_fn: Callable[[Any], bool]) _Custom

Return a custom filter.

A callback function will be called when user-defined filter function, that’s being called with the value as an argument, returns true.

Parameters:
  • callback (Callback) – A callback function to be awaited when filter function return true

  • filter_fn (Callable[[Any], bool]) – Filter function, that will be called with a value and should return True to await filter’s callback

Returns:

A instance of callable filter

Return type:

_Custom

This filter allows to specify filter function that will be called every time the value is received from the controller.

A callback is awaited once the filter function returns true.

from pyplumio.filter import custom

# Await the callback when temperature is higher that 10 degrees
# Celsius.
ecomax.subscribe("outside_temp", custom(my_callback, lambda x: x > 10))

Callbacks Examples

In this example we’ll use filter chaining to achieve more complex event processing.

from pyplumio.filter import throttle, on_change


async def my_callback(value) -> None:
    """Prints current heating temperature."""
    print(f"Heating Temperature: {value}")


async def main():
    """Subscribes callback to the current heating temperature."""
    async with pyplumio.open_tcp_connection("localhost", 8899) as conn:

        # Get the ecoMAX device.
        ecomax = await conn.get("ecomax")

        # Await the callback on value change but no faster than
        # once per 5 seconds.
        ecomax.subscribe("heating_temp", throttle(on_change(my_callback), seconds=5))

        # Wait until disconnected (forever)
        await conn.wait_until_done()


asyncio.run(main())

In the example below, my_callback with be called with current heating temperature on every SensorDataMessage.

import asyncio

import pyplumio


async def my_callback(value) -> None:
    """Prints current heating temperature."""
    print(f"Heating Temperature: {value}")


async def main():
    """Subscribes callback to the current heating temperature."""
    async with pyplumio.open_tcp_connection("localhost", 8899) as conn:

        # Get the ecoMAX device.
        ecomax = await conn.get("ecomax")

        # Subscribe my_callback to heating_temp event.
        ecomax.subscribe("heating_temp", my_callback)

        # Wait until disconnected (forever)
        await conn.wait_until_done()


asyncio.run(main())

In the example below, my_callback with be called with current target temperature once and my_callback2 will be called when heating temperature will change more the 0.1 degrees Celsius.

import asyncio

import pyplumio
from pyplumio.filters import on_change


async def my_callback(value) -> None:
    """Prints heating target temperature."""
    print(f"Target Temperature: {value}")

async def my_callback2(value) -> None:
    """Prints current heating temperature."""
    print(f"Current Temperature: {value}")

async def main():
    """Subscribes callback to the current heating temperature."""
    async with pyplumio.open_tcp_connection("localhost", 8899) as conn:
        # Get the ecoMAX device.
        ecomax = await conn.get("ecomax")

        # Subscribe my_callback to heating_target_temp event.
        ecomax.subscribe_once("heating_target_temp", my_callback)

        # Subscribe my_callback2 to heating_temp changes.
        ecomax.subscribe("heating_temp", on_change(my_callback2))

        # Wait until disconnected (forever)
        await conn.wait_until_done()


asyncio.run(main())