RTU/PLC code cleanup, #46 changed KEEP_ALIVE to scada message type and use it for the RTU too

This commit is contained in:
Mikayla Fischler 2022-05-09 15:00:16 -04:00
parent 679d98c8bf
commit 25558df22d
9 changed files with 191 additions and 148 deletions

View File

@ -269,7 +269,7 @@ plc.rps_init = function (reactor)
end
-- reactor PLC communications
plc.comms = function (id, modem, local_port, server_port, reactor, rps)
plc.comms = function (id, modem, local_port, server_port, reactor, rps, conn_watchdog)
local self = {
id = id,
seq_num = 0,
@ -279,6 +279,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
l_port = local_port,
reactor = reactor,
rps = rps,
conn_watchdog = conn_watchdog,
scrammed = false,
linked = false,
status_cache = nil,
@ -398,7 +399,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
-- keep alive ack
local _send_keep_alive_ack = function (srv_time)
_send(RPLC_TYPES.KEEP_ALIVE, { srv_time, util.time() })
_send(SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() })
end
-- general ack
@ -456,8 +457,8 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
end
-- close the connection to the server
local close = function (conn_watchdog)
conn_watchdog.cancel()
local close = function ()
self.conn_watchdog.cancel()
unlink()
_send_mgmt(SCADA_MGMT_TYPES.CLOSE, {})
end
@ -478,7 +479,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
local sys_status = {
util.time(), -- timestamp
(not self.scrammed), -- enabled
(not self.scrammed), -- requested control state
rps.is_tripped(), -- overridden
degraded, -- degraded
self.reactor.getHeatingRate(), -- heating rate
@ -542,7 +543,7 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
end
-- handle an RPLC packet
local handle_packet = function (packet, plc_state, setpoints, conn_watchdog)
local handle_packet = function (packet, plc_state, setpoints)
if packet ~= nil then
-- check sequence number
if self.r_seq_num == nil then
@ -554,29 +555,13 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
self.r_seq_num = packet.scada_frame.seq_num()
end
-- feed the watchdog first so it doesn't uhh...eat our packets
conn_watchdog.feed()
-- feed the watchdog first so it doesn't uhh...eat our packets :)
self.conn_watchdog.feed()
-- handle packet
if packet.scada_frame.protocol() == PROTOCOLS.RPLC then
if self.linked then
if packet.type == RPLC_TYPES.KEEP_ALIVE then
-- keep alive request received, echo back
if packet.length == 1 then
local timestamp = packet.data[1]
local trip_time = util.time() - timestamp
if trip_time > 500 then
log.warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. ")")
end
-- log.debug("RPLC RTT = ".. trip_time .. "ms")
_send_keep_alive_ack(timestamp)
else
log.debug("RPLC keep alive packet length mismatch")
end
elseif packet.type == RPLC_TYPES.LINK_REQ then
if packet.type == RPLC_TYPES.LINK_REQ then
-- link request confirmation
if packet.length == 1 then
log.debug("received unsolicited link request response")
@ -694,15 +679,34 @@ plc.comms = function (id, modem, local_port, server_port, reactor, rps)
log.debug("discarding non-link packet before linked")
end
elseif packet.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then
-- handle session close
if packet.type == SCADA_MGMT_TYPES.CLOSE then
conn_watchdog.cancel()
if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
-- keep alive request received, echo back
if packet.length == 1 then
local timestamp = packet.data[1]
local trip_time = util.time() - timestamp
if trip_time > 500 then
log.warning("PLC KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)")
end
-- log.debug("RPLC RTT = ".. trip_time .. "ms")
_send_keep_alive_ack(timestamp)
else
log.debug("SCADA keep alive packet length mismatch")
end
elseif packet.type == SCADA_MGMT_TYPES.CLOSE then
-- handle session close
self.conn_watchdog.cancel()
unlink()
println_ts("server connection closed by remote host")
log.warning("server connection closed by remote host")
else
log.warning("received unknown SCADA_MGMT packet type " .. packet.type)
end
else
-- should be unreachable assuming packet is from parse_packet()
log.error("illegal packet type " .. protocol, true)
end
end
end

