diff --git a/supervisor/session/coordinator.lua b/supervisor/session/coordinator.lua index 4860942..a4b2e2d 100644 --- a/supervisor/session/coordinator.lua +++ b/supervisor/session/coordinator.lua @@ -1,7 +1,9 @@ -local comms = require("scada-common.comms") -local log = require("scada-common.log") -local mqueue = require("scada-common.mqueue") -local util = require("scada-common.util") +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local mqueue = require("scada-common.mqueue") +local util = require("scada-common.util") + +local svqtypes = require("supervisor.session.svqtypes") local coordinator = {} @@ -10,6 +12,9 @@ local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES local CRDN_COMMANDS = comms.CRDN_COMMANDS +local SV_Q_CMDS = svqtypes.SV_Q_CMDS +local SV_Q_DATA = svqtypes.SV_Q_DATA + local print = util.print local println = util.println local print_ts = util.print_ts @@ -175,16 +180,33 @@ function coordinator.new_session(id, in_queue, out_queue, facility_units) -- acknowledgement to coordinator receiving builds self.acks.builds = true elseif pkt.type == SCADA_CRDN_TYPES.COMMAND_UNIT then - if pkt.length > 2 then + if pkt.length >= 2 then -- get command and unit id local cmd = pkt.data[1] local uid = pkt.data[2] -- continue if valid unit id if util.is_int(uid) and uid > 0 and uid <= #self.units then - local unit = self.units[pkt.data[2]] ---@type reactor_unit - if cmd == CRDN_COMMANDS.SCRAM then - unit.scram() + if cmd == CRDN_COMMANDS.START then + self.out_q.push_data(SV_Q_DATA.START, uid) + elseif cmd == CRDN_COMMANDS.SCRAM then + self.out_q.push_data(SV_Q_DATA.SCRAM, uid) + elseif cmd == CRDN_COMMANDS.RESET_RPS then + self.out_q.push_data(SV_Q_DATA.RESET_RPS, uid) + elseif cmd == CRDN_COMMANDS.SET_BURN then + if pkt.length == 3 then + self.out_q.push_data(SV_Q_DATA.SET_BURN, { uid, pkt.data[3] }) + else + log.debug(log_header .. "CRDN command unit burn rate missing option") + end + elseif cmd == CRDN_COMMANDS.SET_WASTE then + if pkt.length == 3 then + self.out_q.push_data(SV_Q_DATA.SET_WASTE, { uid, pkt.data[3] }) + else + log.debug(log_header .. "CRDN command unit set waste missing option") + end + else + log.debug(log_header .. "CRDN command unknown") end else log.debug(log_header .. "CRDN command unit invalid") diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index 9f01ec8..68898f3 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -516,7 +516,7 @@ function plc.new_session(id, for_reactor, in_queue, out_queue) end elseif message.qtype == mqueue.TYPE.DATA then -- instruction with body - local cmd = message.message + local cmd = message.message ---@type queue_data if cmd.key == PLC_S_DATA.BURN_RATE then -- update burn rate self.commanded_burn_rate = cmd.val diff --git a/supervisor/session/rtu.lua b/supervisor/session/rtu.lua index e844a9f..1469863 100644 --- a/supervisor/session/rtu.lua +++ b/supervisor/session/rtu.lua @@ -105,8 +105,6 @@ function rtu.new_session(id, in_queue, out_queue, advertisement, facility_units) rsio = self.advert[i][4] } - local target_unit = self.f_units[unit_advert.reactor] ---@type reactor_unit - local u_type = unit_advert.type ---@type integer|boolean -- validate unit advertisement @@ -122,6 +120,7 @@ function rtu.new_session(id, in_queue, out_queue, advertisement, facility_units) if advert_validator.valid() then advert_validator.assert_min(unit_advert.index, 1) advert_validator.assert_min(unit_advert.reactor, 1) + advert_validator.assert_max(unit_advert.reactor, #self.f_units) if not advert_validator.valid() then u_type = false end else u_type = false @@ -131,31 +130,35 @@ function rtu.new_session(id, in_queue, out_queue, advertisement, facility_units) if u_type == false then -- validation fail - elseif u_type == RTU_UNIT_TYPES.REDSTONE then - -- redstone - unit, rs_in_q = svrs_redstone.new(self.id, i, unit_advert, self.modbus_q) - elseif u_type == RTU_UNIT_TYPES.BOILER_VALVE then - -- boiler (Mekanism 10.1+) - unit = svrs_boilerv.new(self.id, i, unit_advert, self.modbus_q) - if type(unit) ~= "nil" then target_unit.add_boiler(unit) end - elseif u_type == RTU_UNIT_TYPES.TURBINE_VALVE then - -- turbine (Mekanism 10.1+) - unit, tbv_in_q = svrs_turbinev.new(self.id, i, unit_advert, self.modbus_q) - if type(unit) ~= "nil" then target_unit.add_turbine(unit) end - elseif u_type == RTU_UNIT_TYPES.IMATRIX then - -- induction matrix - unit = svrs_imatrix.new(self.id, i, unit_advert, self.modbus_q) - elseif u_type == RTU_UNIT_TYPES.SPS then - -- super-critical phase shifter - unit = svrs_sps.new(self.id, i, unit_advert, self.modbus_q) - elseif u_type == RTU_UNIT_TYPES.SNA then - -- solar neutron activator - unit = svrs_sna.new(self.id, i, unit_advert, self.modbus_q) - elseif u_type == RTU_UNIT_TYPES.ENV_DETECTOR then - -- environment detector - unit = svrs_envd.new(self.id, i, unit_advert, self.modbus_q) else - log.error(log_header .. "bad advertisement: encountered unsupported RTU type") + local target_unit = self.f_units[unit_advert.reactor] ---@type reactor_unit + + if u_type == RTU_UNIT_TYPES.REDSTONE then + -- redstone + unit, rs_in_q = svrs_redstone.new(self.id, i, unit_advert, self.modbus_q) + elseif u_type == RTU_UNIT_TYPES.BOILER_VALVE then + -- boiler (Mekanism 10.1+) + unit = svrs_boilerv.new(self.id, i, unit_advert, self.modbus_q) + if type(unit) ~= "nil" then target_unit.add_boiler(unit) end + elseif u_type == RTU_UNIT_TYPES.TURBINE_VALVE then + -- turbine (Mekanism 10.1+) + unit, tbv_in_q = svrs_turbinev.new(self.id, i, unit_advert, self.modbus_q) + if type(unit) ~= "nil" then target_unit.add_turbine(unit) end + elseif u_type == RTU_UNIT_TYPES.IMATRIX then + -- induction matrix + unit = svrs_imatrix.new(self.id, i, unit_advert, self.modbus_q) + elseif u_type == RTU_UNIT_TYPES.SPS then + -- super-critical phase shifter + unit = svrs_sps.new(self.id, i, unit_advert, self.modbus_q) + elseif u_type == RTU_UNIT_TYPES.SNA then + -- solar neutron activator + unit = svrs_sna.new(self.id, i, unit_advert, self.modbus_q) + elseif u_type == RTU_UNIT_TYPES.ENV_DETECTOR then + -- environment detector + unit = svrs_envd.new(self.id, i, unit_advert, self.modbus_q) + else + log.error(log_header .. "bad advertisement: encountered unsupported RTU type") + end end if unit ~= nil then @@ -333,7 +336,6 @@ function rtu.new_session(id, in_queue, out_queue, advertisement, facility_units) elseif msg.qtype == mqueue.TYPE.DATA then -- instruction with body local cmd = msg.message ---@type queue_data - if cmd.key == RTU_S_DATA.RS_COMMAND then local rs_cmd = cmd.val ---@type rs_session_command diff --git a/supervisor/session/svqtypes.lua b/supervisor/session/svqtypes.lua index 9814608..175f98b 100644 --- a/supervisor/session/svqtypes.lua +++ b/supervisor/session/svqtypes.lua @@ -1,10 +1,15 @@ local svqtypes = {} local SV_Q_CMDS = { - BUILD_CHANGED = 0 + BUILD_CHANGED = 1 } local SV_Q_DATA = { + START = 1, + SCRAM = 2, + RESET_RPS = 3, + SET_BURN = 4, + SET_WASTE = 5 } svqtypes.SV_Q_CMDS = SV_Q_CMDS diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index 3ad05f4..2c42f91 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -12,6 +12,10 @@ local rtu = require("supervisor.session.rtu") -- Supervisor Sessions Handler local SV_Q_CMDS = svqtypes.SV_Q_CMDS +local SV_Q_DATA = svqtypes.SV_Q_DATA + +local PLC_S_CMDS = plc.PLC_S_CMDS +local PLC_S_DATA = plc.PLC_S_DATA local CRD_S_CMDS = coordinator.CRD_S_CMDS local svsessions = {} @@ -38,32 +42,75 @@ local self = { -- PRIVATE FUNCTIONS -- +-- handle a session output queue +---@param session plc_session_struct|rtu_session_struct|coord_session_struct +local function _sv_handle_outq(session) + -- record handler start time + local handle_start = util.time() + + -- process output queue + while session.out_queue.ready() do + -- get a new message to process + local msg = session.out_queue.pop() + + if msg ~= nil then + if msg.qtype == mqueue.TYPE.PACKET then + -- handle a packet to be sent + self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) + elseif msg.qtype == mqueue.TYPE.COMMAND then + -- handle instruction/notification + local cmd = msg.message + if cmd == SV_Q_CMDS.BUILD_CHANGED then + -- notify coordinator(s) that a build has changed + for j = 1, #self.coord_sessions do + local s = self.coord_sessions[j] ---@type coord_session_struct + s.in_queue.push_command(CRD_S_CMDS.RESEND_BUILDS) + end + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- instruction/notification with body + local cmd = msg.message ---@type queue_data + local plc_s = nil + + if type(cmd.val) == "table" then + plc_s = svsessions.get_reactor_session(cmd.val[1]) + elseif type(cmd.val) == "number" then + plc_s = svsessions.get_reactor_session(cmd.val) + end + + if plc_s ~= nil then + if cmd.key == SV_Q_DATA.START then + plc_s.in_queue.push_command(PLC_S_CMDS.ENABLE) + elseif cmd.key == SV_Q_DATA.SCRAM then + plc_s.in_queue.push_command(PLC_S_CMDS.SCRAM) + elseif cmd.key == SV_Q_DATA.RESET_RPS then + plc_s.in_queue.push_command(PLC_S_CMDS.RPS_RESET) + elseif cmd.key == SV_Q_DATA.SET_BURN and type(cmd.val) == "table" and #cmd.val == 2 then + plc_s.in_queue.push_data(PLC_S_DATA.BURN_RATE, cmd.val[2]) + elseif cmd.key == SV_Q_DATA.SET_WASTE and type(cmd.val) == "table" and #cmd.val == 2 then + ---@todo set waste + end + end + end + end + + -- max 100ms spent processing queue + if util.time() - handle_start > 100 then + log.warning("supervisor out queue handler exceeded 100ms queue process limit") + log.warning(util.c("offending session: port ", session.r_port, " type '", session.s_type, "'")) + break + end + end +end + -- iterate all the given sessions ---@param sessions table local function _iterate(sessions) for i = 1, #sessions do local session = sessions[i] ---@type plc_session_struct|rtu_session_struct|coord_session_struct + if session.open and session.instance.iterate() then - -- process output queues - while session.out_queue.ready() do - local msg = session.out_queue.pop() - if msg ~= nil then - if msg.qtype == mqueue.TYPE.PACKET then - -- packet to be sent - self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) - elseif msg.qtype == mqueue.TYPE.COMMAND then - -- notification - local cmd = msg.message - if cmd == SV_Q_CMDS.BUILD_CHANGED then - -- notify coordinator(s) that a build has changed - for j = 1, #self.coord_sessions do - local s = self.coord_sessions[j] ---@type coord_session_struct - s.in_queue.push_command(CRD_S_CMDS.RESEND_BUILDS) - end - end - end - end - end + _sv_handle_outq(session) else session.open = false end @@ -221,9 +268,10 @@ end ---@param version string ---@return integer|false session_id function svsessions.establish_plc_session(local_port, remote_port, for_reactor, version) - if svsessions.get_reactor_session(for_reactor) == nil then + if svsessions.get_reactor_session(for_reactor) == nil and for_reactor >= 1 and for_reactor <= self.num_reactors then ---@class plc_session_struct local plc_s = { + s_type = "plc", open = true, reactor = for_reactor, version = version, @@ -246,7 +294,7 @@ function svsessions.establish_plc_session(local_port, remote_port, for_reactor, -- success return plc_s.instance.get_id() else - -- reactor already assigned to a PLC + -- reactor already assigned to a PLC or ID out of range return false end end @@ -262,6 +310,7 @@ function svsessions.establish_rtu_session(local_port, remote_port, advertisement ---@class rtu_session_struct local rtu_s = { + s_type = "rtu", open = true, version = version, l_port = local_port, @@ -290,6 +339,7 @@ end function svsessions.establish_coord_session(local_port, remote_port, version) ---@class coord_session_struct local coord_s = { + s_type = "crd", open = true, version = version, l_port = local_port, diff --git a/supervisor/session/unit.lua b/supervisor/session/unit.lua index bc1d544..c51586e 100644 --- a/supervisor/session/unit.lua +++ b/supervisor/session/unit.lua @@ -126,7 +126,6 @@ function unit.new(for_reactor, num_boilers, num_turbines) if self.plc_s ~= nil then local plc_db = self.plc_i.get_db() - ---@todo Mekanism 10.1+ will change fuel/waste to need _amnt _compute_dt(DT_KEYS.ReactorTemp, plc_db.mek_status.temp) _compute_dt(DT_KEYS.ReactorFuel, plc_db.mek_status.fuel) _compute_dt(DT_KEYS.ReactorWaste, plc_db.mek_status.waste) @@ -409,14 +408,6 @@ function unit.new(for_reactor, num_boilers, num_turbines) _update_annunciator() end - -- COMMAND UNIT -- - - -- SCRAM reactor - function public.scram() - if self.plc_s ~= nil then - end - end - -- READ STATES/PROPERTIES -- -- get build properties of all machines diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 1756a2a..d2ad774 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -13,7 +13,7 @@ local svsessions = require("supervisor.session.svsessions") local config = require("supervisor.config") local supervisor = require("supervisor.supervisor") -local SUPERVISOR_VERSION = "beta-v0.6.0" +local SUPERVISOR_VERSION = "beta-v0.6.1" local print = util.print local println = util.println diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index 873c3e1..4cf0eef 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -198,19 +198,25 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen if packet.type == RPLC_TYPES.LINK_REQ then if packet.length == 2 then -- this is a linking request - local plc_id = svsessions.establish_plc_session(l_port, r_port, packet.data[1], packet.data[2]) - if plc_id == false then - -- reactor already has a PLC assigned - log.debug(util.c("PLC_LNK: assignment collision with reactor ", packet.data[1])) - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.COLLISION }) + if type(packet.data[1]) == "number" and type(packet.data[2]) == "string" then + local plc_id = svsessions.establish_plc_session(l_port, r_port, packet.data[1], packet.data[2]) + if plc_id == false then + -- reactor already has a PLC assigned + log.debug(util.c("PLC_LNK: assignment collision with reactor ", packet.data[1])) + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.COLLISION }) + else + -- got an ID; assigned to a reactor successfully + println(util.c("connected to reactor ", packet.data[1], " PLC (", packet.data[2], ") [:", r_port, "]")) + log.debug("PLC_LNK: allowed for device at " .. r_port) + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.ALLOW }) + end else - -- got an ID; assigned to a reactor successfully - println(util.c("connected to reactor ", packet.data[1], " PLC (", packet.data[2], ") [:", r_port, "]")) - log.debug("PLC_LNK: allowed for device at " .. r_port) - _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.ALLOW }) + log.debug("PLC_LNK: bad parameter types") + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.DENY }) end else log.debug("PLC_LNK: new linking packet length mismatch") + _send_plc_linking(next_seq_id, r_port, { RPLC_LINKING.DENY }) end else -- force a re-link @@ -267,7 +273,7 @@ function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen elseif packet.type == SCADA_CRDN_TYPES.ESTABLISH then if packet.length == 1 then -- this is an attempt to establish a new session - println(util.c("connected to coordinator [:", r_port, "]")) + println(util.c("connected to coordinator (", packet.data[1], ") [:", r_port, "]")) svsessions.establish_coord_session(l_port, r_port, packet.data[1])