API documentation¶
Ammoo leverages Python 3.5’s new syntax to make working with AMQP easier:
import ammoo
async with await ammoo.connect('amqp://broker/') as connection:
async with connection.channel() as channel:
await channel.publish('my_exchange', 'routing_key', 'text body')
async with channel.consume('my_queue') as consumer:
async for message in consumer:
print('Received message: {}'.format(message.body)
await message.ack()
Get a message from queue, decode it’s body as text and reply with some JSON:
message = await channel.get('my_queue', no_ack=True)
await message.reply(json={'original message': message.decode()})
Connect¶
-
coroutine
ammoo.
connect
(url=None, *, host, port, virtualhost)¶ Parameters: - url (str) – URL for the connection. Any component in the URL can be overridden with a keyword argument, or omitted enitrely.
- host (str) – Server hostname or IP address. Defaults to localhost.
- port (int) – Server port. Defaults to 5671 for unencrypted connections and 5672 for SSL.
- virtualhost (str) – AMQP virtualhost to open on connection initialization. Defaults to “/”.
- ssl (bool) – Force encryption on/off. By default the connection is unencrypted. Using the amqps schema in the URL turns encryption on.
- ssl_context (ssl.SSLContext) – Explicit SSL context object. Use this argument for eg. client certificates or validate the server certificate against a particular certificate chain.
- heartbeat_interval (int) – Expected number of seconds between frames to consider the connection alive. Both the server and client will send a heartbeat frame with this interval if no other frames have been sent. Defaults to whatever the server suggests. 0 turns heartbeats off.
- auth – Authentication mechanism/chooser to use. Defaults to password authentication with the AMQPLAIN or PLAIN mechanism.
- login (str) – Username to use for default
auth
. Defaults to “guest”. - password (str) – Password to use for default
auth
. Defaults to “guest”. - frame_max (int) – Maximum AMQP frame size. Must be at least 4096 bytes. Defaults to 128kB, until the server demands a smaller one.
- loop (asyncio.AbstractEventLoop) – Event loop. Defaults to
asyncio.get_event_loop()
- connection_factory – Class (or class returning function) to use instead of default
Connection
.
Connects to an AMQP server and returns a
Connection
instance:await ammoo.connect('amqps://myserver/myvhost') await ammoo.connect(host='myserver', virtualhost='myvhost', ssl=True)
Connecting to unencrypted AMQP on localhost to virtualhost / is simply:
await ammoo.connect()
Connection¶
-
class
ammoo.
Connection
¶ An AMQP connection. It’s an asynchronous context manager, which does the server handshake while entering, and closes the connection in exit. Most of the class’s methods are unusable until the connection has been entered, so use
connect()
to get one, and use it within async for:async with await connect() as connection: # correct! async with connect() as connection: # Fails: connect() is a coroutine that needs to be awaited connection = await connect() # works, but... connection.channel() # raises an exception because connection isn't entered
-
channel
(*, prefetch_size=None, prefetch_count=None)¶ Parameters: - prefetch_size (int) – Passed to
Channel.qos()
if used. - prefetch_count (int) – Passed to
Channel.qos()
if used.
Return type: Open a new channel. Should be used in a context manager:
async with connection.channel() as channel: ...
If prefetch_size or prefetch_count are given,
Channel.qos()
is called after opening the channel. The two async with blocks achieve the same:async with connection.channel(prefetch_count=5) as channel: ... async with connection.channel() as channel: await channel.qos(prefetch_count=5, prefetch_size=0) ...
- prefetch_size (int) – Passed to
-
Channel¶
-
class
ammoo.
Channel
¶ An AMQP channel.
-
consume
(queue_name, *, ...)¶ Start a new consumer on a queue.
Parameters: - queue_name (str) – Queue name
- no_ack (bool) – Optional: If True, server does not expect messages delivered to consumer to be acknowledged or rejected.
- no_local (bool) – Optional: If True, the server will not deliver messages to the connection that published them.
- exclusive (bool) – Optional: If True, only this consumer can access the queue.
- priority (int) – Optional: Set consumer priority. Lower priority consumers will receive messages only when higher priority ones are busy (Sets x-priority on the consumer).
Return type: Note
priority is a RabbitMQ extension
-
coroutine
get
(queue_name, *, ...)¶ Get a message from queue.
Parameters: Raises: EmptyQueue – If there are no messages in queue, EmptyQueue is raised
Return type:
-
coroutine
ack
(delivery_tag)¶ Acknowledge a message.
Parameters: delivery_tag (int) – Delivery tag of message to acknowledge ( ExpectedMessage.delivery_tag
)See also
-
coroutine
reject
(delivery_tag, requeue)¶ Reject a message. Opposite of
ack()
.Parameters: - delivery_tag (int) – Delivery tag of message to reject (
ExpectedMessage.delivery_tag
) - requeue (bool) – If True, the server will try to requeue the message. False means the message is discarded or dead-lettered.
See also
- delivery_tag (int) – Delivery tag of message to reject (
-
coroutine
qos
(prefetch_size, prefetch_count, global_)¶ Limit how many unacknowledged messages (or message data) will be delivered to consumers. Without qos, the server will deliver all of the queue’s messages to the consumer, possibly causing the consumer to run out of memory or starving other consumers of messages.
Parameters: - prefetch_size (int) – Maximum number of unacknowledged messages server will deliver to a consumer. Zero turns the limit off.
- prefetch_count (int) – Maximum combined size of unacknowledged messages server will deliver to a consumer. Zero turns the limit off.
- global (bool) – For standard AMQP: True applies the qos to the whole connection, and False to the channel only. For RabbitMQ: True applies the setting to both the channel’s current consumers and future ones, while False only applies to the latter.
-
coroutine
recover
(requeue)¶ Ask server to redeliver unacknowledged messages
Parameters: requeue (bool) – If False, redeliver messages to the original recipient. If True, the message may be delivered to another recipient.
-
coroutine
publish
(exchange_name, route, [body, ]*, ...)¶ Publish a message body to exchange_name with route. body and json are mutually exclusive, but one of them has to be used.
Publish a binary body with
bytes
orbytearray
:await channel.publish(exchange_name, routing_key, b'binary bytes') await channel.publish(exchange_name, routing_key, bytearray(b'binary bytarray'))
Publish a
str
:# encoded to bytes with channel's default encoding await channel.publish(exchange_name, routing_key, 'text string') # use non-default encoding await channel.publish(exchange_name, routing_key, 'text string', encoding='iso-8859-1')
Serialize JSON into body:
await channel.publish(exchange_name, routing_key, json={'key': 123})
Set the content-encoding property:
# body will also be encoded with iso-8859-1 instead of channel's default encoding await channel.publish(exchange_name, routing_key, 'some text', content_encoding='iso-8859-1'}) # content-encoding property is set to channel's default encoding await channel.publish(exchange_name, routing_key, 'some text', content_encoding=True}) # can be used for json too await channel.publish(exchange_name, routing_key, json={'key': 123}, content_encoding='iso-8859-1'})
Set the content-type property:
await channel.publish(exchange_name, routing_key, b'binary data', content_type=True}) # bytes body -> content-type is set to application/octet-stream await channel.publish(exchange_name, routing_key, 'some text', content_type=True}) # str body -> content-type is set to text/plain await channel.publish(exchange_name, routing_key, json={'key': 123}, content_type=True}) # json -> content-type is set to application/json await channel.publish(exchange_name, routing_key, body, content_type='application/acme-2000'}) # or a regular str
Parameters: - exchange_name (str) – Exchange name. Use an empty string for the default exchange.
- route – A
str
is used as a literal routing key, while aMapping
is used for the headers exchange type. - body – Message body.
str
,bytes
orbytearray
. If the json keyword argument is used, body may be omitted. - json – Optional: Object to serialize as JSON into body. Cannot be used at the same time as the body argument.
- mandatory (bool) – Optional: When True, messages the cannot be routed to a queue are returned back to the client.
- immediate (bool) – Optional: When True, messages that are not routed to a consumer immediately are returned back to the client.
- encoding (str) – Optional: Encode
str
body to bytes with this encoding. - correlation_id (str) – Optional: Correlation-id property.
- reply_to (str) – Optional: Reply-to property.
- expiration – Optional: Message expiration property, usually in milliseconds. Messages die if they are not consumed from queue within this TTL.
int
orstr
. - cc – Optional: Additional routing keys to use when routing message to queues.
- bcc – Optional: Like cc, but bcc will be removed from message before delivery.
- priority (int) – Optional: priority property.
- delivery_mode (int) – Optional: delivery-mode property 1 for non-persistent or 2 for persistent.
- timestamp (datetime) – Optional: Message timestamp property. If time zone is not set, UTC is assumed.
- content_encoding – Optional: content-encoding property. A
str
, or if abool
True, the value of the encoding argument or the channel’s default. - content_type – Optional: content-type property. A
str
, or if abool
True, set application/octet-stream if body isbytes
orbytearray
, text/plain if it’sstr
, and application/json if the json argument was used. - message_id (str) – Optional: message-id property.
- type (str) – Optional: type property.
- user_id (str) – Optional: user-id property.
- app_id (str) – Optional: app-id property.
Note
cc and bcc are RabbitMQ extensions. Not supported by standard AMQP.
-
coroutine
select_confirm
()¶ Turns publisher confirms on for the channel.
Note
RabbitMQ extension. Not supported by standard AMQP.
-
coroutine
declare_exchange
(exchange_name, exchange_type, *, ...)¶ Declare exchange.
Parameters: - exchange_name (str) – Exchange name
- exchange_type (str) – Exchange type: direct, fanout, topic, or headers.
- durable (bool) – Optional: If True, exchange will survive server restart.
- auto_delete (bool) – Optional: If True, the exchange is deleted (after a delay) when the last queue is unbound from it.
- alternate_exchange_name – Optional: Alternate exchange name. Messages that can’t be routed to any queue are instead published on the alternate exchange (Sets alternate-exchange on the exchange).
-
coroutine
delete_exchange
(exchange_name, *, ...)¶ Delete exchange exchange_name.
Parameters:
-
coroutine
assert_exchange_exists
(exchange_name)¶ Asserts exchange_name exists*. Channel will be closed if it does not!
Parameters: exchange_name – Exchange name
-
coroutine
bind_exchange
(destination, source, routing_key)¶ Bind source exchange to destination exchange for routing_key.
Parameters: Note
RabbitMQ extension. Not supported by standard AMQP.
-
coroutine
declare_queue
(queue_name, *, ...)¶ Declare a queue named queue_name.
Parameters: - queue_name (str) – Queue name
- exclusive (bool) – Optional: If True, the queue can be accessed only by this connection and is deleted when connection is closed.
- durable (bool) – Optional: If True, the queue will be marked as durable, surviving server restarts.
- auto_delete (bool) – Optional: If True, the queue is deleted when all consumers are finished using it.
- ttl (int) – Optional: Messages die after this number of milliseconds, if no one has consumed them first (Sets x-message-ttl on the queue).
- expires (int) – Optional: Milliseconds queue is unused before it is deleted (Sets x-expires on the queue).
- max_length (int) – Optional: Maximum number of messages in the queue before the oldest will die (Sets x-max-length on the queue).
- max_length_bytes (int) – Optional: Maximum number of message bytes in the queue before the oldest will die (Sets x-max-length on the queue).
- dead_letter_exchange (str) – Optional: Name of dead letter exchange. The queue’s dead messages are routed here (Sets x-dead-letter-exchange on the queue).
- dead_letter_routing_key – Optional: Override dead messages’ routing key when routing them to dead_letter_exchange (Sets x-dead-letter-routing-key on the queue).
- max_priority (int) – Optional: Queue’s maximum priority (Sets x-max-priority).
Return type:
-
coroutine
delete_queue
(queue_name)¶ Delete a queue named queue_name. If the queue does not exist, the method merely asserts it is not there.
Parameters: Returns: Number of messages in queue before it was deleted
-
coroutine
purge_queue
(queue_name)¶ Purges a queue of messages, emptying it.
Parameters: queue_name (str) – Queue name Returns: Number of messages in queue before it was purged
-
coroutine
bind_queue
(queue_name, exchange_name, routing_key)¶ Bind queue_name to exchange_name for routing_key.
Parameters:
-
coroutine
unbind_queue
(queue_name, exchange_name, routing_key)¶ Unbind queue_name from exchange_name for routing_key. Undoes
bind_queue()
.Parameters: - queue_name (str) – Queue name
- exchange_name (str) – Exchange name
- routing_key – Same as for
bind_queue()
.
-
coroutine
assert_queue_exists
(queue_name)¶ Asserts queue_name exists*. Channel is closed by the server if it does not!
Parameters: queue_name – Queue name Return type: QueueDeclareOkParameters
-
Consumer¶
-
class
ammoo.
Consumer
¶ An AMQP consumer. Asynchronous iterable that returns
DeliverMessage
instances. Must be used in an async for block:async with channel.consume('my_queue') as consumer: async for message in consumer: ...
Message¶
-
class
ammoo.
Message
¶ Base class for messages. Not instantiated directly:
Message: body, decode(), json(), exchange_name, routing_key, properties -> ReturnMessage: reply_code, reply_text -> ExpectedMessage: ack(), reject(), reply(), delivery_tag, redelivered -> DeliverMessage: consumer_tag -> GetMessage: message_count
-
exchange_name
¶ Exchange message was published to
-
routing_key
¶ Routing key message was published with
-
properties
¶ Message’s
BasicHeaderProperties
-
decode
(encoding=None)¶ Decode body into a
str
. When the encoding argument is not passed, the encoding defaults to the message’s content-encoding property (if defined), or the channel’s default encoding.>>> message.decode() 'text body' >>> message.decode('iso-8859-1') 'sí'
Parameters: encoding (str) – Optional: Encoding to use to decode body instead of content-encoding property/channel’s default encoding Return type: str
-
-
class
ammoo.
ExpectedMessage
¶ Base class for
DeliverMessage
andGetMessage
; not instantiated directly. Subclass ofMessage
.-
delivery_tag
¶ Message’s delivery tag, used for acknowledging or rejecting message to server
-
coroutine
ack
()¶ Acknowledge message to server. Calls
Channel.ack()
with the message’s delivery tag.
-
coroutine
reject
(requeue)¶ Reject message to server. Calls
Channel.reject()
with the message’s delivery tag.
-
coroutine
reply
([body, ]*, ...)¶ Publish a reply to a message that has the reply-to property set. If the message has the correlation-id property, it’s also set on the published message.
The method accepts the same keyword arguments as
Channel.publish()
.Note
Direct reply to is a RabbitMQ extension
-
-
class
ammoo.
DeliverMessage
¶ Message delivered to a
Consumer
. Subclass ofExpectedMessage
.
-
class
ammoo.
GetMessage
¶ A message from queue returned by calling
Channel.get()
. Subclass ofExpectedMessage
.-
message_count
¶ Number of messages still in queue after getting this message.
-
-
class
ammoo.
ReturnMessage
¶ Message returned by server as a consequence of using the mandatory or immediate flags of
Channel.publish()
. Subclass ofMessage
.
Message properties¶
-
class
ammoo.wire.frames.header.
BasicHeaderProperties
(content_type, content_encoding, headers, delivery_mode, priority, correlation_id, reply_to, expiration, message_id, timestamp, type_, user_id, app_id, cluster_id)¶ -
app_id
¶ Alias for field number 12
-
cluster_id
¶ Alias for field number 13
-
content_encoding
¶ Alias for field number 1
-
content_type
¶ Alias for field number 0
-
correlation_id
¶ Alias for field number 5
-
delivery_mode
¶ Alias for field number 3
-
expiration
¶ Alias for field number 7
-
headers
¶ Alias for field number 2
-
message_id
¶ Alias for field number 8
-
priority
¶ Alias for field number 4
-
reply_to
¶ Alias for field number 6
-
timestamp
¶ Alias for field number 9
-
type_
¶ Alias for field number 10
-
user_id
¶ Alias for field number 11
-