View File

@ -11,7 +11,7 @@ local config = require("config")
local plc = require("plc")
local threads = require("threads")
local R_PLC_VERSION = "alpha-v0.6.4"
local R_PLC_VERSION = "alpha-v0.6.5"
local print = util.print
local println = util.println
@ -102,30 +102,35 @@ function init()
-- init reactor protection system
smem_sys.rps = plc.rps_init(smem_dev.reactor)
log.debug("rps init")
log.debug("init> rps init")
if __shared_memory.networked then
-- start comms
smem_sys.plc_comms = plc.comms(config.REACTOR_ID, smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT, smem_dev.reactor, smem_sys.rps)
log.debug("comms init")
-- comms watchdog, 3 second timeout
smem_sys.conn_watchdog = util.new_watchdog(3)
log.debug("conn watchdog started")
log.debug("init> conn watchdog started")
-- start comms
smem_sys.plc_comms = plc.comms(config.REACTOR_ID, smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT, smem_dev.reactor, smem_sys.rps, smem_sys.conn_watchdog)
log.debug("init> comms init")
else
println("boot> starting in offline mode");
log.debug("running without networking")
log.debug("init> running without networking")
end
os.queueEvent("clock_start")
println("boot> completed");
log.debug("init> boot completed")
else
println("boot> system in degraded state, awaiting devices...")
log.warning("booted in a degraded state, awaiting peripheral connections...")
log.warning("init> booted in a degraded state, awaiting peripheral connections...")
end
end
----------------------------------------
-- start system
----------------------------------------
-- initialize PLC
init()

View File

