Connecting
Opening the connection
- pyplumio.open_tcp_connection(host: str, port: int, *, protocol: Protocol | None = None, reconnect_on_failure: bool = True, **kwargs: Any) TcpConnection
Create a TCP connection.
- Parameters:
host (str) – IP address or host name of the remote RS-485 server
port (int) – Port that remote RS-485 server is listening to
protocol (Protocol, optional) – Protocol that will be used for communication with the ecoMAX controller, default to AsyncProtocol
reconnect_on_failure (bool, optional) – True if PyPlumIO should try reconnecting on failure, otherwise False, default to True
**kwargs – Additional keyword arguments to be passed to asyncio.open_connection()
- Returns:
An instance of TCP connection
- Return type:
TcpConnection
With this you can connect to the controller remotely via RS-485 to Ethernet/WiFi converters, which are readily available online or can be custom built using wired connection and ser2net software.
import pyplumio
async with pyplumio.open_tcp_connection("localhost", port=8899) as conn:
...
Note
Although async with syntax is preferred, you can initiate connection without it. See following examples for more information.
- pyplumio.open_serial_connection(device: str, baudrate: int = 115200, *, protocol: Protocol | None = None, reconnect_on_failure: bool = True, **kwargs: Any) SerialConnection
Create a serial connection.
- Parameters:
device (str) – Serial port device name. e. g. /dev/ttyUSB0
baudrate (int, optional) – Serial port baud rate, defaults to 115200
protocol (Protocol, optional) – Protocol that will be used for communication with the ecoMAX controller, default to AsyncProtocol
reconnect_on_failure (bool, optional) – True if PyPlumIO should try reconnecting on failure, otherwise False, default to True
**kwargs – Additional keyword arguments to be passed to serial_asyncio.open_serial_connection()
- Returns:
An instance of serial connection
- Return type:
SerialConnection
You can connect to the ecoMAX controller via wired connection through RS-485 to USB or RS-485 to TTL adapters, that are connected directly to the device running PyPlumIO.
You MUST not connect RS-485 lines directly to the UART outputs of your PC or you’ll risk damaging your PC/controller or the ecoMAX controller itself.
import pyplumio
async with pyplumio.open_serial_connection("/dev/ttyUSB0", baudrate=115200) as conn:
...
Protocols
Connection helpers and classes allow to specify custom protocol to handle data, once connection is established.
All protocols should inherit following abstract base class:
- class pyplumio.Protocol
Represents a protocol.
Most of documentation assumes that protocol is left as is, which is by default AsyncProtocol. However, setting different protocol allows for more fine-grained control over data processing.
In following example we’ll set protocol to DummyProtocol, request and output alerts and close the connection without an additional overhead of working with device classes and queues.
import asyncio
import pyplumio
from pyplumio.const import DeviceType
from pyplumio.frames import requests, responses
async def main():
"""Open a connection and request alerts."""
async with pyplumio.open_tcp_connection(
host="localhost", port=8899, protocol=pyplumio.DummyProtocol()
) as connection:
await connection.writer.write(
requests.AlertsRequest(recipient=DeviceType.ECOMAX, start=0, count=5)
)
while connection.connected:
if isinstance(
(frame := await connection.reader.read()), responses.AlertsResponse
):
print(frame.data)
break
asyncio.run(main())
All built-in protocols are listed below.
- class pyplumio.DummyProtocol
Represents a dummy protocol.
This protocol sets frame reader and writer as attributes, then sets connected event and does nothing.
- class pyplumio.AsyncProtocol(ethernet_parameters: EthernetParameters | None = None, wireless_parameters: WirelessParameters | None = None, consumers_count: int = 3)
Represents an async protocol.
This protocol implements producer-consumers pattern using asyncio queues.
The frame producer tries to read frames from the write queue. If any is available, it sends them to the device via frame writer.
It then reads stream via frame reader and puts received frame into the read queue.
Frame consumers read frames from the read queue, create device entry, if needed, and send frame to the entry for the processing.
Network Information
When opening the connection, you can send ethernet and wireless network information to the ecoMAX controller by passing one or both of data classes below to the Protocol of your choice.
- class pyplumio.EthernetParameters(ip: str = '0.0.0.0', netmask: str = '255.255.255.0', gateway: str = '0.0.0.0', status: bool = True)
Represents an ethernet parameters.
- gateway: str = '0.0.0.0'
Gateway IP address
- ip: str = '0.0.0.0'
IP address
- netmask: str = '255.255.255.0'
IP subnet mask
- status: bool = True
Connection status. Parameters will be ignored if set to False
- class pyplumio.WirelessParameters(ip: str = '0.0.0.0', netmask: str = '255.255.255.0', gateway: str = '0.0.0.0', status: bool = True, ssid: str = '', encryption: EncryptionType = EncryptionType.NONE, signal_quality: int = 100)
Represents a wireless network parameters.
- status: bool = True
Connection status. Parameters will be ignored if set to False
- encryption: EncryptionType = 1
Wireless encryption standard (0 - unknown, 1 - no encryption, 2 - WEP, 3 - WPA, 4 - WPA2)
- signal_quality: int = 100
Wireless signal strength in percentage
- ssid: str = ''
Wireless Service Set IDentifier
Once set, network information will be shown in information section on the ecoMAX panel.
In the example below, we’ll set both ethernet and wireless parameters.
import pyplumio
from pyplumio.const import EncryptionType
async def main():
"""Initialize a connection with network parameters."""
async with pyplumio.open_tcp_connection(
host="localhost",
port=8899,
protocol=pyplumio.AsyncProtocol(
ethernet_parameters=pyplumio.EthernetParameters(
ip="10.10.1.100",
netmask="255.255.255.0",
gateway="10.10.1.1",
),
wireless_parameters=pyplumio.WirelessParameters(
ip="10.10.2.100",
netmask="255.255.255.0",
gateway="10.10.2.1",
ssid="My SSID",
encryption=EncryptionType.WPA2,
signal_quality=100,
),
),
) as connection:
...
Connection Examples
The following example illustrates opening a TCP connection using Python’s context manager.
import asyncio
import logging
import pyplumio
_LOGGER = logging.getLogger(__name__)
async def main():
"""Opens the connection and gets the ecoMAX device."""
async with pyplumio.open_tcp_connection("localhost", port=8899) as conn:
try:
# Get the ecoMAX device within 10 seconds or timeout.
ecomax = await conn.get("ecomax", timeout=10)
except asyncio.TimeoutError:
# If device times out, log the error.
_LOGGER.error("Failed to get the device within 10 seconds")
# Run the coroutine in asyncio event loop.
asyncio.run(main())
The following example illustrates opening a TCP connection without using Python’s context manager.
import asyncio
import logging
import pyplumio
_LOGGER = logging.getLogger(__name__)
async def main():
"""Opens the connection and gets the ecoMAX device."""
connection = pyplumio.open_tcp_connection("localhost", port=8899)
# Connect to the device.
await connection.connect()
try:
# Get the ecoMAX device within 10 seconds or timeout.
ecomax = await connection.get("ecomax", timeout=10)
except asyncio.TimeoutError:
# If device times out, log the error.
_LOGGER.error("Failed to get the device within 10 seconds")
# Close the connection.
await connection.close()
# Run the coroutine in asyncio event loop.
asyncio.run(main())