Connecting

Opening the connection

pyplumio.open_tcp_connection(host: str, port: int, *, protocol: Protocol | None = None, reconnect_on_failure: bool = True, **kwargs: Any) TcpConnection

Create a TCP connection.

Parameters:
  • host (str) – IP address or host name of the remote RS-485 server

  • port (int) – Port that remote RS-485 server is listening to

  • protocol (Protocol, optional) – Protocol that will be used for communication with the ecoMAX controller, default to AsyncProtocol

  • reconnect_on_failure (bool, optional) – True if PyPlumIO should try reconnecting on failure, otherwise False, default to True

  • **kwargs – Additional keyword arguments to be passed to asyncio.open_connection()

Returns:

An instance of TCP connection

Return type:

TcpConnection

With this you can connect to the controller remotely via RS-485 to Ethernet/WiFi converters, which are readily available online or can be custom built using wired connection and ser2net software.

import pyplumio

async with pyplumio.open_tcp_connection("localhost", port=8899) as conn:
    ...

Note

Although async with syntax is preferred, you can initiate connection without it. See following examples for more information.

pyplumio.open_serial_connection(device: str, baudrate: int = 115200, *, protocol: Protocol | None = None, reconnect_on_failure: bool = True, **kwargs: Any) SerialConnection

Create a serial connection.

Parameters:
  • device (str) – Serial port device name. e. g. /dev/ttyUSB0

  • baudrate (int, optional) – Serial port baud rate, defaults to 115200

  • protocol (Protocol, optional) – Protocol that will be used for communication with the ecoMAX controller, default to AsyncProtocol

  • reconnect_on_failure (bool, optional) – True if PyPlumIO should try reconnecting on failure, otherwise False, default to True

  • **kwargs – Additional keyword arguments to be passed to serial_asyncio.open_serial_connection()

Returns:

An instance of serial connection

Return type:

SerialConnection

You can connect to the ecoMAX controller via wired connection through RS-485 to USB or RS-485 to TTL adapters, that are connected directly to the device running PyPlumIO.

You MUST not connect RS-485 lines directly to the UART outputs of your PC or you’ll risk damaging your PC/controller or the ecoMAX controller itself.

import pyplumio

async with pyplumio.open_serial_connection("/dev/ttyUSB0", baudrate=115200) as conn:
    ...

Protocols

Connection helpers and classes allow to specify custom protocol to handle data, once connection is established.

All protocols should inherit following abstract base class:

class pyplumio.Protocol

Represents a protocol.

Most of documentation assumes that protocol is left as is, which is by default AsyncProtocol. However, setting different protocol allows for more fine-grained control over data processing.

In following example we’ll set protocol to DummyProtocol, request and output alerts and close the connection without an additional overhead of working with device classes and queues.

import asyncio

import pyplumio
from pyplumio.const import DeviceType
from pyplumio.frames import requests, responses


async def main():
    """Open a connection and request alerts."""
    async with pyplumio.open_tcp_connection(
        host="localhost", port=8899, protocol=pyplumio.DummyProtocol()
    ) as connection:
        await connection.writer.write(
            requests.AlertsRequest(recipient=DeviceType.ECOMAX, start=0, count=5)
        )

        while connection.connected:
            if isinstance(
                (frame := await connection.reader.read()), responses.AlertsResponse
            ):
                print(frame.data)
                break


asyncio.run(main())

All built-in protocols are listed below.

class pyplumio.DummyProtocol

Represents a dummy protocol.

This protocol sets frame reader and writer as attributes, then sets connected event and does nothing.

class pyplumio.AsyncProtocol(ethernet_parameters: EthernetParameters | None = None, wireless_parameters: WirelessParameters | None = None, consumers_count: int = 3)

Represents an async protocol.

This protocol implements producer-consumers pattern using asyncio queues.

The frame producer tries to read frames from the write queue. If any is available, it sends them to the device via frame writer.

It then reads stream via frame reader and puts received frame into the read queue.

Frame consumers read frames from the read queue, create device entry, if needed, and send frame to the entry for the processing.

Network Information

When opening the connection, you can send ethernet and wireless network information to the ecoMAX controller by passing one or both of data classes below to the Protocol of your choice.

class pyplumio.EthernetParameters(ip: str = '0.0.0.0', netmask: str = '255.255.255.0', gateway: str = '0.0.0.0', status: bool = True)

Represents an ethernet parameters.

gateway: str = '0.0.0.0'

Gateway IP address

ip: str = '0.0.0.0'

IP address

netmask: str = '255.255.255.0'

IP subnet mask

status: bool = True

Connection status. Parameters will be ignored if set to False

class pyplumio.WirelessParameters(ip: str = '0.0.0.0', netmask: str = '255.255.255.0', gateway: str = '0.0.0.0', status: bool = True, ssid: str = '', encryption: EncryptionType = EncryptionType.NONE, signal_quality: int = 100)

Represents a wireless network parameters.

status: bool = True

Connection status. Parameters will be ignored if set to False

encryption: EncryptionType = 1

Wireless encryption standard (0 - unknown, 1 - no encryption, 2 - WEP, 3 - WPA, 4 - WPA2)

signal_quality: int = 100

Wireless signal strength in percentage

ssid: str = ''

Wireless Service Set IDentifier

Once set, network information will be shown in information section on the ecoMAX panel.

In the example below, we’ll set both ethernet and wireless parameters.

import pyplumio
from pyplumio.const import EncryptionType


async def main():
    """Initialize a connection with network parameters."""
    async with pyplumio.open_tcp_connection(
        host="localhost",
        port=8899,
        protocol=pyplumio.AsyncProtocol(
            ethernet_parameters=pyplumio.EthernetParameters(
                ip="10.10.1.100",
                netmask="255.255.255.0",
                gateway="10.10.1.1",
            ),
            wireless_parameters=pyplumio.WirelessParameters(
                ip="10.10.2.100",
                netmask="255.255.255.0",
                gateway="10.10.2.1",
                ssid="My SSID",
                encryption=EncryptionType.WPA2,
                signal_quality=100,
            ),
        ),
    ) as connection:
        ...

Connection Examples

The following example illustrates opening a TCP connection using Python’s context manager.

import asyncio
import logging

import pyplumio


_LOGGER = logging.getLogger(__name__)


async def main():
    """Opens the connection and gets the ecoMAX device."""
    async with pyplumio.open_tcp_connection("localhost", port=8899) as conn:
        try:
            # Get the ecoMAX device within 10 seconds or timeout.
            ecomax = await conn.get("ecomax", timeout=10)
        except asyncio.TimeoutError:
            # If device times out, log the error.
            _LOGGER.error("Failed to get the device within 10 seconds")


# Run the coroutine in asyncio event loop.
asyncio.run(main())

The following example illustrates opening a TCP connection without using Python’s context manager.

import asyncio
import logging

import pyplumio


_LOGGER = logging.getLogger(__name__)


async def main():
    """Opens the connection and gets the ecoMAX device."""
    connection = pyplumio.open_tcp_connection("localhost", port=8899)

    # Connect to the device.
    await connection.connect()

    try:
        # Get the ecoMAX device within 10 seconds or timeout.
        ecomax = await connection.get("ecomax", timeout=10)
    except asyncio.TimeoutError:
        # If device times out, log the error.
        _LOGGER.error("Failed to get the device within 10 seconds")

    # Close the connection.
    await connection.close()


# Run the coroutine in asyncio event loop.
asyncio.run(main())