Messaging Service Core Module
Background
The BioConnect message queue makes it possible for applications to communicate asynchronously, by sending messages to each other via a queue. A message queue provides temporary storage between the sender and the receiver so that the sender can keep operating without interruption when the destination program is busy or not connected. Asynchronous processing allows a task to call a service, and move on to the next task while the service processes the request at its own pace
A typical message queue system involves two parties: producer and consumer
- The producer creates the message and sends it to the message queue. In case the consumer is too busy to immediately process it, the queue stores it until the consumer is available.
- The consumer retrieves the message from the queue and starts processing it.
- The message queue then temporarily locks the message to prevent it from being read by another consumer.
- After the consumer completes the message processing, it deletes the message from the queue to prevent it from being read by other consumers.
The message system provides the following advantages:
- Message queues increase the reliability of systems as they persist data in case some part of the system goes offline.
- They improve performance as they enable asynchronous communication. This ensures that every system component is never idle waiting for a response or request.
- Message queues increase scalability as the system components are decoupled and thus can be independently scaled.
BioConnect Message Queue Library
The BioConnect message queue library is a wrapper to hide the underlying message queue implementation and provides simplified high-level functions to send and receive the message. The library provides the following functions:
- send message
- receive message
- consistent message schema
- data marshal/unmarshal
- auto retry for send/receive
- error checking
BioConnect Message System
The BioConnect message system consists of many FIFO message queues. Each application job can listen to a single queue, while any application can send a message to any queue. Some generic applications, email, SMS notification message, cloud storage bucket utilities such as zip file creation, will be added so any application can use them.
BioConnect Message Sequence Diagram
The calling sequence illustrates the interactions between these two applications by sending the message to each other's queue.
Message Schema
The following schema will be followed by BioConnect message systems. While the consistent schema makes it easier for the integration of the different applications, each application can use the data field to add more custom data unique to the application.
(note: feedback will be great for the schema, add/remove/change field )
{
"title":"Message Title",
"message_type":"Message Type",
"send_from":"l7",
"send_to":"bioconnect-id-service",
"create_time":1654627550.958133,
"env":"DEV",
"comments":"comments",
"data":{
"sample_id":111111,
"sample_name":"a sample name",
"project_id":"a project"
},
"original_message":"None"
}
Field | Type | Required | Description |
---|---|---|---|
title | string | true | message title |
message_type | string | true | message type |
send_from | string | true | the queue name the message from, constant from QueueName |
send_to | string | true | the queue name the message to, constant from QueueName |
create_time | string | true | timestamp when the message is send, generated automaticallyt |
env | string | false | application deployment environment, such as dev, sqa, uat, prd |
comments | string | false | free text comments |
data | dict | false | application specific dictionay data |
original_message | string | false | original message |
Messages in your code
Sending and receiving messages involves a message broker. The BioConnect message library hides the complexity of a handshake between the application and message broker and provides convenient ways to produce and consume messages. The Python library of BioConnect Message provides the high level "send_message" and "receive_message" methods to produce and consume messages directly.
Library installation
The BioConnect message library is available from PyPI, the Python packaging index.
https://test.pypi.org/project/bioconnect-lib
Installation can be done in the usual ways by adding the repository to your Poetry pyproject.toml file, or to requirements.py for pip. With pip from the command line do:
pip install -i https://test.pypi.org/simple/ bioconnect-lib
Send a message
Sending messages in your Python application requires that you import 'send_message' from 'bioconnect_lib.mq.message_queue'.
from bioconnect_lib.mq.message_queue import send_message, QueueName
try:
title = "Message Title"
message_type = "Message Type"
send_from = f'{QueueName.L7}'
send_to = f'{QueueName.BIOCONNECT_ID_SERVICE}'
env = "DEV"
comments = "comments"
data = {
"sample_id": 111111,
"sample_name": "a sample name",
"project_id": "a project"
}
response = send_message(
title=title,
message_type=message_type,
send_from=send_from,
send_to=send_to,
env=env,
comments=comments,
data=data)
logger.info(f'response: {response}')
except Exception as e:
logger.error("Failed to send message", exc_info=True)
# outuput
{
"title":"Message Title",
"message_type":"Message Type",
"send_from":"l7",
"send_to":"bioconnect-id-service",
"create_time":1654627550.958133,
"env":"DEV",
"comments":"comments",
"data":{
"sample_id":111111,
"sample_name":"a sample name",
"project_id":"a project"
},
"original_message":"None"
}
Receive a message
Sending messages in your Python application requires that you import 'receive_message' from 'bioconnect_lib.mq.message_queue'.
from bioconnect_lib.mq.message_queue import receive_message, QueueName
try:
queue_name = f'{QueueName.BIOCONNECT_DATA_PACKAGE}'
max_wait_time = 1 # max wait time in seconds, 0 means forever
message_list = receive_message(queue_name=queue_name,
max_wait_time=max_wait_time)
for m in message_list:
print(f'{m}')
except Exception as e:
logger.error("Failed to receive messages", exc_info=True)
# output
{
"title":"Message Title",
"message_type":"Message Type",
"send_from":"l7",
"send_to":"bioconnect-id-service",
"create_time":1654627486.0506082,
"env":"DEV",
"comments":"comments",
"data":{
"sample_id":111111,
"sample_name":"a sample name",
"project_id":"a project"
}
Receive a message with callback
Alternatively, a callback function can be passed into receive_message. Whenever a new message is received, the callback function will be called.
from bioconnect_lib.mq.message_queue import receive_message, QueueName
try:
def my_call_back(message):
print(f'call back: {message}')
queue_name = f'{QueueName.BIOCONNECT_ID_SERVICE}'
max_wait_time = 1 # max wait time in seconds
receive_message(queue_name=queue_name,
call_back_function=my_call_back,
max_wait_time=max_wait_time)
except Exception as e:
logger.error("Failed to receive messages", exc_info=True)
# output
{
"title":"Message Title",
"message_type":"Message Type",
"send_from":"l7",
"send_to":"bioconnect-id-service",
"create_time":1654628636.6351209,
"env":"DEV",
"comments":"comments",
"data":{
"sample_id":111111,
"sample_name":"a sample name",
"project_id":"a project"
},
"original_message":"None"
}