MQTT and Mailbox

mailboxes

Mailboxes are used to communicate between tasks or external processes (using MQTT binding).

Warning

A mailbox is owned by the task creating it
When a task ends, the owned mailboxes are destroyed and content is flushed.
It is therefore highly recommended that - in multitask systems - the mailboxes are created in a task that is not ending before the box is needed!

Typical example code 'main task' :

// create the box here
MBoxCreate(mailbox_name) ;
// now child can use the box and die when required
var child1 = createTask('code1.js', arguments );
// do something
for( ; ; ) {
    if( MBoxCheck(mailbox_name) ) {
        // do something with the message
    }
    if( a_condition ) {
        break ;
    }
}

// now wait for the child to end
print('Waiting for the child to end');
waitTask( child1 );
print('Finished, ending.');

By default, non-MQTT mailboxes have a capacity to hold 20 messages. When a new message is added :

  • If the box is not full, the new message is added,
  • If the box is full, the oldest message is deleted before the new message is added in the queue.
  • The mailbox capacity can be changed by calling the MBoxSetCapacity() function.

Warning

Special note for MQTT connected mailboxes: Delivery takes time and is asynchronous. The SDRVM starts a background thread to submit the messages to the broker. This thread as a queue of length = 10.
Messages sent to MQTT are using a QOS=1. (see MQTT QOS infos here)

mailboxes

  • Display MQTT sent messages:
 mosquitto_sub -t 'demo/#' -u mqtt -P mqtt -h localhost
  • Send MQTT message to the SDRVM :
mosquitto_pub -h localhost -u mqtt -P mqtt -t 'demo' -m "message test" 

MBoxCreate

Mailbox name is unique and used as an identifier: if another mailbox yet exists using this same name, command will not work.

bool result = MBoxCreate(mailbox_name) ;
Mbox can be connected to MQTT topic by using a second JSON-type argument as follows :

var config = {
   'host': 'localhost',
   'login': 'mqtt',
   'pass' : 'mqtt',
   'topic': 'demo',
   'mode' : 'read' or 'write' 
} ;

bool result = MBoxCreate(mailbox_name, config) ;

Note

You cannot bind a MBox for READ AND WRITE to the same MQTT Topic. If the parameter 'mode' is not specified, the SDRVM assumes it is READ ONLY, posted messages will not be sent to MQTT.

Messages will be sent or received from the specified MQTT broker.
If a message is received on the topic passed as parameter, then it is added to the pending messages list.

if( !MBoxCreate('pong')) {
    print('could not create the mailbox');  
    exit(0);
}

MBoxPost

Post a message to the 'mailbox_name' mailbox

MBoxPost(mailbox_name, message)
  • Example, sending 10 messages:
if( !MBoxCreate('ping')) {
    print('could not create the mailbox');  
    exit(0);
}
for( var i=0 ; i < 10 ; i++ ) {
    // send a message to task pong
    MBoxPost('test', 'message test nr ' + i);
}

MBoxSetCapacity

MBoxSetCapacity(mailbox_name, new_capacity)

Updates the maximum number of message a mailbox can hold.

MBoxIsFull

bool MBoxIsFull(mailbox_name)

Returns true if the maximum number of message has been reached: new messages will delete oldest ones.

MBoxSendingMessages

Checks if messages are still in the 'sending box'.

Returns : * For 'non MQTT' mailboxes : returns false. * For MQTT mailboxes : Checks that all pending messages have been sucessfully delivered to the MQTT broker.

It is recommended to check that a MQTT box is empty before leaving a SDRVM application, otherwise the pending messages might get lost (not delivered).

  • Example, sending many messages and waiting for them to be delivered before exitting:
// Create a box that is connected to MQTT
var config = {
   'host': 'localhost',
   'login': 'mqtt',
   'pass' : 'mqtt',
   'topic': 'test' 
} ;

if( MBoxCreate('box', config) == false ) {
    if( MBoxExists('box') == false ) {
    print('could not connect');
    exit();
    }    
}

var content = '123456789_' ;
for( i=0 ; i < 20 ; i++ ) {
     print('Sending msg ' + i );
     MBoxPost('box',content+i ) ;
}

