Migration guide: v2.0.0#
Version 2.0.0 introduces some breaking changes. This page aims to help you migrate to this new major version. The relevant changes are:
The deprecated
connect
anddisconnect
methods have been removedThe deprecated
filtered_messages
andunfiltered_messages
methods have been removedUser-managed queues for incoming messages have been replaced with a single client-wide queue
Some arguments to the
Client
have been renamed or removed
Changes to the client lifecycle#
The deprecated connect
and disconnect
methods have been removed. The best way to connect and disconnect from the broker is through the client’s context manager:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.publish("temperature/outside", payload=28.4)
asyncio.run(main())
If your use case does not allow you to use a context manager, you can use the client’s __aenter__
and __aexit__
methods almost interchangeably in place of the removed connect
and disconnect
methods.
The __aenter__
and __aexit__
methods are designed to be called by the async with
statement when the execution enters and exits the context manager. However, we can also execute them manually:
import asyncio
import aiomqtt
async def main():
client = aiomqtt.Client("test.mosquitto.org")
await client.__aenter__()
try:
await client.publish("temperature/outside", payload=28.4)
finally:
await client.__aexit__(None, None, None)
asyncio.run(main())
__aenter__
is equivalent to connect
. __aexit__
is equivalent to disconnect
except that it forces disconnection instead of throwing an exception in case the client cannot disconnect cleanly.
Note
__aexit__
expects three arguments: exc_type
, exc
, and tb
. These arguments describe the exception that caused the context manager to exit, if any. You can pass None
to all of these arguments in a manual call to __aexit__
.
Changes to the message queue#
The filtered_messages
, unfiltered_messages
, and messages
methods have been removed and replaced with a single client-wide message queue.
For previous versions, a minimal example of printing all messages (unfiltered) looked like this:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async with client.messages() as messages:
async for message in messages:
print(message.payload)
asyncio.run(main())
We now no longer need the line async with client.messages() as messages:
, but instead access the message generator directly with client.messages
:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
async for message in client.messages:
print(message.payload)
asyncio.run(main())
To handle messages from different topics differently, we can use Topic.matches()
:
import asyncio
import aiomqtt
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
async for message in client.messages:
if message.topic.matches("humidity/inside"):
print(f"[humidity/inside] {message.payload}")
if message.topic.matches("+/outside"):
print(f"[+/outside] {message.payload}")
if message.topic.matches("temperature/#"):
print(f"[temperature/#] {message.payload}")
asyncio.run(main())
Note
In our example, messages to temperature/outside
are handled twice!
The filtered_messages
, unfiltered_messages
, and messages
methods created isolated message queues underneath, such that you could invoke them multiple times. From Version 2.0.0 on, the client maintains a single queue that holds all incoming messages, accessible via Client.messages
.
If you continue to need multiple queues (e.g. because you have special concurrency requirements), you can build a “distributor” on top:
import asyncio
import aiomqtt
async def temperature_consumer():
while True:
message = await temperature_queue.get()
print(f"[temperature/#] {message.payload}")
async def humidity_consumer():
while True:
message = await humidity_queue.get()
print(f"[humidity/#] {message.payload}")
temperature_queue = asyncio.Queue()
humidity_queue = asyncio.Queue()
async def distributor(client):
# Sort messages into the appropriate queues
async for message in client.messages:
if message.topic.matches("temperature/#"):
temperature_queue.put_nowait(message)
elif message.topic.matches("humidity/#"):
humidity_queue.put_nowait(message)
async def main():
async with aiomqtt.Client("test.mosquitto.org") as client:
await client.subscribe("temperature/#")
await client.subscribe("humidity/#")
# Use a task group to manage and await all tasks
async with asyncio.TaskGroup() as tg:
tg.create_task(distributor(client))
tg.create_task(temperature_consumer())
tg.create_task(humidity_consumer())
asyncio.run(main())
Changes to client arguments#
The
queue_class
andqueue_maxsize
arguments tofiltered_messages
,unfiltered_messages
, andmessages
have been moved to theClient
and have been renamed toqueue_type
andmax_queued_incoming_messages
The
max_queued_messages
client argument has been renamed tomax_queued_outgoing_messages
The deprecated
message_retry_set
client argument has been removed