diff --git a/reactor-plc/plc.lua b/reactor-plc/plc.lua index f76bfb6..c1ffbf0 100644 --- a/reactor-plc/plc.lua +++ b/reactor-plc/plc.lua @@ -457,11 +457,9 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) send_status(plc_state.degraded) log._debug("re-sent initial status data") elseif link_ack == RPLC_LINKING.DENY then - -- @todo: make sure this doesn't become a MITM security risk println_ts("received unsolicited link denial, unlinking") log._debug("unsolicited RPLC link request denied") elseif link_ack == RPLC_LINKING.COLLISION then - -- @todo: make sure this doesn't become a MITM security risk println_ts("received unsolicited link collision, unlinking") log._warning("unsolicited RPLC link request collision") else @@ -562,7 +560,11 @@ function comms_init(id, modem, local_port, server_port, reactor, iss) local is_scrammed = function () return self.scrammed end local is_linked = function () return self.linked end - local unlink = function () self.linked = false end + + local unlink = function () + self.linked = false + self.r_seq_num = nil + end return { reconnect_modem = reconnect_modem, diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index 811f4f2..37ac6de 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -12,7 +12,7 @@ os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.4.9" +local R_PLC_VERSION = "alpha-v0.4.10" local print = util.print local println = util.println diff --git a/rtu/rtu.lua b/rtu/rtu.lua index 2d7306d..27f18c6 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -120,6 +120,7 @@ end function rtu_comms(modem, local_port, server_port) local self = { seq_num = 0, + r_seq_num = nil, txn_id = 0, modem = modem, s_port = server_port, @@ -193,8 +194,23 @@ function rtu_comms(modem, local_port, server_port) end -- handle a MODBUS/SCADA packet - local handle_packet = function(packet, units, rtu_state) + local handle_packet = function(packet, units, rtu_state, conn_watchdog) if packet ~= nil then + local seq_ok = true + + -- check sequence number + if self.r_seq_num == nil then + self.r_seq_num = packet.scada_frame.seq_num() + elseif rtu_state.linked and self.r_seq_num >= packet.scada_frame.seq_num() then + log._warning("sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. packet.scada_frame.seq_num()) + return + else + self.r_seq_num = packet.scada_frame.seq_num() + end + + -- feed watchdog on valid sequence number + conn_watchdog.feed() + local protocol = packet.scada_frame.protocol() if protocol == PROTOCOLS.MODBUS_TCP then @@ -220,6 +236,7 @@ function rtu_comms(modem, local_port, server_port) if packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then -- acknowledgement rtu_state.linked = true + self.r_seq_num = nil elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then -- request for capabilities again send_advertisement(units) @@ -279,11 +296,17 @@ function rtu_comms(modem, local_port, server_port) _send(SCADA_MGMT_TYPES.RTU_HEARTBEAT, {}) end + local unlink = function (rtu_state) + rtu_state.linked = false + self.r_seq_num = nil + end + return { reconnect_modem = reconnect_modem, parse_packet = parse_packet, handle_packet = handle_packet, send_advertisement = send_advertisement, - send_heartbeat = send_heartbeat + send_heartbeat = send_heartbeat, + unlink = unlink } end diff --git a/rtu/startup.lua b/rtu/startup.lua index 9219bff..d4360b5 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -19,7 +19,7 @@ os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.4.6" +local RTU_VERSION = "alpha-v0.4.7" local print = util.print local println = util.println @@ -53,6 +53,7 @@ local __shared_memory = { -- system objects rtu_sys = { rtu_comms = nil, + conn_watchdog = nil, units = {} }, @@ -203,6 +204,10 @@ end local main_thread = threads.thread__main(__shared_memory) local comms_thread = threads.thread__comms(__shared_memory) +-- start connection watchdog +smem_sys.conn_watchdog = util.new_watchdog(5) +log._debug("init> conn watchdog started") + -- run threads parallel.waitForAll(main_thread.exec, comms_thread.exec) diff --git a/rtu/threads.lua b/rtu/threads.lua index 049ca89..b02aafb 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -23,16 +23,38 @@ function thread__main(smem) local loop_clock = os.startTimer(MAIN_CLOCK) -- load in from shared memory - local rtu_state = smem.rtu_state - local rtu_dev = smem.rtu_dev - local rtu_comms = smem.rtu_sys.rtu_comms - local units = smem.rtu_sys.units + local rtu_state = smem.rtu_state + local rtu_dev = smem.rtu_dev + local rtu_comms = smem.rtu_sys.rtu_comms + local conn_watchdog = smem.rtu_sys.conn_watchdog + local units = smem.rtu_sys.units -- event loop while true do local event, param1, param2, param3, param4, param5 = os.pullEventRaw() - if event == "peripheral_detach" then + if event == "timer" and param1 == loop_clock then + -- start next clock timer + loop_clock = os.startTimer(MAIN_CLOCK) + + -- period tick, if we are linked send heartbeat, if not send advertisement + if rtu_state.linked then + rtu_comms.send_heartbeat() + else + -- advertise units + rtu_comms.send_advertisement(units) + end + elseif event == "modem_message" then + -- got a packet + local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + -- pass the packet onto the comms message queue + smem.q.mq_comms.push_packet(packet) + end + elseif event == "timer" and param1 == conn_watchdog.get_timer() then + -- haven't heard from server recently? unlink + rtu_comms.unlink(rtu_state) + elseif event == "peripheral_detach" then -- handle loss of a device local device = ppm.handle_unmount(param1) @@ -94,23 +116,6 @@ function thread__main(smem) end end end - elseif event == "timer" and param1 == loop_clock then - -- start next clock timer - loop_clock = os.startTimer(MAIN_CLOCK) - - -- period tick, if we are linked send heartbeat, if not send advertisement - if rtu_state.linked then - rtu_comms.send_heartbeat() - else - -- advertise units - rtu_comms.send_advertisement(units) - end - elseif event == "modem_message" then - -- got a packet - local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) - if packet ~= nil then - smem.q.mq_comms.push_packet(packet) - end end -- check for termination request @@ -132,13 +137,14 @@ function thread__comms(smem) log._debug("comms thread start") -- load in from shared memory - local rtu_state = smem.rtu_state - local rtu_comms = smem.rtu_sys.rtu_comms - local units = smem.rtu_sys.units + local rtu_state = smem.rtu_state + local rtu_comms = smem.rtu_sys.rtu_comms + local conn_watchdog = smem.rtu_sys.conn_watchdog + local units = smem.rtu_sys.units - local comms_queue = smem.q.mq_comms + local comms_queue = smem.q.mq_comms - local last_update = util.time() + local last_update = util.time() -- thread loop while true do @@ -153,7 +159,8 @@ function thread__comms(smem) elseif msg.qtype == mqueue.TYPE.PACKET then -- received a packet -- handle the packet (rtu_state passed to allow setting link flag) - rtu_comms.handle_packet(msg.message, units, rtu_state) + -- (conn_watchdog passed to allow feeding watchdog) + rtu_comms.handle_packet(msg.message, units, rtu_state, conn_watchdog) end -- quick yield