Unbridge - for republishing msgs sent from your laptop to openpilot (#160)

* duplicate bridge

* unbridge!

* some fixes

* combine bridge

* one if

* rm unbridge

* add --unbridge arg

* ip and unbridge arg parsing. segfaults if not given any ip value

* this is better than a python cli interface for now

* where did those tabs come from?

* bridge --unbridge is a little cumbersome to type...

...and only republish test messages

* more clear

* update

* only check --ip

* assume reverse if anything specified as ip

* refactor

* better ordering

* order imports

* use a whitelist

* combine into get_services
This commit is contained in:
ShaneSmiskol 2021-06-07 18:11:25 -07:00 committed by GitHub
parent cde8667d3b
commit 127edd520f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 46 additions and 31 deletions

View File

@ -1,59 +1,74 @@
#include <iostream>
#include <string>
#include <algorithm>
#include <cassert>
#include <csignal>
#include <iostream>
#include <map>
#include <string>
typedef void (*sighandler_t)(int sig);
#include "services.h"
#include "impl_msgq.h"
#include "impl_zmq.h"
#include "services.h"
void sigpipe_handler(int sig) {
assert(sig == SIGPIPE);
std::cout << "SIGPIPE received" << std::endl;
}
static std::vector<std::string> get_services() {
std::vector<std::string> name_list;
static std::vector<std::string> get_services(std::string whitelist_str, bool zmq_to_msgq) {
std::vector<std::string> service_list;
for (const auto& it : services) {
std::string name = it.name;
if (name == "plusFrame" || name == "uiLayoutState") continue;
name_list.push_back(name);
bool in_whitelist = whitelist_str.find(name) != std::string::npos;
if (name == "plusFrame" || name == "uiLayoutState" || (zmq_to_msgq && !in_whitelist)) {
continue;
}
service_list.push_back(name);
}
return name_list;
return service_list;
}
int main(void){
int main(int argc, char** argv) {
signal(SIGPIPE, (sighandler_t)sigpipe_handler);
auto endpoints = get_services();
bool zmq_to_msgq = argc > 2;
std::string ip = zmq_to_msgq ? argv[1] : "127.0.0.1";
std::string whitelist_str = zmq_to_msgq ? std::string(argv[2]) : "";
std::map<SubSocket*, PubSocket*> sub2pub;
Context *zmq_context = new ZMQContext();
Context *msgq_context = new MSGQContext();
Poller *poller = new MSGQPoller();
for (auto endpoint: endpoints){
SubSocket * msgq_sock = new MSGQSubSocket();
msgq_sock->connect(msgq_context, endpoint, "127.0.0.1", false);
poller->registerSocket(msgq_sock);
PubSocket * zmq_sock = new ZMQPubSocket();
zmq_sock->connect(zmq_context, endpoint);
sub2pub[msgq_sock] = zmq_sock;
Poller *poller;
Context *pub_context;
Context *sub_context;
if (zmq_to_msgq) { // republishes zmq debugging messages as msgq
poller = new ZMQPoller();
pub_context = new MSGQContext();
sub_context = new ZMQContext();
} else {
poller = new MSGQPoller();
pub_context = new ZMQContext();
sub_context = new MSGQContext();
}
std::map<SubSocket*, PubSocket*> sub2pub;
for (auto endpoint: get_services(whitelist_str, zmq_to_msgq)) {
PubSocket * pub_sock;
SubSocket * sub_sock;
if (zmq_to_msgq) {
pub_sock = new MSGQPubSocket();
sub_sock = new ZMQSubSocket();
} else {
pub_sock = new ZMQPubSocket();
sub_sock = new MSGQSubSocket();
}
pub_sock->connect(pub_context, endpoint);
sub_sock->connect(sub_context, endpoint, ip, false);
while (true){
for (auto sub_sock : poller->poll(100)){
poller->registerSocket(sub_sock);
sub2pub[sub_sock] = pub_sock;
}
while (true) {
for (auto sub_sock : poller->poll(100)) {
Message * msg = sub_sock->receive();
if (msg == NULL) continue;
sub2pub[sub_sock]->sendMessage(msg);