From ea17ba41febb7c1288a6494fa75ed5cb2f2529a6 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Thu, 7 Jul 2022 00:34:42 -0400 Subject: [PATCH] #74 supervisor-coordinator comms establish --- coordinator/apisessions.lua | 9 ++ coordinator/coordinator.lua | 71 +++++++--- coordinator/startup.lua | 2 +- graphics/element.lua | 5 - scada-common/comms.lua | 40 +++--- scada-common/log.lua | 2 +- scada-common/mqueue.lua | 2 +- supervisor/session/coordinator.lua | 207 +++++++++++++++++++++++++++++ supervisor/session/svsessions.lua | 28 ++++ supervisor/startup.lua | 5 +- supervisor/supervisor.lua | 72 ++++++++-- 11 files changed, 379 insertions(+), 64 deletions(-) diff --git a/coordinator/apisessions.lua b/coordinator/apisessions.lua index 2141e77..3c14c08 100644 --- a/coordinator/apisessions.lua +++ b/coordinator/apisessions.lua @@ -4,4 +4,13 @@ local apisessions = {} function apisessions.handle_packet(packet) end +function apisessions.check_all_watchdogs() +end + +function apisessions.close_all() +end + +function apisessions.free_all_closed() +end + return apisessions diff --git a/coordinator/coordinator.lua b/coordinator/coordinator.lua index 4954323..78e6a1d 100644 --- a/coordinator/coordinator.lua +++ b/coordinator/coordinator.lua @@ -17,7 +17,7 @@ local println_ts = util.println_ts local PROTOCOLS = comms.PROTOCOLS local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES -local COORD_TYPES = comms.COORD_TYPES +local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES -- request the user to select a monitor ---@param names table available monitors @@ -209,16 +209,16 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa _open_channels() -- send a packet to the supervisor - ---@param msg_type SCADA_MGMT_TYPES|COORD_TYPES + ---@param msg_type SCADA_MGMT_TYPES|SCADA_CRDN_TYPES ---@param msg table local function _send_sv(protocol, msg_type, msg) local s_pkt = comms.scada_packet() - local pkt = nil ---@type mgmt_packet|coord_packet + local pkt = nil ---@type mgmt_packet|crdn_packet if protocol == PROTOCOLS.SCADA_MGMT then pkt = comms.mgmt_packet() - elseif protocol == PROTOCOLS.COORD_DATA then - pkt = comms.coord_packet() + elseif protocol == PROTOCOLS.SCADA_CRDN then + pkt = comms.crdn_packet() else return end @@ -230,6 +230,17 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa self.sv_seq_num = self.sv_seq_num + 1 end + -- attempt connection establishment + local function _send_establish() + _send_sv(PROTOCOLS.SCADA_CRDN, SCADA_CRDN_TYPES.ESTABLISH, { version }) + end + + -- keep alive ack + ---@param srv_time integer + local function _send_keep_alive_ack(srv_time) + _send_sv(PROTOCOLS.SCADA_MGMT, SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() }) + end + -- PUBLIC FUNCTIONS -- -- reconnect a newly connected modem @@ -251,7 +262,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa local start = util.time_s() local terminated = false - _send_sv(PROTOCOLS.COORD_DATA, COORD_TYPES.ESTABLISH, {}) + _send_establish() clock.start() @@ -262,12 +273,12 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa if event == "timer" and clock.is_clock(p1) then -- timed out attempt, try again tick_dmesg_waiting(math.max(0, timeout_s - (util.time_s() - start))) - _send_sv(PROTOCOLS.COORD_DATA, COORD_TYPES.ESTABLISH, {}) + _send_establish() clock.start() elseif event == "modem_message" then -- handle message local packet = public.parse_packet(p1, p2, p3, p4, p5) - if packet ~= nil and packet.type == COORD_TYPES.ESTABLISH then + if packet ~= nil and packet.type == SCADA_CRDN_TYPES.ESTABLISH then public.handle_packet(packet) end elseif event == "terminate" then @@ -291,7 +302,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa ---@param reply_to integer ---@param message any ---@param distance integer - ---@return mgmt_frame|coord_frame|capi_frame|nil packet + ---@return mgmt_frame|crdn_frame|capi_frame|nil packet function public.parse_packet(side, sender, reply_to, message, distance) local pkt = nil local s_pkt = comms.scada_packet() @@ -307,10 +318,10 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa pkt = mgmt_pkt.get() end -- get as coordinator packet - elseif s_pkt.protocol() == PROTOCOLS.COORD_DATA then - local coord_pkt = comms.coord_packet() - if coord_pkt.decode(s_pkt) then - pkt = coord_pkt.get() + elseif s_pkt.protocol() == PROTOCOLS.SCADA_CRDN then + local crdn_pkt = comms.crdn_packet() + if crdn_pkt.decode(s_pkt) then + pkt = crdn_pkt.get() end -- get as coordinator API packet elseif s_pkt.protocol() == PROTOCOLS.COORD_API then @@ -327,7 +338,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa end -- handle a packet - ---@param packet mgmt_frame|coord_frame|capi_frame + ---@param packet mgmt_frame|crdn_frame|capi_frame function public.handle_packet(packet) if packet ~= nil then local protocol = packet.scada_frame.protocol() @@ -349,8 +360,8 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa sv_watchdog.feed() -- handle packet - if protocol == PROTOCOLS.COORD_DATA then - if packet.type == COORD_TYPES.ESTABLISH then + if protocol == PROTOCOLS.SCADA_CRDN then + if packet.type == SCADA_CRDN_TYPES.ESTABLISH then -- connection with supervisor established if packet.length > 1 then -- get configuration @@ -369,22 +380,38 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa -- init database structure database.init(conf) + + self.sv_linked = true else log.debug("supervisor conn establish packet length mismatch") end else log.debug("supervisor conn establish packet length mismatch") end - elseif packet.type == COORD_TYPES.QUERY_UNIT then - elseif packet.type == COORD_TYPES.QUERY_FACILITY then - elseif packet.type == COORD_TYPES.COMMAND_UNIT then - elseif packet.type == COORD_TYPES.ALARM then + elseif packet.type == SCADA_CRDN_TYPES.QUERY_UNIT then + elseif packet.type == SCADA_CRDN_TYPES.QUERY_FACILITY then + elseif packet.type == SCADA_CRDN_TYPES.COMMAND_UNIT then + elseif packet.type == SCADA_CRDN_TYPES.ALARM then else - log.warning("received unknown COORD_DATA packet type " .. packet.type) + log.warning("received unknown SCADA_CRDN packet type " .. packet.type) end elseif protocol == PROTOCOLS.SCADA_MGMT then if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then - -- keep alive response received + -- keep alive request received, echo back + if packet.length == 1 then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 500 then + log.warning("coord KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)") + end + + -- log.debug("coord RTT = " .. trip_time .. "ms") + + _send_keep_alive_ack(timestamp) + else + log.debug("SCADA keep alive packet length mismatch") + end elseif packet.type == SCADA_MGMT_TYPES.CLOSE then -- handle session close sv_watchdog.cancel() diff --git a/coordinator/startup.lua b/coordinator/startup.lua index de36d5b..e6cadc9 100644 --- a/coordinator/startup.lua +++ b/coordinator/startup.lua @@ -13,7 +13,7 @@ local config = require("coordinator.config") local coordinator = require("coordinator.coordinator") local renderer = require("coordinator.renderer") -local COORDINATOR_VERSION = "alpha-v0.3.0" +local COORDINATOR_VERSION = "alpha-v0.3.2" local print = util.print local println = util.println diff --git a/graphics/element.lua b/graphics/element.lua index 268dcb7..5e58ec1 100644 --- a/graphics/element.lua +++ b/graphics/element.lua @@ -3,8 +3,6 @@ -- local core = require("graphics.core") -local log = require("scada-common.log") -local util = require("scada-common.util") local element = {} @@ -31,9 +29,6 @@ function element.new(args) bounds = { x1 = 1, y1 = 1, x2 = 1, y2 = 1} } - ---@fixme remove debug - log.dmesg("new " .. self.elem_type) - local protected = { window = nil, ---@type table fg_bg = core.graphics.cpair(colors.white, colors.black), diff --git a/scada-common/comms.lua b/scada-common/comms.lua index 1943baa..3d18d7f 100644 --- a/scada-common/comms.lua +++ b/scada-common/comms.lua @@ -16,7 +16,7 @@ local PROTOCOLS = { MODBUS_TCP = 0, -- our "MODBUS TCP"-esque protocol RPLC = 1, -- reactor PLC protocol SCADA_MGMT = 2, -- SCADA supervisor management, device advertisements, etc - COORD_DATA = 3, -- data/control packets for coordinators to/from supervisory controllers + SCADA_CRDN = 3, -- data/control packets for coordinators to/from supervisory controllers COORD_API = 4 -- data/control packets for pocket computers to/from coordinators } @@ -48,8 +48,8 @@ local SCADA_MGMT_TYPES = { REMOTE_LINKED = 3 -- remote device linked } ----@alias COORD_TYPES integer -local COORD_TYPES = { +---@alias SCADA_CRDN_TYPES integer +local SCADA_CRDN_TYPES = { ESTABLISH = 0, -- initial greeting QUERY_UNIT = 1, -- query the state of a unit QUERY_FACILITY = 2, -- query general facility status @@ -80,7 +80,7 @@ comms.PROTOCOLS = PROTOCOLS comms.RPLC_TYPES = RPLC_TYPES comms.RPLC_LINKING = RPLC_LINKING comms.SCADA_MGMT_TYPES = SCADA_MGMT_TYPES -comms.COORD_TYPES = COORD_TYPES +comms.SCADA_CRDN_TYPES = SCADA_CRDN_TYPES comms.RTU_UNIT_TYPES = RTU_UNIT_TYPES -- generic SCADA packet object @@ -438,7 +438,7 @@ function comms.mgmt_packet() end -- SCADA coordinator packet -function comms.coord_packet() +function comms.crdn_packet() local self = { frame = nil, raw = nil, @@ -447,20 +447,20 @@ function comms.coord_packet() data = nil } - ---@class coord_packet + ---@class crdn_packet local public = {} -- check that type is known - local function _coord_type_valid() - return self.type == COORD_TYPES.ESTABLISH or - self.type == COORD_TYPES.QUERY_UNIT or - self.type == COORD_TYPES.QUERY_FACILITY or - self.type == COORD_TYPES.COMMAND_UNIT or - self.type == COORD_TYPES.ALARM + local function _crdn_type_valid() + return self.type == SCADA_CRDN_TYPES.ESTABLISH or + self.type == SCADA_CRDN_TYPES.QUERY_UNIT or + self.type == SCADA_CRDN_TYPES.QUERY_FACILITY or + self.type == SCADA_CRDN_TYPES.COMMAND_UNIT or + self.type == SCADA_CRDN_TYPES.ALARM end -- make a coordinator packet - ---@param packet_type COORD_TYPES + ---@param packet_type SCADA_CRDN_TYPES ---@param data table function public.make(packet_type, data) if type(data) == "table" then @@ -475,7 +475,7 @@ function comms.coord_packet() insert(self.raw, data[i]) end else - log.error("comms.coord_packet.make(): data not table") + log.error("comms.crdn_packet.make(): data not table") end end @@ -486,18 +486,18 @@ function comms.coord_packet() if frame then self.frame = frame - if frame.protocol() == PROTOCOLS.COORD_DATA then + if frame.protocol() == PROTOCOLS.SCADA_CRDN then local ok = frame.length() >= 1 if ok then local data = frame.data() public.make(data[1], { table.unpack(data, 2, #data) }) - ok = _coord_type_valid() + ok = _crdn_type_valid() end return ok else - log.debug("attempted COORD_DATA parse of incorrect protocol " .. frame.protocol(), true) + log.debug("attempted SCADA_CRDN parse of incorrect protocol " .. frame.protocol(), true) return false end else @@ -511,7 +511,7 @@ function comms.coord_packet() -- get this packet as a frame with an immutable relation to this object function public.get() - ---@class coord_frame + ---@class crdn_frame local frame = { scada_frame = self.frame, type = self.type, @@ -539,7 +539,7 @@ function comms.capi_packet() ---@class capi_packet local public = {} - local function _coord_type_valid() + local function _capi_type_valid() -- @todo return false end @@ -577,7 +577,7 @@ function comms.capi_packet() if ok then local data = frame.data() public.make(data[1], { table.unpack(data, 2, #data) }) - ok = _coord_type_valid() + ok = _capi_type_valid() end return ok diff --git a/scada-common/log.lua b/scada-common/log.lua index 1e91bb6..08aa8c8 100644 --- a/scada-common/log.lua +++ b/scada-common/log.lua @@ -183,7 +183,7 @@ function log.dmesg(msg, tag, tag_color) out.write(lines[i]) end - _log(util.c("[", t_stamp, "] ", tag, " ", msg)) + _log(util.c("[", t_stamp, "] [", tag, "] ", msg)) return ts_coord end diff --git a/scada-common/mqueue.lua b/scada-common/mqueue.lua index fd80cfa..ed22535 100644 --- a/scada-common/mqueue.lua +++ b/scada-common/mqueue.lua @@ -62,7 +62,7 @@ function mqueue.new() end -- push a packet onto the queue - ---@param packet scada_packet|modbus_packet|rplc_packet|coord_packet|capi_packet + ---@param packet scada_packet|modbus_packet|rplc_packet|crdn_packet|capi_packet function public.push_packet(packet) _push(TYPE.PACKET, packet) end diff --git a/supervisor/session/coordinator.lua b/supervisor/session/coordinator.lua index afa28c0..4920b55 100644 --- a/supervisor/session/coordinator.lua +++ b/supervisor/session/coordinator.lua @@ -1,3 +1,210 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local mqueue = require("scada-common.mqueue") +local util = require("scada-common.util") + local coordinator = {} +local PROTOCOLS = comms.PROTOCOLS +local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES +local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES + +local print = util.print +local println = util.println +local print_ts = util.print_ts +local println_ts = util.println_ts + +local PERIODICS = { + KEEP_ALIVE = 2.0 +} + +-- coordinator supervisor session +---@param id integer +---@param in_queue mqueue +---@param out_queue mqueue +function coordinator.new_session(id, in_queue, out_queue) + local log_header = "crdn_session(" .. id .. "): " + + local self = { + id = id, + in_q = in_queue, + out_q = out_queue, + -- connection properties + seq_num = 0, + r_seq_num = nil, + connected = true, + conn_watchdog = util.new_watchdog(3), + last_rtt = 0, + -- periodic messages + periodics = { + last_update = 0, + keep_alive = 0 + } + } + + -- mark this coordinator session as closed, stop watchdog + local function _close() + self.conn_watchdog.cancel() + self.connected = false + end + + -- send a CRDN packet + ---@param msg_type SCADA_CRDN_TYPES + ---@param msg table + local function _send(msg_type, msg) + local s_pkt = comms.scada_packet() + local c_pkt = comms.crdn_packet() + + c_pkt.make(msg_type, msg) + s_pkt.make(self.seq_num, PROTOCOLS.SCADA_CRDN, c_pkt.raw_sendable()) + + self.out_q.push_packet(s_pkt) + self.seq_num = self.seq_num + 1 + end + + -- send a SCADA management packet + ---@param msg_type SCADA_MGMT_TYPES + ---@param msg table + local function _send_mgmt(msg_type, msg) + local s_pkt = comms.scada_packet() + local m_pkt = comms.mgmt_packet() + + m_pkt.make(msg_type, msg) + s_pkt.make(self.seq_num, PROTOCOLS.SCADA_MGMT, m_pkt.raw_sendable()) + + self.out_q.push_packet(s_pkt) + self.seq_num = self.seq_num + 1 + end + + -- handle a packet + ---@param pkt crdn_frame + local function _handle_packet(pkt) + -- check sequence number + if self.r_seq_num == nil then + self.r_seq_num = pkt.scada_frame.seq_num() + elseif self.r_seq_num >= pkt.scada_frame.seq_num() then + log.warning(log_header .. "sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. pkt.scada_frame.seq_num()) + return + else + self.r_seq_num = pkt.scada_frame.seq_num() + end + + -- feed watchdog + self.conn_watchdog.feed() + + -- process packet + if pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then + if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + -- keep alive reply + if pkt.length == 2 then + local srv_start = pkt.data[1] + local coord_send = pkt.data[2] + local srv_now = util.time() + self.last_rtt = srv_now - srv_start + + if self.last_rtt > 500 then + log.warning(log_header .. "COORD KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. "ms)") + end + + log.debug(log_header .. "COORD RTT = " .. self.last_rtt .. "ms") + log.debug(log_header .. "COORD TT = " .. (srv_now - coord_send) .. "ms") + else + log.debug(log_header .. "SCADA keep alive packet length mismatch") + end + elseif pkt.type == SCADA_MGMT_TYPES.CLOSE then + -- close the session + _close() + else + log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type) + end + elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_CRDN then + if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then + else + end + end + end + + ---@class coord_session + local public = {} + + -- get the session ID + function public.get_id() return self.id end + + -- check if a timer matches this session's watchdog + function public.check_wd(timer) + return self.conn_watchdog.is_timer(timer) and self.connected + end + + -- close the connection + function public.close() + _close() + _send_mgmt(SCADA_MGMT_TYPES.CLOSE, {}) + println("connection to coordinator #" .. self.id .. " closed by server") + log.info(log_header .. "session closed by server") + end + + -- iterate the session + ---@return boolean connected + function public.iterate() + if self.connected then + ------------------ + -- handle queue -- + ------------------ + + local handle_start = util.time() + + while self.in_q.ready() and self.connected do + -- get a new message to process + local message = self.in_q.pop() + + if message ~= nil then + if message.qtype == mqueue.TYPE.PACKET then + -- handle a packet + _handle_packet(message.message) + elseif message.qtype == mqueue.TYPE.COMMAND then + -- handle instruction + elseif message.qtype == mqueue.TYPE.DATA then + -- instruction with body + end + end + + -- max 100ms spent processing queue + if util.time() - handle_start > 100 then + log.warning(log_header .. "exceeded 100ms queue process limit") + break + end + end + + -- exit if connection was closed + if not self.connected then + println("connection to coordinator " .. self.id .. " closed by remote host") + log.info(log_header .. "session closed by remote host") + return self.connected + end + + ---------------------- + -- update periodics -- + ---------------------- + + local elapsed = util.time() - self.periodics.last_update + + local periodics = self.periodics + + -- keep alive + + periodics.keep_alive = periodics.keep_alive + elapsed + if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then + _send_mgmt(SCADA_MGMT_TYPES.KEEP_ALIVE, { util.time() }) + periodics.keep_alive = 0 + end + + self.periodics.last_update = util.time() + end + + return self.connected + end + + return public +end + return coordinator diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index 1077661..3a1ac4b 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -243,6 +243,34 @@ function svsessions.establish_rtu_session(local_port, remote_port, advertisement return rtu_s.instance.get_id() end +-- establish a new coordinator session +---@param local_port integer +---@param remote_port integer +---@param version string +---@return integer|false session_id +function svsessions.establish_coord_session(local_port, remote_port, version) + ---@class coord_session_struct + local coord_s = { + open = true, + version = version, + l_port = local_port, + r_port = remote_port, + in_queue = mqueue.new(), + out_queue = mqueue.new(), + instance = nil + } + + coord_s.instance = coordinator.new_session(self.next_coord_id, coord_s.in_queue, coord_s.out_queue) + table.insert(self.coord_sessions, coord_s) + + log.debug("established new coordinator session to " .. remote_port .. " with ID " .. self.next_coord_id) + + self.next_coord_id = self.next_coord_id + 1 + + -- success + return coord_s.instance.get_id() +end + -- attempt to identify which session's watchdog timer fired ---@param timer_event number function svsessions.check_all_watchdogs(timer_event) diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 54f4f54..cdf3426 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -13,7 +13,7 @@ local svsessions = require("supervisor.session.svsessions") local config = require("supervisor.config") local supervisor = require("supervisor.supervisor") -local SUPERVISOR_VERSION = "beta-v0.4.14" +local SUPERVISOR_VERSION = "beta-v0.5.1" local print = util.print local println = util.println @@ -72,7 +72,8 @@ if modem == nil then end -- start comms, open all channels -local superv_comms = supervisor.comms(SUPERVISOR_VERSION, config.NUM_REACTORS, modem, config.SCADA_DEV_LISTEN, config.SCADA_SV_LISTEN) +local superv_comms = supervisor.comms(SUPERVISOR_VERSION, config.NUM_REACTORS, config.REACTOR_COOLING, modem, + config.SCADA_DEV_LISTEN, config.SCADA_SV_LISTEN) -- base loop clock (6.67Hz, 3 ticks) local MAIN_CLOCK = 0.15 diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index 1256dc6..617e996 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -9,8 +9,9 @@ local supervisor = {} local PROTOCOLS = comms.PROTOCOLS local RPLC_TYPES = comms.RPLC_TYPES local RPLC_LINKING = comms.RPLC_LINKING -local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES +local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES +local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES local SESSION_TYPE = svsessions.SESSION_TYPE @@ -22,10 +23,11 @@ local println_ts = util.println_ts -- supervisory controller communications ---@param version string ---@param num_reactors integer +---@param cooling_conf table ---@param modem table ---@param dev_listen integer ---@param coord_listen integer -function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen) +function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen, coord_listen) local self = { version = version, num_reactors = num_reactors, @@ -57,7 +59,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen -- link modem to svsessions svsessions.link_modem(self.modem) - -- send PLC link request responses + -- send PLC link request response ---@param dest integer ---@param msg table local function _send_plc_linking(seq_id, dest, msg) @@ -70,7 +72,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) end - -- send RTU advertisement responses + -- send RTU advertisement response ---@param seq_id integer ---@param dest integer local function _send_remote_linked(seq_id, dest) @@ -83,6 +85,26 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) end + -- send coordinator connection establish response + ---@param seq_id integer + ---@param dest integer + local function _send_crdn_establish(seq_id, dest) + local s_pkt = comms.scada_packet() + local c_pkt = comms.crdn_packet() + + local config = { self.num_reactors } + + for i = 1, #cooling_conf do + table.insert(config, cooling_conf[i].BOILERS) + table.insert(config, cooling_conf[i].TURBINES) + end + + c_pkt.make(SCADA_CRDN_TYPES.ESTABLISH, config) + s_pkt.make(seq_id, PROTOCOLS.SCADA_CRDN, c_pkt.raw_sendable()) + + self.modem.transmit(dest, self.coord_listen, s_pkt.raw_sendable()) + end + -- PUBLIC FUNCTIONS -- -- reconnect a newly connected modem @@ -100,7 +122,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen ---@param reply_to integer ---@param message any ---@param distance integer - ---@return modbus_frame|rplc_frame|mgmt_frame|coord_frame|nil packet + ---@return modbus_frame|rplc_frame|mgmt_frame|crdn_frame|nil packet function public.parse_packet(side, sender, reply_to, message, distance) local pkt = nil local s_pkt = comms.scada_packet() @@ -128,10 +150,10 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen pkt = mgmt_pkt.get() end -- get as coordinator packet - elseif s_pkt.protocol() == PROTOCOLS.COORD_DATA then - local coord_pkt = comms.coord_packet() - if coord_pkt.decode(s_pkt) then - pkt = coord_pkt.get() + elseif s_pkt.protocol() == PROTOCOLS.SCADA_CRDN then + local crdn_pkt = comms.crdn_packet() + if crdn_pkt.decode(s_pkt) then + pkt = crdn_pkt.get() end else log.debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true) @@ -142,7 +164,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen end -- handle a packet - ---@param packet modbus_frame|rplc_frame|mgmt_frame|coord_frame + ---@param packet modbus_frame|rplc_frame|mgmt_frame|crdn_frame function public.handle_packet(packet) if packet ~= nil then local l_port = packet.scada_frame.local_port() @@ -226,7 +248,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen end else -- any other packet should be session related, discard it - log.debug("discarding SCADA_MGMT packet without a known session") + log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_MGMT packet without a known session")) end else log.debug("illegal packet type " .. protocol .. " on device listening channel") @@ -238,8 +260,34 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen if protocol == PROTOCOLS.SCADA_MGMT then -- SCADA management packet - elseif protocol == PROTOCOLS.COORD_DATA then + if session ~= nil then + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) + else + -- any other packet should be session related, discard it + log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_MGMT packet without a known session")) + end + elseif protocol == PROTOCOLS.SCADA_CRDN then -- coordinator packet + if session ~= nil then + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) + elseif packet.type == SCADA_CRDN_TYPES.ESTABLISH then + if packet.length == 1 then + -- this is an attempt to establish a new session + println(util.c("connected to coordinator [:", r_port, "]")) + + svsessions.establish_coord_session(l_port, r_port, packet.data[1]) + + log.debug("CRDN_ESTABLISH: connected to " .. r_port) + _send_crdn_establish(packet.scada_frame.seq_num() + 1, r_port) + else + log.debug("CRDN_ESTABLISH: establish packet length mismatch") + end + else + -- any other packet should be session related, discard it + log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_CRDN packet without a known session")) + end else log.debug("illegal packet type " .. protocol .. " on coordinator listening channel") end