rx and tx threads for PLC comms to maintain quick comms and #36 only feed watchdog on valid sequence numbers

This commit is contained in:
Mikayla Fischler 2022-04-28 22:36:45 -04:00
parent aff166e27d
commit 67ec8fbd91
3 changed files with 71 additions and 22 deletions

View File

@ -411,7 +411,7 @@ function comms_init(id, modem, local_port, server_port, reactor, iss)
end
-- handle an RPLC packet
local handle_packet = function (packet, plc_state)
local handle_packet = function (packet, plc_state, conn_watchdog)
if packet ~= nil then
-- check sequence number
if self.r_seq_num == nil then
@ -423,6 +423,9 @@ function comms_init(id, modem, local_port, server_port, reactor, iss)
self.r_seq_num = packet.scada_frame.seq_num()
end
-- feed the watchdog first so it doesn't uhh...eat our packets
conn_watchdog.feed()
-- handle packet
if packet.scada_frame.protocol() == PROTOCOLS.RPLC then
if self.linked then
@ -431,10 +434,8 @@ function comms_init(id, modem, local_port, server_port, reactor, iss)
local timestamp = packet.data[1]
local trip_time = util.time() - timestamp
if trip_time < 0 then
log._warning("PLC KEEP_ALIVE trip time less than 0 (" .. trip_time .. ")")
elseif trip_time > 1200 then
log._warning("PLC KEEP_ALIVE trip time > 1.2s (" .. trip_time .. ")")
if trip_time > 500 then
log._warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. ")")
end
-- log._debug("RPLC RTT = ".. trip_time .. "ms")

View File

@ -12,7 +12,7 @@ os.loadAPI("config.lua")
os.loadAPI("plc.lua")
os.loadAPI("threads.lua")
local R_PLC_VERSION = "alpha-v0.4.6"
local R_PLC_VERSION = "alpha-v0.4.7"
local print = util.print
local println = util.println
@ -58,7 +58,8 @@ local __shared_memory = {
-- message queues
q = {
mq_iss = mqueue.new(),
mq_comms = mqueue.new()
mq_comms_tx = mqueue.new(),
mq_comms_rx = mqueue.new()
}
}
@ -126,12 +127,16 @@ init()
-- init threads
local main_thread = threads.thread__main(__shared_memory, init)
local iss_thread = threads.thread__iss(__shared_memory)
local comms_thread = threads.thread__comms(__shared_memory)
-- run threads
if __shared_memory.networked then
parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec)
-- init comms threads
local comms_thread_tx = threads.thread__comms_tx(__shared_memory)
local comms_thread_rx = threads.thread__comms_rx(__shared_memory)
-- run threads
parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread_tx.exec, comms_thread_rx.exec)
else
-- run threads, excluding comms
parallel.waitForAll(main_thread.exec, iss_thread.exec)
end

View File

@ -58,7 +58,7 @@ function thread__main(smem, init)
-- send updated data
if not plc_state.no_modem then
if plc_comms.is_linked() then
smem.q.mq_comms.push_command(MQ__COMM_CMD.SEND_STATUS)
smem.q.mq_comms_tx.push_command(MQ__COMM_CMD.SEND_STATUS)
else
if ticks_to_update == 0 then
plc_comms.send_link_req()
@ -71,13 +71,10 @@ function thread__main(smem, init)
end
elseif event == "modem_message" and networked and not plc_state.no_modem then
-- got a packet
-- feed the watchdog first so it doesn't uhh...eat our packets
conn_watchdog.feed()
-- handle the packet
local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5)
if packet ~= nil then
smem.q.mq_comms.push_packet(packet)
-- pass the packet onto the comms message queue
smem.q.mq_comms_rx.push_packet(packet)
end
elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then
-- haven't heard from server recently? shutdown reactor
@ -296,16 +293,16 @@ function thread__iss(smem)
end
-- communications handler thread
function thread__comms(smem)
function thread__comms_tx(smem)
-- execute thread
local exec = function ()
log._debug("comms thread start")
log._debug("comms tx thread start")
-- load in from shared memory
local plc_state = smem.plc_state
local plc_comms = smem.plc_sys.plc_comms
local comms_queue = smem.q.mq_comms
local comms_queue = smem.q.mq_comms_tx
local last_update = util.time()
@ -326,8 +323,6 @@ function thread__comms(smem)
-- received data
elseif msg.qtype == mqueue.TYPE.PACKET then
-- received a packet
-- handle the packet (plc_state passed to allow clearing SCRAM flag)
plc_comms.handle_packet(msg.message, plc_state)
end
-- quick yield
@ -336,7 +331,55 @@ function thread__comms(smem)
-- check for termination request
if plc_state.shutdown then
log._warning("comms thread exiting")
log._warning("comms tx thread exiting")
break
end
-- delay before next check
last_update = util.adaptive_delay(COMMS_SLEEP, last_update)
end
end
return { exec = exec }
end
function thread__comms_rx(smem)
-- execute thread
local exec = function ()
log._debug("comms rx thread start")
-- load in from shared memory
local plc_state = smem.plc_state
local plc_comms = smem.plc_sys.plc_comms
local conn_watchdog = smem.plc_sys.conn_watchdog
local comms_queue = smem.q.mq_comms_rx
local last_update = util.time()
-- thread loop
while true do
-- check for messages in the message queue
while comms_queue.ready() and not plc_state.shutdown do
local msg = comms_queue.pop()
if msg.qtype == mqueue.TYPE.COMMAND then
-- received a command
elseif msg.qtype == mqueue.TYPE.DATA then
-- received data
elseif msg.qtype == mqueue.TYPE.PACKET then
-- received a packet
-- handle the packet (plc_state passed to allow clearing SCRAM flag)
plc_comms.handle_packet(msg.message, plc_state, conn_watchdog)
end
-- quick yield
util.nop()
end
-- check for termination request
if plc_state.shutdown then
log._warning("comms rx thread exiting")
break
end