supervisor sessions work in progress

This commit is contained in:
Mikayla Fischler 2022-04-22 11:07:59 -04:00
parent 17d0213d58
commit 1bf0d352a1
8 changed files with 611 additions and 17 deletions

View File

@ -318,3 +318,66 @@ function mgmt_packet()
get = get get = get
} }
end end
-- SCADA coordinator packet
-- @todo
function coord_packet()
local self = {
frame = nil,
type = nil,
length = nil,
data = nil
}
local _coord_type_valid = function ()
-- @todo
return false
end
-- make a coordinator packet
local make = function (packet_type, length, data)
self.type = packet_type
self.length = length
self.data = data
end
-- decode a coordinator packet from a SCADA frame
local decode = function (frame)
if frame then
self.frame = frame
if frame.protocol() == comms.PROTOCOLS.COORD_DATA then
local data = frame.data()
local ok = #data > 1
if ok then
make(data[1], data[2], { table.unpack(data, 3, #data) })
ok = _coord_type_valid()
end
return ok
else
log._debug("attempted COORD_DATA parse of incorrect protocol " .. frame.protocol(), true)
return false
end
else
log._debug("nil frame encountered", true)
return false
end
end
local get = function ()
return {
scada_frame = self.frame,
type = self.type,
length = self.length,
data = self.data
}
end
return {
make = make,
decode = decode,
get = get
}
end

View File

@ -4,13 +4,12 @@
-- from all PLCs and coordinator(s) while in backup to allow -- from all PLCs and coordinator(s) while in backup to allow
-- instant failover if active goes offline without re-sync -- instant failover if active goes offline without re-sync
SYSTEM_TYPE = 'active' SYSTEM_TYPE = 'active'
-- scada network listen for PLC's and RTU's -- scada network listen for PLC's and RTU's
SCADA_DEV_LISTEN = 16000 SCADA_DEV_LISTEN = 16000
-- failover synchronization -- failover synchronization
SCADA_FO_CHANNEL = 16001 SCADA_FO_LOCAL = 16101
SCADA_FO_PEER = 16102
-- listen port for SCADA supervisor access by coordinators -- listen port for SCADA supervisor access by coordinators
SCADA_SV_CHANNEL = 16002 SCADA_SV_LISTEN = 16201
-- expected number of reactors -- expected number of reactors
NUM_REACTORS = 4 NUM_REACTORS = 4

View File

248
supervisor/session/plc.lua Normal file
View File

@ -0,0 +1,248 @@
-- #REQUIRES mqueue.lua
-- #REQUIRES comms.lua
-- #REQUIRES log.lua
-- #REQUIRES util.lua
local RPLC_TYPES = comms.RPLC_TYPES
PLC_S_COMMANDS = {
SCRAM = 0,
ENABLE = 1,
ISS_CLEAR = 2
}
-- PLC supervisor session
function new_session(id, for_reactor, in_queue, out_queue)
local log_header = "plc_session(" .. id .. "): "
local self = {
id = id,
for_reactor = for_reactor,
in_q = in_queue,
out_q = out_queue,
commanded_state = false,
-- connection properties
seq_num = 0,
connected = true,
received_struct = false,
plc_conn_watchdog = util.new_watchdog(3)
-- when to next retry one of these requests
retry_times = {
struct_req = 0,
scram_req = 0,
enable_req = 0
},
-- session PLC status database
sDB = {
control_state = false,
overridden = false,
degraded = false,
iss_status = {
dmg_crit = false,
ex_hcool = false,
ex_waste = false,
high_temp = false,
no_fuel = false,
no_cool = false,
timed_out = false
},
mek_status = {
heating_rate = 0,
status = false,
burn_rate = 0,
act_burn_rate = 0,
temp = 0,
damage = 0,
boil_eff = 0,
env_loss = 0,
fuel = 0,
fuel_need = 0,
fuel_fill = 0,
waste = 0,
waste_need = 0,
waste_fill = 0,
cool_type = "?",
cool_amnt = 0,
cool_need = 0,
cool_fill = 0,
hcool_type = "?",
hcool_amnt = 0,
hcool_need = 0,
hcool_fill = 0
},
mek_struct = {
heat_cap = 0,
fuel_asm = 0,
fuel_sa = 0,
fuel_cap = 0,
waste_cap = 0,
cool_cap = 0,
hcool_cap = 0,
max_burn = 0
}
}
}
local _copy_iss_status = function (iss_status)
self.sDB.iss_status.dmg_crit = iss_status[1]
self.sDB.iss_status.ex_hcool = iss_status[2]
self.sDB.iss_status.ex_waste = iss_status[3]
self.sDB.iss_status.high_temp = iss_status[4]
self.sDB.iss_status.no_fuel = iss_status[5]
self.sDB.iss_status.no_cool = iss_status[6]
self.sDB.iss_status.timed_out = iss_status[7]
end
local _copy_status = function (heating_rate, mek_data)
self.sDB.mek_status.heating_rate = heating_rate
for key, value in pairs(mek_data) do
self.sDB.mek_status[key] = value
end
end
local _copy_struct = function (mek_data)
for key, value in pairs(mek_data) do
self.sDB.mek_struct[key] = value
end
end
local _get_ack = function (pkt)
if rplc_packet.length == 1 then
return rplc_packet.data[1]
else
log._warning(log_header .. "RPLC ACK length mismatch")
return nil
end
end
local get_id = function () return self.id end
local close = function () self.connected = false end
local check_wd = function (timer)
return timer == plc_conn_watchdog
end
local get_struct = function ()
if self.received_struct then
return self.sDB.mek_struct
else
-- @todo: need a system in place to re-request this periodically
return nil
end
end
local iterate = function ()
if self.connected and ~self.in_q.empty() then
-- get a new message to process
local message = self.in_q.pop()
if message.qtype == mqueue.TYPE.PACKET then
-- handle an incoming packet from the PLC
rplc_pkt = message.message.get()
if rplc_pkt.id == for_reactor then
if rplc_pkt.type == RPLC_TYPES.KEEP_ALIVE then
-- periodic keep alive
elseif rplc_pkt.type == RPLC_TYPES.STATUS then
-- status packet received, update data
if rplc_packet.length == 6 then
-- @todo [1] is timestamp, determine how this will be used (if at all)
self.sDB.control_state = rplc_packet.data[2]
self.sDB.overridden = rplc_packet.data[3]
self.sDB.degraded = rplc_packet.data[4]
-- attempt to read mek_data table
if rplc_packet.data[6] ~= nil then
local status = pcall(_copy_status, rplc_packet.data[5], rplc_packet.data[6])
if status then
-- copied in status data OK
else
-- error copying status data
log._error(log_header .. "failed to parse status packet data")
end
else
self.sDB.mek_status.heating_rate = rplc_packet.data[5]
end
else
log._warning(log_header .. "RPLC status packet length mismatch")
end
elseif rplc_pkt.type == RPLC_TYPES.MEK_STRUCT then
-- received reactor structure, record it
if rplc_packet.length == 1 then
local status = pcall(_copy_struct, rplc_packet.data[1])
if status then
-- copied in structure data OK
else
-- error copying structure data
log._error(log_header .. "failed to parse struct packet data")
end
else
log._warning(log_header .. "RPLC struct packet length mismatch")
end
elseif rplc_pkt.type == RPLC_TYPES.MEK_SCRAM then
-- SCRAM acknowledgement
local ack = _get_ack(rplc_pkt)
if ack then
self.sDB.control_state = false
elseif ack == false then
log._warning(log_header .. "SCRAM failed!")
end
elseif rplc_pkt.type == RPLC_TYPES.MEK_ENABLE then
-- enable acknowledgement
local ack = _get_ack(rplc_pkt)
if ack then
self.sDB.control_state = true
elseif ack == false then
log._warning(log_header .. "enable failed!")
end
elseif rplc_pkt.type == RPLC_TYPES.MEK_BURN_RATE then
-- burn rate acknowledgement
if _get_ack(rplc_pkt) == false then
log._warning(log_header .. "burn rate update failed!")
end
elseif rplc_pkt.type == RPLC_TYPES.ISS_STATUS then
-- ISS status packet received, copy data
if rplc_packet.length == 1 then
local status = pcall(_copy_iss_status, rplc_packet.data[1])
if status then
-- copied in ISS status data OK
else
-- error copying ISS status data
log._error(log_header .. "failed to parse ISS status packet data")
end
else
log._warning(log_header .. "RPLC ISS status packet length mismatch")
end
elseif rplc_pkt.type == RPLC_TYPES.ISS_ALARM then
-- ISS alarm
self.sDB.overridden = true
-- @todo
elseif rplc_pkt.type == RPLC_TYPES.ISS_CLEAR then
-- ISS clear acknowledgement
-- @todo
else
log._warning(log_header .. "handler received unsupported RPLC packet type " .. rplc_pkt.type)
end
else
log._warning(log_header .. "RPLC packet with ID not matching reactor ID: reactor " .. self.for_reactor .. " != " .. rplc_pkt.id)
end
elseif message.qtype == mqueue.TYPE.COMMAND then
-- handle instruction
end
end
return self.connected
end
return {
get_id = get_id,
check_wd = check_wd,
get_struct = get_struct,
close = close,
iterate = iterate
}
end

View File

View File

@ -0,0 +1,141 @@
-- #REQUIRES mqueue.lua
-- #REQUIRES log.lua
-- Supervisor Sessions Handler
SESSION_TYPE = {
RTU_SESSION = 0,
PLC_SESSION = 1,
COORD_SESSION = 2
}
local self = {
num_reactors = 0,
rtu_sessions = {},
plc_sessions = {},
coord_sessions = {},
next_rtu_id = 0,
next_plc_id = 0,
next_coord_id = 0
}
function alloc_reactor_plcs(num_reactors)
self.num_reactors = num_reactors
for i = 1, num_reactors do
table.insert(self.plc_sessions, false)
end
end
function find_session(stype, remote_port)
if stype == SESSION_TYPE.RTU_SESSION then
for i = 1, #self.rtu_sessions do
if self.rtu_sessions[i].r_host == remote_port then
return self.rtu_sessions[i]
end
end
elseif stype == SESSION_TYPE.PLC_SESSION then
for i = 1, #self.plc_sessions do
if self.plc_sessions[i].r_host == remote_port then
return self.plc_sessions[i]
end
end
elseif stype == SESSION_TYPE.COORD_SESSION then
for i = 1, #self.coord_sessions do
if self.coord_sessions[i].r_host == remote_port then
return self.coord_sessions[i]
end
end
else
log._error("cannot search for unknown session type " .. stype, true)
end
return nil
end
function get_reactor_session(reactor)
local session = nil
for i = 1, #self.plc_sessions do
if self.plc_sessions[i].reactor == reactor then
session = self.plc_sessions[i]
end
end
return session
end
function establish_plc_session(remote_port, for_reactor)
if get_reactor_session(for_reactor) == nil then
local plc_s = {
open = true,
reactor = for_reactor,
r_host = remote_port,
in_queue = mqueue.new(),
out_queue = mqueue.new(),
instance = nil
}
plc_s.instance = plc.new_session(next_plc_id, plc_s.in_queue, plc_s.out_queue)
table.insert(self.plc_sessions, plc_s)
next_plc_id = next_plc_id + 1
-- success
return plc_s.instance.get_id()
else
-- reactor already assigned to a PLC
return false
end
end
local function _iterate(sessions)
for i = 1, #sessions do
local session = sessions[i]
if session.open then
local ok = session.instance.iterate()
if not ok then
session.open = false
session.instance.close()
end
end
end
end
function iterate_all()
-- iterate RTU sessions
_iterate(self.rtu_sessions)
-- iterate PLC sessions
_iterate(self.plc_sessions)
-- iterate coordinator sessions
_iterate(self.coord_sessions)
end
local function _free_closed(sessions)
local move_to = 1
for i = 1, #sessions do
local session = sessions[i]
if session ~= nil then
if sessions[i].open then
if sessions[move_to] == nil then
sessions[move_to] = session
sessions[i] = nil
end
move_to = move_to + 1
else
sessions[i] = nil
end
end
end
end
function free_all_closed()
-- free closed RTU sessions
_free_closed(self.rtu_sessions)
-- free closed PLC sessions
_free_closed(self.plc_sessions)
-- free closed coordinator sessions
_free_closed(self.coord_sessions)
end

View File

@ -6,11 +6,18 @@ os.loadAPI("scada-common/log.lua")
os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/util.lua")
os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/ppm.lua")
os.loadAPI("scada-common/comms.lua") os.loadAPI("scada-common/comms.lua")
os.loadAPI("scada-common/modbus.lua")
os.loadAPI("config.lua") os.loadAPI("config.lua")
os.loadAPI("mqueue.lua")
os.loadAPI("supervisor.lua") os.loadAPI("supervisor.lua")
local SUPERVISOR_VERSION = "alpha-v0.1.0" os.loadAPI("session/rtu.lua")
os.loadAPI("session/plc.lua")
os.loadAPI("session/coordinator.lua")
os.loadAPI("session/svsessions.lua")
local SUPERVISOR_VERSION = "alpha-v0.1.1"
local print = util.print local print = util.print
local println = util.println local println = util.println
@ -20,7 +27,6 @@ local println_ts = util.println_ts
log._info("========================================") log._info("========================================")
log._info("BOOTING supervisor.startup " .. SUPERVISOR_VERSION) log._info("BOOTING supervisor.startup " .. SUPERVISOR_VERSION)
log._info("========================================") log._info("========================================")
println(">> SCADA Supervisor " .. SUPERVISOR_VERSION .. " <<") println(">> SCADA Supervisor " .. SUPERVISOR_VERSION .. " <<")
-- mount connected devices -- mount connected devices
@ -40,7 +46,8 @@ if config.SYSTEM_TYPE == "active" then
end end
-- start comms, open all channels -- start comms, open all channels
local comms = supervisor.superv_comms(config.NUM_REACTORS, modem, config.SCADA_DEV_LISTEN, config.SCADA_FO_CHANNEL, config.SCADA_SV_CHANNEL) local comms = supervisor.superv_comms(config.NUM_REACTORS, modem, config.SCADA_DEV_LISTEN, config.SCADA_FO_LOCAL, config.SCADA_FO_PEER,
config.SCADA_SV_CHANNEL)
-- base loop clock (4Hz, 5 ticks) -- base loop clock (4Hz, 5 ticks)
local loop_clock = os.startTimer(0.25) local loop_clock = os.startTimer(0.25)
@ -82,12 +89,14 @@ while true do
loop_clock = os.startTimer(0.25) loop_clock = os.startTimer(0.25)
elseif event == "modem_message" then elseif event == "modem_message" then
-- got a packet -- got a packet
local packet = superv_comms.parse_packet(p1, p2, p3, p4, p5)
superv_comms.handle_packet(packet)
end end
-- check for termination request -- check for termination request
if event == "terminate" or ppm.should_terminate() then if event == "terminate" or ppm.should_terminate() then
log._warning("terminate requested, exiting...") log._warning("terminate requested, exiting...")
-- todo: attempt failover, alert hot backup -- @todo: attempt failover, alert hot backup
break break
end end
end end

View File

@ -1,15 +1,27 @@
-- #REQUIRES comms.lua -- #REQUIRES comms.lua
-- #REQUIRES modbus.lua
-- #REQUIRES mqueue.lua
-- #REQUIRES svsessions.lua
local PROTOCOLS = comms.PROTOCOLS
local RPLC_TYPES = comms.RPLC_TYPES
local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES
local RTU_ADVERT_TYPES = comms.RTU_ADVERT_TYPES
local SESSION_TYPE = svsessions.SESSION_TYPE
-- supervisory controller communications -- supervisory controller communications
function superv_comms(mode, num_reactors, modem, dev_listen, fo_channel, sv_channel) function superv_comms(mode, num_reactors, modem, dev_listen, fo_local, fo_peer, coord_listen)
local self = { local self = {
mode = mode, mode = mode,
seq_num = 0, fo_seq_num = 0,
ln_seq_num = 0,
num_reactors = num_reactors, num_reactors = num_reactors,
modem = modem, modem = modem,
dev_listen = dev_listen, dev_listen = dev_listen,
fo_channel = fo_channel, fo_rx = fo_local,
sv_channel = sv_channel, fo_tx = fo_peer,
coord_listen = coord_listen,
reactor_struct_cache = nil reactor_struct_cache = nil
} }
@ -20,14 +32,28 @@ function superv_comms(mode, num_reactors, modem, dev_listen, fo_channel, sv_chan
if not self.modem.isOpen(self.dev_listen) then if not self.modem.isOpen(self.dev_listen) then
self.modem.open(self.dev_listen) self.modem.open(self.dev_listen)
end end
if not self.modem.isOpen(self.fo_channel) then if not self.modem.isOpen(self.fo_rx) then
self.modem.open(self.fo_channel) self.modem.open(self.fo_rx)
end end
if not self.modem.isOpen(self.sv_channel) then if not self.modem.isOpen(self.coord_listen) then
self.modem.open(self.sv_channel) self.modem.open(self.coord_listen)
end end
end end
local _send_fo = function (msg)
local packet = comms.scada_packet()
packet.make(self.fo_seq_num, PROTOCOLS.SCADA_MGMT, msg)
self.modem.transmit(self.fo_tx, self.fo_rx, packet.raw())
self.fo_seq_num = self.fo_seq_num + 1
end
local _send_plc_linking = function (dest, msg)
local packet = comms.scada_packet()
packet.make(self.ln_seq_num, PROTOCOLS.RPLC, msg)
self.modem.transmit(dest, self.dev_listen, packet.raw())
self.ln_seq_num = self.ln_seq_num + 1
end
-- PUBLIC FUNCTIONS -- -- PUBLIC FUNCTIONS --
-- reconnect a newly connected modem -- reconnect a newly connected modem
@ -36,7 +62,115 @@ function superv_comms(mode, num_reactors, modem, dev_listen, fo_channel, sv_chan
_open_channels() _open_channels()
end end
-- parse a packet
local parse_packet = function(side, sender, reply_to, message, distance)
local pkt = nil
local s_pkt = scada_packet()
-- parse packet as generic SCADA packet
s_pkt.recieve(side, sender, reply_to, message, distance)
if s_pkt.is_valid() then
-- get as MODBUS TCP packet
if s_pkt.protocol() == PROTOCOLS.MODBUS_TCP then
local m_pkt = comms.modbus_packet()
if m_pkt.decode(s_pkt) then
pkt = m_pkt.get()
end
-- get as RPLC packet
elseif s_pkt.protocol() == PROTOCOLS.RPLC then
local rplc_pkt = comms.rplc_packet()
if rplc_pkt.decode(s_pkt) then
pkt = rplc_pkt.get()
end
-- get as SCADA management packet
elseif s_pkt.protocol() == PROTOCOLS.SCADA_MGMT then
local mgmt_pkt = comms.mgmt_packet()
if mgmt_pkt.decode(s_pkt) then
pkt = mgmt_packet.get()
end
-- get as coordinator packet
elseif s_pkt.protocol() == PROTOCOLS.COORD_DATA then
local coord_pkt = comms.coord_packet()
if coord_pkt.decode(s_pkt) then
pkt = coord_pkt.get()
end
else
log._debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true)
end
end
return pkt
end
local handle_packet = function(packet)
if packet ~= nil then
local sender = packet.scada_frame.sender()
local receiver = packet.scada_frame.receiver()
local protocol = packet.scada_frame.protocol()
-- device (RTU/PLC) listening channel
if receiver == self.dev_listen then
if protocol == PROTOCOLS.MODBUS_TCP then
-- MODBUS response
elseif protocol == PROTOCOLS.RPLC then
-- reactor PLC packet
local session = svsessions.find_session(SESSION_TYPE.PLC_SESSION, sender)
if session then
if packet.type == RPLC_TYPES.LINK_REQ then
-- new device on this port? that's a collision
_send_plc_linking(sender, { RPLC_LINKING.COLLISION })
else
-- pass the packet onto the session handler
session.in_queue.push_packet(packet)
end
else
-- unknown session, is this a linking request?
if packet.type == RPLC_TYPES.LINK_REQ then
-- this is a linking request
local plc_id = svsessions.establish_plc_session(sender)
if plc_id == false then
-- reactor already has a PLC assigned
_send_plc_linking(sender, { RPLC_LINKING.COLLISION })
else
-- got an ID; assigned to a reactor successfully
_send_plc_linking(sender, { RPLC_LINKING.ALLOW })
end
else
-- force a re-link
_send_plc_linking(sender, { RPLC_LINKING.DENY })
end
end
elseif protocol == PROTOCOLS.SCADA_MGMT then
-- SCADA management packet
else
log._debug("illegal packet type " .. protocol .. " on device listening channel")
end
-- failover listening channel
elseif receiver == self.fo_rx then
if protocol == PROTOCOLS.SCADA_MGMT then
-- SCADA management packet
else
log._debug("illegal packet type " .. protocol .. " on failover listening channel")
end
-- coordinator listening channel
elseif reciever == self.coord_listen then
if protocol == PROTOCOLS.SCADA_MGMT then
-- SCADA management packet
elseif protocol == PROTOCOLS.COORD_DATA then
-- coordinator packet
else
log._debug("illegal packet type " .. protocol .. " on coordinator listening channel")
end
else
log._error("received packet on unused channel " .. receiver, true)
end
end
end
return { return {
reconnect_modem = reconnect_modem reconnect_modem = reconnect_modem,
parse_packet = parse_packet,
handle_packet = handle_packet
} }
end end