diff --git a/scada-common/comms.lua b/scada-common/comms.lua index 9181ce3..cb203bf 100644 --- a/scada-common/comms.lua +++ b/scada-common/comms.lua @@ -318,3 +318,66 @@ function mgmt_packet() get = get } end + +-- SCADA coordinator packet +-- @todo +function coord_packet() + local self = { + frame = nil, + type = nil, + length = nil, + data = nil + } + + local _coord_type_valid = function () + -- @todo + return false + end + + -- make a coordinator packet + local make = function (packet_type, length, data) + self.type = packet_type + self.length = length + self.data = data + end + + -- decode a coordinator packet from a SCADA frame + local decode = function (frame) + if frame then + self.frame = frame + + if frame.protocol() == comms.PROTOCOLS.COORD_DATA then + local data = frame.data() + local ok = #data > 1 + + if ok then + make(data[1], data[2], { table.unpack(data, 3, #data) }) + ok = _coord_type_valid() + end + + return ok + else + log._debug("attempted COORD_DATA parse of incorrect protocol " .. frame.protocol(), true) + return false + end + else + log._debug("nil frame encountered", true) + return false + end + end + + local get = function () + return { + scada_frame = self.frame, + type = self.type, + length = self.length, + data = self.data + } + end + + return { + make = make, + decode = decode, + get = get + } +end diff --git a/supervisor/config.lua b/supervisor/config.lua index bc02177..de10492 100644 --- a/supervisor/config.lua +++ b/supervisor/config.lua @@ -4,13 +4,12 @@ -- from all PLCs and coordinator(s) while in backup to allow -- instant failover if active goes offline without re-sync SYSTEM_TYPE = 'active' - -- scada network listen for PLC's and RTU's SCADA_DEV_LISTEN = 16000 -- failover synchronization -SCADA_FO_CHANNEL = 16001 +SCADA_FO_LOCAL = 16101 +SCADA_FO_PEER = 16102 -- listen port for SCADA supervisor access by coordinators -SCADA_SV_CHANNEL = 16002 - +SCADA_SV_LISTEN = 16201 -- expected number of reactors NUM_REACTORS = 4 diff --git a/supervisor/session/coordinator.lua b/supervisor/session/coordinator.lua new file mode 100644 index 0000000..e69de29 diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua new file mode 100644 index 0000000..4008a33 --- /dev/null +++ b/supervisor/session/plc.lua @@ -0,0 +1,248 @@ +-- #REQUIRES mqueue.lua +-- #REQUIRES comms.lua +-- #REQUIRES log.lua +-- #REQUIRES util.lua + +local RPLC_TYPES = comms.RPLC_TYPES + +PLC_S_COMMANDS = { + SCRAM = 0, + ENABLE = 1, + ISS_CLEAR = 2 +} + +-- PLC supervisor session +function new_session(id, for_reactor, in_queue, out_queue) + local log_header = "plc_session(" .. id .. "): " + + local self = { + id = id, + for_reactor = for_reactor, + in_q = in_queue, + out_q = out_queue, + commanded_state = false, + -- connection properties + seq_num = 0, + connected = true, + received_struct = false, + plc_conn_watchdog = util.new_watchdog(3) + -- when to next retry one of these requests + retry_times = { + struct_req = 0, + scram_req = 0, + enable_req = 0 + }, + -- session PLC status database + sDB = { + control_state = false, + overridden = false, + degraded = false, + iss_status = { + dmg_crit = false, + ex_hcool = false, + ex_waste = false, + high_temp = false, + no_fuel = false, + no_cool = false, + timed_out = false + }, + mek_status = { + heating_rate = 0, + + status = false, + burn_rate = 0, + act_burn_rate = 0, + temp = 0, + damage = 0, + boil_eff = 0, + env_loss = 0, + + fuel = 0, + fuel_need = 0, + fuel_fill = 0, + waste = 0, + waste_need = 0, + waste_fill = 0, + cool_type = "?", + cool_amnt = 0, + cool_need = 0, + cool_fill = 0, + hcool_type = "?", + hcool_amnt = 0, + hcool_need = 0, + hcool_fill = 0 + }, + mek_struct = { + heat_cap = 0, + fuel_asm = 0, + fuel_sa = 0, + fuel_cap = 0, + waste_cap = 0, + cool_cap = 0, + hcool_cap = 0, + max_burn = 0 + } + } + } + + local _copy_iss_status = function (iss_status) + self.sDB.iss_status.dmg_crit = iss_status[1] + self.sDB.iss_status.ex_hcool = iss_status[2] + self.sDB.iss_status.ex_waste = iss_status[3] + self.sDB.iss_status.high_temp = iss_status[4] + self.sDB.iss_status.no_fuel = iss_status[5] + self.sDB.iss_status.no_cool = iss_status[6] + self.sDB.iss_status.timed_out = iss_status[7] + end + + local _copy_status = function (heating_rate, mek_data) + self.sDB.mek_status.heating_rate = heating_rate + for key, value in pairs(mek_data) do + self.sDB.mek_status[key] = value + end + end + + local _copy_struct = function (mek_data) + for key, value in pairs(mek_data) do + self.sDB.mek_struct[key] = value + end + end + + local _get_ack = function (pkt) + if rplc_packet.length == 1 then + return rplc_packet.data[1] + else + log._warning(log_header .. "RPLC ACK length mismatch") + return nil + end + end + + local get_id = function () return self.id end + + local close = function () self.connected = false end + + local check_wd = function (timer) + return timer == plc_conn_watchdog + end + + local get_struct = function () + if self.received_struct then + return self.sDB.mek_struct + else + -- @todo: need a system in place to re-request this periodically + return nil + end + end + + local iterate = function () + if self.connected and ~self.in_q.empty() then + -- get a new message to process + local message = self.in_q.pop() + + if message.qtype == mqueue.TYPE.PACKET then + -- handle an incoming packet from the PLC + rplc_pkt = message.message.get() + + if rplc_pkt.id == for_reactor then + if rplc_pkt.type == RPLC_TYPES.KEEP_ALIVE then + -- periodic keep alive + elseif rplc_pkt.type == RPLC_TYPES.STATUS then + -- status packet received, update data + if rplc_packet.length == 6 then + -- @todo [1] is timestamp, determine how this will be used (if at all) + self.sDB.control_state = rplc_packet.data[2] + self.sDB.overridden = rplc_packet.data[3] + self.sDB.degraded = rplc_packet.data[4] + + -- attempt to read mek_data table + if rplc_packet.data[6] ~= nil then + local status = pcall(_copy_status, rplc_packet.data[5], rplc_packet.data[6]) + if status then + -- copied in status data OK + else + -- error copying status data + log._error(log_header .. "failed to parse status packet data") + end + else + self.sDB.mek_status.heating_rate = rplc_packet.data[5] + end + else + log._warning(log_header .. "RPLC status packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_STRUCT then + -- received reactor structure, record it + if rplc_packet.length == 1 then + local status = pcall(_copy_struct, rplc_packet.data[1]) + if status then + -- copied in structure data OK + else + -- error copying structure data + log._error(log_header .. "failed to parse struct packet data") + end + else + log._warning(log_header .. "RPLC struct packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_SCRAM then + -- SCRAM acknowledgement + local ack = _get_ack(rplc_pkt) + if ack then + self.sDB.control_state = false + elseif ack == false then + log._warning(log_header .. "SCRAM failed!") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_ENABLE then + -- enable acknowledgement + local ack = _get_ack(rplc_pkt) + if ack then + self.sDB.control_state = true + elseif ack == false then + log._warning(log_header .. "enable failed!") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_BURN_RATE then + -- burn rate acknowledgement + if _get_ack(rplc_pkt) == false then + log._warning(log_header .. "burn rate update failed!") + end + elseif rplc_pkt.type == RPLC_TYPES.ISS_STATUS then + -- ISS status packet received, copy data + if rplc_packet.length == 1 then + local status = pcall(_copy_iss_status, rplc_packet.data[1]) + if status then + -- copied in ISS status data OK + else + -- error copying ISS status data + log._error(log_header .. "failed to parse ISS status packet data") + end + else + log._warning(log_header .. "RPLC ISS status packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.ISS_ALARM then + -- ISS alarm + self.sDB.overridden = true + -- @todo + elseif rplc_pkt.type == RPLC_TYPES.ISS_CLEAR then + -- ISS clear acknowledgement + -- @todo + else + log._warning(log_header .. "handler received unsupported RPLC packet type " .. rplc_pkt.type) + end + else + log._warning(log_header .. "RPLC packet with ID not matching reactor ID: reactor " .. self.for_reactor .. " != " .. rplc_pkt.id) + end + elseif message.qtype == mqueue.TYPE.COMMAND then + -- handle instruction + + end + end + + return self.connected + end + + return { + get_id = get_id, + check_wd = check_wd, + get_struct = get_struct, + close = close, + iterate = iterate + } +end diff --git a/supervisor/session/rtu.lua b/supervisor/session/rtu.lua new file mode 100644 index 0000000..e69de29 diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua new file mode 100644 index 0000000..fcc6950 --- /dev/null +++ b/supervisor/session/svsessions.lua @@ -0,0 +1,141 @@ +-- #REQUIRES mqueue.lua +-- #REQUIRES log.lua + +-- Supervisor Sessions Handler + +SESSION_TYPE = { + RTU_SESSION = 0, + PLC_SESSION = 1, + COORD_SESSION = 2 +} + +local self = { + num_reactors = 0, + rtu_sessions = {}, + plc_sessions = {}, + coord_sessions = {}, + next_rtu_id = 0, + next_plc_id = 0, + next_coord_id = 0 +} + +function alloc_reactor_plcs(num_reactors) + self.num_reactors = num_reactors + for i = 1, num_reactors do + table.insert(self.plc_sessions, false) + end +end + +function find_session(stype, remote_port) + if stype == SESSION_TYPE.RTU_SESSION then + for i = 1, #self.rtu_sessions do + if self.rtu_sessions[i].r_host == remote_port then + return self.rtu_sessions[i] + end + end + elseif stype == SESSION_TYPE.PLC_SESSION then + for i = 1, #self.plc_sessions do + if self.plc_sessions[i].r_host == remote_port then + return self.plc_sessions[i] + end + end + elseif stype == SESSION_TYPE.COORD_SESSION then + for i = 1, #self.coord_sessions do + if self.coord_sessions[i].r_host == remote_port then + return self.coord_sessions[i] + end + end + else + log._error("cannot search for unknown session type " .. stype, true) + end + + return nil +end + +function get_reactor_session(reactor) + local session = nil + + for i = 1, #self.plc_sessions do + if self.plc_sessions[i].reactor == reactor then + session = self.plc_sessions[i] + end + end + + return session +end + +function establish_plc_session(remote_port, for_reactor) + if get_reactor_session(for_reactor) == nil then + local plc_s = { + open = true, + reactor = for_reactor, + r_host = remote_port, + in_queue = mqueue.new(), + out_queue = mqueue.new(), + instance = nil + } + + plc_s.instance = plc.new_session(next_plc_id, plc_s.in_queue, plc_s.out_queue) + table.insert(self.plc_sessions, plc_s) + next_plc_id = next_plc_id + 1 + + -- success + return plc_s.instance.get_id() + else + -- reactor already assigned to a PLC + return false + end +end + +local function _iterate(sessions) + for i = 1, #sessions do + local session = sessions[i] + if session.open then + local ok = session.instance.iterate() + if not ok then + session.open = false + session.instance.close() + end + end + end +end + +function iterate_all() + -- iterate RTU sessions + _iterate(self.rtu_sessions) + + -- iterate PLC sessions + _iterate(self.plc_sessions) + + -- iterate coordinator sessions + _iterate(self.coord_sessions) +end + +local function _free_closed(sessions) + local move_to = 1 + for i = 1, #sessions do + local session = sessions[i] + if session ~= nil then + if sessions[i].open then + if sessions[move_to] == nil then + sessions[move_to] = session + sessions[i] = nil + end + move_to = move_to + 1 + else + sessions[i] = nil + end + end + end +end + +function free_all_closed() + -- free closed RTU sessions + _free_closed(self.rtu_sessions) + + -- free closed PLC sessions + _free_closed(self.plc_sessions) + + -- free closed coordinator sessions + _free_closed(self.coord_sessions) +end diff --git a/supervisor/startup.lua b/supervisor/startup.lua index cec58c0..f58efd2 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -6,11 +6,18 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/modbus.lua") os.loadAPI("config.lua") +os.loadAPI("mqueue.lua") os.loadAPI("supervisor.lua") -local SUPERVISOR_VERSION = "alpha-v0.1.0" +os.loadAPI("session/rtu.lua") +os.loadAPI("session/plc.lua") +os.loadAPI("session/coordinator.lua") +os.loadAPI("session/svsessions.lua") + +local SUPERVISOR_VERSION = "alpha-v0.1.1" local print = util.print local println = util.println @@ -20,7 +27,6 @@ local println_ts = util.println_ts log._info("========================================") log._info("BOOTING supervisor.startup " .. SUPERVISOR_VERSION) log._info("========================================") - println(">> SCADA Supervisor " .. SUPERVISOR_VERSION .. " <<") -- mount connected devices @@ -40,7 +46,8 @@ if config.SYSTEM_TYPE == "active" then end -- start comms, open all channels -local comms = supervisor.superv_comms(config.NUM_REACTORS, modem, config.SCADA_DEV_LISTEN, config.SCADA_FO_CHANNEL, config.SCADA_SV_CHANNEL) +local comms = supervisor.superv_comms(config.NUM_REACTORS, modem, config.SCADA_DEV_LISTEN, config.SCADA_FO_LOCAL, config.SCADA_FO_PEER, + config.SCADA_SV_CHANNEL) -- base loop clock (4Hz, 5 ticks) local loop_clock = os.startTimer(0.25) @@ -82,12 +89,14 @@ while true do loop_clock = os.startTimer(0.25) elseif event == "modem_message" then -- got a packet + local packet = superv_comms.parse_packet(p1, p2, p3, p4, p5) + superv_comms.handle_packet(packet) end -- check for termination request if event == "terminate" or ppm.should_terminate() then log._warning("terminate requested, exiting...") - -- todo: attempt failover, alert hot backup + -- @todo: attempt failover, alert hot backup break end end diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index d9b125c..d6bfd54 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -1,15 +1,27 @@ -- #REQUIRES comms.lua +-- #REQUIRES modbus.lua +-- #REQUIRES mqueue.lua +-- #REQUIRES svsessions.lua + +local PROTOCOLS = comms.PROTOCOLS +local RPLC_TYPES = comms.RPLC_TYPES +local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES +local RTU_ADVERT_TYPES = comms.RTU_ADVERT_TYPES + +local SESSION_TYPE = svsessions.SESSION_TYPE -- supervisory controller communications -function superv_comms(mode, num_reactors, modem, dev_listen, fo_channel, sv_channel) +function superv_comms(mode, num_reactors, modem, dev_listen, fo_local, fo_peer, coord_listen) local self = { mode = mode, - seq_num = 0, + fo_seq_num = 0, + ln_seq_num = 0, num_reactors = num_reactors, modem = modem, dev_listen = dev_listen, - fo_channel = fo_channel, - sv_channel = sv_channel, + fo_rx = fo_local, + fo_tx = fo_peer, + coord_listen = coord_listen, reactor_struct_cache = nil } @@ -20,14 +32,28 @@ function superv_comms(mode, num_reactors, modem, dev_listen, fo_channel, sv_chan if not self.modem.isOpen(self.dev_listen) then self.modem.open(self.dev_listen) end - if not self.modem.isOpen(self.fo_channel) then - self.modem.open(self.fo_channel) + if not self.modem.isOpen(self.fo_rx) then + self.modem.open(self.fo_rx) end - if not self.modem.isOpen(self.sv_channel) then - self.modem.open(self.sv_channel) + if not self.modem.isOpen(self.coord_listen) then + self.modem.open(self.coord_listen) end end + local _send_fo = function (msg) + local packet = comms.scada_packet() + packet.make(self.fo_seq_num, PROTOCOLS.SCADA_MGMT, msg) + self.modem.transmit(self.fo_tx, self.fo_rx, packet.raw()) + self.fo_seq_num = self.fo_seq_num + 1 + end + + local _send_plc_linking = function (dest, msg) + local packet = comms.scada_packet() + packet.make(self.ln_seq_num, PROTOCOLS.RPLC, msg) + self.modem.transmit(dest, self.dev_listen, packet.raw()) + self.ln_seq_num = self.ln_seq_num + 1 + end + -- PUBLIC FUNCTIONS -- -- reconnect a newly connected modem @@ -36,7 +62,115 @@ function superv_comms(mode, num_reactors, modem, dev_listen, fo_channel, sv_chan _open_channels() end + -- parse a packet + local parse_packet = function(side, sender, reply_to, message, distance) + local pkt = nil + local s_pkt = scada_packet() + + -- parse packet as generic SCADA packet + s_pkt.recieve(side, sender, reply_to, message, distance) + + if s_pkt.is_valid() then + -- get as MODBUS TCP packet + if s_pkt.protocol() == PROTOCOLS.MODBUS_TCP then + local m_pkt = comms.modbus_packet() + if m_pkt.decode(s_pkt) then + pkt = m_pkt.get() + end + -- get as RPLC packet + elseif s_pkt.protocol() == PROTOCOLS.RPLC then + local rplc_pkt = comms.rplc_packet() + if rplc_pkt.decode(s_pkt) then + pkt = rplc_pkt.get() + end + -- get as SCADA management packet + elseif s_pkt.protocol() == PROTOCOLS.SCADA_MGMT then + local mgmt_pkt = comms.mgmt_packet() + if mgmt_pkt.decode(s_pkt) then + pkt = mgmt_packet.get() + end + -- get as coordinator packet + elseif s_pkt.protocol() == PROTOCOLS.COORD_DATA then + local coord_pkt = comms.coord_packet() + if coord_pkt.decode(s_pkt) then + pkt = coord_pkt.get() + end + else + log._debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true) + end + end + + return pkt + end + + local handle_packet = function(packet) + if packet ~= nil then + local sender = packet.scada_frame.sender() + local receiver = packet.scada_frame.receiver() + local protocol = packet.scada_frame.protocol() + + -- device (RTU/PLC) listening channel + if receiver == self.dev_listen then + if protocol == PROTOCOLS.MODBUS_TCP then + -- MODBUS response + elseif protocol == PROTOCOLS.RPLC then + -- reactor PLC packet + local session = svsessions.find_session(SESSION_TYPE.PLC_SESSION, sender) + if session then + if packet.type == RPLC_TYPES.LINK_REQ then + -- new device on this port? that's a collision + _send_plc_linking(sender, { RPLC_LINKING.COLLISION }) + else + -- pass the packet onto the session handler + session.in_queue.push_packet(packet) + end + else + -- unknown session, is this a linking request? + if packet.type == RPLC_TYPES.LINK_REQ then + -- this is a linking request + local plc_id = svsessions.establish_plc_session(sender) + if plc_id == false then + -- reactor already has a PLC assigned + _send_plc_linking(sender, { RPLC_LINKING.COLLISION }) + else + -- got an ID; assigned to a reactor successfully + _send_plc_linking(sender, { RPLC_LINKING.ALLOW }) + end + else + -- force a re-link + _send_plc_linking(sender, { RPLC_LINKING.DENY }) + end + end + elseif protocol == PROTOCOLS.SCADA_MGMT then + -- SCADA management packet + else + log._debug("illegal packet type " .. protocol .. " on device listening channel") + end + -- failover listening channel + elseif receiver == self.fo_rx then + if protocol == PROTOCOLS.SCADA_MGMT then + -- SCADA management packet + else + log._debug("illegal packet type " .. protocol .. " on failover listening channel") + end + -- coordinator listening channel + elseif reciever == self.coord_listen then + if protocol == PROTOCOLS.SCADA_MGMT then + -- SCADA management packet + elseif protocol == PROTOCOLS.COORD_DATA then + -- coordinator packet + else + log._debug("illegal packet type " .. protocol .. " on coordinator listening channel") + end + else + log._error("received packet on unused channel " .. receiver, true) + end + end + end + return { - reconnect_modem = reconnect_modem + reconnect_modem = reconnect_modem, + parse_packet = parse_packet, + handle_packet = handle_packet } end