From 67ec8fbd910bdacade32ffd7ceb625b4e1d03f09 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Thu, 28 Apr 2022 22:36:45 -0400 Subject: [PATCH] rx and tx threads for PLC comms to maintain quick comms and #36 only feed watchdog on valid sequence numbers --- reactor-plc/plc.lua | 11 ++++--- reactor-plc/startup.lua | 15 ++++++--- reactor-plc/threads.lua | 67 +++++++++++++++++++++++++++++++++-------- 3 files changed, 71 insertions(+), 22 deletions(-) diff --git a/reactor-plc/plc.lua b/reactor-plc/plc.lua index 361c174..127820f 100644 --- a/reactor-plc/plc.lua +++ b/reactor-plc/plc.lua @@ -411,7 +411,7 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) end -- handle an RPLC packet - local handle_packet = function (packet, plc_state) + local handle_packet = function (packet, plc_state, conn_watchdog) if packet ~= nil then -- check sequence number if self.r_seq_num == nil then @@ -423,6 +423,9 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) self.r_seq_num = packet.scada_frame.seq_num() end + -- feed the watchdog first so it doesn't uhh...eat our packets + conn_watchdog.feed() + -- handle packet if packet.scada_frame.protocol() == PROTOCOLS.RPLC then if self.linked then @@ -431,10 +434,8 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) local timestamp = packet.data[1] local trip_time = util.time() - timestamp - if trip_time < 0 then - log._warning("PLC KEEP_ALIVE trip time less than 0 (" .. trip_time .. ")") - elseif trip_time > 1200 then - log._warning("PLC KEEP_ALIVE trip time > 1.2s (" .. trip_time .. ")") + if trip_time > 500 then + log._warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. ")") end -- log._debug("RPLC RTT = ".. trip_time .. "ms") diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index 4709a9c..3bf458f 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -12,7 +12,7 @@ os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.4.6" +local R_PLC_VERSION = "alpha-v0.4.7" local print = util.print local println = util.println @@ -58,7 +58,8 @@ local __shared_memory = { -- message queues q = { mq_iss = mqueue.new(), - mq_comms = mqueue.new() + mq_comms_tx = mqueue.new(), + mq_comms_rx = mqueue.new() } } @@ -126,12 +127,16 @@ init() -- init threads local main_thread = threads.thread__main(__shared_memory, init) local iss_thread = threads.thread__iss(__shared_memory) -local comms_thread = threads.thread__comms(__shared_memory) --- run threads if __shared_memory.networked then - parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec) + -- init comms threads + local comms_thread_tx = threads.thread__comms_tx(__shared_memory) + local comms_thread_rx = threads.thread__comms_rx(__shared_memory) + + -- run threads + parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread_tx.exec, comms_thread_rx.exec) else + -- run threads, excluding comms parallel.waitForAll(main_thread.exec, iss_thread.exec) end diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index a9b95d7..c946ed5 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -58,7 +58,7 @@ function thread__main(smem, init) -- send updated data if not plc_state.no_modem then if plc_comms.is_linked() then - smem.q.mq_comms.push_command(MQ__COMM_CMD.SEND_STATUS) + smem.q.mq_comms_tx.push_command(MQ__COMM_CMD.SEND_STATUS) else if ticks_to_update == 0 then plc_comms.send_link_req() @@ -71,13 +71,10 @@ function thread__main(smem, init) end elseif event == "modem_message" and networked and not plc_state.no_modem then -- got a packet - -- feed the watchdog first so it doesn't uhh...eat our packets - conn_watchdog.feed() - - -- handle the packet local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) if packet ~= nil then - smem.q.mq_comms.push_packet(packet) + -- pass the packet onto the comms message queue + smem.q.mq_comms_rx.push_packet(packet) end elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then -- haven't heard from server recently? shutdown reactor @@ -296,16 +293,16 @@ function thread__iss(smem) end -- communications handler thread -function thread__comms(smem) +function thread__comms_tx(smem) -- execute thread local exec = function () - log._debug("comms thread start") + log._debug("comms tx thread start") -- load in from shared memory local plc_state = smem.plc_state local plc_comms = smem.plc_sys.plc_comms - local comms_queue = smem.q.mq_comms + local comms_queue = smem.q.mq_comms_tx local last_update = util.time() @@ -326,8 +323,6 @@ function thread__comms(smem) -- received data elseif msg.qtype == mqueue.TYPE.PACKET then -- received a packet - -- handle the packet (plc_state passed to allow clearing SCRAM flag) - plc_comms.handle_packet(msg.message, plc_state) end -- quick yield @@ -336,7 +331,55 @@ function thread__comms(smem) -- check for termination request if plc_state.shutdown then - log._warning("comms thread exiting") + log._warning("comms tx thread exiting") + break + end + + -- delay before next check + last_update = util.adaptive_delay(COMMS_SLEEP, last_update) + end + end + + return { exec = exec } +end + +function thread__comms_rx(smem) + -- execute thread + local exec = function () + log._debug("comms rx thread start") + + -- load in from shared memory + local plc_state = smem.plc_state + local plc_comms = smem.plc_sys.plc_comms + local conn_watchdog = smem.plc_sys.conn_watchdog + + local comms_queue = smem.q.mq_comms_rx + + local last_update = util.time() + + -- thread loop + while true do + -- check for messages in the message queue + while comms_queue.ready() and not plc_state.shutdown do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (plc_state passed to allow clearing SCRAM flag) + plc_comms.handle_packet(msg.message, plc_state, conn_watchdog) + end + + -- quick yield + util.nop() + end + + -- check for termination request + if plc_state.shutdown then + log._warning("comms rx thread exiting") break end