// We need to wait for the MBox to be empty...
while( MBoxSendingMessages('box') == true ) {
    print('we have unsent messages...');
    sleep(100);
}
print('ok, all sent');

MBoxCheck

Checks for pending messages in the mailbox.

MBoxCheck(mailbox_name);
print('We have: ',MBoxCheck('test'), ' pending messages');

MBoxExists

Check if mailbox exists.

MBoxExists(mailbox_name);
  • Example :
if (MBoxExists('test')) { print('Mailbox : test exists !'};

MBoxPop

MBoxPop(mailbox_name);

Get a pending message on the mailbox (oldest one first if several).
This call returns a JSON structure :

{
   'msg_valid': boolean, /* true if a message has been received */
   'msg_from': task_id,  /* the Task ID of the sender */
   'msg_payload' : Object /* content */
}
Example :

print('MBox test message task starting');

if( !MBoxCreate('test')) {
    print('could not create the mailbox');  
    exit(0);
}

// Send 10 messages  
for( var i=0 ; i < 10 ; i++ ) {
    // send a message to task pong
    MBoxPost('test', 'message ' + i);
    sleep(200);
}
print("Waiting message: ",MBoxCheck('test'));
for (  ; ; ) {
    if (MBoxCheck('test') && MBoxExists('test')) { 
            print(JSON.stringify(MBoxPop('ping')));
    } else {break;}
}
print('Task testmsg ends');
MBoxCreate('box');
MBoxPost('box', 1 );
MBoxPost('box', 'test string' );
MBoxPost('box', {'x': 3} );

var x = MBoxPopWait( 'box', 10000 );
print( JSON.stringify( x ));
x = MBoxPopWait( 'box', 10000 );
print( JSON.stringify( x ));
x = MBoxPopWait( 'box', 10000 );
print( JSON.stringify( x ));
{"msg_valid":true,"msg_from":0,"msg_payload":"1.000000"}
{"msg_valid":true,"msg_from":0,"msg_payload":"test string"}
{"msg_valid":true,"msg_from":0,"msg_payload":"{\"x\":3}"

MBoxPopWait

Wait for incoming message on the 'boxname' mailbox for a 'timeout' duration (milliseconds).

Note

if timeout is <= 0, the task will wait forever

MBoxPopWait( boxname, timeout);
  • Example:
var msg = MBoxPopWait('ping', 10000);
print('Pong:: received message : ' + msg.msg_from + ' content :' + msg.msg_payload );

MBox example : ping and pong

A 'ping.js' task will create a new 'ping' mailbox and send messages to this mailbox at regular intervals.
The 'pong.js' task is receiving messages, and reports information to the first 'ping' task using a 'pong' mailbox ...

  • ping.js
rename('ping');
createTask('pong.js');
print('Ping task starting');
// Create the message box for detections
if( !MBoxCreate('ping')) {
    print('could not create the mailbox');  
    exit(0);
}
for( var i=0 ; i < 100 ; i++ ) {
    // send a message to task pong
    MBoxPost('ping', 'message ' + i);
    // wait for a message from task pong
    var msg = MBoxPopWait('pong', 10000);
    print('Ping :: received message :' + msg.msg_from + ', content:' + msg.msg_payload );
    sleep(1000);
}

print('Task ping ends');
  • pong.js
//remane('pong');
if( !MBoxCreate('pong')) {
    print('could not create the mailbox');  
    exit(0);
}
print('Pong task starting');
for( var i=0 ; i < 50 ; i++ ) {
    var msg = MBoxPopWait('ping', 10000);
    print('Pong:: received message : ' + msg.msg_from + ' content :' + msg.msg_payload );
    MBoxPost('pong','ok');
}
print('Task pong ends');
 Loading boot task from file : [./ping.js]

(ping:0)> Ping task starting
(ping:0)> Ping :: received message :-1, content:undefined
(task:1)> Pong task starting
(task:1)> Pong:: received message : 0 content :message 0
(ping:0)> Ping :: received message :1, content:ok
(task:1)> Pong:: received message : 0 content :message 1
(ping:0)> Ping :: received message :1, content:ok
(task:1)> Pong:: received message : 0 content :message 2
(ping:0)> Ping :: received message :1, content:ok
(task:1)> Pong:: received message : 0 content :message 3
.....
Last update: March 12, 2023