diff --git a/drivers/windows/pandaJ2534DLL Test/j2534_tests.cpp b/drivers/windows/pandaJ2534DLL Test/j2534_tests.cpp index 0b8059f7..dee1a203 100644 --- a/drivers/windows/pandaJ2534DLL Test/j2534_tests.cpp +++ b/drivers/windows/pandaJ2534DLL Test/j2534_tests.cpp @@ -563,9 +563,27 @@ namespace pandaJ2534DLLTest check_J2534_can_msg(j2534_msg_recv[0], ISO15765, CAN_29BIT_ID, 0, 4 + 0x13, 4 + 0x13, "\x18\xda\xf1\xef""nineteen bytes here", LINE_INFO()); } - //Check tx passes with filter. 29 bit. Good Filter. NoPadding. STD address. Multi Frame. - /*TEST_METHOD(J2534_ISO15765_SuccessTx_29b_Filter_NoPad_STD_FFCF) - { //TODO when TX works with flow control}*/ + //Check multi frame tx passes with filter. 29 bit. Good Filter. NoPadding. STD address. Multi Frame. + TEST_METHOD(J2534_ISO15765_SuccessTx_29b_Filter_NoPad_STD_FFCF) + { + unsigned long chanid; + Assert::AreEqual(STATUS_NOERROR, open_dev(""), _T("Failed to open device."), LINE_INFO()); + Assert::AreEqual(STATUS_NOERROR, PassThruConnect(devid, ISO15765, CAN_29BIT_ID, 500000, &chanid), _T("Failed to open channel."), LINE_INFO()); + J2534_set_flowctrl_filter(chanid, CAN_29BIT_ID, 4, "\xff\xff\xff\xff", "\x18\xda\xf1\xef", "\x18\xda\xef\xf1", LINE_INFO()); + write_ioctl(chanid, LOOPBACK, 0, LINE_INFO()); + auto p = getPanda(500); + + J2534_send_msg_checked(chanid, ISO15765, 0, CAN_29BIT_ID, 0, 14, 0, "\x18\xda\xef\xf1""\xAA\xBB\xCC\xDD\xEE\xFF\x11\x22\x33\x44", LINE_INFO()); + auto j2534_msg_recv = j2534_recv_loop(chanid, 1); + check_J2534_can_msg(j2534_msg_recv[0], ISO15765, CAN_29BIT_ID | TX_MSG_TYPE | TX_INDICATION, 0, 4, 0, "\x18\xda\xef\xf1", LINE_INFO()); + + auto panda_msg_recv = panda_recv_loop(p, 1); + check_panda_can_msg(panda_msg_recv[0], 0, 0x18DAEFF1, TRUE, FALSE, "\x10\x0A""\xAA\xBB\xCC\xDD\xEE\xFF", LINE_INFO()); + + checked_panda_send(p, 0x18DAF1EF, TRUE, "\x30\x0\x0", 3, 0, LINE_INFO()); + panda_msg_recv = panda_recv_loop(p, 1); + check_panda_can_msg(panda_msg_recv[0], 0, 0x18DAEFF1, TRUE, FALSE, "\x21""\x11\x22\x33\x44", LINE_INFO()); + } //Check rx passes with filter. 11 bit. Good Filter. NoPadding. STD address. Single Frame. TEST_METHOD(J2534_ISO15765_SuccessRx_11b_Filter_NoPad_STD_SF) diff --git a/drivers/windows/pandaJ2534DLL/FrameSet.cpp b/drivers/windows/pandaJ2534DLL/FrameSet.cpp index 15c632ef..5839e6e6 100644 --- a/drivers/windows/pandaJ2534DLL/FrameSet.cpp +++ b/drivers/windows/pandaJ2534DLL/FrameSet.cpp @@ -13,6 +13,11 @@ std::shared_ptr FrameSet::init_tx(std::string& payload, std::weak_ptr< return res; } +void FrameSet::tx_flowcontrol(uint8_t block_size, std::chrono::microseconds separation_time) { + this->block_size = block_size; + this->separation_time = separation_time; +} + std::shared_ptr FrameSet::init_rx_first_frame(uint16_t final_size, const std::string& piece, unsigned long rxFlags) { auto res = std::make_shared(); res->expected_size = final_size & 0xFFF; @@ -24,6 +29,16 @@ std::shared_ptr FrameSet::init_rx_first_frame(uint16_t final_size, con return res; } +std::string FrameSet::consumeTxBuff(unsigned int numbytes) { + synchronized(access_lock) { + if (!this->istx) return std::string(); + auto outstr = this->msg.substr(this->consumed_count, numbytes); + this->consumed_count += outstr.size(); + return outstr; + } + return std::string(); //Suppress Warning +} + bool FrameSet::rx_add_frame(uint8_t pcibyte, unsigned int max_packet_size, const std::string piece) { synchronized(access_lock) { if ((pcibyte & 0x0F) != this->next_part) { @@ -62,6 +77,13 @@ unsigned int FrameSet::bytes_remaining() { return 0; //suppress warning } +bool FrameSet::is_ready() { + synchronized(access_lock) { + return this->msg.size() == this->expected_size; + } + return 0; //suppress warning +} + bool FrameSet::flush_result(std::string& final_msg) { synchronized(access_lock) { if (this->msg.size() == this->expected_size) { @@ -71,3 +93,9 @@ bool FrameSet::flush_result(std::string& final_msg) { } return FALSE; } + +uint8_t FrameSet::getNextConsecutiveFrameId() { + synchronized(access_lock) { + return this->next_part++; + } +} diff --git a/drivers/windows/pandaJ2534DLL/FrameSet.h b/drivers/windows/pandaJ2534DLL/FrameSet.h index 17470e06..be52975f 100644 --- a/drivers/windows/pandaJ2534DLL/FrameSet.h +++ b/drivers/windows/pandaJ2534DLL/FrameSet.h @@ -10,9 +10,14 @@ class FrameSet public: //Critical section will be required if accessed outside of processMessage FrameSet(); + ~FrameSet() { + printf("FUUUUUUCK\n"); + }; static std::shared_ptr init_tx(std::string& payload, std::weak_ptr filter); + void tx_flowcontrol(uint8_t block_size, std::chrono::microseconds separation_time); + static std::shared_ptr init_rx_first_frame(uint16_t final_size, const std::string& piece, unsigned long rxFlags); bool rx_add_frame(uint8_t pci_byte, unsigned int max_packet_size, const std::string piece); @@ -21,8 +26,14 @@ public: unsigned int bytes_remaining(); + bool is_ready(); + bool FrameSet::flush_result(std::string& final_msg); + std::string consumeTxBuff(unsigned int numbytes); + + uint8_t getNextConsecutiveFrameId(); + void lock() { access_lock.lock(); } @@ -31,8 +42,11 @@ public: access_lock.unlock(); } + std::chrono::microseconds separation_time; std::weak_ptr filter; + std::string lastTxMsg; + private: Mutex access_lock; bool istx; @@ -43,4 +57,5 @@ private: unsigned long flags; unsigned long consumed_count; + uint8_t block_size; }; diff --git a/drivers/windows/pandaJ2534DLL/J2534Connection.cpp b/drivers/windows/pandaJ2534DLL/J2534Connection.cpp index e382fb91..a07db88b 100644 --- a/drivers/windows/pandaJ2534DLL/J2534Connection.cpp +++ b/drivers/windows/pandaJ2534DLL/J2534Connection.cpp @@ -59,7 +59,7 @@ long J2534Connection::PassThruStartMsgFilter(unsigned long FilterType, PASSTHRU_ for (int i = 0; i < this->filters.size(); i++) { if (filters[i] == nullptr) { try { - auto newfilter = std::make_shared(J2534MessageFilter(this, FilterType, pMaskMsg, pPatternMsg, pFlowControlMsg)); + auto newfilter = std::make_shared(this, FilterType, pMaskMsg, pPatternMsg, pFlowControlMsg); for (int check_idx = 0; check_idx < filters.size(); check_idx++) { if (filters[check_idx] == nullptr) continue; if (filters[check_idx] == newfilter) { @@ -107,6 +107,8 @@ long J2534Connection::setBaud(unsigned long baud) { return STATUS_NOERROR; } +void J2534Connection::sendConsecutiveFrame(std::shared_ptr frame, std::shared_ptr filter) { } + void J2534Connection::processMessageReceipt(const PASSTHRU_MSG_INTERNAL& msg) { if (this->loopback) { synchronized(message_access_lock) { diff --git a/drivers/windows/pandaJ2534DLL/J2534Connection.h b/drivers/windows/pandaJ2534DLL/J2534Connection.h index f3191860..9f85a8e1 100644 --- a/drivers/windows/pandaJ2534DLL/J2534Connection.h +++ b/drivers/windows/pandaJ2534DLL/J2534Connection.h @@ -4,6 +4,7 @@ #include "synchronize.h" #include "PandaJ2534Device.h" #include "J2534MessageFilter.h" +#include "FrameSet.h" class J2534MessageFilter; class PandaJ2534Device; @@ -57,6 +58,8 @@ public: virtual void processMessageReceipt(const PASSTHRU_MSG_INTERNAL& msg); virtual void processMessage(const PASSTHRU_MSG_INTERNAL& msg); + virtual void sendConsecutiveFrame(std::shared_ptr frame, std::shared_ptr filter); + virtual unsigned long getMinMsgLen() { return 1; } diff --git a/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.cpp b/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.cpp index 7df86b3b..be04a537 100644 --- a/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.cpp +++ b/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.cpp @@ -1,6 +1,7 @@ #include "stdafx.h" #include "J2534Connection_ISO15765.h" #include "Timer.h" +#include #define msg_is_extaddr(msg) check_bmask(msg->TxFlags, ISO15765_ADDR_TYPE) #define msg_is_padded(msg) check_bmask(msg->TxFlags, ISO15765_FRAME_PAD) @@ -10,6 +11,10 @@ #define FRAME_CONSEC 0x20 #define FRAME_FLOWCTRL 0x30 +#define FLOWCTRL_CONTINUE 0 +#define FLOWCTRL_WAIT 1 +#define FLOWCTRL_ABORT 2 + #define msg_get_type(msg, addrlen) ((msg).Data[addrlen] & 0xF0) #define is_single(msg, addrlen) (msg_get_type(msg, addrlen) == FRAME_SINGLE) @@ -52,24 +57,32 @@ long J2534Connection_ISO15765::PassThruWriteMsgs(PASSTHRU_MSG *pMsg, unsigned lo uint8_t snd_buff[8] = { 0 }; uint8_t addrlen = msg_is_extaddr(msg) ? 5 : 4; unsigned long payload_len = msg->DataSize - addrlen; + unsigned long this_payload_len; unsigned int idx = 0; if (msg_is_extaddr(msg)) snd_buff[idx++] = msg->Data[4]; //EXT ADDR byte if (payload_len <= (msg_is_extaddr(msg) ? 6 : 7)) { + this_payload_len = payload_len; snd_buff[idx++] = payload_len; } else { - printf("LONG MSG\n"); - //TODO Make work will full TX sequence. Currently only sends first frame. snd_buff[idx++] = 0x10 | ((payload_len >> 8) & 0xF); snd_buff[idx++] = payload_len & 0xFF; + this_payload_len = 8 - idx; // 5 or 6 } - memcpy_s(&snd_buff[idx], sizeof(snd_buff) - idx, &msg->Data[addrlen], payload_len); + memcpy_s(&snd_buff[idx], sizeof(snd_buff) - idx, &msg->Data[addrlen], this_payload_len); if (auto panda_dev_sp = this->panda_dev.lock()) { + + synchronized(staged_writes_lock) { + this->staged_writes[fid].dispatched_msg = std::string((char*)msg->Data, 4) + + std::string((char*)snd_buff, (msg_is_padded(msg) ? sizeof(snd_buff) : (this_payload_len + idx))); + this->staged_writes[fid].remaining_payload = std::string((char*)&msg->Data[addrlen + this_payload_len], payload_len - this_payload_len); + } + if (panda_dev_sp->panda->can_send(addr, val_is_29bit(msg->TxFlags), snd_buff, - (msg_is_padded(msg) ? sizeof(snd_buff) : (payload_len + idx)), panda::PANDA_CAN1) == FALSE) { + (msg_is_padded(msg) ? sizeof(snd_buff) : (this_payload_len + idx)), panda::PANDA_CAN1) == FALSE) { *pNumMsgs = msgnum; return ERR_INVALID_MSG; } @@ -85,26 +98,59 @@ void J2534Connection_ISO15765::processMessageReceipt(const PASSTHRU_MSG_INTERNAL //TX_MSG_TYPE should be set in RxStatus if (!check_bmask(msg.RxStatus, TX_MSG_TYPE)) return; - int fid = get_matching_out_fc_filter_id(msg.Data, msg.RxStatus, CAN_29BIT_ID); - if (fid == -1) return; + synchronized(staged_writes_lock) { + uint8_t addrlen; + unsigned long filterFlags; + bool did_msg_finish_tx = FALSE; - uint8_t addrlen = check_bmask(this->filters[fid]->flags, ISO15765_ADDR_TYPE) ? 5 : 4; + { + int fid = get_matching_out_fc_filter_id(msg.Data, msg.RxStatus, CAN_29BIT_ID); + if (fid == -1) return; + auto filter = this->filters[fid]; + if (filter == nullptr) return; //Avoid having to lock, shared pointer keeps filter alive for us. Maybe? + filterFlags = filter->flags; - if (msg.Data.size() >= addrlen + 1 && (msg.Data[addrlen] & 0xF0) == FRAME_FLOWCTRL) return; + addrlen = check_bmask(filterFlags, ISO15765_ADDR_TYPE) ? 5 : 4; - this->conversations[fid].reset(); + if (msg.Data.size() >= addrlen + 1 && (msg.Data[addrlen] & 0xF0) == FRAME_FLOWCTRL) return; - outframe.ProtocolID = ISO15765; - outframe.Timestamp = msg.Timestamp; - outframe.RxStatus = msg.RxStatus | TX_MSG_TYPE | TX_INDICATION; - if (check_bmask(this->filters[fid]->flags, ISO15765_ADDR_TYPE)) - outframe.RxStatus |= ISO15765_ADDR_TYPE; - outframe.ExtraDataIndex = 0; - outframe.TxFlags = 0; - outframe.Data = msg.Data.substr(0, addrlen); + if (this->conversations[fid] != nullptr && + this->conversations[fid]->lastTxMsg == msg.Data) { + ///////////////////////////////////////////////////////////////////////////////////// + if (this->conversations[fid]->bytes_remaining() == 0) { + did_msg_finish_tx = TRUE; + } else if (auto panda_dev_sp = this->panda_dev.lock()) { + panda_dev_sp->registerMultiPartTx(this->conversations[fid]); //Should this function be used here? + return; // Don't send a message to the client for this in message. + } + } else { + this->conversations[fid] = nullptr; - synchronized(message_access_lock) { - this->messages.push(outframe); + auto& staged_write = this->staged_writes[fid]; + if (staged_write.dispatched_msg == msg.Data) { + if (this->staged_writes[fid].remaining_payload.size() == 0) + did_msg_finish_tx = TRUE; + else + this->conversations[fid] = FrameSet::init_tx(this->staged_writes[fid].remaining_payload, filter); + } + } + + } + + outframe.ProtocolID = ISO15765; + outframe.Timestamp = msg.Timestamp; + outframe.RxStatus = msg.RxStatus | TX_MSG_TYPE; + if (did_msg_finish_tx) + outframe.RxStatus |= TX_INDICATION; + if (check_bmask(filterFlags, ISO15765_ADDR_TYPE)) + outframe.RxStatus |= ISO15765_ADDR_TYPE; + outframe.ExtraDataIndex = 0; + outframe.TxFlags = 0; + outframe.Data = msg.Data.substr(0, addrlen); + + synchronized(message_access_lock) { + this->messages.push(outframe); + } } } @@ -121,6 +167,33 @@ void J2534Connection_ISO15765::processMessage(const PASSTHRU_MSG_INTERNAL& msg) uint8_t addrlen = is_ext_addr ? 5 : 4; switch (msg_get_type(msg, addrlen)) { + case FRAME_FLOWCTRL: { + if (msg.Data.size() < addrlen + 3) return; + uint8_t flow_status = msg.Data[addrlen] & 0x0F; + uint8_t block_size = msg.Data[addrlen + 1]; + uint8_t st_min = msg.Data[addrlen + 2]; + switch (flow_status) { + case FLOWCTRL_CONTINUE: { + if (st_min > 0xF9) break; + if (st_min >= 0xf1 && st_min <= 0xf9) { + this->conversations[fid]->tx_flowcontrol(block_size, std::chrono::microseconds((st_min & 0x0F) * 100)); + } else { + this->conversations[fid]->tx_flowcontrol(block_size, std::chrono::microseconds(st_min * 1000)); + } + if (auto panda_dev_sp = this->panda_dev.lock()) { + panda_dev_sp->registerMultiPartTx(this->conversations[fid]); + } + break; + } + case FLOWCTRL_WAIT: + this->conversations[fid]->tx_flowcontrol(0, std::chrono::microseconds(0)); + break; + case FLOWCTRL_ABORT: + this->conversations[fid] = nullptr; + break; + } + break; + } case FRAME_SINGLE: this->conversations[fid] = nullptr; //Reset any current transaction. @@ -163,6 +236,7 @@ void J2534Connection_ISO15765::processMessage(const PASSTHRU_MSG_INTERNAL& msg) outframe.RxStatus |= ISO15765_ADDR_TYPE; outframe.ExtraDataIndex = 0; outframe.TxFlags = 0; + synchronized(message_access_lock) { this->messages.push(outframe); } @@ -243,3 +317,33 @@ int J2534Connection_ISO15765::get_matching_in_fc_filter_id(const PASSTHRU_MSG_IN } return -1; } + +void J2534Connection_ISO15765::sendConsecutiveFrame(std::shared_ptr frame, std::shared_ptr filter) { + auto filter_addr_str = filter->get_flowctrl(); + uint32_t addr = ((uint8_t)filter_addr_str[0]) << 24 | ((uint8_t)filter_addr_str[1]) << 16 | + ((uint8_t)filter_addr_str[2]) << 8 | ((uint8_t)filter_addr_str[3]); + uint8_t snd_buff[8] = { 0 }; + unsigned int idx = 0; + + if (check_bmask(filter->flags, ISO15765_ADDR_TYPE)) + snd_buff[idx++] = filter_addr_str[4]; //EXT ADDR byte + + snd_buff[idx++] = 0x20 | frame->getNextConsecutiveFrameId(); + + std::string payload_piece = frame->consumeTxBuff(check_bmask(filter->flags, ISO15765_ADDR_TYPE) ? 6 : 7); + memcpy_s(&snd_buff[idx], sizeof(snd_buff) - idx, payload_piece.c_str(), payload_piece.size()); + if (auto panda_dev_sp = this->panda_dev.lock()) { + + /*synchronized(staged_writes_lock) { + this->staged_writes[fid].dispatched_msg = std::string((char*)msg->Data, 4) + + std::string((char*)snd_buff, (msg_is_padded(msg) ? sizeof(snd_buff) : (this_payload_len + idx))); + this->staged_writes[fid].remaining_payload = std::string((char*)&msg->Data[addrlen + this_payload_len], payload_len - this_payload_len); + }*/ + frame->lastTxMsg = filter_addr_str.substr(0,4) + std::string((char*)snd_buff, (check_bmask(filter->flags, ISO15765_FRAME_PAD) ? sizeof(snd_buff) : (payload_piece.size() + idx))); + + if (panda_dev_sp->panda->can_send(addr, val_is_29bit(filter->flags), snd_buff, + (check_bmask(filter->flags, ISO15765_FRAME_PAD) ? sizeof(snd_buff) : (payload_piece.size() + idx)), panda::PANDA_CAN1) == FALSE) { + return; + } + } +} diff --git a/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.h b/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.h index 52caa294..1b147189 100644 --- a/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.h +++ b/drivers/windows/pandaJ2534DLL/J2534Connection_ISO15765.h @@ -4,6 +4,11 @@ #include "J2534Connection_CAN.h" #include "FrameSet.h" +typedef struct { + std::string dispatched_msg; + std::string remaining_payload; +} PRESTAGED_WRITE; + class J2534Connection_ISO15765 : public J2534Connection { //J2534Connection_CAN { public: J2534Connection_ISO15765( @@ -23,6 +28,7 @@ public: virtual void processMessageReceipt(const PASSTHRU_MSG_INTERNAL& msg); virtual void processMessage(const PASSTHRU_MSG_INTERNAL& msg); + void sendConsecutiveFrame(std::shared_ptr frame, std::shared_ptr filter); virtual unsigned long getMinMsgLen() { return 4; @@ -41,6 +47,7 @@ public: } private: + Mutex staged_writes_lock; + std::array staged_writes; std::array, 10> conversations; }; - diff --git a/drivers/windows/pandaJ2534DLL/PandaJ2534Device.cpp b/drivers/windows/pandaJ2534DLL/PandaJ2534Device.cpp index e8421e18..cc271076 100644 --- a/drivers/windows/pandaJ2534DLL/PandaJ2534Device.cpp +++ b/drivers/windows/pandaJ2534DLL/PandaJ2534Device.cpp @@ -1,6 +1,17 @@ #include "stdafx.h" #include "PandaJ2534Device.h" +FLOW_CONTROL_WRITE::FLOW_CONTROL_WRITE(std::shared_ptr framein) : frame(framein) { + expire = std::chrono::steady_clock::now() + framein->separation_time; +} + +void FLOW_CONTROL_WRITE::refreshExpiration() { + if (auto& frameptr = this->frame.lock()) { + expire += frameptr->separation_time; + } +} + + PandaJ2534Device::PandaJ2534Device(std::unique_ptr new_panda) { this->panda = std::move(new_panda); @@ -12,6 +23,10 @@ PandaJ2534Device::PandaJ2534Device(std::unique_ptr new_panda) { DWORD canListenThreadID; this->thread_kill_event = CreateEvent(NULL, TRUE, FALSE, NULL); this->can_thread_handle = CreateThread(NULL, 0, _can_recv_threadBootstrap, (LPVOID)this, 0, &canListenThreadID); + + DWORD flowControlSendThreadID; + this->flow_control_wakeup_event = CreateEvent(NULL, TRUE, FALSE, NULL); + this->flow_control_thread_handle = CreateThread(NULL, 0, _flow_control_write_threadBootstrap, (LPVOID)this, 0, &flowControlSendThreadID); }; PandaJ2534Device::~PandaJ2534Device() { @@ -19,6 +34,7 @@ PandaJ2534Device::~PandaJ2534Device() { DWORD res = WaitForSingleObject(this->can_thread_handle, INFINITE); CloseHandle(this->can_thread_handle); + res = WaitForSingleObject(this->flow_control_thread_handle, INFINITE); CloseHandle(this->flow_control_thread_handle); CloseHandle(this->flow_control_wakeup_event); @@ -70,7 +86,6 @@ DWORD PandaJ2534Device::can_recv_thread() { std::vector msg_recv; err = this->panda->can_recv_async(this->thread_kill_event, msg_recv); for (auto msg_in : msg_recv) { - //if (this->_is_29bit() != msg_in.addr_29b) {} PASSTHRU_MSG_INTERNAL msg_out; msg_out.ProtocolID = CAN; msg_out.ExtraDataIndex = 0; @@ -105,5 +120,63 @@ DWORD PandaJ2534Device::_flow_control_write_threadBootstrap(LPVOID This) { } DWORD PandaJ2534Device::flow_control_write_thread() { + const HANDLE subscriptions[] = { this->flow_control_wakeup_event, this->thread_kill_event }; + DWORD sleepDuration = INFINITE; + while (TRUE) { + DWORD res = WaitForMultipleObjects(2, subscriptions, FALSE, sleepDuration); + if (res == WAIT_OBJECT_0 + 1) return 0; + if (res != WAIT_OBJECT_0 && res != WAIT_TIMEOUT) { + printf("Got an unexpected wait result in flow_control_write_thread. Res: %d; GetLastError: %d\n. Terminating thread.", res, GetLastError()); + return 0; + } + ResetEvent(this->flow_control_wakeup_event); + + while (TRUE) { + synchronized(active_flow_control_txs_lock) { //implemented with for loop. Consumes breaks. + if (this->active_flow_control_txs.size() == 0) { + sleepDuration = INFINITE; + goto break_flow_ctrl_loop; + } + auto& fcontrol_write = this->active_flow_control_txs.front(); + if (auto& frame = fcontrol_write->frame.lock()) { + if (std::chrono::steady_clock::now() >= fcontrol_write->expire) { + this->active_flow_control_txs.pop_front(); + if (auto& filter = frame->filter.lock()) { + //Write message + filter->conn->sendConsecutiveFrame(frame, filter); + } else { + this->active_flow_control_txs.pop_front(); + } + //fcontrol_write->refreshExpiration(); + //insertMultiPartTxInQueue(std::move(fcontrol_write)); + } else { //Ran out of things that need to be sent now. Sleep! + auto time_diff = std::chrono::duration_cast(fcontrol_write->expire - std::chrono::steady_clock::now()); + sleepDuration = max(1, time_diff.count()); + goto break_flow_ctrl_loop;// doloop = FALSE; + } + } else { // This frame has been aborted. + this->active_flow_control_txs.pop_front(); + } + } + } + break_flow_ctrl_loop: + continue; + } return 0; } + +void PandaJ2534Device::insertMultiPartTxInQueue(std::unique_ptr fcwrite) { + synchronized(active_flow_control_txs_lock) { + auto iter = this->active_flow_control_txs.begin(); + for (; iter != this->active_flow_control_txs.end(); iter++) { + if (fcwrite->expire < (*iter)->expire) break; + } + this->active_flow_control_txs.insert(iter, std::move(fcwrite)); + } +} + +void PandaJ2534Device::registerMultiPartTx(std::shared_ptr frame) { + auto fcwrite = std::make_unique(frame); + insertMultiPartTxInQueue(std::move(fcwrite)); + SetEvent(this->flow_control_wakeup_event); +} diff --git a/drivers/windows/pandaJ2534DLL/PandaJ2534Device.h b/drivers/windows/pandaJ2534DLL/PandaJ2534Device.h index a5f51a2c..da92f27a 100644 --- a/drivers/windows/pandaJ2534DLL/PandaJ2534Device.h +++ b/drivers/windows/pandaJ2534DLL/PandaJ2534Device.h @@ -1,10 +1,25 @@ #pragma once #include +#include +#include #include "J2534_v0404.h" #include "panda/panda.h" +#include "synchronize.h" #include "J2534Connection.h" +#include "FrameSet.h" class J2534Connection; +class FrameSet; + +typedef struct FLOW_CONTROL_WRITE { + FLOW_CONTROL_WRITE(std::shared_ptr framein); + ~FLOW_CONTROL_WRITE() {}; + + void refreshExpiration(); + + std::weak_ptr frame; + std::chrono::time_point expire; +} FLOW_CONTROL_WRITE; class PandaJ2534Device { public: @@ -20,6 +35,9 @@ public: std::unique_ptr panda; std::vector> connections; + void insertMultiPartTxInQueue(std::unique_ptr fcwrite); + void registerMultiPartTx(std::shared_ptr frame); + private: HANDLE thread_kill_event; @@ -31,4 +49,6 @@ private: HANDLE flow_control_thread_handle; static DWORD WINAPI _flow_control_write_threadBootstrap(LPVOID This); DWORD flow_control_write_thread(); + std::list> active_flow_control_txs; + Mutex active_flow_control_txs_lock; };