How to feed ZeroMQ data into Simulink via S-functions

This example builds on our previous S-function example and shows how to feed data from a ZeroMQ publisher into Simulink via S-functions.

I recommend against using cppzmq because Simulink tends to have issues with C++ exceptions. Instead, we use the C API of ZeroMQ.

This particular example receives a double value from a ZeroMQ publisher (as a string, such as "-10.2345") and outputs it to a Simulink output port.

I recommend using simulation stop time = inf here. This means, however, that the scopes don’t work properly. Using display

Simulink ZMQ example setup

S-function implementation

Compile this using

mex CXXFLAGS="-fPIC -std=gnu++17" -lzmq zmq_value_input.cpp
#define S_FUNCTION_NAME  zmq_value_input
#define S_FUNCTION_LEVEL 2

#include "simstruc.h"
#include <cmath>  // For sine wave generation
#include <zmq.h>  // ZeroMQ library
#include <cstring> // For memcpy
#include <thread>
#include <mutex>
#include <atomic>
#include <cmath>
#include <optional>

// Define constants
#define SAMPLING_RATE 10 // 1Hz
#define DEFAULT_VALUE 0.0

std::mutex zmq_mutex;
double last_received_value = nan("");
std::atomic<bool> zmq_running(true);

void zmq_receive_thread(void *context)
{
    void *receiver = zmq_socket(context, ZMQ_SUB);
    // Connect to the publisher
    if(zmq_connect(receiver, "tcp://127.0.0.1:5699") != 0) {
        printf("Error connecting to ZeroMQ publisher: %s\n", zmq_strerror(zmq_errno()));
        return;
    }
    // Subscribe to all messages
    if (zmq_setsockopt(receiver, ZMQ_SUBSCRIBE, "", 0) != 0) {
        printf("Error setting ZMQ_SUBSCRIBE: %s\n", zmq_strerror(zmq_errno()));
        return;
    }

    while (zmq_running)
    {
        char buffer[256];
        int bytes_received = zmq_recv(receiver, buffer, 256, 0);
        if (bytes_received != -1)
        {
            std::string received(buffer, bytes_received);
            // You can enable this line for debugging
            //printf("Received: %s\n", received.c_str());
            std::lock_guard<std::mutex> lock(zmq_mutex);
            last_received_value = atof(buffer);
        }
    }
    zmq_close(receiver);
}

// Function: mdlInitializeSizes ===============================================
// Abstract:
//   Setup sizes of the various vectors.
static void mdlInitializeSizes(SimStruct *S)
{
    ssSetNumSFcnParams(S, 0);  // No parameters
    ssSetNumContStates(S, 0);  // No continuous states
    ssSetNumDiscStates(S, 0);  // No discrete states

    // 1 output port with 1 element
    if (!ssSetNumOutputPorts(S, 1)) return;
    ssSetOutputPortWidth(S, 0, 1);  // Output port width = 1 (sine wave value or received value)
    ssSetOutputPortDataType(S, 0, SS_DOUBLE);
    ssSetOutputPortComplexSignal(S, 0, COMPLEX_NO);

    // Sample timesg++ -o zmq_push_sender zmq_push_sender.cpp -lzmq -std=gnu++17
    ssSetNumPWork(S, 1);  // Pointer work vector to store ZeroMQ context
    ssSetNumModes(S, 0);  // Mode vector
    ssSetNumNonsampledZCs(S, 0);  // Zero crossings

    // Make the S-function block capable of being used in a Real-Time Workshop-generated model
    ssSetOptions(S, 0);
}

// Function: mdlInitializeSampleTimes =========================================
// Abstract:
//    Initialize the sample times to be 1ms (1kHz)
static void mdlInitializeSampleTimes(SimStruct *S)
{
    ssSetSampleTime(S, 0, 1.0 / SAMPLING_RATE);  // 1kHz sampling rate
    ssSetOffsetTime(S, 0, 0.0);
}

// Function: mdlStart =========================================================
// Abstract:
//    Initialize ZeroMQ context and start the receive thread.
#define MDL_START
static void mdlStart(SimStruct *S)
{
    void *context = zmq_ctx_new();
    ssSetPWorkValue(S, 0, context);
    std::thread(zmq_receive_thread, context).detach();
}

// Function: mdlOutputs =======================================================
// Abstract:
//      Output the sine wave or the last received value from ZeroMQ.
static void mdlOutputs(SimStruct *S, int_T tid)
{
    real_T *y = ssGetOutputPortRealSignal(S, 0);  // Output pointer
    real_T time = ssGetT(S);  // Get the current simulation time

    // Output the last received value from ZeroMQ
    std::lock_guard<std::mutex> lock(zmq_mutex);
    y[0] = last_received_value == nan("") ? DEFAULT_VALUE : last_received_value;
    //y[0] = isnan(last_received_value) ? DEFAULT_VALUE : last_received_value;
}

// Function: mdlTerminate =====================================================
// Abstract:
//    This function is called at the end of simulation for cleanup.
static void mdlTerminate(SimStruct *S)
{
    zmq_running = false;
    void *context = ssGetPWorkValue(S, 0);
    if (context)
    {
        zmq_ctx_shutdown(context);
        zmq_ctx_term(context);
    }
}

// Required S-function trailer
#ifdef MATLAB_MEX_FILE
#include "simulink.c"   // MEX-file interface mechanism
#else
#include "cg_sfun.h"    // Code generation interface
#endif

Test data producer

This simple C++ program sends values to the ZeroMQ publisher.

g++ -o zmq_push_sender zmq_push_sender.cpp -lzmq -std=gnu++17

and run in the background.

#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>
#include <sstream>
#include <zmq_addon.hpp>

int main() {
    // Prepare ZeroMQ context and socket
    zmq::context_t context(4);
    zmq::socket_t socket(context, zmq::socket_type::pub);
    socket.bind("tcp://*:5699");

    double value = 0.0;

    while (true) {
        // Create the message with the current value
        std::ostringstream oss;
        oss << value;
        std::string value_str = oss.str();

        zmq::message_t message(value_str.c_str(), value_str.size());

        // Send the message
         // socket.send(zmq::str_buffer("AAA"), zmq::send_flags::none);

        socket.send(message, zmq::send_flags::none);

        printf("Sent: %s\n", value_str.c_str());

        // Increment the value and reset if necessary
        value += 1.0;
        if (value >= 10.0) {
            value = -10.0;
        }

        // Wait for 100ms
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    return 0;
}