From e679b5a25aafa4676af4801c1ce323caa24edd81 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Sun, 13 Nov 2022 14:13:30 -0500 Subject: [PATCH] #122 versioned comms protocol with unified establish protocol --- coordinator/coordinator.lua | 188 +++++++++++++++------------ coordinator/startup.lua | 2 +- reactor-plc/plc.lua | 146 ++++++++++----------- reactor-plc/startup.lua | 2 +- rtu/rtu.lua | 206 +++++++++++++++++------------ rtu/startup.lua | 2 +- rtu/threads.lua | 10 +- scada-common/comms.lua | 76 ++++++----- supervisor/session/svsessions.lua | 6 +- supervisor/startup.lua | 2 +- supervisor/supervisor.lua | 207 ++++++++++++++++-------------- 11 files changed, 463 insertions(+), 384 deletions(-) diff --git a/coordinator/coordinator.lua b/coordinator/coordinator.lua index 05b8443..2a211cb 100644 --- a/coordinator/coordinator.lua +++ b/coordinator/coordinator.lua @@ -16,6 +16,8 @@ local print_ts = util.print_ts local println_ts = util.println_ts local PROTOCOLS = comms.PROTOCOLS +local DEVICE_TYPES = comms.DEVICE_TYPES +local ESTABLISH_ACK = comms.ESTABLISH_ACK local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES local CRDN_COMMANDS = comms.CRDN_COMMANDS @@ -236,7 +238,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa -- attempt connection establishment local function _send_establish() - _send_sv(PROTOCOLS.SCADA_CRDN, SCADA_CRDN_TYPES.ESTABLISH, { version }) + _send_sv(PROTOCOLS.SCADA_MGMT, SCADA_MGMT_TYPES.ESTABLISH, { comms.version, version, DEVICE_TYPES.CRDN }) end -- keep alive ack @@ -288,7 +290,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa elseif event == "modem_message" then -- handle message local packet = public.parse_packet(p1, p2, p3, p4, p5) - if packet ~= nil and packet.type == SCADA_CRDN_TYPES.ESTABLISH then + if packet ~= nil and packet.type == SCADA_MGMT_TYPES.ESTABLISH then public.handle_packet(packet) end elseif event == "terminate" then @@ -385,106 +387,124 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa -- handle packet if protocol == PROTOCOLS.SCADA_CRDN then - if packet.type == SCADA_CRDN_TYPES.ESTABLISH then - -- connection with supervisor established - if packet.length > 1 then - -- get configuration - - ---@class facility_conf - local conf = { - num_units = packet.data[1], - defs = {} -- boilers and turbines - } - - if (packet.length - 1) == (conf.num_units * 2) then - -- record sequence of pairs of [#boilers, #turbines] per unit - for i = 2, packet.length do - table.insert(conf.defs, packet.data[i]) - end - - -- init io controller - iocontrol.init(conf, public) - - self.sv_linked = true + if self.sv_linked then + if packet.type == SCADA_CRDN_TYPES.STRUCT_BUILDS then + -- record builds + if iocontrol.record_builds(packet.data) then + -- acknowledge receipt of builds + _send_sv(PROTOCOLS.SCADA_CRDN, SCADA_CRDN_TYPES.STRUCT_BUILDS, {}) else - log.debug("supervisor conn establish packet length mismatch") + log.error("received invalid SCADA_CRDN build packet") end - elseif packet.length == 1 and packet.data[1] == false then - log.debug("supervisor connection denied") - else - log.debug("supervisor conn establish packet length mismatch") - end - elseif packet.type == SCADA_CRDN_TYPES.STRUCT_BUILDS then - -- record builds - if iocontrol.record_builds(packet.data) then - -- acknowledge receipt of builds - _send_sv(PROTOCOLS.SCADA_CRDN, SCADA_CRDN_TYPES.STRUCT_BUILDS, {}) - else - log.error("received invalid build packet") - end - elseif packet.type == SCADA_CRDN_TYPES.UNIT_STATUSES then - -- update statuses - if not iocontrol.update_statuses(packet.data) then - log.error("received invalid unit statuses packet") - end - elseif packet.type == SCADA_CRDN_TYPES.COMMAND_UNIT then - -- unit command acknowledgement - if packet.length == 3 then - local cmd = packet.data[1] - local unit_id = packet.data[2] - local ack = packet.data[3] + elseif packet.type == SCADA_CRDN_TYPES.UNIT_STATUSES then + -- update statuses + if not iocontrol.update_statuses(packet.data) then + log.error("received invalid SCADA_CRDN unit statuses packet") + end + elseif packet.type == SCADA_CRDN_TYPES.COMMAND_UNIT then + -- unit command acknowledgement + if packet.length == 3 then + local cmd = packet.data[1] + local unit_id = packet.data[2] + local ack = packet.data[3] - local unit = iocontrol.get_db().units[unit_id] ---@type ioctl_entry + local unit = iocontrol.get_db().units[unit_id] ---@type ioctl_entry - if unit ~= nil then - if cmd == CRDN_COMMANDS.SCRAM then - unit.scram_ack(ack) - elseif cmd == CRDN_COMMANDS.START then - unit.start_ack(ack) - elseif cmd == CRDN_COMMANDS.RESET_RPS then - unit.reset_rps_ack(ack) - elseif cmd == CRDN_COMMANDS.SET_BURN then - unit.set_burn_ack(ack) - elseif cmd == CRDN_COMMANDS.SET_WASTE then - unit.set_waste_ack(ack) + if unit ~= nil then + if cmd == CRDN_COMMANDS.SCRAM then + unit.scram_ack(ack) + elseif cmd == CRDN_COMMANDS.START then + unit.start_ack(ack) + elseif cmd == CRDN_COMMANDS.RESET_RPS then + unit.reset_rps_ack(ack) + elseif cmd == CRDN_COMMANDS.SET_BURN then + unit.set_burn_ack(ack) + elseif cmd == CRDN_COMMANDS.SET_WASTE then + unit.set_waste_ack(ack) + else + log.debug(util.c("received command ack with unknown command ", cmd)) + end else - log.debug(util.c("received command ack with unknown command ", cmd)) + log.debug(util.c("received command ack with unknown unit ", unit_id)) end else - log.debug(util.c("received command ack with unknown unit ", unit_id)) + log.debug("SCADA_CRDN unit command ack packet length mismatch") end + elseif packet.type == SCADA_CRDN_TYPES.ALARM then + ---@todo alarm/architecture handling else - log.debug("unit command ack packet length mismatch") + log.warning("received unknown SCADA_CRDN packet type " .. packet.type) end - elseif packet.type == SCADA_CRDN_TYPES.ALARM then else - log.warning("received unknown SCADA_CRDN packet type " .. packet.type) + log.debug("discarding SCADA_CRDN packet before linked") end elseif protocol == PROTOCOLS.SCADA_MGMT then - if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then - -- keep alive request received, echo back - if packet.length == 1 then - local timestamp = packet.data[1] - local trip_time = util.time() - timestamp + if packet.type == SCADA_MGMT_TYPES.ESTABLISH then + -- connection with supervisor established + if packet.length == 2 then + local est_ack = packet.data[1] + local config = packet.data[2] - if trip_time > 500 then - log.warning("coord KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + if est_ack == ESTABLISH_ACK.ALLOW then + if type(config) == "table" and #config > 1 then + -- get configuration + + ---@class facility_conf + local conf = { + num_units = config[1], + defs = {} -- boilers and turbines + } + + if (#config - 1) == (conf.num_units * 2) then + -- record sequence of pairs of [#boilers, #turbines] per unit + for i = 2, #config do + table.insert(conf.defs, config[i]) + end + + -- init io controller + iocontrol.init(conf, public) + + self.sv_linked = true + else + log.error("invalid supervisor configuration definitions received, establish failed") + end + else + log.error("invalid supervisor configuration table received, establish failed") + end + else + log.debug("supervisor connection denied") end - - -- log.debug("coord RTT = " .. trip_time .. "ms") - - _send_keep_alive_ack(timestamp) else - log.debug("SCADA keep alive packet length mismatch") + log.debug("SCADA_MGMT establish packet length mismatch") + end + elseif self.sv_linked then + if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 500 then + log.warning("coord KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + end + + -- log.debug("coord RTT = " .. trip_time .. "ms") + + _send_keep_alive_ack(timestamp) + else + log.debug("SCADA keep alive packet length mismatch") + end + elseif packet.type == SCADA_MGMT_TYPES.CLOSE then + -- handle session close + sv_watchdog.cancel() + self.sv_linked = false + println_ts("server connection closed by remote host") + log.warning("server connection closed by remote host") + else + log.warning("received unknown SCADA_MGMT packet type " .. packet.type) end - elseif packet.type == SCADA_MGMT_TYPES.CLOSE then - -- handle session close - sv_watchdog.cancel() - self.sv_linked = false - println_ts("server connection closed by remote host") - log.warning("server connection closed by remote host") else - log.warning("received unknown SCADA_MGMT packet type " .. packet.type) + log.debug("discarding non-link SCADA_MGMT packet before linked") end else log.debug("illegal packet type " .. protocol .. " on supervisor listening channel", true) diff --git a/coordinator/startup.lua b/coordinator/startup.lua index fa3576f..e7028e7 100644 --- a/coordinator/startup.lua +++ b/coordinator/startup.lua @@ -16,7 +16,7 @@ local config = require("coordinator.config") local coordinator = require("coordinator.coordinator") local renderer = require("coordinator.renderer") -local COORDINATOR_VERSION = "alpha-v0.6.10" +local COORDINATOR_VERSION = "alpha-v0.6.11" local print = util.print local println = util.println diff --git a/reactor-plc/plc.lua b/reactor-plc/plc.lua index c59de20..70afdfd 100644 --- a/reactor-plc/plc.lua +++ b/reactor-plc/plc.lua @@ -9,8 +9,9 @@ local plc = {} local rps_status_t = types.rps_status_t local PROTOCOLS = comms.PROTOCOLS +local DEVICE_TYPES = comms.DEVICE_TYPES +local ESTABLISH_ACK = comms.ESTABLISH_ACK local RPLC_TYPES = comms.RPLC_TYPES -local RPLC_LINKING = comms.RPLC_LINKING local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local print = util.print @@ -600,7 +601,7 @@ function plc.comms(id, version, modem, local_port, server_port, reactor, rps, co -- attempt to establish link with supervisor function public.send_link_req() - _send(RPLC_TYPES.LINK_REQ, { id, version }) + _send_mgmt(SCADA_MGMT_TYPES.ESTABLISH, { comms.version, version, DEVICE_TYPES.PLC, id }) end -- send live status information @@ -716,34 +717,7 @@ function plc.comms(id, version, modem, local_port, server_port, reactor, rps, co -- handle packet if protocol == PROTOCOLS.RPLC then if self.linked then - if packet.type == RPLC_TYPES.LINK_REQ then - -- link request confirmation - if packet.length == 1 then - log.debug("received unsolicited link request response") - - local link_ack = packet.data[1] - - if link_ack == RPLC_LINKING.ALLOW then - self.status_cache = nil - _send_struct() - public.send_status(plc_state.no_reactor, plc_state.reactor_formed) - log.debug("re-sent initial status data") - elseif link_ack == RPLC_LINKING.DENY then - println_ts("received unsolicited link denial, unlinking") - log.debug("unsolicited RPLC link request denied") - elseif link_ack == RPLC_LINKING.COLLISION then - println_ts("received unsolicited link collision, unlinking") - log.warning("unsolicited RPLC link request collision") - else - println_ts("invalid unsolicited link response") - log.error("unsolicited unknown RPLC link request response") - end - - self.linked = link_ack == RPLC_LINKING.ALLOW - else - log.debug("RPLC link req packet length mismatch") - end - elseif packet.type == RPLC_TYPES.STATUS then + if packet.type == RPLC_TYPES.STATUS then -- request of full status, clear cache first self.status_cache = nil public.send_status(plc_state.no_reactor, plc_state.reactor_formed) @@ -805,69 +779,97 @@ function plc.comms(id, version, modem, local_port, server_port, reactor, rps, co else log.warning("received unknown RPLC packet type " .. packet.type) end - elseif packet.type == RPLC_TYPES.LINK_REQ then + else + log.debug("discarding RPLC packet before linked") + end + elseif protocol == PROTOCOLS.SCADA_MGMT then + if self.linked then + if packet.type == SCADA_MGMT_TYPES.ESTABLISH then + -- link request confirmation + if packet.length == 1 then + log.debug("received unsolicited establish response") + + local est_ack = packet.data[1] + + if est_ack == ESTABLISH_ACK.ALLOW then + self.status_cache = nil + _send_struct() + public.send_status(plc_state.no_reactor, plc_state.reactor_formed) + log.debug("re-sent initial status data") + elseif est_ack == ESTABLISH_ACK.DENY then + println_ts("received unsolicited link denial, unlinking") + log.info("unsolicited establish request denied") + elseif est_ack == ESTABLISH_ACK.COLLISION then + println_ts("received unsolicited link collision, unlinking") + log.warning("unsolicited establish request collision") + else + println_ts("invalid unsolicited link response") + log.error("unsolicited unknown establish request response") + end + + self.linked = est_ack == ESTABLISH_ACK.ALLOW + else + log.debug("SCADA_MGMT establish packet length mismatch") + end + elseif packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 and type(packet.data[1]) == "number" then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 500 then + log.warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + end + + -- log.debug("RPLC RTT = " .. trip_time .. "ms") + + _send_keep_alive_ack(timestamp) + else + log.debug("SCADA_MGMT keep alive packet length/type mismatch") + end + elseif packet.type == SCADA_MGMT_TYPES.CLOSE then + -- handle session close + conn_watchdog.cancel() + public.unlink() + println_ts("server connection closed by remote host") + log.warning("server connection closed by remote host") + else + log.warning("received unsupported SCADA_MGMT packet type " .. packet.type) + end + elseif packet.type == SCADA_MGMT_TYPES.ESTABLISH then -- link request confirmation if packet.length == 1 then - local link_ack = packet.data[1] + local est_ack = packet.data[1] - if link_ack == RPLC_LINKING.ALLOW then + if est_ack == ESTABLISH_ACK.ALLOW then println_ts("linked!") - log.debug("RPLC link request approved") + log.debug("supervisor establish request approved") -- reset remote sequence number and cache self.r_seq_num = nil self.status_cache = nil - if plc_state.reactor_formed then - _send_struct() - end - + if plc_state.reactor_formed then _send_struct() end public.send_status(plc_state.no_reactor, plc_state.reactor_formed) log.debug("sent initial status data") - elseif link_ack == RPLC_LINKING.DENY then + elseif est_ack == ESTABLISH_ACK.DENY then println_ts("link request denied, retrying...") - log.debug("RPLC link request denied") - elseif link_ack == RPLC_LINKING.COLLISION then + log.debug("establish request denied") + elseif est_ack == ESTABLISH_ACK.COLLISION then println_ts("reactor PLC ID collision (check config), retrying...") - log.warning("RPLC link request collision") + log.warning("establish request collision") else println_ts("invalid link response, bad channel? retrying...") - log.error("unknown RPLC link request response") + log.error("unknown establish request response") end - self.linked = link_ack == RPLC_LINKING.ALLOW + self.linked = est_ack == ESTABLISH_ACK.ALLOW else - log.debug("RPLC link req packet length mismatch") + log.debug("SCADA_MGMT establish packet length mismatch") end else - log.debug("discarding non-link packet before linked") - end - elseif protocol == PROTOCOLS.SCADA_MGMT then - if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then - -- keep alive request received, echo back - if packet.length == 1 then - local timestamp = packet.data[1] - local trip_time = util.time() - timestamp - - if trip_time > 500 then - log.warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") - end - - -- log.debug("RPLC RTT = " .. trip_time .. "ms") - - _send_keep_alive_ack(timestamp) - else - log.debug("SCADA keep alive packet length mismatch") - end - elseif packet.type == SCADA_MGMT_TYPES.CLOSE then - -- handle session close - conn_watchdog.cancel() - public.unlink() - println_ts("server connection closed by remote host") - log.warning("server connection closed by remote host") - else - log.warning("received unknown SCADA_MGMT packet type " .. packet.type) + log.debug("discarding non-link SCADA_MGMT packet before linked") end else -- should be unreachable assuming packet is from parse_packet() diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index 785d668..84a653d 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -13,7 +13,7 @@ local config = require("reactor-plc.config") local plc = require("reactor-plc.plc") local threads = require("reactor-plc.threads") -local R_PLC_VERSION = "beta-v0.9.5" +local R_PLC_VERSION = "beta-v0.9.6" local print = util.print local println = util.println diff --git a/rtu/rtu.lua b/rtu/rtu.lua index 229d58b..88516d1 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -8,6 +8,8 @@ local modbus = require("rtu.modbus") local rtu = {} local PROTOCOLS = comms.PROTOCOLS +local DEVICE_TYPES = comms.DEVICE_TYPES +local ESTABLISH_ACK = comms.ESTABLISH_ACK local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES @@ -210,6 +212,34 @@ function rtu.comms(version, modem, local_port, server_port, conn_watchdog) _send(SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() }) end + -- generate device advertisement table + ---@param units table + ---@return table advertisement + local function _generate_advertisement(units) + local advertisement = {} + + for i = 1, #units do + local unit = units[i] --@type rtu_unit_registry_entry + local type = comms.rtu_t_to_unit_type(unit.type) + + if type ~= nil then + local advert = { + type, + unit.index, + unit.reactor + } + + if type == RTU_UNIT_TYPES.REDSTONE then + insert(advert, unit.device) + end + + insert(advertisement, advert) + end + end + + return advertisement + end + -- PUBLIC FUNCTIONS -- -- send a MODBUS TCP packet @@ -244,31 +274,16 @@ function rtu.comms(version, modem, local_port, server_port, conn_watchdog) _send(SCADA_MGMT_TYPES.CLOSE, {}) end + -- send establish request (includes advertisement) + ---@param units table + function public.send_establish(units) + _send(SCADA_MGMT_TYPES.ESTABLISH, { comms.version, self.version, DEVICE_TYPES.RTU, _generate_advertisement(units) }) + end + -- send capability advertisement ---@param units table function public.send_advertisement(units) - local advertisement = { self.version } - - for i = 1, #units do - local unit = units[i] --@type rtu_unit_registry_entry - local type = comms.rtu_t_to_unit_type(unit.type) - - if type ~= nil then - local advert = { - type, - unit.index, - unit.reactor - } - - if type == RTU_UNIT_TYPES.REDSTONE then - insert(advert, unit.device) - end - - insert(advertisement, advert) - end - end - - _send(SCADA_MGMT_TYPES.RTU_ADVERT, advertisement) + _send(SCADA_MGMT_TYPES.RTU_ADVERT, _generate_advertisement(units)) end -- notify that a peripheral was remounted @@ -334,86 +349,107 @@ function rtu.comms(version, modem, local_port, server_port, conn_watchdog) local protocol = packet.scada_frame.protocol() if protocol == PROTOCOLS.MODBUS_TCP then - local return_code = false + if rtu_state.linked then + local return_code = false ---@diagnostic disable-next-line: param-type-mismatch - local reply = modbus.reply__neg_ack(packet) + local reply = modbus.reply__neg_ack(packet) - -- handle MODBUS instruction - if packet.unit_id <= #units then - local unit = units[packet.unit_id] ---@type rtu_unit_registry_entry - local unit_dbg_tag = " (unit " .. packet.unit_id .. ")" + -- handle MODBUS instruction + if packet.unit_id <= #units then + local unit = units[packet.unit_id] ---@type rtu_unit_registry_entry + local unit_dbg_tag = " (unit " .. packet.unit_id .. ")" - if unit.name == "redstone_io" then - -- immediately execute redstone RTU requests + if unit.name == "redstone_io" then + -- immediately execute redstone RTU requests ---@diagnostic disable-next-line: param-type-mismatch - return_code, reply = unit.modbus_io.handle_packet(packet) - if not return_code then - log.warning("requested MODBUS operation failed" .. unit_dbg_tag) + return_code, reply = unit.modbus_io.handle_packet(packet) + if not return_code then + log.warning("requested MODBUS operation failed" .. unit_dbg_tag) + end + else + -- check validity then pass off to unit comms thread +---@diagnostic disable-next-line: param-type-mismatch + return_code, reply = unit.modbus_io.check_request(packet) + if return_code then + -- check if there are more than 3 active transactions + -- still queue the packet, but this may indicate a problem + if unit.pkt_queue.length() > 3 then +---@diagnostic disable-next-line: param-type-mismatch + reply = modbus.reply__srv_device_busy(packet) + log.debug("queueing new request with " .. unit.pkt_queue.length() .. + " transactions already in the queue" .. unit_dbg_tag) + end + + -- always queue the command even if busy + unit.pkt_queue.push_packet(packet) + else + log.warning("cannot perform requested MODBUS operation" .. unit_dbg_tag) + end end else - -- check validity then pass off to unit comms thread + -- unit ID out of range? ---@diagnostic disable-next-line: param-type-mismatch - return_code, reply = unit.modbus_io.check_request(packet) - if return_code then - -- check if there are more than 3 active transactions - -- still queue the packet, but this may indicate a problem - if unit.pkt_queue.length() > 3 then ----@diagnostic disable-next-line: param-type-mismatch - reply = modbus.reply__srv_device_busy(packet) - log.debug("queueing new request with " .. unit.pkt_queue.length() .. - " transactions already in the queue" .. unit_dbg_tag) - end - - -- always queue the command even if busy - unit.pkt_queue.push_packet(packet) - else - log.warning("cannot perform requested MODBUS operation" .. unit_dbg_tag) - end + reply = modbus.reply__gw_unavailable(packet) + log.error("received MODBUS packet for non-existent unit") end - else - -- unit ID out of range? ----@diagnostic disable-next-line: param-type-mismatch - reply = modbus.reply__gw_unavailable(packet) - log.error("received MODBUS packet for non-existent unit") - end - public.send_modbus(reply) + public.send_modbus(reply) + else + log.debug("discarding MODBUS packet before linked") + end elseif protocol == PROTOCOLS.SCADA_MGMT then -- SCADA management packet - if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then - -- keep alive request received, echo back + if packet.type == SCADA_MGMT_TYPES.ESTABLISH then if packet.length == 1 then - local timestamp = packet.data[1] - local trip_time = util.time() - timestamp + local est_ack = packet.data[1] - if trip_time > 500 then - log.warning("RTU KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + if est_ack == ESTABLISH_ACK.ALLOW then + -- establish allowed + rtu_state.linked = true + self.r_seq_num = nil + println_ts("supervisor connection established") + log.info("supervisor connection established") + else + -- establish denied + public.unlink(rtu_state) + println_ts("supervisor connection") + log.warning("supervisor connection denied by remote host") end - - -- log.debug("RTU RTT = " .. trip_time .. "ms") - - _send_keep_alive_ack(timestamp) else - log.debug("SCADA keep alive packet length mismatch") + log.debug("SCADA_MGMT establish packet length mismatch") + end + elseif rtu_state.linked then + if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 and type(packet.data[1]) == "number" then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 500 then + log.warning("RTU KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + end + + -- log.debug("RTU RTT = " .. trip_time .. "ms") + + _send_keep_alive_ack(timestamp) + else + log.debug("SCADA_MGMT keep alive packet length/type mismatch") + end + elseif packet.type == SCADA_MGMT_TYPES.CLOSE then + -- close connection + self.conn_watchdog.cancel() + public.unlink(rtu_state) + println_ts("server connection closed by remote host") + log.warning("server connection closed by remote host") + elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then + -- request for capabilities again + public.send_advertisement(units) + else + -- not supported + log.warning("received unsupported SCADA_MGMT message type " .. packet.type) end - elseif packet.type == SCADA_MGMT_TYPES.CLOSE then - -- close connection - self.conn_watchdog.cancel() - public.unlink(rtu_state) - println_ts("server connection closed by remote host") - log.warning("server connection closed by remote host") - elseif packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then - -- acknowledgement - rtu_state.linked = true - self.r_seq_num = nil - println_ts("supervisor connection established") - log.info("supervisor connection established") - elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then - -- request for capabilities again - public.send_advertisement(units) else - -- not supported - log.warning("RTU got unexpected SCADA message type " .. packet.type) + log.debug("discarding non-link SCADA_MGMT packet before linked") end else -- should be unreachable assuming packet is from parse_packet() diff --git a/rtu/startup.lua b/rtu/startup.lua index e50c93f..03dfbaa 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -24,7 +24,7 @@ local sna_rtu = require("rtu.dev.sna_rtu") local sps_rtu = require("rtu.dev.sps_rtu") local turbinev_rtu = require("rtu.dev.turbinev_rtu") -local RTU_VERSION = "beta-v0.9.2" +local RTU_VERSION = "beta-v0.9.3" local rtu_t = types.rtu_t diff --git a/rtu/threads.lua b/rtu/threads.lua index fd08759..3d5b690 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -11,7 +11,7 @@ local sna_rtu = require("rtu.dev.sna_rtu") local sps_rtu = require("rtu.dev.sps_rtu") local turbinev_rtu = require("rtu.dev.turbinev_rtu") -local modbus = require("rtu.modbus") +local modbus = require("rtu.modbus") local threads = {} @@ -58,10 +58,10 @@ function threads.thread__main(smem) -- start next clock timer loop_clock.start() - -- period tick, if we are not linked send advertisement + -- period tick, if we are not linked send establish request if not rtu_state.linked then -- advertise units - rtu_comms.send_advertisement(units) + rtu_comms.send_establish(units) end elseif event == "modem_message" then -- got a packet @@ -93,7 +93,9 @@ function threads.thread__main(smem) -- we are going to let the PPM prevent crashes -- return fault flags/codes to MODBUS queries local unit = units[i] - println_ts("lost the " .. unit.type .. " on interface " .. unit.name) + println_ts(util.c("lost the ", unit.type, " on interface ", unit.name)) + log.warning(util.c("lost the ", unit.type, " unit peripheral on interface ", unit.name)) + break end end end diff --git a/scada-common/comms.lua b/scada-common/comms.lua index 16092ee..adf58c4 100644 --- a/scada-common/comms.lua +++ b/scada-common/comms.lua @@ -12,6 +12,8 @@ local rtu_t = types.rtu_t local insert = table.insert +comms.version = "1.0.0" + ---@alias PROTOCOLS integer local PROTOCOLS = { MODBUS_TCP = 0, -- our "MODBUS TCP"-esque protocol @@ -21,43 +23,49 @@ local PROTOCOLS = { COORD_API = 4 -- data/control packets for pocket computers to/from coordinators } ----@alias RPLC_TYPES integer -local RPLC_TYPES = { - LINK_REQ = 0, -- linking requests - STATUS = 1, -- reactor/system status - MEK_STRUCT = 2, -- mekanism build structure - MEK_BURN_RATE = 3, -- set burn rate - RPS_ENABLE = 4, -- enable reactor - RPS_SCRAM = 5, -- SCRAM reactor (manual request) - RPS_ASCRAM = 6, -- SCRAM reactor (automatic request) - RPS_STATUS = 7, -- RPS status - RPS_ALARM = 8, -- RPS alarm broadcast - RPS_RESET = 9 -- clear RPS trip (if in bad state, will trip immediately) +---@alias DEVICE_TYPES integer +local DEVICE_TYPES = { + PLC = 0, -- PLC device type for establish + RTU = 1, -- RTU device type for establish + SV = 2, -- supervisor device type for establish + CRDN = 3 -- coordinator device type for establish } ----@alias RPLC_LINKING integer -local RPLC_LINKING = { +---@alias RPLC_TYPES integer +local RPLC_TYPES = { + STATUS = 0, -- reactor/system status + MEK_STRUCT = 1, -- mekanism build structure + MEK_BURN_RATE = 2, -- set burn rate + RPS_ENABLE = 3, -- enable reactor + RPS_SCRAM = 4, -- SCRAM reactor (manual request) + RPS_ASCRAM = 5, -- SCRAM reactor (automatic request) + RPS_STATUS = 6, -- RPS status + RPS_ALARM = 7, -- RPS alarm broadcast + RPS_RESET = 8 -- clear RPS trip (if in bad state, will trip immediately) +} + +---@alias SCADA_MGMT_TYPES integer +local SCADA_MGMT_TYPES = { + ESTABLISH = 0, -- establish new connection + KEEP_ALIVE = 1, -- keep alive packet w/ RTT + CLOSE = 2, -- close a connection + RTU_ADVERT = 3, -- RTU capability advertisement + RTU_DEV_REMOUNT = 4 -- RTU multiblock possbily changed (formed, unformed) due to PPM remount +} + +---@alias ESTABLISH_ACK integer +local ESTABLISH_ACK = { ALLOW = 0, -- link approved DENY = 1, -- link denied COLLISION = 2 -- link denied due to existing active link } ----@alias SCADA_MGMT_TYPES integer -local SCADA_MGMT_TYPES = { - KEEP_ALIVE = 0, -- keep alive packet w/ RTT - CLOSE = 1, -- close a connection - REMOTE_LINKED = 2, -- remote device linked - RTU_ADVERT = 3, -- RTU capability advertisement - RTU_DEV_REMOUNT = 4 -- RTU multiblock possbily changed (formed, unformed) due to PPM remount -} - ---@alias SCADA_CRDN_TYPES integer local SCADA_CRDN_TYPES = { - ESTABLISH = 0, -- initial greeting - STRUCT_BUILDS = 1, -- mekanism structure builds - UNIT_STATUSES = 2, -- state of reactor units - COMMAND_UNIT = 3, -- command a reactor unit - ALARM = 4 -- alarm signaling + STRUCT_BUILDS = 0, -- mekanism structure builds + UNIT_STATUSES = 1, -- state of reactor units + COMMAND_UNIT = 2, -- command a reactor unit + ALARM = 3 -- alarm signaling } ---@alias CRDN_COMMANDS integer @@ -86,8 +94,9 @@ local RTU_UNIT_TYPES = { } comms.PROTOCOLS = PROTOCOLS +comms.DEVICE_TYPES = DEVICE_TYPES comms.RPLC_TYPES = RPLC_TYPES -comms.RPLC_LINKING = RPLC_LINKING +comms.ESTABLISH_ACK = ESTABLISH_ACK comms.SCADA_MGMT_TYPES = SCADA_MGMT_TYPES comms.SCADA_CRDN_TYPES = SCADA_CRDN_TYPES comms.CRDN_COMMANDS = CRDN_COMMANDS @@ -286,8 +295,7 @@ function comms.rplc_packet() -- check that type is known local function _rplc_type_valid() - return self.type == RPLC_TYPES.LINK_REQ or - self.type == RPLC_TYPES.STATUS or + return self.type == RPLC_TYPES.STATUS or self.type == RPLC_TYPES.MEK_STRUCT or self.type == RPLC_TYPES.MEK_BURN_RATE or self.type == RPLC_TYPES.RPS_ENABLE or @@ -384,7 +392,8 @@ function comms.mgmt_packet() -- check that type is known local function _scada_type_valid() - return self.type == SCADA_MGMT_TYPES.KEEP_ALIVE or + return self.type == SCADA_MGMT_TYPES.ESTABLISH or + self.type == SCADA_MGMT_TYPES.KEEP_ALIVE or self.type == SCADA_MGMT_TYPES.CLOSE or self.type == SCADA_MGMT_TYPES.REMOTE_LINKED or self.type == SCADA_MGMT_TYPES.RTU_ADVERT or @@ -472,8 +481,7 @@ function comms.crdn_packet() -- check that type is known local function _crdn_type_valid() - return self.type == SCADA_CRDN_TYPES.ESTABLISH or - self.type == SCADA_CRDN_TYPES.STRUCT_BUILDS or + return self.type == SCADA_CRDN_TYPES.STRUCT_BUILDS or self.type == SCADA_CRDN_TYPES.UNIT_STATUSES or self.type == SCADA_CRDN_TYPES.COMMAND_UNIT or self.type == SCADA_CRDN_TYPES.ALARM diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index 29a3e07..c4a6c8f 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -317,11 +317,9 @@ end ---@param local_port integer ---@param remote_port integer ---@param advertisement table +---@param version string ---@return integer session_id -function svsessions.establish_rtu_session(local_port, remote_port, advertisement) - -- pull version from advertisement - local version = table.remove(advertisement, 1) - +function svsessions.establish_rtu_session(local_port, remote_port, advertisement, version) ---@class rtu_session_struct local rtu_s = { s_type = "rtu", diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 6a3abea..f213501 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -13,7 +13,7 @@ local svsessions = require("supervisor.session.svsessions") local config = require("supervisor.config") local supervisor = require("supervisor.supervisor") -local SUPERVISOR_VERSION = "beta-v0.7.4" +local SUPERVISOR_VERSION = "beta-v0.7.5" local print = util.print local println = util.println diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index ec85623..b86e0e6 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -7,8 +7,9 @@ local svsessions = require("supervisor.session.svsessions") local supervisor = {} local PROTOCOLS = comms.PROTOCOLS +local DEVICE_TYPES = comms.DEVICE_TYPES +local ESTABLISH_ACK = comms.ESTABLISH_ACK local RPLC_TYPES = comms.RPLC_TYPES -local RPLC_LINKING = comms.RPLC_LINKING local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES @@ -51,27 +52,14 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen -- link modem to svsessions svsessions.init(self.modem, num_reactors, cooling_conf) - -- send PLC link request response + -- send an establish request response to a PLC/RTU ---@param dest integer ---@param msg table - local function _send_plc_linking(seq_id, dest, msg) - local s_pkt = comms.scada_packet() - local r_pkt = comms.rplc_packet() - - r_pkt.make(0, RPLC_TYPES.LINK_REQ, msg) - s_pkt.make(seq_id, PROTOCOLS.RPLC, r_pkt.raw_sendable()) - - self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) - end - - -- send RTU advertisement response - ---@param seq_id integer - ---@param dest integer - local function _send_remote_linked(seq_id, dest) + local function _send_dev_establish(seq_id, dest, msg) local s_pkt = comms.scada_packet() local m_pkt = comms.mgmt_packet() - m_pkt.make(SCADA_MGMT_TYPES.REMOTE_LINKED, {}) + m_pkt.make(SCADA_MGMT_TYPES.ESTABLISH, msg) s_pkt.make(seq_id, PROTOCOLS.SCADA_MGMT, m_pkt.raw_sendable()) self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) @@ -80,23 +68,13 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen -- send coordinator connection establish response ---@param seq_id integer ---@param dest integer - ---@param allow boolean - local function _send_crdn_establish(seq_id, dest, allow) + ---@param msg table + local function _send_crdn_establish(seq_id, dest, msg) local s_pkt = comms.scada_packet() - local c_pkt = comms.crdn_packet() + local c_pkt = comms.mgmt_packet() - local config = { false } - - if allow then - config = { self.num_reactors } - for i = 1, #cooling_conf do - table.insert(config, cooling_conf[i].BOILERS) - table.insert(config, cooling_conf[i].TURBINES) - end - end - - c_pkt.make(SCADA_CRDN_TYPES.ESTABLISH, config) - s_pkt.make(seq_id, PROTOCOLS.SCADA_CRDN, c_pkt.raw_sendable()) + c_pkt.make(SCADA_MGMT_TYPES.ESTABLISH, msg) + s_pkt.make(seq_id, PROTOCOLS.SCADA_MGMT, c_pkt.raw_sendable()) self.modem.transmit(dest, self.coord_listen, s_pkt.raw_sendable()) end @@ -187,46 +165,12 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen -- reactor PLC packet if session ~= nil then - if packet.type == RPLC_TYPES.LINK_REQ then - -- new device on this port? that's a collision - log.debug("PLC_LNK: request from existing connection received on " .. r_port .. ", responding with collision") - _send_plc_linking(packet.scada_frame.seq_num() + 1, r_port, { RPLC_LINKING.COLLISION }) - else - -- pass the packet onto the session handler - session.in_queue.push_packet(packet) - end + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) else - local next_seq_id = packet.scada_frame.seq_num() + 1 - - -- unknown session, is this a linking request? - if packet.type == RPLC_TYPES.LINK_REQ then - if packet.length == 2 then - -- this is a linking request - if type(packet.data[1]) == "number" and type(packet.data[2]) == "string" then - local plc_id = svsessions.establish_plc_session(l_port, r_port, packet.data[1], packet.data[2]) - if plc_id == false then - -- reactor already has a PLC assigned - log.debug(util.c("PLC_LNK: assignment collision with reactor ", packet.data[1])) - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.COLLISION }) - else - -- got an ID; assigned to a reactor successfully - println(util.c("connected to reactor ", packet.data[1], " PLC (", packet.data[2], ") [:", r_port, "]")) - log.debug("PLC_LNK: allowed for device at " .. r_port) - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.ALLOW }) - end - else - log.debug("PLC_LNK: bad parameter types") - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.DENY }) - end - else - log.debug("PLC_LNK: new linking packet length mismatch") - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.DENY }) - end - else - -- force a re-link - log.debug("PLC_LNK: no session but not a link, force relink") - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.DENY }) - end + -- unknown session, force a re-link + log.debug("PLC_EST: no session but not an establish, force relink") + _send_dev_establish((packet.scada_frame.seq_num() + 1), r_port, { ESTABLISH_ACK.DENY }) end elseif protocol == PROTOCOLS.SCADA_MGMT then -- look for an associated session @@ -236,20 +180,63 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen if session ~= nil then -- pass the packet onto the session handler session.in_queue.push_packet(packet) - elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then - if packet.length >= 1 then - -- this is an RTU advertisement for a new session - local rtu_version = packet.data[1] + elseif packet.type == SCADA_MGMT_TYPES.ESTABLISH then + -- establish a new session + local next_seq_id = packet.scada_frame.seq_num() + 1 - -- note: this function mutates packet.data - svsessions.establish_rtu_session(l_port, r_port, packet.data) + -- validate packet and continue + if packet.length >= 3 and type(packet.data[1]) == "string" and type(packet.data[2]) == "string" then + local comms_v = packet.data[1] + local firmware_v = packet.data[2] + local dev_type = packet.data[3] - println(util.c("connected to RTU (", rtu_version, ") [:", r_port, "]")) - log.debug("RTU_ADVERT: linked " .. r_port) + if comms_v ~= comms.version then + log.debug(util.c("dropping establish packet with incorrect comms version v", comms_v, + " (expected v", comms.version, ")")) + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + return + end - _send_remote_linked(packet.scada_frame.seq_num() + 1, r_port) + if dev_type == DEVICE_TYPES.PLC then + -- PLC linking request + if packet.length == 4 and type(packet.data[4]) == "number" then + local reactor_id = packet.data[4] + local plc_id = svsessions.establish_plc_session(l_port, r_port, reactor_id, firmware_v) + + if plc_id == false then + -- reactor already has a PLC assigned + log.debug(util.c("PLC_ESTABLISH: assignment collision with reactor ", reactor_id)) + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.COLLISION }) + else + -- got an ID; assigned to a reactor successfully + println(util.c("reactor ", reactor_id, " PLC (", firmware_v, ") [:", r_port, "] \xbb connected")) + log.debug(util.c("PLC_ESTABLISH: link accepted for PLC (", firmware_v, ") [:", r_port, "] connected with session ID ", plc_id)) + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.ALLOW }) + end + else + log.debug("PLC_ESTABLISH: packet length mismatch/bad parameter type") + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + end + elseif dev_type == DEVICE_TYPES.RTU then + if packet.length == 4 then + -- this is an RTU advertisement for a new session + local rtu_advert = packet.data[4] + local s_id = svsessions.establish_rtu_session(l_port, r_port, rtu_advert, firmware_v) + + println(util.c("RTU (", firmware_v, ") [:", r_port, "] \xbb connected")) + log.debug(util.c("RTU_ESTABLISH: RTU (",firmware_v, ") [:", r_port, "] connected with session ID ", s_id)) + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.ALLOW }) + else + log.debug("RTU_ESTABLISH: packet length mismatch") + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + end + else + log.debug(util.c("illegal establish packet for device ", dev_type, " on PLC/RTU listening channel")) + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + end else - log.debug("RTU_ADVERT: advertisement packet empty") + log.debug("invalid establish packet (on PLC/RTU listening channel)") + _send_dev_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) end else -- any other packet should be session related, discard it @@ -268,6 +255,48 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen if session ~= nil then -- pass the packet onto the session handler session.in_queue.push_packet(packet) + elseif packet.type == SCADA_MGMT_TYPES.ESTABLISH then + -- establish a new session + local next_seq_id = packet.scada_frame.seq_num() + 1 + + -- validate packet and continue + if packet.length >= 3 and type(packet.data[1]) == "string" and type(packet.data[2]) == "string" then + local comms_v = packet.data[1] + local firmware_v = packet.data[2] + local dev_type = packet.data[3] + + if comms_v ~= comms.version then + log.debug(util.c("dropping establish packet with incorrect comms version v", comms_v, + " (expected v", comms.version, ")")) + _send_crdn_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + return + elseif dev_type ~= DEVICE_TYPES.CRDN then + log.debug(util.c("illegal establish packet for device ", dev_type, " on CRDN listening channel")) + _send_crdn_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + return + end + + -- this is an attempt to establish a new session + local s_id = svsessions.establish_coord_session(l_port, r_port, firmware_v) + + if s_id ~= false then + local config = { self.num_reactors } + for i = 1, #cooling_conf do + table.insert(config, cooling_conf[i].BOILERS) + table.insert(config, cooling_conf[i].TURBINES) + end + + println(util.c("coordinator (",firmware_v, ") [:", r_port, "] \xbb connected")) + log.debug(util.c("CRDN_ESTABLISH: coordinator (",firmware_v, ") [:", r_port, "] connected with session ID ", s_id)) + _send_crdn_establish(next_seq_id, r_port, { ESTABLISH_ACK.ALLOW, config }) + else + log.debug("CRDN_ESTABLISH: denied new coordinator due to already being connected to another coordinator") + _send_crdn_establish(next_seq_id, r_port, { ESTABLISH_ACK.COLLISION }) + end + else + log.debug("CRDN_ESTABLISH: establish packet length mismatch") + _send_crdn_establish(next_seq_id, r_port, { ESTABLISH_ACK.DENY }) + end else -- any other packet should be session related, discard it log.debug(r_port .. "->" .. l_port .. ": discarding SCADA_MGMT packet without a known session") @@ -277,22 +306,6 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen if session ~= nil then -- pass the packet onto the session handler session.in_queue.push_packet(packet) - elseif packet.type == SCADA_CRDN_TYPES.ESTABLISH then - if packet.length == 1 then - -- this is an attempt to establish a new session - local s_id = svsessions.establish_coord_session(l_port, r_port, packet.data[1]) - - if s_id ~= false then - println(util.c("connected to coordinator (", packet.data[1], ") [:", r_port, "]")) - log.debug("CRDN_ESTABLISH: connected to " .. r_port) - else - log.debug("CRDN_ESTABLISH: denied new coordinator due to already being connected to another coordinator") - end - - _send_crdn_establish(packet.scada_frame.seq_num() + 1, r_port, (s_id ~= false)) - else - log.debug("CRDN_ESTABLISH: establish packet length mismatch") - end else -- any other packet should be session related, discard it log.debug(r_port .. "->" .. l_port .. ": discarding SCADA_CRDN packet without a known session")