@ -125,14 +125,15 @@ rtu.init_unit = function ()
}
end
rtu.comms = function (modem, local_port, server_port)
rtu.comms = function (modem, local_port, server_port, conn_watchdog)
local self = {
seq_num = 0,
r_seq_num = nil,
txn_id = 0,
modem = modem,
s_port = server_port,
l_port = local_port
l_port = local_port,
conn_watchdog = conn_watchdog
}
-- open modem
@ -153,8 +154,21 @@ rtu.comms = function (modem, local_port, server_port)
self.seq_num = self.seq_num + 1
end
-- keep alive ack
local _send_keep_alive_ack = function (srv_time)
_send(SCADA_MGMT_TYPES.KEEP_ALIVE, { srv_time, util.time() })
end
-- PUBLIC FUNCTIONS --
-- send a MODBUS TCP packet
local send_modbus = function (m_pkt)
local s_pkt = comms.scada_packet()
s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable())
self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable())
self.seq_num = self.seq_num + 1
end
-- reconnect a newly connected modem
local reconnect_modem = function (modem)
self.modem = modem
@ -165,12 +179,47 @@ rtu.comms = function (modem, local_port, server_port)
end
end
-- send a MODBUS TCP packet
local send_modbus = function (m_pkt)
local s_pkt = comms.scada_packet()
s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable())
self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable())
self.seq_num = self.seq_num + 1
-- unlink from the server
local unlink = function (rtu_state)
rtu_state.linked = false
self.r_seq_num = nil
end
-- close the connection to the server
local close = function (rtu_state)
self.conn_watchdog.cancel()
unlink(rtu_state)
_send(SCADA_MGMT_TYPES.CLOSE, {})
end
-- send capability advertisement
local send_advertisement = function (units)
local advertisement = {}
for i = 1, #units do
local unit = units[i]
local type = comms.rtu_t_to_advert_type(unit.type)
if type ~= nil then
if type == RTU_ADVERT_TYPES.REDSTONE then
insert(advertisement, {
type = type,
index = unit.index,
reactor = unit.for_reactor,
rsio = unit.device
})
else
insert(advertisement, {
type = type,
index = unit.index,
reactor = unit.for_reactor,
rsio = nil
})
end
end
end
_send(SCADA_MGMT_TYPES.RTU_ADVERT, advertisement)
end
-- parse a MODBUS/SCADA packet
@ -203,7 +252,7 @@ rtu.comms = function (modem, local_port, server_port)
end
-- handle a MODBUS/SCADA packet
local handle_packet = function(packet, units, rtu_state, conn_watchdog)
local handle_packet = function(packet, units, rtu_state)
if packet ~= nil then
local seq_ok = true
@ -218,7 +267,7 @@ rtu.comms = function (modem, local_port, server_port)
end
-- feed watchdog on valid sequence number
conn_watchdog.feed()
self.conn_watchdog.feed()
local protocol = packet.scada_frame.protocol()
@ -257,10 +306,28 @@ rtu.comms = function (modem, local_port, server_port)
send_modbus(reply)
elseif protocol == PROTOCOLS.SCADA_MGMT then
-- SCADA management packet
if packet.type == SCADA_MGMT_TYPES.CLOSE then
if packet.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
-- keep alive request received, echo back
if packet.length == 1 then
local timestamp = packet.data[1]
local trip_time = util.time() - timestamp
if trip_time > 500 then
log.warning("RTU KEEP_ALIVE trip time > 500ms (" .. trip_time .. "ms)")
end
-- log.debug("RTU RTT = ".. trip_time .. "ms")
_send_keep_alive_ack(timestamp)
else
log.debug("SCADA keep alive packet length mismatch")
end
elseif packet.type == SCADA_MGMT_TYPES.CLOSE then
-- close connection
conn_watchdog.cancel()
self.conn_watchdog.cancel()
unlink(rtu_state)
println_ts("server connection closed by remote host")
log.warning("server connection closed by remote host")
elseif packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then
-- acknowledgement
rtu_state.linked = true
@ -279,50 +346,6 @@ rtu.comms = function (modem, local_port, server_port)
end
end
-- send capability advertisement
local send_advertisement = function (units)
local advertisement = {}
for i = 1, #units do
local unit = units[i]
local type = comms.rtu_t_to_advert_type(unit.type)
if type ~= nil then
if type == RTU_ADVERT_TYPES.REDSTONE then
insert(advertisement, {
type = type,
index = unit.index,
reactor = unit.for_reactor,
rsio = unit.device
})
else
insert(advertisement, {
type = type,
index = unit.index,
reactor = unit.for_reactor,
rsio = nil
})
end
end
end
_send(SCADA_MGMT_TYPES.RTU_ADVERT, advertisement)
end
local send_heartbeat = function ()
_send(SCADA_MGMT_TYPES.RTU_HEARTBEAT, {})
end
local unlink = function (rtu_state)
rtu_state.linked = false
self.r_seq_num = nil
end
local close = function (rtu_state)
unlink(rtu_state)
_send(SCADA_MGMT_TYPES.CLOSE, {})
end
return {
send_modbus = send_modbus,
reconnect_modem = reconnect_modem,

View File

@ -22,7 +22,7 @@ local imatrix_rtu = require("dev.imatrix_rtu")
local turbine_rtu = require("dev.turbine_rtu")
local turbinev_rtu = require("dev.turbinev_rtu")
local RTU_VERSION = "alpha-v0.6.0"
local RTU_VERSION = "alpha-v0.6.1"
local rtu_t = types.rtu_t
@ -80,8 +80,6 @@ if smem_dev.modem == nil then
return
end
smem_sys.rtu_comms = rtu.comms(smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT)
----------------------------------------
-- interpret config and init units
----------------------------------------
@ -230,14 +228,18 @@ end
-- start system
----------------------------------------
-- start connection watchdog
smem_sys.conn_watchdog = util.new_watchdog(5)
log.debug("boot> conn watchdog started")
-- setup comms
smem_sys.rtu_comms = rtu.comms(smem_dev.modem, config.LISTEN_PORT, config.SERVER_PORT, smem_sys.conn_watchdog)
log.debug("boot> comms init")
-- init threads
local main_thread = threads.thread__main(__shared_memory)
local comms_thread = threads.thread__comms(__shared_memory)
-- start connection watchdog
smem_sys.conn_watchdog = util.new_watchdog(5)
log.debug("init> conn watchdog started")
-- assemble thread list
local _threads = { main_thread.exec, comms_thread.exec }
for i = 1, #units do

View File

@ -180,8 +180,7 @@ threads.thread__comms = function (smem)
elseif msg.qtype == mqueue.TYPE.PACKET then
-- received a packet
-- handle the packet (rtu_state passed to allow setting link flag)
-- (conn_watchdog passed to allow feeding watchdog)
rtu_comms.handle_packet(msg.message, units, rtu_state, conn_watchdog)
rtu_comms.handle_packet(msg.message, units, rtu_state)
end
-- quick yield
@ -211,7 +210,6 @@ threads.thread__unit_comms = function (smem, unit)
-- load in from shared memory
local rtu_state = smem.rtu_state
local packet_queue = unit.pkt_queue
local last_update = util.time()

View File

@ -18,16 +18,15 @@ local PROTOCOLS = {
}
local RPLC_TYPES = {
KEEP_ALIVE = 0, -- keep alive packets
LINK_REQ = 1, -- linking requests
STATUS = 2, -- reactor/system status
MEK_STRUCT = 3, -- mekanism build structure
MEK_BURN_RATE = 4, -- set burn rate
RPS_ENABLE = 5, -- enable reactor
RPS_SCRAM = 6, -- SCRAM reactor
RPS_STATUS = 7, -- RPS status
RPS_ALARM = 8, -- RPS alarm broadcast
RPS_RESET = 9 -- clear RPS trip (if in bad state, will trip immediately)
LINK_REQ = 0, -- linking requests
STATUS = 1, -- reactor/system status
MEK_STRUCT = 2, -- mekanism build structure
MEK_BURN_RATE = 3, -- set burn rate
RPS_ENABLE = 4, -- enable reactor
RPS_SCRAM = 5, -- SCRAM reactor
RPS_STATUS = 6, -- RPS status
RPS_ALARM = 7, -- RPS alarm broadcast
RPS_RESET = 8 -- clear RPS trip (if in bad state, will trip immediately)
}
local RPLC_LINKING = {
@ -37,11 +36,10 @@ local RPLC_LINKING = {
}
local SCADA_MGMT_TYPES = {
PING = 0, -- generic ping
KEEP_ALIVE = 0, -- keep alive packet w/ RTT
CLOSE = 1, -- close a connection
REMOTE_LINKED = 2, -- remote device linked
RTU_ADVERT = 3, -- RTU capability advertisement
RTU_HEARTBEAT = 4 -- RTU heartbeat
RTU_ADVERT = 2, -- RTU capability advertisement
REMOTE_LINKED = 3 -- remote device linked
}
local RTU_ADVERT_TYPES = {

View File

@ -243,24 +243,7 @@ plc.new_session = function (id, for_reactor, in_queue, out_queue)
self.plc_conn_watchdog.feed()
-- handle packet by type
if pkt.type == RPLC_TYPES.KEEP_ALIVE then
-- keep alive reply
if pkt.length == 2 then
local srv_start = pkt.data[1]
local plc_send = pkt.data[2]
local srv_now = util.time()
self.last_rtt = srv_now - srv_start
if self.last_rtt > 500 then
log.warning(log_header .. "PLC KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. ")")
end
-- log.debug(log_header .. "RPLC RTT = ".. self.last_rtt .. "ms")
-- log.debug(log_header .. "RPLC TT = ".. (srv_now - plc_send) .. "ms")
else
log.debug(log_header .. "RPLC keep alive packet length mismatch")
end
elseif pkt.type == RPLC_TYPES.STATUS then
if pkt.type == RPLC_TYPES.STATUS then
-- status packet received, update data
if pkt.length >= 5 then
self.sDB.last_status_update = pkt.data[1]
@ -366,7 +349,24 @@ plc.new_session = function (id, for_reactor, in_queue, out_queue)
log.debug(log_header .. "handler received unsupported RPLC packet type " .. pkt.type)
end
elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then
if pkt.type == SCADA_MGMT_TYPES.CLOSE then
if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
-- keep alive reply
if pkt.length == 2 then
local srv_start = pkt.data[1]
local plc_send = pkt.data[2]
local srv_now = util.time()
self.last_rtt = srv_now - srv_start
if self.last_rtt > 500 then
log.warning(log_header .. "PLC KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. "ms)")
end
-- log.debug(log_header .. "PLC RTT = ".. self.last_rtt .. "ms")
-- log.debug(log_header .. "PLC TT = ".. (srv_now - plc_send) .. "ms")
else
log.debug(log_header .. "SCADA keep alive packet length mismatch")
end
elseif pkt.type == SCADA_MGMT_TYPES.CLOSE then
-- close the session
self.connected = false
else
@ -497,7 +497,7 @@ plc.new_session = function (id, for_reactor, in_queue, out_queue)
periodics.keep_alive = periodics.keep_alive + elapsed
if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then
_send(RPLC_TYPES.KEEP_ALIVE, { util.time() })
_send_mgmt(SCADA_MGMT_TYPES.KEEP_ALIVE, { util.time() })
periodics.keep_alive = 0
end

View File

@ -35,7 +35,7 @@ rtu.new_session = function (id, in_queue, out_queue)
}
-- send a MODBUS TCP packet
local send_modbus = function (m_pkt)
local _send_modbus = function (m_pkt)
local s_pkt = comms.scada_packet()
s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable())
self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable())
@ -66,16 +66,31 @@ rtu.new_session = function (id, in_queue, out_queue)
self.r_seq_num = pkt.scada_frame.seq_num()
end
-- feed watchdog
self.rtu_conn_watchdog.feed()
-- process packet
if pkt.scada_frame.protocol() == PROTOCOLS.MODBUS_TCP then
-- feed watchdog
self.rtu_conn_watchdog.feed()
elseif pkt.scada_frame.protocol() == PROTOCOLS.SCADA_MGMT then
-- feed watchdog
self.rtu_conn_watchdog.feed()
if pkt.type == SCADA_MGMT_TYPES.CLOSE then
if pkt.type == SCADA_MGMT_TYPES.KEEP_ALIVE then
-- keep alive reply
if pkt.length == 2 then
local srv_start = pkt.data[1]
local rtu_send = pkt.data[2]
local srv_now = util.time()
self.last_rtt = srv_now - srv_start
if self.last_rtt > 500 then
log.warning(log_header .. "RTU KEEP_ALIVE round trip time > 500ms (" .. self.last_rtt .. "ms)")
end
-- log.debug(log_header .. "RTU RTT = ".. self.last_rtt .. "ms")
-- log.debug(log_header .. "RTU TT = ".. (srv_now - rtu_send) .. "ms")
else
log.debug(log_header .. "SCADA keep alive packet length mismatch")
end
elseif pkt.type == SCADA_MGMT_TYPES.CLOSE then
-- close the session
self.connected = false
elseif pkt.type == SCADA_MGMT_TYPES.RTU_ADVERT then
@ -84,8 +99,6 @@ rtu.new_session = function (id, in_queue, out_queue)
local unit = packet.data[i]
unit
end
elseif pkt.type == SCADA_MGMT_TYPES.RTU_HEARTBEAT then
-- periodic RTU heartbeat
else
log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type)
end
@ -162,7 +175,7 @@ rtu.new_session = function (id, in_queue, out_queue)
periodics.keep_alive = periodics.keep_alive + elapsed
if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then
-- _send(RPLC_TYPES.KEEP_ALIVE, { util.time() })
_send_mgmt(SCADA_MGMT_TYPES.KEEP_ALIVE, { util.time() })
periodics.keep_alive = 0
end

View File

@ -14,7 +14,7 @@ local svsessions = require("session.svsessions")
local config = require("config")
local supervisor = require("supervisor")
local SUPERVISOR_VERSION = "alpha-v0.3.3"
local SUPERVISOR_VERSION = "alpha-v0.3.4"
local print = util.print
local println = util.println