How to feed ZeroMQ data into Simulink via S-functions
This example builds on our previous S-function example and shows how to feed data from a ZeroMQ publisher into Simulink via S-functions.
I recommend against using cppzmq
because Simulink tends to have issues with C++ exceptions. Instead, we use the C API of ZeroMQ.
This particular example receives a double value from a ZeroMQ publisher (as a string, such as "-10.2345"
) and outputs it to a Simulink output port.
Simulink setup
I recommend using simulation stop time = inf
here. This means, however, that the scopes don’t work properly. Using display
S-function implementation
Compile this using
mex CXXFLAGS="-fPIC -std=gnu++17" -lzmq zmq_value_input.cpp
#define S_FUNCTION_NAME zmq_value_input
#define S_FUNCTION_LEVEL 2
#include "simstruc.h"
#include <cmath> // For sine wave generation
#include <zmq.h> // ZeroMQ library
#include <cstring> // For memcpy
#include <thread>
#include <mutex>
#include <atomic>
#include <cmath>
#include <optional>
// Define constants
#define SAMPLING_RATE 10 // 1Hz
#define DEFAULT_VALUE 0.0
std::mutex zmq_mutex;
double last_received_value = nan("");
std::atomic<bool> zmq_running(true);
void zmq_receive_thread(void *context)
{
void *receiver = zmq_socket(context, ZMQ_SUB);
// Connect to the publisher
if(zmq_connect(receiver, "tcp://127.0.0.1:5699") != 0) {
printf("Error connecting to ZeroMQ publisher: %s\n", zmq_strerror(zmq_errno()));
return;
}
// Subscribe to all messages
if (zmq_setsockopt(receiver, ZMQ_SUBSCRIBE, "", 0) != 0) {
printf("Error setting ZMQ_SUBSCRIBE: %s\n", zmq_strerror(zmq_errno()));
return;
}
while (zmq_running)
{
char buffer[256];
int bytes_received = zmq_recv(receiver, buffer, 256, 0);
if (bytes_received != -1)
{
std::string received(buffer, bytes_received);
// You can enable this line for debugging
//printf("Received: %s\n", received.c_str());
std::lock_guard<std::mutex> lock(zmq_mutex);
last_received_value = atof(buffer);
}
}
zmq_close(receiver);
}
// Function: mdlInitializeSizes ===============================================
// Abstract:
// Setup sizes of the various vectors.
static void mdlInitializeSizes(SimStruct *S)
{
ssSetNumSFcnParams(S, 0); // No parameters
ssSetNumContStates(S, 0); // No continuous states
ssSetNumDiscStates(S, 0); // No discrete states
// 1 output port with 1 element
if (!ssSetNumOutputPorts(S, 1)) return;
ssSetOutputPortWidth(S, 0, 1); // Output port width = 1 (sine wave value or received value)
ssSetOutputPortDataType(S, 0, SS_DOUBLE);
ssSetOutputPortComplexSignal(S, 0, COMPLEX_NO);
// Sample timesg++ -o zmq_push_sender zmq_push_sender.cpp -lzmq -std=gnu++17
ssSetNumPWork(S, 1); // Pointer work vector to store ZeroMQ context
ssSetNumModes(S, 0); // Mode vector
ssSetNumNonsampledZCs(S, 0); // Zero crossings
// Make the S-function block capable of being used in a Real-Time Workshop-generated model
ssSetOptions(S, 0);
}
// Function: mdlInitializeSampleTimes =========================================
// Abstract:
// Initialize the sample times to be 1ms (1kHz)
static void mdlInitializeSampleTimes(SimStruct *S)
{
ssSetSampleTime(S, 0, 1.0 / SAMPLING_RATE); // 1kHz sampling rate
ssSetOffsetTime(S, 0, 0.0);
}
// Function: mdlStart =========================================================
// Abstract:
// Initialize ZeroMQ context and start the receive thread.
#define MDL_START
static void mdlStart(SimStruct *S)
{
void *context = zmq_ctx_new();
ssSetPWorkValue(S, 0, context);
std::thread(zmq_receive_thread, context).detach();
}
// Function: mdlOutputs =======================================================
// Abstract:
// Output the sine wave or the last received value from ZeroMQ.
static void mdlOutputs(SimStruct *S, int_T tid)
{
real_T *y = ssGetOutputPortRealSignal(S, 0); // Output pointer
real_T time = ssGetT(S); // Get the current simulation time
// Output the last received value from ZeroMQ
std::lock_guard<std::mutex> lock(zmq_mutex);
y[0] = last_received_value == nan("") ? DEFAULT_VALUE : last_received_value;
//y[0] = isnan(last_received_value) ? DEFAULT_VALUE : last_received_value;
}
// Function: mdlTerminate =====================================================
// Abstract:
// This function is called at the end of simulation for cleanup.
static void mdlTerminate(SimStruct *S)
{
zmq_running = false;
void *context = ssGetPWorkValue(S, 0);
if (context)
{
zmq_ctx_shutdown(context);
zmq_ctx_term(context);
}
}
// Required S-function trailer
#ifdef MATLAB_MEX_FILE
#include "simulink.c" // MEX-file interface mechanism
#else
#include "cg_sfun.h" // Code generation interface
#endif
Test data producer
This simple C++ program sends values to the ZeroMQ publisher.
g++ -o zmq_push_sender zmq_push_sender.cpp -lzmq -std=gnu++17
and run in the background.
#include <zmq.hpp>
#include <iostream>
#include <thread>
#include <chrono>
#include <sstream>
#include <zmq_addon.hpp>
int main() {
// Prepare ZeroMQ context and socket
zmq::context_t context(4);
zmq::socket_t socket(context, zmq::socket_type::pub);
socket.bind("tcp://*:5699");
double value = 0.0;
while (true) {
// Create the message with the current value
std::ostringstream oss;
oss << value;
std::string value_str = oss.str();
zmq::message_t message(value_str.c_str(), value_str.size());
// Send the message
// socket.send(zmq::str_buffer("AAA"), zmq::send_flags::none);
socket.send(message, zmq::send_flags::none);
printf("Sent: %s\n", value_str.c_str());
// Increment the value and reset if necessary
value += 1.0;
if (value >= 10.0) {
value = -10.0;
}
// Wait for 100ms
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
return 0;
}