#74 supervisor-coordinator comms establish

This commit is contained in:
Mikayla Fischler 2022-07-07 00:34:42 -04:00
parent 39672fedb4
commit ea17ba41fe
11 changed files with 379 additions and 64 deletions

View File

@ -4,4 +4,13 @@ local apisessions = {}
function apisessions.handle_packet(packet) function apisessions.handle_packet(packet)
end end
function apisessions.check_all_watchdogs()
end
function apisessions.close_all()
end
function apisessions.free_all_closed()
end
return apisessions return apisessions

View File

@ -17,7 +17,7 @@ local println_ts = util.println_ts
local PROTOCOLS = comms.PROTOCOLS local PROTOCOLS = comms.PROTOCOLS
local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES
local COORD_TYPES = comms.COORD_TYPES local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES
-- request the user to select a monitor -- request the user to select a monitor
---@param names table available monitors ---@param names table available monitors
@ -209,16 +209,16 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
_open_channels() _open_channels()
-- send a packet to the supervisor -- send a packet to the supervisor
---@param msg_type SCADA_MGMT_TYPES|COORD_TYPES ---@param msg_type SCADA_MGMT_TYPES|SCADA_CRDN_TYPES
---@param msg table ---@param msg table
local function _send_sv(protocol, msg_type, msg) local function _send_sv(protocol, msg_type, msg)
local s_pkt = comms.scada_packet() local s_pkt = comms.scada_packet()
local pkt = nil ---@type mgmt_packet|coord_packet local pkt = nil ---@type mgmt_packet|crdn_packet
if protocol == PROTOCOLS.SCADA_MGMT then if protocol == PROTOCOLS.SCADA_MGMT then
pkt = comms.mgmt_packet() pkt = comms.mgmt_packet()
elseif protocol == PROTOCOLS.COORD_DATA then elseif protocol == PROTOCOLS.SCADA_CRDN then
pkt = comms.coord_packet() pkt = comms.crdn_packet()
else else
return return
end end
@ -230,6 +230,17 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
self.sv_seq_num = self.sv_seq_num + 1 self.sv_seq_num = self.sv_seq_num + 1
end end
-- attempt connection establishment
local function _send_establish()
_send_sv(PROTOCOLS.SCADA_CRDN, SCADA_CRDN_TYPES.ESTABLISH, { version })
end
-- keep alive ack
---@param srv_time integer
local function _send_keep_alive_ack(srv_time)
_send_sv(PROTOCOLS.SCADA_MGMT, SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() })
end
-- PUBLIC FUNCTIONS -- -- PUBLIC FUNCTIONS --
-- reconnect a newly connected modem -- reconnect a newly connected modem
@ -251,7 +262,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
local start = util.time_s() local start = util.time_s()
local terminated = false local terminated = false
_send_sv(PROTOCOLS.COORD_DATA, COORD_TYPES.ESTABLISH, {}) _send_establish()
clock.start() clock.start()
@ -262,12 +273,12 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
if event == "timer" and clock.is_clock(p1) then if event == "timer" and clock.is_clock(p1) then
-- timed out attempt, try again -- timed out attempt, try again
tick_dmesg_waiting(math.max(0, timeout_s - (util.time_s() - start))) tick_dmesg_waiting(math.max(0, timeout_s - (util.time_s() - start)))
_send_sv(PROTOCOLS.COORD_DATA, COORD_TYPES.ESTABLISH, {}) _send_establish()
clock.start() clock.start()
elseif event == "modem_message" then elseif event == "modem_message" then
-- handle message -- handle message
local packet = public.parse_packet(p1, p2, p3, p4, p5) local packet = public.parse_packet(p1, p2, p3, p4, p5)
if packet ~= nil and packet.type == COORD_TYPES.ESTABLISH then if packet ~= nil and packet.type == SCADA_CRDN_TYPES.ESTABLISH then
public.handle_packet(packet) public.handle_packet(packet)
end end
elseif event == "terminate" then elseif event == "terminate" then
@ -291,7 +302,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
---@param reply_to integer ---@param reply_to integer
---@param message any ---@param message any
---@param distance integer ---@param distance integer
---@return mgmt_frame|coord_frame|capi_frame|nil packet ---@return mgmt_frame|crdn_frame|capi_frame|nil packet
function public.parse_packet(side, sender, reply_to, message, distance) function public.parse_packet(side, sender, reply_to, message, distance)
local pkt = nil local pkt = nil
local s_pkt = comms.scada_packet() local s_pkt = comms.scada_packet()
@ -307,10 +318,10 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
pkt = mgmt_pkt.get() pkt = mgmt_pkt.get()
end end
-- get as coordinator packet -- get as coordinator packet
elseif s_pkt.protocol() == PROTOCOLS.COORD_DATA then elseif s_pkt.protocol() == PROTOCOLS.SCADA_CRDN then
local coord_pkt = comms.coord_packet() local crdn_pkt = comms.crdn_packet()
if coord_pkt.decode(s_pkt) then if crdn_pkt.decode(s_pkt) then
pkt = coord_pkt.get() pkt = crdn_pkt.get()
end end
-- get as coordinator API packet -- get as coordinator API packet
elseif s_pkt.protocol() == PROTOCOLS.COORD_API then elseif s_pkt.protocol() == PROTOCOLS.COORD_API then
@ -327,7 +338,7 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
end end
-- handle a packet -- handle a packet
---@param packet mgmt_frame|coord_frame|capi_frame ---@param packet mgmt_frame|crdn_frame|capi_frame
function public.handle_packet(packet) function public.handle_packet(packet)
if packet ~= nil then if packet ~= nil then
local protocol = packet.scada_frame.protocol() local protocol = packet.scada_frame.protocol()
@ -349,8 +360,8 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
sv_watchdog.feed() sv_watchdog.feed()
-- handle packet -- handle packet
if protocol == PROTOCOLS.COORD_DATA then if protocol == PROTOCOLS.SCADA_CRDN then
if packet.type == COORD_TYPES.ESTABLISH then if packet.type == SCADA_CRDN_TYPES.ESTABLISH then
-- connection with supervisor established -- connection with supervisor established
if packet.length > 1 then if packet.length > 1 then
-- get configuration -- get configuration
@ -369,22 +380,38 @@ function coordinator.comms(version, modem, sv_port, sv_listen, api_listen, sv_wa
-- init database structure -- init database structure
database.init(conf) database.init(conf)
self.sv_linked = true
else else
log.debug("supervisor conn establish packet length mismatch") log.debug("supervisor conn establish packet length mismatch")
end end
else else
log.debug("supervisor conn establish packet length mismatch") log.debug("supervisor conn establish packet length mismatch")
end end
elseif packet.type == COORD_TYPES.QUERY_UNIT then elseif packet.type == SCADA_CRDN_TYPES.QUERY_UNIT then
elseif packet.type == COORD_TYPES.QUERY_FACILITY then elseif packet.type == SCADA_CRDN_TYPES.QUERY_FACILITY then
elseif packet.type == COORD_TYPES.COMMAND_UNIT then elseif packet.type == SCADA_CRDN_TYPES.COMMAND_UNIT then
elseif packet.type == COORD_TYPES.ALARM then elseif packet.type == SCADA_CRDN_TYPES.ALARM then
else else
log.warning("received unknown COORD_DATA packet type " .. packet.type) log.warning("received unknown SCADA_CRDN packet type " .. packet.type)
end end
elseif protocol == PROTOCOLS.SCADA_MGMT then elseif protocol == PROTOCOLS.SCADA_MGMT then
if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
-- keep alive response received -- keep alive request received, echo back
if packet.length == 1 then
local timestamp = packet.data[1]
local trip_time = util.time() - timestamp
if trip_time > 500 then
log.warning("coord KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)")
end
-- log.debug("coord RTT = " .. trip_time .. "ms")
_send_keep_alive_ack(timestamp)
else
log.debug("SCADA keep alive packet length mismatch")
end
elseif packet.type == SCADA_MGMT_TYPES.CLOSE then elseif packet.type == SCADA_MGMT_TYPES.CLOSE then
-- handle session close -- handle session close
sv_watchdog.cancel() sv_watchdog.cancel()

View File

@ -13,7 +13,7 @@ local config = require("coordinator.config")
local coordinator = require("coordinator.coordinator") local coordinator = require("coordinator.coordinator")
local renderer = require("coordinator.renderer") local renderer = require("coordinator.renderer")
local COORDINATOR_VERSION = "alpha-v0.3.0" local COORDINATOR_VERSION = "alpha-v0.3.2"
local print = util.print local print = util.print
local println = util.println local println = util.println

View File

@ -3,8 +3,6 @@
-- --
local core = require("graphics.core") local core = require("graphics.core")
local log = require("scada-common.log")
local util = require("scada-common.util")
local element = {} local element = {}
@ -31,9 +29,6 @@ function element.new(args)
bounds = { x1 = 1, y1 = 1, x2 = 1, y2 = 1} bounds = { x1 = 1, y1 = 1, x2 = 1, y2 = 1}
} }
---@fixme remove debug
log.dmesg("new " .. self.elem_type)
local protected = { local protected = {
window = nil, ---@type table window = nil, ---@type table
fg_bg = core.graphics.cpair(colors.white, colors.black), fg_bg = core.graphics.cpair(colors.white, colors.black),

View File

@ -16,7 +16,7 @@ local PROTOCOLS = {
MODBUS_TCP = 0, -- our "MODBUS TCP"-esque protocol MODBUS_TCP = 0, -- our "MODBUS TCP"-esque protocol
RPLC = 1, -- reactor PLC protocol RPLC = 1, -- reactor PLC protocol
SCADA_MGMT = 2, -- SCADA supervisor management, device advertisements, etc SCADA_MGMT = 2, -- SCADA supervisor management, device advertisements, etc
COORD_DATA = 3, -- data/control packets for coordinators to/from supervisory controllers SCADA_CRDN = 3, -- data/control packets for coordinators to/from supervisory controllers
COORD_API = 4 -- data/control packets for pocket computers to/from coordinators COORD_API = 4 -- data/control packets for pocket computers to/from coordinators
} }
@ -48,8 +48,8 @@ local SCADA_MGMT_TYPES = {
REMOTE_LINKED = 3 -- remote device linked REMOTE_LINKED = 3 -- remote device linked
} }
---@alias COORD_TYPES integer ---@alias SCADA_CRDN_TYPES integer
local COORD_TYPES = { local SCADA_CRDN_TYPES = {
ESTABLISH = 0, -- initial greeting ESTABLISH = 0, -- initial greeting
QUERY_UNIT = 1, -- query the state of a unit QUERY_UNIT = 1, -- query the state of a unit
QUERY_FACILITY = 2, -- query general facility status QUERY_FACILITY = 2, -- query general facility status
@ -80,7 +80,7 @@ comms.PROTOCOLS = PROTOCOLS
comms.RPLC_TYPES = RPLC_TYPES comms.RPLC_TYPES = RPLC_TYPES
comms.RPLC_LINKING = RPLC_LINKING comms.RPLC_LINKING = RPLC_LINKING
comms.SCADA_MGMT_TYPES = SCADA_MGMT_TYPES comms.SCADA_MGMT_TYPES = SCADA_MGMT_TYPES
comms.COORD_TYPES = COORD_TYPES comms.SCADA_CRDN_TYPES = SCADA_CRDN_TYPES
comms.RTU_UNIT_TYPES = RTU_UNIT_TYPES comms.RTU_UNIT_TYPES = RTU_UNIT_TYPES
-- generic SCADA packet object -- generic SCADA packet object
@ -438,7 +438,7 @@ function comms.mgmt_packet()
end end
-- SCADA coordinator packet -- SCADA coordinator packet
function comms.coord_packet() function comms.crdn_packet()
local self = { local self = {
frame = nil, frame = nil,
raw = nil, raw = nil,
@ -447,20 +447,20 @@ function comms.coord_packet()
data = nil data = nil
} }
---@class coord_packet ---@class crdn_packet
local public = {} local public = {}
-- check that type is known -- check that type is known
local function _coord_type_valid() local function _crdn_type_valid()
return self.type == COORD_TYPES.ESTABLISH or return self.type == SCADA_CRDN_TYPES.ESTABLISH or
self.type == COORD_TYPES.QUERY_UNIT or self.type == SCADA_CRDN_TYPES.QUERY_UNIT or
self.type == COORD_TYPES.QUERY_FACILITY or self.type == SCADA_CRDN_TYPES.QUERY_FACILITY or
self.type == COORD_TYPES.COMMAND_UNIT or self.type == SCADA_CRDN_TYPES.COMMAND_UNIT or
self.type == COORD_TYPES.ALARM self.type == SCADA_CRDN_TYPES.ALARM
end end
-- make a coordinator packet -- make a coordinator packet
---@param packet_type COORD_TYPES ---@param packet_type SCADA_CRDN_TYPES
---@param data table ---@param data table
function public.make(packet_type, data) function public.make(packet_type, data)
if type(data) == "table" then if type(data) == "table" then
@ -475,7 +475,7 @@ function comms.coord_packet()
insert(self.raw, data[i]) insert(self.raw, data[i])
end end
else else
log.error("comms.coord_packet.make(): data not table") log.error("comms.crdn_packet.make(): data not table")
end end
end end
@ -486,18 +486,18 @@ function comms.coord_packet()
if frame then if frame then
self.frame = frame self.frame = frame
if frame.protocol() == PROTOCOLS.COORD_DATA then if frame.protocol() == PROTOCOLS.SCADA_CRDN then
local ok = frame.length() >= 1 local ok = frame.length() >= 1
if ok then if ok then
local data = frame.data() local data = frame.data()
public.make(data[1], { table.unpack(data, 2, #data) }) public.make(data[1], { table.unpack(data, 2, #data) })
ok = _coord_type_valid() ok = _crdn_type_valid()
end end
return ok return ok
else else
log.debug("attempted COORD_DATA parse of incorrect protocol " .. frame.protocol(), true) log.debug("attempted SCADA_CRDN parse of incorrect protocol " .. frame.protocol(), true)
return false return false
end end
else else
@ -511,7 +511,7 @@ function comms.coord_packet()
-- get this packet as a frame with an immutable relation to this object -- get this packet as a frame with an immutable relation to this object
function public.get() function public.get()
---@class coord_frame ---@class crdn_frame
local frame = { local frame = {
scada_frame = self.frame, scada_frame = self.frame,
type = self.type, type = self.type,
@ -539,7 +539,7 @@ function comms.capi_packet()
---@class capi_packet ---@class capi_packet
local public = {} local public = {}
local function _coord_type_valid() local function _capi_type_valid()
-- @todo -- @todo
return false return false
end end
@ -577,7 +577,7 @@ function comms.capi_packet()
if ok then if ok then
local data = frame.data() local data = frame.data()
public.make(data[1], { table.unpack(data, 2, #data) }) public.make(data[1], { table.unpack(data, 2, #data) })
ok = _coord_type_valid() ok = _capi_type_valid()
end end
return ok return ok

View File

@ -183,7 +183,7 @@ function log.dmesg(msg, tag, tag_color)
out.write(lines[i]) out.write(lines[i])
end end
_log(util.c("[", t_stamp, "] ", tag, " ", msg)) _log(util.c("[", t_stamp, "] [", tag, "] ", msg))
return ts_coord return ts_coord
end end

View File

@ -62,7 +62,7 @@ function mqueue.new()
end end
-- push a packet onto the queue -- push a packet onto the queue
---@param packet scada_packet|modbus_packet|rplc_packet|coord_packet|capi_packet ---@param packet scada_packet|modbus_packet|rplc_packet|crdn_packet|capi_packet
function public.push_packet(packet) function public.push_packet(packet)
_push(TYPE.PACKET, packet) _push(TYPE.PACKET, packet)
end end

View File

@ -1,3 +1,210 @@
local comms = require("scada-common.comms")
local log = require("scada-common.log")
local mqueue = require("scada-common.mqueue")
local util = require("scada-common.util")
local coordinator = {} local coordinator = {}
local PROTOCOLS = comms.PROTOCOLS
local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES
local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES
local print = util.print
local println = util.println
local print_ts = util.print_ts
local println_ts = util.println_ts
local PERIODICS = {
KEEP_ALIVE = 2.0
}
-- coordinator supervisor session
---@param id integer
---@param in_queue mqueue
---@param out_queue mqueue
function coordinator.new_session(id, in_queue, out_queue)
local log_header = "crdn_session(" .. id .. "): "
local self = {
id = id,
in_q = in_queue,
out_q = out_queue,
-- connection properties
seq_num = 0,
r_seq_num = nil,
connected = true,
conn_watchdog = util.new_watchdog(3),
last_rtt = 0,
-- periodic messages
periodics = {
last_update = 0,
keep_alive = 0
}
}
-- mark this coordinator session as closed, stop watchdog
local function _close()
self.conn_watchdog.cancel()
self.connected = false
end
-- send a CRDN packet
---@param msg_type SCADA_CRDN_TYPES
---@param msg table
local function _send(msg_type, msg)
local s_pkt = comms.scada_packet()
local c_pkt = comms.crdn_packet()
c_pkt.make(msg_type, msg)
s_pkt.make(self.seq_num, PROTOCOLS.SCADA_CRDN, c_pkt.raw_sendable())
self.out_q.push_packet(s_pkt)
self.seq_num = self.seq_num + 1
end
-- send a SCADA management packet
---@param msg_type SCADA_MGMT_TYPES
---@param msg table
local function _send_mgmt(msg_type, msg)
local s_pkt = comms.scada_packet()
local m_pkt = comms.mgmt_packet()
m_pkt.make(msg_type, msg)
s_pkt.make(self.seq_num, PROTOCOLS.SCADA_MGMT, m_pkt.raw_sendable())
self.out_q.push_packet(s_pkt)
self.seq_num = self.seq_num + 1
end
-- handle a packet
---@param pkt crdn_frame
local function _handle_packet(pkt)
-- check sequence number
if self.r_seq_num == nil then
self.r_seq_num = pkt.scada_frame.seq_num()
elseif self.r_seq_num >= pkt.scada_frame.seq_num() then
log.warning(log_header .. "sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. pkt.scada_frame.seq_num())
return
else
self.r_seq_num = pkt.scada_frame.seq_num()
end
-- feed watchdog
self.conn_watchdog.feed()
-- process packet
if pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then
if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
-- keep alive reply
if pkt.length == 2 then
local srv_start = pkt.data[1]
local coord_send = pkt.data[2]
local srv_now = util.time()
self.last_rtt = srv_now - srv_start
if self.last_rtt > 500 then
log.warning(log_header .. "COORD KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. "ms)")
end
log.debug(log_header .. "COORD RTT = " .. self.last_rtt .. "ms")
log.debug(log_header .. "COORD TT = " .. (srv_now - coord_send) .. "ms")
else
log.debug(log_header .. "SCADA keep alive packet length mismatch")
end
elseif pkt.type == SCADA_MGMT_TYPES.CLOSE then
-- close the session
_close()
else
log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type)
end
elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_CRDN then
if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
else
end
end
end
---@class coord_session
local public = {}
-- get the session ID
function public.get_id() return self.id end
-- check if a timer matches this session's watchdog
function public.check_wd(timer)
return self.conn_watchdog.is_timer(timer) and self.connected
end
-- close the connection
function public.close()
_close()
_send_mgmt(SCADA_MGMT_TYPES.CLOSE, {})
println("connection to coordinator #" .. self.id .. " closed by server")
log.info(log_header .. "session closed by server")
end
-- iterate the session
---@return boolean connected
function public.iterate()
if self.connected then
------------------
-- handle queue --
------------------
local handle_start = util.time()
while self.in_q.ready() and self.connected do
-- get a new message to process
local message = self.in_q.pop()
if message ~= nil then
if message.qtype == mqueue.TYPE.PACKET then
-- handle a packet
_handle_packet(message.message)
elseif message.qtype == mqueue.TYPE.COMMAND then
-- handle instruction
elseif message.qtype == mqueue.TYPE.DATA then
-- instruction with body
end
end
-- max 100ms spent processing queue
if util.time() - handle_start > 100 then
log.warning(log_header .. "exceeded 100ms queue process limit")
break
end
end
-- exit if connection was closed
if not self.connected then
println("connection to coordinator " .. self.id .. " closed by remote host")
log.info(log_header .. "session closed by remote host")
return self.connected
end
----------------------
-- update periodics --
----------------------
local elapsed = util.time() - self.periodics.last_update
local periodics = self.periodics
-- keep alive
periodics.keep_alive = periodics.keep_alive + elapsed
if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then
_send_mgmt(SCADA_MGMT_TYPES.KEEP_ALIVE, { util.time() })
periodics.keep_alive = 0
end
self.periodics.last_update = util.time()
end
return self.connected
end
return public
end
return coordinator return coordinator

View File

@ -243,6 +243,34 @@ function svsessions.establish_rtu_session(local_port, remote_port, advertisement
return rtu_s.instance.get_id() return rtu_s.instance.get_id()
end end
-- establish a new coordinator session
---@param local_port integer
---@param remote_port integer
---@param version string
---@return integer|false session_id
function svsessions.establish_coord_session(local_port, remote_port, version)
---@class coord_session_struct
local coord_s = {
open = true,
version = version,
l_port = local_port,
r_port = remote_port,
in_queue = mqueue.new(),
out_queue = mqueue.new(),
instance = nil
}
coord_s.instance = coordinator.new_session(self.next_coord_id, coord_s.in_queue, coord_s.out_queue)
table.insert(self.coord_sessions, coord_s)
log.debug("established new coordinator session to " .. remote_port .. " with ID " .. self.next_coord_id)
self.next_coord_id = self.next_coord_id + 1
-- success
return coord_s.instance.get_id()
end
-- attempt to identify which session's watchdog timer fired -- attempt to identify which session's watchdog timer fired
---@param timer_event number ---@param timer_event number
function svsessions.check_all_watchdogs(timer_event) function svsessions.check_all_watchdogs(timer_event)

View File

@ -13,7 +13,7 @@ local svsessions = require("supervisor.session.svsessions")
local config = require("supervisor.config") local config = require("supervisor.config")
local supervisor = require("supervisor.supervisor") local supervisor = require("supervisor.supervisor")
local SUPERVISOR_VERSION = "beta-v0.4.14" local SUPERVISOR_VERSION = "beta-v0.5.1"
local print = util.print local print = util.print
local println = util.println local println = util.println
@ -72,7 +72,8 @@ if modem == nil then
end end
-- start comms, open all channels -- start comms, open all channels
local superv_comms = supervisor.comms(SUPERVISOR_VERSION, config.NUM_REACTORS, modem, config.SCADA_DEV_LISTEN, config.SCADA_SV_LISTEN) local superv_comms = supervisor.comms(SUPERVISOR_VERSION, config.NUM_REACTORS, config.REACTOR_COOLING, modem,
config.SCADA_DEV_LISTEN, config.SCADA_SV_LISTEN)
-- base loop clock (6.67Hz, 3 ticks) -- base loop clock (6.67Hz, 3 ticks)
local MAIN_CLOCK = 0.15 local MAIN_CLOCK = 0.15

View File

@ -9,8 +9,9 @@ local supervisor = {}
local PROTOCOLS = comms.PROTOCOLS local PROTOCOLS = comms.PROTOCOLS
local RPLC_TYPES = comms.RPLC_TYPES local RPLC_TYPES = comms.RPLC_TYPES
local RPLC_LINKING = comms.RPLC_LINKING local RPLC_LINKING = comms.RPLC_LINKING
local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES
local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES local RTU_UNIT_TYPES = comms.RTU_UNIT_TYPES
local SCADA_MGMT_TYPES = comms.SCADA_MGMT_TYPES
local SCADA_CRDN_TYPES = comms.SCADA_CRDN_TYPES
local SESSION_TYPE = svsessions.SESSION_TYPE local SESSION_TYPE = svsessions.SESSION_TYPE
@ -22,10 +23,11 @@ local println_ts = util.println_ts
-- supervisory controller communications -- supervisory controller communications
---@param version string ---@param version string
---@param num_reactors integer ---@param num_reactors integer
---@param cooling_conf table
---@param modem table ---@param modem table
---@param dev_listen integer ---@param dev_listen integer
---@param coord_listen integer ---@param coord_listen integer
function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen) function supervisor.comms(version, num_reactors, cooling_conf, modem, dev_listen, coord_listen)
local self = { local self = {
version = version, version = version,
num_reactors = num_reactors, num_reactors = num_reactors,
@ -57,7 +59,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
-- link modem to svsessions -- link modem to svsessions
svsessions.link_modem(self.modem) svsessions.link_modem(self.modem)
-- send PLC link request responses -- send PLC link request response
---@param dest integer ---@param dest integer
---@param msg table ---@param msg table
local function _send_plc_linking(seq_id, dest, msg) local function _send_plc_linking(seq_id, dest, msg)
@ -70,7 +72,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable())
end end
-- send RTU advertisement responses -- send RTU advertisement response
---@param seq_id integer ---@param seq_id integer
---@param dest integer ---@param dest integer
local function _send_remote_linked(seq_id, dest) local function _send_remote_linked(seq_id, dest)
@ -83,6 +85,26 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable()) self.modem.transmit(dest, self.dev_listen, s_pkt.raw_sendable())
end end
-- send coordinator connection establish response
---@param seq_id integer
---@param dest integer
local function _send_crdn_establish(seq_id, dest)
local s_pkt = comms.scada_packet()
local c_pkt = comms.crdn_packet()
local config = { self.num_reactors }
for i = 1, #cooling_conf do
table.insert(config, cooling_conf[i].BOILERS)
table.insert(config, cooling_conf[i].TURBINES)
end
c_pkt.make(SCADA_CRDN_TYPES.ESTABLISH, config)
s_pkt.make(seq_id, PROTOCOLS.SCADA_CRDN, c_pkt.raw_sendable())
self.modem.transmit(dest, self.coord_listen, s_pkt.raw_sendable())
end
-- PUBLIC FUNCTIONS -- -- PUBLIC FUNCTIONS --
-- reconnect a newly connected modem -- reconnect a newly connected modem
@ -100,7 +122,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
---@param reply_to integer ---@param reply_to integer
---@param message any ---@param message any
---@param distance integer ---@param distance integer
---@return modbus_frame|rplc_frame|mgmt_frame|coord_frame|nil packet ---@return modbus_frame|rplc_frame|mgmt_frame|crdn_frame|nil packet
function public.parse_packet(side, sender, reply_to, message, distance) function public.parse_packet(side, sender, reply_to, message, distance)
local pkt = nil local pkt = nil
local s_pkt = comms.scada_packet() local s_pkt = comms.scada_packet()
@ -128,10 +150,10 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
pkt = mgmt_pkt.get() pkt = mgmt_pkt.get()
end end
-- get as coordinator packet -- get as coordinator packet
elseif s_pkt.protocol() == PROTOCOLS.COORD_DATA then elseif s_pkt.protocol() == PROTOCOLS.SCADA_CRDN then
local coord_pkt = comms.coord_packet() local crdn_pkt = comms.crdn_packet()
if coord_pkt.decode(s_pkt) then if crdn_pkt.decode(s_pkt) then
pkt = coord_pkt.get() pkt = crdn_pkt.get()
end end
else else
log.debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true) log.debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true)
@ -142,7 +164,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
end end
-- handle a packet -- handle a packet
---@param packet modbus_frame|rplc_frame|mgmt_frame|coord_frame ---@param packet modbus_frame|rplc_frame|mgmt_frame|crdn_frame
function public.handle_packet(packet) function public.handle_packet(packet)
if packet ~= nil then if packet ~= nil then
local l_port = packet.scada_frame.local_port() local l_port = packet.scada_frame.local_port()
@ -226,7 +248,7 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
end end
else else
-- any other packet should be session related, discard it -- any other packet should be session related, discard it
log.debug("discarding SCADA_MGMT packet without a known session") log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_MGMT packet without a known session"))
end end
else else
log.debug("illegal packet type " .. protocol .. " on device listening channel") log.debug("illegal packet type " .. protocol .. " on device listening channel")
@ -238,8 +260,34 @@ function supervisor.comms(version, num_reactors, modem, dev_listen, coord_listen
if protocol == PROTOCOLS.SCADA_MGMT then if protocol == PROTOCOLS.SCADA_MGMT then
-- SCADA management packet -- SCADA management packet
elseif protocol == PROTOCOLS.COORD_DATA then if session ~= nil then
-- pass the packet onto the session handler
session.in_queue.push_packet(packet)
else
-- any other packet should be session related, discard it
log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_MGMT packet without a known session"))
end
elseif protocol == PROTOCOLS.SCADA_CRDN then
-- coordinator packet -- coordinator packet
if session ~= nil then
-- pass the packet onto the session handler
session.in_queue.push_packet(packet)
elseif packet.type == SCADA_CRDN_TYPES.ESTABLISH then
if packet.length == 1 then
-- this is an attempt to establish a new session
println(util.c("connected to coordinator [:", r_port, "]"))
svsessions.establish_coord_session(l_port, r_port, packet.data[1])
log.debug("CRDN_ESTABLISH: connected to " .. r_port)
_send_crdn_establish(packet.scada_frame.seq_num() + 1, r_port)
else
log.debug("CRDN_ESTABLISH: establish packet length mismatch")
end
else
-- any other packet should be session related, discard it
log.debug(util.c(r_port, "->", l_port, ": discarding SCADA_CRDN packet without a known session"))
end
else else
log.debug("illegal packet type " .. protocol .. " on coordinator listening channel") log.debug("illegal packet type " .. protocol .. " on coordinator listening channel")
end end