Introduction to AzureQstor

Azure queue storage

Azure queue storage is a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS. A single queue message can be up to 64 KB in size, and a queue can contain millions of messages, up to the total capacity limit of a storage account. Queue storage is often used to create a backlog of work to process asynchronously.

AzureQstor is an R interface to queue storage, building on the functionality provided by the AzureStor package. You can easily create and delete queues, and read and write messages to queues.

Queue endpoint

AzureQstor uses a combination of S3 and R6 classes. The queue endpoint is an S3 object for compatibility with AzureStor. It has methods for creating, retrieving and deleting queues that mirror those in AzureStor for ADLS2, blob and file storage.

library(AzureQstor)

endp <- storage_endpoint("https://mystorage.queue.core.windows.net", key="access_key")

# creating, retrieving and deleting queues
qu <- create_storage_queue(endp, "myqueue")
qu2 <- storage_queue(endp, "myotherqueue")
delete_storage_queue(qu2, confirm=FALSE)

# list all storage queues in this account
list_storage_queues(endp)

Queues and messages

Queues and messages are represented using R6 classes. The list_storage_queues, storage_queue and create_storage_queue calls above return objects of class StorageQueue, which has methods for reading, writing, updating and deleting messages.

qu <- storage_queue(endp, "myqueue")

# write a message to the back of the queue
qu$put("New message")

# read a message from the front of the queue
msg <- qu$get_message()

Once we have read a message, we have a time window (by default 30 seconds) in which to process it. During this window, the message still exists in the queue, but is invisible: further requests for messages will skip over it. If we need more time to process a message, we can update it on the queue to extend the invisibility window.

qu$update_message(msg, visibility_timeout=60)

Once we are done with the message, we delete it from the queue:

qu$delete_message(msg)

To retrieve a message from a queue without affecting its visibility, we can use the peek_message method. This can be useful if we only want to examine a message’s contents without any further processing.

msg <- qu$peek_message()

The StorageQueue class also provides methods to retrieve multiple messages at once, to a maximum of 32.

# read a batch of 30 messages; returns a list of message objects
qu$read_messages(n=30)

# peek at the next 30 messages
qu$peek_messages(n=30)

Messages themselves are objects of class QueueMessage, which has methods for updates and deletes. In fact, the above delete_message and update_message queue methods simply call the corresponding method in the message object.

msg <- qu$get_message()
msg$update(visibility_timeout=60)
msg$delete()

The content of a message is in its text field, which will (usually) be a text string.

msg$text
## [1] "New message"

Metadata

You can get and set metadata for a queue object with the get/set_metadata R6 methods. If you prefer S3, you can also use the AzureStor get/set_storage_metadata S3 generics, which have methods for queue objects.

set_storage_metadata(qu, name1="value1", name2="value2")
get_storage_metadata(qu)
## $name1
## [1] "value1"
##
## $name2
## [1] "value2"

# same as above
qu$set_metadata(name1="value1", name2="value2")
qu$get_metadata()