From 9c034c366b7bd30c4d8f8c89bd06b92c506f5a8b Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Tue, 17 May 2022 17:16:04 -0400 Subject: [PATCH] #8 base class for RTU unit sessions, handle MODBUS error responses --- supervisor/session/rtu/boiler.lua | 152 ++++++++++-------------- supervisor/session/rtu/emachine.lua | 97 ++++++--------- supervisor/session/rtu/redstone.lua | 120 ++++++++----------- supervisor/session/rtu/turbine.lua | 137 +++++++++------------ supervisor/session/rtu/txnctrl.lua | 13 +- supervisor/session/rtu/unit_session.lua | 132 ++++++++++++++++++++ supervisor/startup.lua | 2 +- 7 files changed, 349 insertions(+), 304 deletions(-) create mode 100644 supervisor/session/rtu/unit_session.lua diff --git a/supervisor/session/rtu/boiler.lua b/supervisor/session/rtu/boiler.lua index 2b8047f..c45e22c 100644 --- a/supervisor/session/rtu/boiler.lua +++ b/supervisor/session/rtu/boiler.lua @@ -2,20 +2,23 @@ local comms = require("scada-common.comms") local log = require("scada-common.log") local types = require("scada-common.types") -local txnctrl = require("supervisor.session.rtu.txnctrl") +local unit_session = require("supervisor.session.rtu.unit_session") local boiler = {} -local PROTOCOLS = comms.PROTOCOLS local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES local MODBUS_FCODE = types.MODBUS_FCODE -local rtu_t = types.rtu_t - local TXN_TYPES = { - BUILD = 0, - STATE = 1, - TANKS = 2 + BUILD = 1, + STATE = 2, + TANKS = 3 +} + +local TXN_TAGS = { + "boiler.build", + "boiler.state", + "boiler.tanks", } local PERIODICS = { @@ -38,11 +41,7 @@ boiler.new = function (session_id, advert, out_queue) local log_tag = "session.rtu(" .. session_id .. ").boiler(" .. advert.index .. "): " local self = { - uid = advert.index, - reactor = advert.reactor, - out_q = out_queue, - transaction_controller = txnctrl.new(), - connected = true, + session = unit_session.new(log_tag, advert, out_queue, TXN_TAGS), has_build = false, periodics = { next_build_req = 0, @@ -81,36 +80,26 @@ boiler.new = function (session_id, advert, out_queue) } } - ---@class unit_session - local public = {} + local public = self.session.get() -- PRIVATE FUNCTIONS -- - local _send_request = function (txn_type, f_code, register_range) - local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(txn_type) - - m_pkt.make(txn_id, self.uid, f_code, register_range) - - self.out_q.push_packet(m_pkt) - end - -- query the build of the device local _request_build = function () -- read input registers 1 through 7 (start = 1, count = 7) - _send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 7 }) + self.session.send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 7 }) end -- query the state of the device local _request_state = function () -- read input registers 8 through 9 (start = 8, count = 2) - _send_request(TXN_TYPES.STATE, MODBUS_FCODE.READ_INPUT_REGS, { 8, 2 }) + self.session.send_request(TXN_TYPES.STATE, MODBUS_FCODE.READ_INPUT_REGS, { 8, 2 }) end -- query the tanks of the device local _request_tanks = function () -- read input registers 10 through 21 (start = 10, count = 12) - _send_request(TXN_TYPES.TANKS, MODBUS_FCODE.READ_INPUT_REGS, { 10, 12 }) + self.session.send_request(TXN_TYPES.TANKS, MODBUS_FCODE.READ_INPUT_REGS, { 10, 12 }) end -- PUBLIC FUNCTIONS -- @@ -118,76 +107,62 @@ boiler.new = function (session_id, advert, out_queue) -- handle a packet ---@param m_pkt modbus_frame public.handle_packet = function (m_pkt) - local success = false - - if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then - if m_pkt.unit_id == self.uid then - local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) - if txn_type == TXN_TYPES.BUILD then - -- build response - if m_pkt.length == 7 then - self.db.build.boil_cap = m_pkt.data[1] - self.db.build.steam_cap = m_pkt.data[2] - self.db.build.water_cap = m_pkt.data[3] - self.db.build.hcoolant_cap = m_pkt.data[4] - self.db.build.ccoolant_cap = m_pkt.data[5] - self.db.build.superheaters = m_pkt.data[6] - self.db.build.max_boil_rate = m_pkt.data[7] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (boiler.build)") - end - elseif txn_type == TXN_TYPES.STATE then - -- state response - if m_pkt.length == 2 then - self.db.state.temperature = m_pkt.data[1] - self.db.state.boil_rate = m_pkt.data[2] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (boiler.state)") - end - elseif txn_type == TXN_TYPES.TANKS then - -- tanks response - if m_pkt.length == 12 then - self.db.tanks.steam = m_pkt.data[1] - self.db.tanks.steam_need = m_pkt.data[2] - self.db.tanks.steam_fill = m_pkt.data[3] - self.db.tanks.water = m_pkt.data[4] - self.db.tanks.water_need = m_pkt.data[5] - self.db.tanks.water_fill = m_pkt.data[6] - self.db.tanks.hcool = m_pkt.data[7] - self.db.tanks.hcool_need = m_pkt.data[8] - self.db.tanks.hcool_fill = m_pkt.data[9] - self.db.tanks.ccool = m_pkt.data[10] - self.db.tanks.ccool_need = m_pkt.data[11] - self.db.tanks.ccool_fill = m_pkt.data[12] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (boiler.tanks)") - end - elseif txn_type == nil then - log.error(log_tag .. "unknown transaction reply") - else - log.error(log_tag .. "unknown transaction type " .. txn_type) - end + local txn_type = self.session.try_resolve(m_pkt.txn_id) + if txn_type == false then + -- nothing to do + elseif txn_type == TXN_TYPES.BUILD then + -- build response + -- load in data if correct length + if m_pkt.length == 7 then + self.db.build.boil_cap = m_pkt.data[1] + self.db.build.steam_cap = m_pkt.data[2] + self.db.build.water_cap = m_pkt.data[3] + self.db.build.hcoolant_cap = m_pkt.data[4] + self.db.build.ccoolant_cap = m_pkt.data[5] + self.db.build.superheaters = m_pkt.data[6] + self.db.build.max_boil_rate = m_pkt.data[7] else - log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + log.debug(log_tag .. "MODBUS transaction reply length mismatch (boiler.build)") end + elseif txn_type == TXN_TYPES.STATE then + -- state response + -- load in data if correct length + if m_pkt.length == 2 then + self.db.state.temperature = m_pkt.data[1] + self.db.state.boil_rate = m_pkt.data[2] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (boiler.state)") + end + elseif txn_type == TXN_TYPES.TANKS then + -- tanks response + -- load in data if correct length + if m_pkt.length == 12 then + self.db.tanks.steam = m_pkt.data[1] + self.db.tanks.steam_need = m_pkt.data[2] + self.db.tanks.steam_fill = m_pkt.data[3] + self.db.tanks.water = m_pkt.data[4] + self.db.tanks.water_need = m_pkt.data[5] + self.db.tanks.water_fill = m_pkt.data[6] + self.db.tanks.hcool = m_pkt.data[7] + self.db.tanks.hcool_need = m_pkt.data[8] + self.db.tanks.hcool_fill = m_pkt.data[9] + self.db.tanks.ccool = m_pkt.data[10] + self.db.tanks.ccool_need = m_pkt.data[11] + self.db.tanks.ccool_fill = m_pkt.data[12] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (boiler.tanks)") + end + elseif txn_type == nil then + log.error(log_tag .. "unknown transaction reply") else - log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + log.error(log_tag .. "unknown transaction type " .. txn_type) end - - return success end - public.get_uid = function () return self.uid end - public.get_reactor = function () return self.reactor end - public.get_db = function () return self.db end - - public.close = function () self.connected = false end - public.is_connected = function () return self.connected end - -- update this runner ---@param time_now integer milliseconds public.update = function (time_now) - if not self.periodics.has_build and self.next_build_req <= time_now then + if not self.periodics.has_build and self.periodics.next_build_req <= time_now then _request_build() self.periodics.next_build_req = time_now + PERIODICS.BUILD end @@ -203,6 +178,9 @@ boiler.new = function (session_id, advert, out_queue) end end + -- get the unit session database + public.get_db = function () return self.db end + return public end diff --git a/supervisor/session/rtu/emachine.lua b/supervisor/session/rtu/emachine.lua index 8e2a2e8..c9c6481 100644 --- a/supervisor/session/rtu/emachine.lua +++ b/supervisor/session/rtu/emachine.lua @@ -2,19 +2,21 @@ local comms = require("scada-common.comms") local log = require("scada-common.log") local types = require("scada-common.types") -local txnctrl = require("supervisor.session.rtu.txnctrl") +local unit_session = require("supervisor.session.rtu.unit_session") local emachine = {} -local PROTOCOLS = comms.PROTOCOLS local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES local MODBUS_FCODE = types.MODBUS_FCODE -local rtu_t = types.rtu_t - local TXN_TYPES = { - BUILD = 0, - STORAGE = 1 + BUILD = 1, + STORAGE = 2 +} + +local TXN_TAGS = { + "emachine.build", + "emachine.storage" } local PERIODICS = { @@ -36,12 +38,7 @@ emachine.new = function (session_id, advert, out_queue) local log_tag = "session.rtu(" .. session_id .. ").emachine(" .. advert.index .. "): " local self = { - uid = advert.index, - -- reactor = advert.reactor, - reactor = 0, - out_q = out_queue, - transaction_controller = txnctrl.new(), - connected = true, + session = unit_session.new(log_tag, advert, out_queue, TXN_TAGS), has_build = false, periodics = { next_build_req = 0, @@ -60,30 +57,20 @@ emachine.new = function (session_id, advert, out_queue) } } - ---@class unit_session - local public = {} + local public = self.session.get() -- PRIVATE FUNCTIONS -- - local _send_request = function (txn_type, f_code, register_range) - local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(txn_type) - - m_pkt.make(txn_id, self.uid, f_code, register_range) - - self.out_q.push_packet(m_pkt) - end - -- query the build of the device local _request_build = function () -- read input register 1 (start = 1, count = 1) - _send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 1 }) + self.session.send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 1 }) end -- query the state of the energy storage local _request_storage = function () -- read input registers 2 through 4 (start = 2, count = 3) - _send_request(TXN_TYPES.STORAGE, MODBUS_FCODE.READ_INPUT_REGS, { 2, 3 }) + self.session.send_request(TXN_TYPES.STORAGE, MODBUS_FCODE.READ_INPUT_REGS, { 2, 3 }) end -- PUBLIC FUNCTIONS -- @@ -91,49 +78,32 @@ emachine.new = function (session_id, advert, out_queue) -- handle a packet ---@param m_pkt modbus_frame public.handle_packet = function (m_pkt) - local success = false - - if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then - if m_pkt.unit_id == self.uid then - local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) - if txn_type == TXN_TYPES.BUILD then - -- build response - if m_pkt.length == 1 then - self.db.build.max_energy = m_pkt.data[1] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (emachine.build)") - end - elseif txn_type == TXN_TYPES.STORAGE then - -- storage response - if m_pkt.length == 3 then - self.db.storage.energy = m_pkt.data[1] - self.db.storage.energy_need = m_pkt.data[2] - self.db.storage.energy_fill = m_pkt.data[3] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (emachine.storage)") - end - elseif txn_type == nil then - log.error(log_tag .. "unknown transaction reply") - else - log.error(log_tag .. "unknown transaction type " .. txn_type) - end + local txn_type = self.session.try_resolve(m_pkt.txn_id) + if txn_type == false then + -- nothing to do + elseif txn_type == TXN_TYPES.BUILD then + -- build response + if m_pkt.length == 1 then + self.db.build.max_energy = m_pkt.data[1] else - log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + log.debug(log_tag .. "MODBUS transaction reply length mismatch (emachine.build)") end + elseif txn_type == TXN_TYPES.STORAGE then + -- storage response + if m_pkt.length == 3 then + self.db.storage.energy = m_pkt.data[1] + self.db.storage.energy_need = m_pkt.data[2] + self.db.storage.energy_fill = m_pkt.data[3] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (emachine.storage)") + end + elseif txn_type == nil then + log.error(log_tag .. "unknown transaction reply") else - log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + log.error(log_tag .. "unknown transaction type " .. txn_type) end - - return success end - public.get_uid = function () return self.uid end - public.get_reactor = function () return self.reactor end - public.get_db = function () return self.db end - - public.close = function () self.connected = false end - public.is_connected = function () return self.connected end - -- update this runner ---@param time_now integer milliseconds public.update = function (time_now) @@ -148,6 +118,9 @@ emachine.new = function (session_id, advert, out_queue) end end + -- get the unit session database + public.get_db = function () return self.db end + return public end diff --git a/supervisor/session/rtu/redstone.lua b/supervisor/session/rtu/redstone.lua index b5f44ea..57848d9 100644 --- a/supervisor/session/rtu/redstone.lua +++ b/supervisor/session/rtu/redstone.lua @@ -5,11 +5,10 @@ local rsio = require("scada-common.rsio") local types = require("scada-common.types") local util = require("scada-common.util") -local txnctrl = require("supervisor.session.rtu.txnctrl") +local unit_session = require("supervisor.session.rtu.unit_session") local redstone = {} -local PROTOCOLS = comms.PROTOCOLS local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES local MODBUS_FCODE = types.MODBUS_FCODE @@ -18,8 +17,6 @@ local IO_LVL = rsio.IO_LVL local IO_DIR = rsio.IO_DIR local IO_MODE = rsio.IO_MODE -local rtu_t = types.rtu_t - local RS_RTU_S_CMDS = { } @@ -31,10 +28,17 @@ redstone.RS_RTU_S_CMDS = RS_RTU_S_CMDS redstone.RS_RTU_S_DATA = RS_RTU_S_DATA local TXN_TYPES = { - DI_READ = 0, - COIL_WRITE = 1, - INPUT_REG_READ = 2, - HOLD_REG_WRITE = 3 + DI_READ = 1, + COIL_WRITE = 2, + INPUT_REG_READ = 3, + HOLD_REG_WRITE = 4 +} + +local TXN_TAGS = { + "redstone.di_read", + "redstone.coil_write", + "redstone.input_reg_write", + "redstone.hold_reg_write" } local PERIODICS = { @@ -55,12 +59,7 @@ redstone.new = function (session_id, advert, out_queue) local log_tag = "session.rtu(" .. session_id .. ").redstone(" .. advert.index .. "): " local self = { - uid = advert.index, - reactor = advert.reactor, - in_q = mqueue.new(), - out_q = out_queue, - transaction_controller = txnctrl.new(), - connected = true, + session = unit_session.new(log_tag, advert, out_queue, TXN_TAGS), has_di = false, has_ai = false, periodics = { @@ -76,8 +75,7 @@ redstone.new = function (session_id, advert, out_queue) db = {} } - ---@class unit_session - local public = {} + local public = self.session.get() -- INITIALIZE -- @@ -110,36 +108,26 @@ redstone.new = function (session_id, advert, out_queue) self.db[channel] = IO_LVL.LOW end - -- PRIVATE FUNCTIONS -- - local _send_request = function (txn_type, f_code, parameters) - local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(txn_type) - - m_pkt.make(txn_id, self.uid, f_code, parameters) - - self.out_q.push_packet(m_pkt) - end - -- query discrete inputs local _request_discrete_inputs = function () - _send_request(TXN_TYPES.DI_READ, MODBUS_FCODE.READ_DISCRETE_INPUTS, { 1, #self.io_list.digital_in }) + self.session.send_request(TXN_TYPES.DI_READ, MODBUS_FCODE.READ_DISCRETE_INPUTS, { 1, #self.io_list.digital_in }) end -- query input registers local _request_input_registers = function () - _send_request(TXN_TYPES.INPUT_REG_READ, MODBUS_FCODE.READ_INPUT_REGS, { 1, #self.io_list.analog_in }) + self.session.send_request(TXN_TYPES.INPUT_REG_READ, MODBUS_FCODE.READ_INPUT_REGS, { 1, #self.io_list.analog_in }) end -- write coil output local _write_coil = function (coil, value) - _send_request(TXN_TYPES.COIL_WRITE, MODBUS_FCODE.WRITE_MUL_COILS, { coil, value }) + self.session.send_request(TXN_TYPES.COIL_WRITE, MODBUS_FCODE.WRITE_MUL_COILS, { coil, value }) end -- write holding register output local _write_holding_register = function (reg, value) - _send_request(TXN_TYPES.HOLD_REG_WRITE, MODBUS_FCODE.WRITE_MUL_HOLD_REGS, { reg, value }) + self.session.send_request(TXN_TYPES.HOLD_REG_WRITE, MODBUS_FCODE.WRITE_MUL_HOLD_REGS, { reg, value }) end -- PUBLIC FUNCTIONS -- @@ -147,55 +135,40 @@ redstone.new = function (session_id, advert, out_queue) -- handle a packet ---@param m_pkt modbus_frame public.handle_packet = function (m_pkt) - local success = false - - if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then - if m_pkt.unit_id == self.uid then - local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) - if txn_type == TXN_TYPES.DI_READ then - -- discrete input read response - if m_pkt.length == #self.io_list.digital_in then - for i = 1, m_pkt.length do - local channel = self.io_list.digital_in[i] - local value = m_pkt.data[i] - self.db[channel] = value - end - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (redstone.discrete_input_read)") - end - elseif txn_type == TXN_TYPES.INPUT_REG_READ then - -- input register read response - if m_pkt.length == #self.io_list.analog_in then - for i = 1, m_pkt.length do - local channel = self.io_list.analog_in[i] - local value = m_pkt.data[i] - self.db[channel] = value - end - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (redstone.input_reg_read)") - end - elseif txn_type == nil then - log.error(log_tag .. "unknown transaction reply") - else - log.error(log_tag .. "unknown transaction type " .. txn_type) + local txn_type = self.session.try_resolve(m_pkt.txn_id) + if txn_type == false then + -- nothing to do + elseif txn_type == TXN_TYPES.DI_READ then + -- discrete input read response + if m_pkt.length == #self.io_list.digital_in then + for i = 1, m_pkt.length do + local channel = self.io_list.digital_in[i] + local value = m_pkt.data[i] + self.db[channel] = value end else - log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + log.debug(log_tag .. "MODBUS transaction reply length mismatch (redstone.di_read)") end + elseif txn_type == TXN_TYPES.INPUT_REG_READ then + -- input register read response + if m_pkt.length == #self.io_list.analog_in then + for i = 1, m_pkt.length do + local channel = self.io_list.analog_in[i] + local value = m_pkt.data[i] + self.db[channel] = value + end + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (redstone.input_reg_read)") + end + elseif txn_type == TXN_TYPES.COIL_WRITE or txn_type == TXN_TYPES.HOLD_REG_WRITE then + -- successful acknowledgement + elseif txn_type == nil then + log.error(log_tag .. "unknown transaction reply") else - log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + log.error(log_tag .. "unknown transaction type " .. txn_type) end - - return success end - public.get_uid = function () return self.uid end - public.get_reactor = function () return self.reactor end - public.get_db = function () return self.db end - - public.close = function () self.connected = false end - public.is_connected = function () return self.connected end - -- update this runner ---@param time_now integer milliseconds public.update = function (time_now) @@ -270,6 +243,9 @@ redstone.new = function (session_id, advert, out_queue) end end + -- get the unit session database + public.get_db = function () return self.db end + return public, self.in_q end diff --git a/supervisor/session/rtu/turbine.lua b/supervisor/session/rtu/turbine.lua index e6d41c2..d45d244 100644 --- a/supervisor/session/rtu/turbine.lua +++ b/supervisor/session/rtu/turbine.lua @@ -2,21 +2,24 @@ local comms = require("scada-common.comms") local log = require("scada-common.log") local types = require("scada-common.types") -local txnctrl = require("supervisor.session.rtu.txnctrl") +local unit_session = require("supervisor.session.rtu.unit_session") local turbine = {} -local PROTOCOLS = comms.PROTOCOLS local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES local DUMPING_MODE = types.DUMPING_MODE local MODBUS_FCODE = types.MODBUS_FCODE -local rtu_t = types.rtu_t - local TXN_TYPES = { - BUILD = 0, - STATE = 1, - TANKS = 2 + BUILD = 1, + STATE = 2, + TANKS = 3 +} + +local TXN_TAGS = { + "turbine.build", + "turbine.state", + "turbine.tanks", } local PERIODICS = { @@ -39,11 +42,7 @@ turbine.new = function (session_id, advert, out_queue) local log_tag = "session.rtu(" .. session_id .. ").turbine(" .. advert.index .. "): " local self = { - uid = advert.index, - reactor = advert.reactor, - out_q = out_queue, - transaction_controller = txnctrl.new(), - connected = true, + session = unit_session.new(log_tag, advert, out_queue, TXN_TAGS), has_build = false, periodics = { next_build_req = 0, @@ -77,36 +76,26 @@ turbine.new = function (session_id, advert, out_queue) } } - ---@class unit_session - local public = {} + local public = self.session.get() -- PRIVATE FUNCTIONS -- - local _send_request = function (txn_type, f_code, register_range) - local m_pkt = comms.modbus_packet() - local txn_id = self.transaction_controller.create(txn_type) - - m_pkt.make(txn_id, self.uid, f_code, register_range) - - self.out_q.push_packet(m_pkt) - end - -- query the build of the device local _request_build = function () -- read input registers 1 through 9 (start = 1, count = 9) - _send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 9 }) + self.session.send_request(TXN_TYPES.BUILD, MODBUS_FCODE.READ_INPUT_REGS, { 1, 9 }) end -- query the state of the device local _request_state = function () -- read input registers 10 through 13 (start = 10, count = 4) - _send_request(TXN_TYPES.STATE, MODBUS_FCODE.READ_INPUT_REGS, { 10, 4 }) + self.session.send_request(TXN_TYPES.STATE, MODBUS_FCODE.READ_INPUT_REGS, { 10, 4 }) end -- query the tanks of the device local _request_tanks = function () -- read input registers 14 through 16 (start = 14, count = 3) - _send_request(TXN_TYPES.TANKS, MODBUS_FCODE.READ_INPUT_REGS, { 14, 3 }) + self.session.send_request(TXN_TYPES.TANKS, MODBUS_FCODE.READ_INPUT_REGS, { 14, 3 }) end -- PUBLIC FUNCTIONS -- @@ -114,67 +103,50 @@ turbine.new = function (session_id, advert, out_queue) -- handle a packet ---@param m_pkt modbus_frame public.handle_packet = function (m_pkt) - local success = false - - if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then - if m_pkt.unit_id == self.uid then - local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) - if txn_type == TXN_TYPES.BUILD then - -- build response - if m_pkt.length == 9 then - self.db.build.blades = m_pkt.data[1] - self.db.build.coils = m_pkt.data[2] - self.db.build.vents = m_pkt.data[3] - self.db.build.dispersers = m_pkt.data[4] - self.db.build.condensers = m_pkt.data[5] - self.db.build.steam_cap = m_pkt.data[6] - self.db.build.max_flow_rate = m_pkt.data[7] - self.db.build.max_production = m_pkt.data[8] - self.db.build.max_water_output = m_pkt.data[9] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.build)") - end - elseif txn_type == TXN_TYPES.STATE then - -- state response - if m_pkt.length == 4 then - self.db.state.flow_rate = m_pkt.data[1] - self.db.state.prod_rate = m_pkt.data[2] - self.db.state.steam_input_rate = m_pkt.data[3] - self.db.state.dumping_mode = m_pkt.data[4] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.state)") - end - elseif txn_type == TXN_TYPES.TANKS then - -- tanks response - if m_pkt.length == 3 then - self.db.tanks.steam = m_pkt.data[1] - self.db.tanks.steam_need = m_pkt.data[2] - self.db.tanks.steam_fill = m_pkt.data[3] - else - log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.tanks)") - end - elseif txn_type == nil then - log.error(log_tag .. "unknown transaction reply") - else - log.error(log_tag .. "unknown transaction type " .. txn_type) - end + local txn_type = self.session.try_resolve(m_pkt.txn_id) + if txn_type == false then + -- nothing to do + elseif txn_type == TXN_TYPES.BUILD then + -- build response + if m_pkt.length == 9 then + self.db.build.blades = m_pkt.data[1] + self.db.build.coils = m_pkt.data[2] + self.db.build.vents = m_pkt.data[3] + self.db.build.dispersers = m_pkt.data[4] + self.db.build.condensers = m_pkt.data[5] + self.db.build.steam_cap = m_pkt.data[6] + self.db.build.max_flow_rate = m_pkt.data[7] + self.db.build.max_production = m_pkt.data[8] + self.db.build.max_water_output = m_pkt.data[9] else - log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.build)") end + elseif txn_type == TXN_TYPES.STATE then + -- state response + if m_pkt.length == 4 then + self.db.state.flow_rate = m_pkt.data[1] + self.db.state.prod_rate = m_pkt.data[2] + self.db.state.steam_input_rate = m_pkt.data[3] + self.db.state.dumping_mode = m_pkt.data[4] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.state)") + end + elseif txn_type == TXN_TYPES.TANKS then + -- tanks response + if m_pkt.length == 3 then + self.db.tanks.steam = m_pkt.data[1] + self.db.tanks.steam_need = m_pkt.data[2] + self.db.tanks.steam_fill = m_pkt.data[3] + else + log.debug(log_tag .. "MODBUS transaction reply length mismatch (turbine.tanks)") + end + elseif txn_type == nil then + log.error(log_tag .. "unknown transaction reply") else - log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + log.error(log_tag .. "unknown transaction type " .. txn_type) end - - return success end - public.get_uid = function () return self.uid end - public.get_reactor = function () return self.reactor end - public.get_db = function () return self.db end - - public.close = function () self.connected = false end - public.is_connected = function () return self.connected end - -- update this runner ---@param time_now integer milliseconds public.update = function (time_now) @@ -194,6 +166,9 @@ turbine.new = function (session_id, advert, out_queue) end end + -- get the unit session database + public.get_db = function () return self.db end + return public end diff --git a/supervisor/session/rtu/txnctrl.lua b/supervisor/session/rtu/txnctrl.lua index 2d6be2e..a19dee6 100644 --- a/supervisor/session/rtu/txnctrl.lua +++ b/supervisor/session/rtu/txnctrl.lua @@ -6,7 +6,7 @@ local util = require("scada-common.util") local txnctrl = {} -local TIMEOUT = 3000 -- 3000ms max wait +local TIMEOUT = 2000 -- 2000ms max wait -- create a new transaction controller txnctrl.new = function () @@ -63,6 +63,17 @@ txnctrl.new = function () return txn_type end + -- renew a transaction by re-inserting it with its ID and type + ---@param txn_id integer + ---@param txn_type integer + public.renew = function (txn_id, txn_type) + insert(self.list, { + txn_id = txn_id, + txn_type = txn_type, + expiry = util.time() + TIMEOUT + }) + end + -- close timed-out transactions public.cleanup = function () local now = util.time() diff --git a/supervisor/session/rtu/unit_session.lua b/supervisor/session/rtu/unit_session.lua new file mode 100644 index 0000000..67f83c0 --- /dev/null +++ b/supervisor/session/rtu/unit_session.lua @@ -0,0 +1,132 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local types = require("scada-common.types") + +local txnctrl = require("supervisor.session.rtu.txnctrl") + +local unit_session = {} + +local PROTOCOLS = comms.PROTOCOLS +local MODBUS_FCODE = types.MODBUS_FCODE +local MODBUS_EXCODE = types.MODBUS_EXCODE + +-- create a new unit session runner +---@param log_tag string +---@param advert rtu_advertisement +---@param out_queue mqueue +---@param txn_tags table +unit_session.new = function (log_tag, advert, out_queue, txn_tags) + local self = { + log_tag = log_tag, + txn_tags = txn_tags, + uid = advert.index, + reactor = advert.reactor, + out_q = out_queue, + transaction_controller = txnctrl.new(), + connected = true, + device_fail = false + } + + ---@class _unit_session + local protected = {} + + ---@class unit_session + local public = {} + + -- PROTECTED FUNCTIONS -- + + -- send a MODBUS message, creating a transaction in the process + ---@param txn_type integer transaction type + ---@param f_code MODBUS_FCODE function code + ---@param register_param table register range or register and values + protected.send_request = function (txn_type, f_code, register_param) + local m_pkt = comms.modbus_packet() + local txn_id = self.transaction_controller.create(txn_type) + + m_pkt.make(txn_id, self.uid, f_code, register_param) + + self.out_q.push_packet(m_pkt) + end + + -- try to resolve a MODBUS transaction + ---@param m_pkt modbus_frame MODBUS packet + ---@return integer|false txn_type transaction type or false on error/busy + protected.try_resolve = function (m_pkt) + if m_pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then + if m_pkt.unit_id == self.uid then + local txn_type = self.transaction_controller.resolve(m_pkt.txn_id) + local txn_tag = " (" .. self.txn_tags[txn_type] .. ")" + + if bit.band(m_pkt.func_code, MODBUS_FCODE.ERROR_FLAG) ~= 0 then + -- transaction incomplete or failed + local ex = m_pkt.data[1] + if ex == MODBUS_EXCODE.ILLEGAL_FUNCTION then + log.error(log_tag .. "MODBUS: illegal function" .. txn_tag) + elseif ex == MODBUS_EXCODE.ILLEGAL_DATA_ADDR then + log.error(log_tag .. "MODBUS: illegal data address" .. txn_tag) + elseif ex == MODBUS_EXCODE.SERVER_DEVICE_FAIL then + if self.device_fail then + log.debug(log_tag .. "MODBUS: repeated device failure" .. txn_tag) + else + self.device_fail = true + log.warning(log_tag .. "MODBUS: device failure" .. txn_tag) + end + elseif ex == MODBUS_EXCODE.ACKNOWLEDGE then + -- will have to wait on reply, renew the transaction + self.transaction_controller.renew(m_pkt.txn_id, txn_type) + elseif ex == MODBUS_EXCODE.SERVER_DEVICE_BUSY then + -- will have to wait on reply, renew the transaction + self.transaction_controller.renew(m_pkt.txn_id, txn_type) + log.debug(log_tag .. "MODBUS: device busy" .. txn_tag) + elseif ex == MODBUS_EXCODE.NEG_ACKNOWLEDGE then + -- general failure + log.error(log_tag .. "MODBUS: negative acknowledge (bad request)" .. txn_tag) + elseif ex == MODBUS_EXCODE.GATEWAY_PATH_UNAVAILABLE then + -- RTU gateway has no known unit with the given ID + log.error(log_tag .. "MODBUS: gateway path unavailable (unknown unit)" .. txn_tag) + elseif ex ~= nil then + -- unsupported exception code + log.debug(log_tag .. "MODBUS: unsupported error " .. ex .. txn_tag) + else + -- nil exception code + log.debug(log_tag .. "MODBUS: nil exception code" .. txn_tag) + end + else + -- clear device fail flag + self.device_fail = false + + -- no error, return the transaction type + return txn_type + end + else + log.error(log_tag .. "wrong unit ID: " .. m_pkt.unit_id, true) + end + else + log.error(log_tag .. "illegal packet type " .. m_pkt.scada_frame.protocol(), true) + end + + -- error or transaction in progress, return false + return false + end + + -- get the public interface + protected.get = function () return public end + + -- PUBLIC FUNCTIONS -- + + -- get the unit ID + public.get_uid = function () return self.uid end + -- get the reactor ID + public.get_reactor = function () return self.reactor end + + -- close this unit + public.close = function () self.connected = false end + -- check if this unit is connected + public.is_connected = function () return self.connected end + -- check if this unit is faulted + public.is_faulted = function () return self.device_fail end + + return protected +end + +return unit_session diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 53f3e1a..d647f99 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -13,7 +13,7 @@ local svsessions = require("supervisor.session.svsessions") local config = require("supervisor.config") local supervisor = require("supervisor.supervisor") -local SUPERVISOR_VERSION = "alpha-v0.3.7" +local SUPERVISOR_VERSION = "alpha-v0.3.8" local print = util.print local println = util.println