cereal/messaging/impl_zmq.cc

163 lines
3.4 KiB
C++
Raw Permalink Normal View History

2019-11-02 00:31:59 +08:00
#include <cassert>
#include <cstring>
#include <iostream>
#include <cstdlib>
#include <cerrno>
#include <unistd.h>
2019-11-02 00:31:59 +08:00
2023-01-01 06:19:29 +08:00
#include "cereal/services.h"
#include "cereal/messaging/impl_zmq.h"
2019-11-02 00:31:59 +08:00
static int get_port(std::string endpoint) {
2023-08-24 04:26:00 +08:00
return services.at(endpoint).port;
2019-11-02 00:31:59 +08:00
}
ZMQContext::ZMQContext() {
context = zmq_ctx_new();
}
ZMQContext::~ZMQContext() {
zmq_ctx_term(context);
2019-11-02 00:31:59 +08:00
}
void ZMQMessage::init(size_t sz) {
size = sz;
data = new char[size];
}
void ZMQMessage::init(char * d, size_t sz) {
size = sz;
data = new char[size];
memcpy(data, d, size);
}
void ZMQMessage::close() {
if (size > 0){
delete[] data;
}
size = 0;
}
ZMQMessage::~ZMQMessage() {
this->close();
}
Visionipc v2.0 (#101) * add visionbuf make static ignore that * Needs decleration * add test binary * c++ * create some structure * some impl * socket stuff * Accept socket connection * Alloc some buffers * Create pub sockets and send buffer id * make listener private * Implement receive * use error check macros in cl_helpers * constructors to pass in opencl context * add some convenience values in struct * refactor creating buffers * rgb is not so simple * add fake stride and expose buffers * add comment * add extra data struct * support conflate * init opencl on all buffers * make ion compile * fix qcom2 * correctly setup yuv pointers when importing buffer * also included from c * Remove send print statements * send metadata * reveive metadata * also used in c code * dont start listener automatically * Was started in 2 places * set 100ms timeout on socket * verify server id to detect reconnects * handle reconnect * buffer cleanup * let user handle opencl creation * add default values * Add support for aligned rgb buffers * add align macro * dont use namespace * use poller * apple ifdef in ipc.cc * VisionBuf is C++ class * Install opencl headers * cppcheck c++ * remove c header guard * fix mac build * simplify constructors * Update visionipc/visionipc.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update visionipc/visionbuf_ion.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * add brackets * s/VIPCBufExtra/VisionIpcBufExtra/g * Add unittesting harness * remove vipc demo * very basic tests * add conflate test * Install opencl * suppress msgq warnings * Make it work using zmq * cl in qcom replay * run unittests in zmq mode as well * non blocking connect * always larger frame queues Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
2021-01-08 21:54:41 +08:00
int ZMQSubSocket::connect(Context *context, std::string endpoint, std::string address, bool conflate, bool check_endpoint){
2019-11-02 00:31:59 +08:00
sock = zmq_socket(context->getRawContext(), ZMQ_SUB);
if (sock == NULL){
return -1;
}
2019-11-02 00:31:59 +08:00
zmq_setsockopt(sock, ZMQ_SUBSCRIBE, "", 0);
if (conflate){
int arg = 1;
zmq_setsockopt(sock, ZMQ_CONFLATE, &arg, sizeof(int));
}
int reconnect_ivl = 500;
zmq_setsockopt(sock, ZMQ_RECONNECT_IVL_MAX, &reconnect_ivl, sizeof(reconnect_ivl));
2019-11-05 05:08:11 +08:00
full_endpoint = "tcp://" + address + ":";
Visionipc v2.0 (#101) * add visionbuf make static ignore that * Needs decleration * add test binary * c++ * create some structure * some impl * socket stuff * Accept socket connection * Alloc some buffers * Create pub sockets and send buffer id * make listener private * Implement receive * use error check macros in cl_helpers * constructors to pass in opencl context * add some convenience values in struct * refactor creating buffers * rgb is not so simple * add fake stride and expose buffers * add comment * add extra data struct * support conflate * init opencl on all buffers * make ion compile * fix qcom2 * correctly setup yuv pointers when importing buffer * also included from c * Remove send print statements * send metadata * reveive metadata * also used in c code * dont start listener automatically * Was started in 2 places * set 100ms timeout on socket * verify server id to detect reconnects * handle reconnect * buffer cleanup * let user handle opencl creation * add default values * Add support for aligned rgb buffers * add align macro * dont use namespace * use poller * apple ifdef in ipc.cc * VisionBuf is C++ class * Install opencl headers * cppcheck c++ * remove c header guard * fix mac build * simplify constructors * Update visionipc/visionipc.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update visionipc/visionbuf_ion.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * add brackets * s/VIPCBufExtra/VisionIpcBufExtra/g * Add unittesting harness * remove vipc demo * very basic tests * add conflate test * Install opencl * suppress msgq warnings * Make it work using zmq * cl in qcom replay * run unittests in zmq mode as well * non blocking connect * always larger frame queues Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
2021-01-08 21:54:41 +08:00
if (check_endpoint){
full_endpoint += std::to_string(get_port(endpoint));
} else {
full_endpoint += endpoint;
}
2019-11-02 00:31:59 +08:00
return zmq_connect(sock, full_endpoint.c_str());
2019-11-02 00:31:59 +08:00
}
Message * ZMQSubSocket::receive(bool non_blocking){
zmq_msg_t msg;
assert(zmq_msg_init(&msg) == 0);
int flags = non_blocking ? ZMQ_DONTWAIT : 0;
int rc = zmq_msg_recv(&msg, sock, flags);
Message *r = NULL;
if (rc >= 0){
// Make a copy to ensure the data is aligned
r = new ZMQMessage;
r->init((char*)zmq_msg_data(&msg), zmq_msg_size(&msg));
2019-11-22 05:14:27 +08:00
}
2019-11-02 00:31:59 +08:00
zmq_msg_close(&msg);
return r;
}
void ZMQSubSocket::setTimeout(int timeout){
zmq_setsockopt(sock, ZMQ_RCVTIMEO, &timeout, sizeof(int));
}
ZMQSubSocket::~ZMQSubSocket(){
zmq_close(sock);
}
Visionipc v2.0 (#101) * add visionbuf make static ignore that * Needs decleration * add test binary * c++ * create some structure * some impl * socket stuff * Accept socket connection * Alloc some buffers * Create pub sockets and send buffer id * make listener private * Implement receive * use error check macros in cl_helpers * constructors to pass in opencl context * add some convenience values in struct * refactor creating buffers * rgb is not so simple * add fake stride and expose buffers * add comment * add extra data struct * support conflate * init opencl on all buffers * make ion compile * fix qcom2 * correctly setup yuv pointers when importing buffer * also included from c * Remove send print statements * send metadata * reveive metadata * also used in c code * dont start listener automatically * Was started in 2 places * set 100ms timeout on socket * verify server id to detect reconnects * handle reconnect * buffer cleanup * let user handle opencl creation * add default values * Add support for aligned rgb buffers * add align macro * dont use namespace * use poller * apple ifdef in ipc.cc * VisionBuf is C++ class * Install opencl headers * cppcheck c++ * remove c header guard * fix mac build * simplify constructors * Update visionipc/visionipc.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update visionipc/visionbuf_ion.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * add brackets * s/VIPCBufExtra/VisionIpcBufExtra/g * Add unittesting harness * remove vipc demo * very basic tests * add conflate test * Install opencl * suppress msgq warnings * Make it work using zmq * cl in qcom replay * run unittests in zmq mode as well * non blocking connect * always larger frame queues Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
2021-01-08 21:54:41 +08:00
int ZMQPubSocket::connect(Context *context, std::string endpoint, bool check_endpoint){
2019-11-02 00:31:59 +08:00
sock = zmq_socket(context->getRawContext(), ZMQ_PUB);
if (sock == NULL){
return -1;
}
2019-11-02 00:31:59 +08:00
full_endpoint = "tcp://*:";
Visionipc v2.0 (#101) * add visionbuf make static ignore that * Needs decleration * add test binary * c++ * create some structure * some impl * socket stuff * Accept socket connection * Alloc some buffers * Create pub sockets and send buffer id * make listener private * Implement receive * use error check macros in cl_helpers * constructors to pass in opencl context * add some convenience values in struct * refactor creating buffers * rgb is not so simple * add fake stride and expose buffers * add comment * add extra data struct * support conflate * init opencl on all buffers * make ion compile * fix qcom2 * correctly setup yuv pointers when importing buffer * also included from c * Remove send print statements * send metadata * reveive metadata * also used in c code * dont start listener automatically * Was started in 2 places * set 100ms timeout on socket * verify server id to detect reconnects * handle reconnect * buffer cleanup * let user handle opencl creation * add default values * Add support for aligned rgb buffers * add align macro * dont use namespace * use poller * apple ifdef in ipc.cc * VisionBuf is C++ class * Install opencl headers * cppcheck c++ * remove c header guard * fix mac build * simplify constructors * Update visionipc/visionipc.h Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * Update visionipc/visionbuf_ion.cc Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com> * add brackets * s/VIPCBufExtra/VisionIpcBufExtra/g * Add unittesting harness * remove vipc demo * very basic tests * add conflate test * Install opencl * suppress msgq warnings * Make it work using zmq * cl in qcom replay * run unittests in zmq mode as well * non blocking connect * always larger frame queues Co-authored-by: Adeeb Shihadeh <adeebshihadeh@gmail.com>
2021-01-08 21:54:41 +08:00
if (check_endpoint){
full_endpoint += std::to_string(get_port(endpoint));
} else {
full_endpoint += endpoint;
}
2019-11-02 00:31:59 +08:00
// ZMQ pub sockets cannot be shared between processes, so we need to ensure pid stays the same
pid = getpid();
return zmq_bind(sock, full_endpoint.c_str());
2019-11-02 00:31:59 +08:00
}
int ZMQPubSocket::sendMessage(Message *message) {
assert(pid == getpid());
2019-11-02 00:31:59 +08:00
return zmq_send(sock, message->getData(), message->getSize(), ZMQ_DONTWAIT);
}
int ZMQPubSocket::send(char *data, size_t size) {
assert(pid == getpid());
2019-11-02 00:31:59 +08:00
return zmq_send(sock, data, size, ZMQ_DONTWAIT);
}
bool ZMQPubSocket::all_readers_updated() {
assert(false); // TODO not implemented
return false;
}
2019-11-02 00:31:59 +08:00
ZMQPubSocket::~ZMQPubSocket(){
zmq_close(sock);
}
void ZMQPoller::registerSocket(SubSocket * socket){
assert(num_polls + 1 < MAX_POLLERS);
polls[num_polls].socket = socket->getRawSocket();
polls[num_polls].events = ZMQ_POLLIN;
sockets.push_back(socket);
num_polls++;
}
std::vector<SubSocket*> ZMQPoller::poll(int timeout){
std::vector<SubSocket*> r;
int rc = zmq_poll(polls, num_polls, timeout);
if (rc < 0){
return r;
}
for (size_t i = 0; i < num_polls; i++){
if (polls[i].revents){
r.push_back(sockets[i]);
}
}
return r;
}