From fc39588b2e1e5470bec3a2bef07cdfb4c171ea7c Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Fri, 13 May 2022 09:45:11 -0400 Subject: [PATCH] #8 RTU session for emachine and turbine, RTU session creation, adjusted sequence number logic in svsessions --- supervisor/session/rtu.lua | 124 +++++++++++++----- supervisor/session/rtu/boiler.lua | 42 +++--- supervisor/session/rtu/emachine.lua | 149 +++++++++++++++++++++ supervisor/session/rtu/turbine.lua | 195 ++++++++++++++++++++++++++++ supervisor/session/svsessions.lua | 31 ++++- supervisor/startup.lua | 2 +- supervisor/supervisor.lua | 48 +++++-- 7 files changed, 525 insertions(+), 66 deletions(-) create mode 100644 supervisor/session/rtu/emachine.lua create mode 100644 supervisor/session/rtu/turbine.lua diff --git a/supervisor/session/rtu.lua b/supervisor/session/rtu.lua index bb86b05..30844dd 100644 --- a/supervisor/session/rtu.lua +++ b/supervisor/session/rtu.lua @@ -5,12 +5,14 @@ local util = require("scada-common.util") -- supervisor rtu sessions (svrs) local svrs_boiler = require("supervisor.session.rtu.boiler") +local svrs_emachine = require("supervisor.session.rtu.emachine") +local svrs_turbine = require("supervisor.session.rtu.turbine") local rtu = {} local PROTOCOLS = comms.PROTOCOLS -local RPLC_TYPES = comms.RPLC_TYPES local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES +local RTU_ADVERT_TYPES = comms.RTU_ADVERT_TYPES local print = util.print local println = util.println @@ -21,27 +23,76 @@ local PERIODICS = { KEEP_ALIVE = 2.0 } -rtu.new_session = function (id, in_queue, out_queue) +-- create a new RTU session +---@param id integer +---@param in_queue mqueue +---@param out_queue mqueue +---@param advertisement table +rtu.new_session = function (id, in_queue, out_queue, advertisement) local log_header = "rtu_session(" .. id .. "): " local self = { id = id, in_q = in_queue, out_q = out_queue, - commanded_state = false, - commanded_burn_rate = 0.0, - ramping_rate = false, + advert = advertisement, -- connection properties seq_num = 0, r_seq_num = nil, connected = true, - received_struct = false, - received_status_cache = false, rtu_conn_watchdog = util.new_watchdog(3), - last_rtt = 0 + last_rtt = 0, + units = {} } + ---@class rtu_session + local public = {} + + -- parse the recorded advertisement + local _parse_advertisement = function () + self.units = {} + for i = 1, #self.advert do + local unit = nil + + ---@type rtu_advertisement + local unit_advert = { + type = self.advert[i][0], + index = self.advert[i][1], + reactor = self.advert[i][2], + rsio = self.advert[i][3] + } + + local u_type = unit_advert.type + + -- create unit by type + if u_type == RTU_ADVERT_TYPES.REDSTONE then + + elseif u_type == RTU_ADVERT_TYPES.BOILER then + unit = svrs_boiler.new(self.id, unit_advert, self.out_q) + elseif u_type == RTU_ADVERT_TYPES.BOILER_VALVE then + -- @todo Mekanism 10.1+ + elseif u_type == RTU_ADVERT_TYPES.TURBINE then + unit = svrs_turbine.new(self.id, unit_advert, self.out_q) + elseif u_type == RTU_ADVERT_TYPES.TURBINE_VALVE then + -- @todo Mekanism 10.1+ + elseif u_type == RTU_ADVERT_TYPES.EMACHINE then + unit = svrs_emachine.new(self.id, unit_advert, self.out_q) + elseif u_type == RTU_ADVERT_TYPES.IMATRIX then + -- @todo Mekanism 10.1+ + end + + if unit ~= nil then + table.insert(self.units, unit) + else + self.units = {} + log.error(log_header .. "bad advertisement; encountered unsupported RTU type") + break + end + end + end + -- send a MODBUS TCP packet + ---@param m_pkt modbus_packet local _send_modbus = function (m_pkt) local s_pkt = comms.scada_packet() s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) @@ -50,6 +101,8 @@ rtu.new_session = function (id, in_queue, out_queue) end -- send a SCADA management packet + ---@param msg_type SCADA_MGMT_TYPES + ---@param msg table local _send_mgmt = function (msg_type, msg) local s_pkt = comms.scada_packet() local m_pkt = comms.mgmt_packet() @@ -62,6 +115,7 @@ rtu.new_session = function (id, in_queue, out_queue) end -- handle a packet + ---@param pkt modbus_frame|mgmt_frame local _handle_packet = function (pkt) -- check sequence number if self.r_seq_num == nil then @@ -78,6 +132,10 @@ rtu.new_session = function (id, in_queue, out_queue) -- process packet if pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then + if self.units[pkt.unit_id] ~= nil then + local unit = self.units[pkt.unit_id] ---@type rtu_session_unit + unit.handle_packet(pkt) + end elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then @@ -102,9 +160,8 @@ rtu.new_session = function (id, in_queue, out_queue) self.connected = false elseif pkt.type == SCADA_MGMT_TYPES.RTU_ADVERT then -- RTU unit advertisement - for i = 1, pkt.length do - local unit = pkt.data[i] ---@type rtu_advertisement - end + self.advert = pkt.data + _parse_advertisement() else log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type) end @@ -114,15 +171,16 @@ rtu.new_session = function (id, in_queue, out_queue) -- PUBLIC FUNCTIONS -- -- get the session ID - local get_id = function () return self.id end + public.get_id = function () return self.id end -- check if a timer matches this session's watchdog - local check_wd = function (timer) + ---@param timer number + public.check_wd = function (timer) return self.rtu_conn_watchdog.is_timer(timer) end -- close the connection - local close = function () + public.close = function () self.rtu_conn_watchdog.cancel() self.connected = false _send_mgmt(SCADA_MGMT_TYPES.CLOSE, {}) @@ -131,8 +189,17 @@ rtu.new_session = function (id, in_queue, out_queue) end -- iterate the session - local iterate = function () + ---@return boolean connected + public.iterate = function () if self.connected then + ------------------ + -- update units -- + ------------------ + + for i = 1, #self.units do + self.units[i].update() + end + ------------------ -- handle queue -- ------------------ @@ -141,17 +208,17 @@ rtu.new_session = function (id, in_queue, out_queue) while self.in_q.ready() and self.connected do -- get a new message to process - local message = self.in_q.pop() + local msg = self.in_q.pop() - if message.qtype == mqueue.TYPE.PACKET then - -- handle a packet - _handle_packet(message.message) - elseif message.qtype == mqueue.TYPE.COMMAND then - -- handle instruction - local cmd = message.message - elseif message.qtype == mqueue.TYPE.DATA then - -- instruction with body - local cmd = message.message + if msg ~= nil then + if msg.qtype == mqueue.TYPE.PACKET then + -- handle a packet + _handle_packet(msg.message) + elseif msg.qtype == mqueue.TYPE.COMMAND then + -- handle instruction + elseif msg.qtype == mqueue.TYPE.DATA then + -- instruction with body + end end -- max 100ms spent processing queue @@ -191,12 +258,7 @@ rtu.new_session = function (id, in_queue, out_queue) return self.connected end - return { - get_id = get_id, - check_wd = check_wd, - close = close, - iterate = iterate - } + return public end return rtu diff --git a/supervisor/session/rtu/boiler.lua b/supervisor/session/rtu/boiler.lua index 83ecd8b..8804bda 100644 --- a/supervisor/session/rtu/boiler.lua +++ b/supervisor/session/rtu/boiler.lua @@ -1,7 +1,6 @@ local comms = require("scada-common.comms") local log = require("scada-common.log") local types = require("scada-common.types") -local util = require("scada-common.util") local txnctrl = require("supervisor.session.rtu.txnctrl") @@ -25,16 +24,17 @@ local PERIODICS = { } -- create a new boiler rtu session runner +---@param session_id integer ---@param advert rtu_advertisement ---@param out_queue mqueue -boiler.new = function (advert, out_queue) +boiler.new = function (session_id, advert, out_queue) -- type check if advert.type ~= rtu_t.boiler then - log.error("attempt to instantiate boiler RTU for non boiler type '" .. advert.type .. "'. this is a bug.") + log.error("attempt to instantiate boiler RTU for type '" .. advert.type .. "'. this is a bug.") return nil end - local log_tag = "session.rtu.boiler(" .. advert.index .. "): " + local log_tag = "session.rtu(" .. session_id .. ").boiler(" .. advert.index .. "): " local self = { uid = advert.index, @@ -69,52 +69,46 @@ boiler.new = function (advert, out_queue) water = 0, water_need = 0, water_fill = 0.0, - hcool = 0, + hcool = {}, ---@type tank_fluid hcool_need = 0, hcool_fill = 0.0, - ccool = 0, + ccool = {}, ---@type tank_fluid ccool_need = 0, ccool_fill = 0.0 } } } - ---@class rtu_session__boiler + ---@class rtu_session_unit local public = {} -- PRIVATE FUNCTIONS -- - -- query the build of the device - local _request_build = function () + local _send_request = function (txn_type, f_code, register_range) local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(TXN_TYPES.BUILD) + local txn_id = self.transaction_controller.create(txn_type) - -- read input registers 1 through 7 (start = 1, count = 7) - m_pkt.make(txn_id, self.uid, MODBUS_FCODE.READ_INPUT_REGS, { 1, 7 }) + m_pkt.make(txn_id, self.uid, f_code, register_range) self.out_q.push_packet(m_pkt) end + -- query the build of the device + local _request_build = function () + -- read input registers 1 through 7 (start = 1, count = 7) + _send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 7 }) + end + -- query the state of the device local _request_state = function () - local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(TXN_TYPES.STATE) - -- read input registers 8 through 9 (start = 8, count = 2) - m_pkt.make(txn_id, self.uid, MODBUS_FCODE.READ_INPUT_REGS, { 8, 2 }) - - self.out_q.push_packet(m_pkt) + _send_request(TXN_TYPES.STATE, MODBUS_FCODE.READ_INPUT_REGS, { 8, 2 }) end -- query the tanks of the device local _request_tanks = function () - local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(TXN_TYPES.TANKS) - -- read input registers 10 through 21 (start = 10, count = 12) - m_pkt.make(txn_id, self.uid, MODBUS_FCODE.READ_INPUT_REGS, { 10, 12 }) - - self.out_q.push_packet(m_pkt) + _send_request(TXN_TYPES.TANKS, MODBUS_FCODE.READ_INPUT_REGS, { 10, 12 }) end -- PUBLIC FUNCTIONS -- diff --git a/supervisor/session/rtu/emachine.lua b/supervisor/session/rtu/emachine.lua new file mode 100644 index 0000000..340315e --- /dev/null +++ b/supervisor/session/rtu/emachine.lua @@ -0,0 +1,149 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local types = require("scada-common.types") + +local txnctrl = require("supervisor.session.rtu.txnctrl") + +local emachine = {} + +local PROTOCOLS = comms.PROTOCOLS +local MODBUS_FCODE = types.MODBUS_FCODE + +local rtu_t = types.rtu_t + +local TXN_TYPES = { + BUILD = 0, + STORAGE = 1 +} + +local PERIODICS = { + BUILD = 1000, + STORAGE = 500 +} + +-- create a new energy machine rtu session runner +---@param session_id integer +---@param advert rtu_advertisement +---@param out_queue mqueue +emachine.new = function (session_id, advert, out_queue) + -- type check + if advert.type ~= rtu_t.energy_machine then + log.error("attempt to instantiate emachine RTU for type '" .. advert.type .. "'. this is a bug.") + return nil + end + + local log_tag = "session.rtu(" .. session_id .. ").emachine(" .. advert.index .. "): " + + local self = { + uid = advert.index, + -- reactor = advert.reactor, + reactor = 0, + out_q = out_queue, + transaction_controller = txnctrl.new(), + has_build = false, + periodics = { + next_build_req = 0, + next_storage_req = 0, + }, + ---@class emachine_session_db + db = { + build = { + max_energy = 0 + }, + storage = { + energy = 0, + energy_need = 0, + energy_fill = 0.0 + } + } + } + + ---@class rtu_session_unit + local public = {} + + -- PRIVATE FUNCTIONS -- + + local _send_request = function (txn_type, f_code, register_range) + local m_pkt = comms.modbus_packet() + local txn_id = self.transaction_controller.create(txn_type) + + m_pkt.make(txn_id, self.uid, f_code, register_range) + + self.out_q.push_packet(m_pkt) + end + + -- query the build of the device + local _request_build = function () + -- read input register 1 (start = 1, count = 1) + _send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 1 }) + end + + -- query the state of the energy storage + local _request_storage = function () + -- read input registers 2 through 4 (start = 2, count = 3) + _send_request(TXN_TYPES.STORAGE, MODBUS_FCODE.READ_INPUT_REGS, { 2, 3 }) + end + + -- PUBLIC FUNCTIONS -- + + -- handle a packet + ---@param m_pkt modbus_frame + public.handle_packet = function (m_pkt) + local success = false + + if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then + if m_pkt.unit_id == self.uid then + local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) + if txn_type == TXN_TYPES.BUILD then + -- build response + if m_pkt.length == 1 then + self.db.build.max_energy = m_pkt.data[1] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (emachine.build)") + end + elseif txn_type == TXN_TYPES.STORAGE then + -- storage response + if m_pkt.length == 3 then + self.db.storage.energy = m_pkt.data[1] + self.db.storage.energy_need = m_pkt.data[2] + self.db.storage.energy_fill = m_pkt.data[3] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (emachine.storage)") + end + elseif txn_type == nil then + log.error(log_tag .. "unknown transaction reply") + else + log.error(log_tag .. "unknown transaction type " .. txn_type) + end + else + log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + end + else + log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + end + + return success + end + + public.get_uid = function () return self.uid end + public.get_reactor = function () return self.reactor end + public.get_db = function () return self.db end + + -- update this runner + ---@param time_now integer milliseconds + public.update = function (time_now) + if not self.has_build and self.next_build_req <= time_now then + _request_build() + self.next_build_req = time_now + PERIODICS.BUILD + end + + if self.next_storage_req <= time_now then + _request_storage() + self.next_storage_req = time_now + PERIODICS.STORAGE + end + end + + return public +end + +return emachine diff --git a/supervisor/session/rtu/turbine.lua b/supervisor/session/rtu/turbine.lua new file mode 100644 index 0000000..23c3d61 --- /dev/null +++ b/supervisor/session/rtu/turbine.lua @@ -0,0 +1,195 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local types = require("scada-common.types") + +local txnctrl = require("supervisor.session.rtu.txnctrl") + +local turbine = {} + +local PROTOCOLS = comms.PROTOCOLS +local DUMPING_MODE = types.DUMPING_MODE +local MODBUS_FCODE = types.MODBUS_FCODE + +local rtu_t = types.rtu_t + +local TXN_TYPES = { + BUILD = 0, + STATE = 1, + TANKS = 2 +} + +local PERIODICS = { + BUILD = 1000, + STATE = 500, + TANKS = 1000 +} + +-- create a new turbine rtu session runner +---@param session_id integer +---@param advert rtu_advertisement +---@param out_queue mqueue +turbine.new = function (session_id, advert, out_queue) + -- type check + if advert.type ~= rtu_t.turbine then + log.error("attempt to instantiate turbine RTU for type '" .. advert.type .. "'. this is a bug.") + return nil + end + + local log_tag = "session.rtu(" .. session_id .. ").turbine(" .. advert.index .. "): " + + local self = { + uid = advert.index, + reactor = advert.reactor, + out_q = out_queue, + transaction_controller = txnctrl.new(), + has_build = false, + periodics = { + next_build_req = 0, + next_state_req = 0, + next_tanks_req = 0, + }, + ---@class turbine_session_db + db = { + build = { + blades = 0, + coils = 0, + vents = 0, + dispersers = 0, + condensers = 0, + steam_cap = 0, + max_flow_rate = 0, + max_production = 0, + max_water_output = 0 + }, + state = { + flow_rate = 0.0, + prod_rate = 0.0, + steam_input_rate = 0.0, + dumping_mode = DUMPING_MODE.IDLE ---@type DUMPING_MODE + }, + tanks = { + steam = 0, + steam_need = 0, + steam_fill = 0.0 + } + } + } + + ---@class rtu_session_unit + local public = {} + + -- PRIVATE FUNCTIONS -- + + local _send_request = function (txn_type, f_code, register_range) + local m_pkt = comms.modbus_packet() + local txn_id = self.transaction_controller.create(txn_type) + + m_pkt.make(txn_id, self.uid, f_code, register_range) + + self.out_q.push_packet(m_pkt) + end + + -- query the build of the device + local _request_build = function () + -- read input registers 1 through 9 (start = 1, count = 9) + _send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 9 }) + end + + -- query the state of the device + local _request_state = function () + -- read input registers 10 through 13 (start = 10, count = 4) + _send_request(TXN_TYPES.STATE, MODBUS_FCODE.READ_INPUT_REGS, { 10, 4 }) + end + + -- query the tanks of the device + local _request_tanks = function () + -- read input registers 14 through 16 (start = 14, count = 3) + _send_request(TXN_TYPES.TANKS, MODBUS_FCODE.READ_INPUT_REGS, { 14, 3 }) + end + + -- PUBLIC FUNCTIONS -- + + -- handle a packet + ---@param m_pkt modbus_frame + public.handle_packet = function (m_pkt) + local success = false + + if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then + if m_pkt.unit_id == self.uid then + local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) + if txn_type == TXN_TYPES.BUILD then + -- build response + if m_pkt.length == 9 then + self.db.build.blades = m_pkt.data[1] + self.db.build.coils = m_pkt.data[2] + self.db.build.vents = m_pkt.data[3] + self.db.build.dispersers = m_pkt.data[4] + self.db.build.condensers = m_pkt.data[5] + self.db.build.steam_cap = m_pkt.data[6] + self.db.build.max_flow_rate = m_pkt.data[7] + self.db.build.max_production = m_pkt.data[8] + self.db.build.max_water_output = m_pkt.data[9] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.build)") + end + elseif txn_type == TXN_TYPES.STATE then + -- state response + if m_pkt.length == 4 then + self.db.state.flow_rate = m_pkt.data[1] + self.db.state.prod_rate = m_pkt.data[2] + self.db.state.steam_input_rate = m_pkt.data[3] + self.db.state.dumping_mode = m_pkt.data[4] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.state)") + end + elseif txn_type == TXN_TYPES.TANKS then + -- tanks response + if m_pkt.length == 3 then + self.db.tanks.steam = m_pkt.data[1] + self.db.tanks.steam_need = m_pkt.data[2] + self.db.tanks.steam_fill = m_pkt.data[3] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.tanks)") + end + elseif txn_type == nil then + log.error(log_tag .. "unknown transaction reply") + else + log.error(log_tag .. "unknown transaction type " .. txn_type) + end + else + log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + end + else + log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + end + + return success + end + + public.get_uid = function () return self.uid end + public.get_reactor = function () return self.reactor end + public.get_db = function () return self.db end + + -- update this runner + ---@param time_now integer milliseconds + public.update = function (time_now) + if not self.has_build and self.next_build_req <= time_now then + _request_build() + self.next_build_req = time_now + PERIODICS.BUILD + end + + if self.next_state_req <= time_now then + _request_state() + self.next_state_req = time_now + PERIODICS.STATE + end + + if self.next_tanks_req <= time_now then + _request_tanks() + self.next_tanks_req = time_now + PERIODICS.TANKS + end + end + + return public +end + +return turbine diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index 37fc89e..635e81e 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -53,7 +53,7 @@ local function _iterate(sessions) end -- cleanly close a session ----@param session plc_session_struct +---@param session plc_session_struct|rtu_session_struct local function _shutdown(session) session.open = false session.instance.close() @@ -127,7 +127,7 @@ end -- find a session by the remote port ---@param remote_port integer ----@return plc_session_struct|nil +---@return plc_session_struct|rtu_session_struct|nil svsessions.find_session = function (remote_port) -- check RTU sessions for i = 1, #self.rtu_sessions do @@ -201,6 +201,33 @@ svsessions.establish_plc_session = function (local_port, remote_port, for_reacto end end +-- establish a new RTU session +---@param local_port integer +---@param remote_port integer +---@param advertisement table +---@return integer session_id +svsessions.establish_rtu_session = function (local_port, remote_port, advertisement) + ---@class rtu_session_struct + local rtu_s = { + open = true, + l_port = local_port, + r_port = remote_port, + in_queue = mqueue.new(), + out_queue = mqueue.new(), + instance = nil + } + + rtu_s.instance = rtu.new_session(self.next_rtu_id, rtu_s.in_queue, rtu_s.out_queue, advertisement) + table.insert(self.rtu_sessions, rtu_s) + + log.debug("established new RTU session to " .. remote_port .. " with ID " .. self.next_rtu_id) + + self.next_rtu_id = self.next_rtu_id + 1 + + -- success + return rtu_s.instance.get_id() +end + -- attempt to identify which session's watchdog timer fired ---@param timer_event number svsessions.check_all_watchdogs = function (timer_event) diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 5923887..b559bf0 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -11,7 +11,7 @@ local svsessions = require("supervisor.session.svsessions") local config = require("supervisor.config") local supervisor = require("supervisor.supervisor") -local SUPERVISOR_VERSION = "alpha-v0.3.5" +local SUPERVISOR_VERSION = "alpha-v0.3.6" local print = util.print local println = util.println diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index a534882..7924e9f 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -26,7 +26,6 @@ local println_ts = util.println_ts ---@param coord_listen integer supervisor.comms = function (num_reactors, modem, dev_listen, coord_listen) local self = { - ln_seq_num = 0, num_reactors = num_reactors, modem = modem, dev_listen = dev_listen, @@ -59,15 +58,27 @@ supervisor.comms = function (num_reactors, modem, dev_listen, coord_listen) -- send PLC link request responses ---@param dest integer ---@param msg table - local _send_plc_linking = function (dest, msg) + local _send_plc_linking = function (seq_id, dest, msg) local s_pkt = comms.scada_packet() local r_pkt = comms.rplc_packet() r_pkt.make(0, RPLC_TYPES.LINK_REQ, msg) - s_pkt.make(self.ln_seq_num, PROTOCOLS.RPLC, r_pkt.raw_sendable()) + s_pkt.make(seq_id, PROTOCOLS.RPLC, r_pkt.raw_sendable()) + + self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) + end + + -- send RTU advertisement responses + ---@param seq_id integer + ---@param dest integer + local _send_remote_linked = function (seq_id, dest) + local s_pkt = comms.scada_packet() + local m_pkt = comms.mgmt_packet() + + m_pkt.make(SCADA_MGMT_TYPES.REMOTE_LINKED, {}) + s_pkt.make(seq_id, PROTOCOLS.SCADA_MGMT, m_pkt.raw_sendable()) self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) - self.ln_seq_num = self.ln_seq_num + 1 end -- PUBLIC FUNCTIONS -- @@ -143,18 +154,27 @@ supervisor.comms = function (num_reactors, modem, dev_listen, coord_listen) if protocol == PROTOCOLS.MODBUS_TCP then -- MODBUS response + if session ~= nil then + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) + else + -- any other packet should be session related, discard it + log.debug("discarding MODBUS_TCP packet without a known session") + end elseif protocol == PROTOCOLS.RPLC then -- reactor PLC packet if session ~= nil then if packet.type == RPLC_TYPES.LINK_REQ then -- new device on this port? that's a collision log.debug("PLC_LNK: request from existing connection received on " .. r_port .. ", responding with collision") - _send_plc_linking(r_port, { RPLC_LINKING.COLLISION }) + _send_plc_linking(packet.scada_frame.seq_num() + 1, r_port, { RPLC_LINKING.COLLISION }) else -- pass the packet onto the session handler session.in_queue.push_packet(packet) end else + local next_seq_id = packet.scada_frame.seq_num() + 1 + -- unknown session, is this a linking request? if packet.type == RPLC_TYPES.LINK_REQ then if packet.length == 1 then @@ -163,12 +183,12 @@ supervisor.comms = function (num_reactors, modem, dev_listen, coord_listen) if plc_id == false then -- reactor already has a PLC assigned log.debug("PLC_LNK: assignment collision with reactor " .. packet.data[1]) - _send_plc_linking(r_port, { RPLC_LINKING.COLLISION }) + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.COLLISION }) else -- got an ID; assigned to a reactor successfully println("connected to reactor " .. packet.data[1] .. " PLC (port " .. r_port .. ")") log.debug("PLC_LNK: allowed for device at " .. r_port) - _send_plc_linking(r_port, { RPLC_LINKING.ALLOW }) + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.ALLOW }) end else log.debug("PLC_LNK: new linking packet length mismatch") @@ -176,7 +196,7 @@ supervisor.comms = function (num_reactors, modem, dev_listen, coord_listen) else -- force a re-link log.debug("PLC_LNK: no session but not a link, force relink") - _send_plc_linking(r_port, { RPLC_LINKING.DENY }) + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.DENY }) end end elseif protocol == PROTOCOLS.SCADA_MGMT then @@ -184,6 +204,18 @@ supervisor.comms = function (num_reactors, modem, dev_listen, coord_listen) if session ~= nil then -- pass the packet onto the session handler session.in_queue.push_packet(packet) + else + -- is this an RTU advertisement? + if packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then + local rtu_id = svsessions.establish_rtu_session(l_port, r_port, packet.data) + + println("connected to RTU (port " .. r_port .. ")") + log.debug("RTU_ADVERT: linked " .. r_port) + _send_remote_linked(packet.scada_frame.seq_num() + 1, r_port) + else + -- any other packet should be session related, discard it + log.debug("discarding SCADA_MGMT packet without a known session") + end end else log.debug("illegal packet type " .. protocol .. " on device listening channel")