diff --git a/pocket/pocket.lua b/pocket/pocket.lua index 2f537a3..11a8307 100644 --- a/pocket/pocket.lua +++ b/pocket/pocket.lua @@ -1,5 +1,384 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local util = require("scada-common.util") +local PROTOCOL = comms.PROTOCOL +local DEVICE_TYPE = comms.DEVICE_TYPE +local ESTABLISH_ACK = comms.ESTABLISH_ACK +local SCADA_MGMT_TYPE = comms.SCADA_MGMT_TYPE +local CAPI_TYPE = comms.CAPI_TYPE local pocket = {} +-- pocket coordinator + supervisor communications +---@nodiscard +---@param version string pocket version +---@param modem table modem device +---@param local_port integer local pocket port +---@param sv_port integer port of supervisor +---@param api_port integer port of coordinator API +---@param range integer trusted device connection range +---@param sv_watchdog watchdog +---@param api_watchdog watchdog +function pocket.comms(version, modem, local_port, sv_port, api_port, range, sv_watchdog, api_watchdog) + local self = { + sv = { + linked = false, + seq_num = 0, + r_seq_num = nil, ---@type nil|integer + last_est_ack = ESTABLISH_ACK.ALLOW + }, + api = { + linked = false, + seq_num = 0, + r_seq_num = nil, ---@type nil|integer + last_est_ack = ESTABLISH_ACK.ALLOW + }, + establish_delay_counter = 0 + } + + comms.set_trusted_range(range) + + -- PRIVATE FUNCTIONS -- + + -- configure modem channels + local function _conf_channels() + modem.closeAll() + modem.open(local_port) + end + + _conf_channels() + + -- send a management packet to the supervisor + ---@param msg_type SCADA_MGMT_TYPE + ---@param msg table + local function _send_sv(msg_type, msg) + local s_pkt = comms.scada_packet() + local pkt = comms.mgmt_packet() + + pkt.make(msg_type, msg) + s_pkt.make(self.sv.seq_num, PROTOCOL.SCADA_MGMT, pkt.raw_sendable()) + + modem.transmit(sv_port, local_port, s_pkt.raw_sendable()) + self.sv.seq_num = self.sv.seq_num + 1 + end + + -- send a management packet to the coordinator + ---@param msg_type SCADA_MGMT_TYPE + ---@param msg table + local function _send_crd(msg_type, msg) + local s_pkt = comms.scada_packet() + local pkt = comms.mgmt_packet() + + pkt.make(msg_type, msg) + s_pkt.make(self.api.seq_num, PROTOCOL.SCADA_MGMT, pkt.raw_sendable()) + + modem.transmit(api_port, local_port, s_pkt.raw_sendable()) + self.api.seq_num = self.api.seq_num + 1 + end + + -- send a packet to the coordinator API + ---@param msg_type CAPI_TYPE + ---@param msg table + local function _send_api(msg_type, msg) + local s_pkt = comms.scada_packet() + local pkt = comms.capi_packet() + + pkt.make(msg_type, msg) + s_pkt.make(self.api.seq_num, PROTOCOL.COORD_API, pkt.raw_sendable()) + + modem.transmit(api_port, local_port, s_pkt.raw_sendable()) + self.api.seq_num = self.api.seq_num + 1 + end + + -- attempt supervisor connection establishment + local function _send_sv_establish() + _send_sv(SCADA_MGMT_TYPE.ESTABLISH, { comms.version, version, DEVICE_TYPE.PKT }) + end + + -- attempt coordinator API connection establishment + local function _send_api_establish() + _send_crd(SCADA_MGMT_TYPE.ESTABLISH, { comms.version, version, DEVICE_TYPE.PKT }) + end + + -- keep alive ack to supervisor + ---@param srv_time integer + local function _send_sv_keep_alive_ack(srv_time) + _send_sv(SCADA_MGMT_TYPE.KEEP_ALIVE, { srv_time, util.time() }) + end + + -- keep alive ack to coordinator + ---@param srv_time integer + local function _send_api_keep_alive_ack(srv_time) + _send_crd(SCADA_MGMT_TYPE.KEEP_ALIVE, { srv_time, util.time() }) + end + + -- PUBLIC FUNCTIONS -- + + ---@class pocket_comms + local public = {} + + -- reconnect a newly connected modem + ---@param new_modem table + function public.reconnect_modem(new_modem) + modem = new_modem + _conf_channels() + end + + -- close connection to the supervisor + function public.close_sv() + sv_watchdog.cancel() + self.sv.linked = false + _send_sv(SCADA_MGMT_TYPE.CLOSE, {}) + end + + -- close connection to coordinator API server + function public.close_api() + api_watchdog.cancel() + self.api.linked = false + _send_crd(SCADA_MGMT_TYPE.CLOSE, {}) + end + + -- close the connections to the servers + function public.close() + public.close_sv() + public.close_api() + end + + -- attempt to re-link if any of the dependent links aren't active + function public.link_update() + if not self.sv.linked then + if self.establish_delay_counter <= 0 then + _send_sv_establish() + self.establish_delay_counter = 4 + else + self.establish_delay_counter = self.establish_delay_counter - 1 + end + elseif not self.api.linked then + if self.establish_delay_counter <= 0 then + _send_api_establish() + self.establish_delay_counter = 4 + else + self.establish_delay_counter = self.establish_delay_counter - 1 + end + else + -- linked, all good! + end + end + + -- parse a packet + ---@param side string + ---@param sender integer + ---@param reply_to integer + ---@param message any + ---@param distance integer + ---@return mgmt_frame|capi_frame|nil packet + function public.parse_packet(side, sender, reply_to, message, distance) + local pkt = nil + local s_pkt = comms.scada_packet() + + -- parse packet as generic SCADA packet + s_pkt.receive(side, sender, reply_to, message, distance) + + if s_pkt.is_valid() then + -- get as SCADA management packet + if s_pkt.protocol() == PROTOCOL.SCADA_MGMT then + local mgmt_pkt = comms.mgmt_packet() + if mgmt_pkt.decode(s_pkt) then + pkt = mgmt_pkt.get() + end + -- get as coordinator API packet + elseif s_pkt.protocol() == PROTOCOL.COORD_API then + local capi_pkt = comms.capi_packet() + if capi_pkt.decode(s_pkt) then + pkt = capi_pkt.get() + end + else + log.debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true) + end + end + + return pkt + end + + -- handle a packet + ---@param packet mgmt_frame|capi_frame|nil + function public.handle_packet(packet) + if packet ~= nil then + local protocol = packet.scada_frame.protocol() + local r_port = packet.scada_frame.remote_port() + + if r_port == api_port then + -- check sequence number + if self.api.r_seq_num == nil then + self.api.r_seq_num = packet.scada_frame.seq_num() + elseif self.connected and self.api.r_seq_num >= packet.scada_frame.seq_num() then + log.warning("sequence out-of-order: last = " .. self.api.r_seq_num .. ", new = " .. packet.scada_frame.seq_num()) + return + else + self.api.r_seq_num = packet.scada_frame.seq_num() + end + + -- feed watchdog on valid sequence number + api_watchdog.feed() + + if protocol == PROTOCOL.COORD_API then + ---@cast packet capi_frame + elseif protocol == PROTOCOL.SCADA_MGMT then + ---@cast packet mgmt_frame + if packet.type == SCADA_MGMT_TYPE.ESTABLISH then + -- connection with coordinator established + if packet.length == 1 then + local est_ack = packet.data[1] + + if est_ack == ESTABLISH_ACK.ALLOW then + log.info("coordinator connection established") + self.establish_delay_counter = 0 + self.api.linked = true + elseif est_ack == ESTABLISH_ACK.DENY then + if self.api.last_est_ack ~= est_ack then + log.info("coordinator connection denied") + end + elseif est_ack == ESTABLISH_ACK.COLLISION then + if self.api.last_est_ack ~= est_ack then + log.info("coordinator connection denied due to collision") + end + elseif est_ack == ESTABLISH_ACK.BAD_VERSION then + if self.api.last_est_ack ~= est_ack then + log.info("coordinator comms version mismatch") + end + else + log.debug("coordinator SCADA_MGMT establish packet reply unsupported") + end + + self.api.last_est_ack = est_ack + else + log.debug("coordinator SCADA_MGMT establish packet length mismatch") + end + elseif self.api.linked then + if packet.type == SCADA_MGMT_TYPE.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 750 then + log.warning("pocket coordinator KEEP_ALIVE trip time > 750ms (" .. trip_time .. "ms)") + end + + -- log.debug("pocket coordinator RTT = " .. trip_time .. "ms") + + _send_api_keep_alive_ack(timestamp) + else + log.debug("coordinator SCADA keep alive packet length mismatch") + end + elseif packet.type == SCADA_MGMT_TYPE.CLOSE then + -- handle session close + api_watchdog.cancel() + self.api.linked = false + log.info("coordinator server connection closed by remote host") + else + log.debug("received unknown SCADA_MGMT packet type " .. packet.type .. " from coordinator") + end + else + log.debug("discarding coordinator non-link SCADA_MGMT packet before linked") + end + else + log.debug("illegal packet type " .. protocol .. " from coordinator", true) + end + elseif r_port == sv_port then + -- check sequence number + if self.sv.r_seq_num == nil then + self.sv.r_seq_num = packet.scada_frame.seq_num() + elseif self.connected and self.sv.r_seq_num >= packet.scada_frame.seq_num() then + log.warning("sequence out-of-order: last = " .. self.sv.r_seq_num .. ", new = " .. packet.scada_frame.seq_num()) + return + else + self.sv.r_seq_num = packet.scada_frame.seq_num() + end + + -- feed watchdog on valid sequence number + sv_watchdog.feed() + + -- handle packet + if protocol == PROTOCOL.SCADA_MGMT then + ---@cast packet mgmt_frame + if packet.type == SCADA_MGMT_TYPE.ESTABLISH then + -- connection with supervisor established + if packet.length == 1 then + local est_ack = packet.data[1] + + if est_ack == ESTABLISH_ACK.ALLOW then + log.info("supervisor connection established") + self.establish_delay_counter = 0 + self.sv.linked = true + elseif est_ack == ESTABLISH_ACK.DENY then + if self.sv.last_est_ack ~= est_ack then + log.info("supervisor connection denied") + end + elseif est_ack == ESTABLISH_ACK.COLLISION then + if self.sv.last_est_ack ~= est_ack then + log.info("supervisor connection denied due to collision") + end + elseif est_ack == ESTABLISH_ACK.BAD_VERSION then + if self.sv.last_est_ack ~= est_ack then + log.info("supervisor comms version mismatch") + end + else + log.debug("supervisor SCADA_MGMT establish packet reply unsupported") + end + + self.sv.last_est_ack = est_ack + else + log.debug("supervisor SCADA_MGMT establish packet length mismatch") + end + elseif self.sv.linked then + if packet.type == SCADA_MGMT_TYPE.KEEP_ALIVE then + -- keep alive request received, echo back + if packet.length == 1 then + local timestamp = packet.data[1] + local trip_time = util.time() - timestamp + + if trip_time > 750 then + log.warning("pocket supervisor KEEP_ALIVE trip time > 750ms (" .. trip_time .. "ms)") + end + + -- log.debug("pocket supervisor RTT = " .. trip_time .. "ms") + + _send_sv_keep_alive_ack(timestamp) + else + log.debug("supervisor SCADA keep alive packet length mismatch") + end + elseif packet.type == SCADA_MGMT_TYPE.CLOSE then + -- handle session close + sv_watchdog.cancel() + self.sv.linked = false + log.info("supervisor server connection closed by remote host") + else + log.debug("received unknown SCADA_MGMT packet type " .. packet.type .. " from supervisor") + end + else + log.debug("discarding supervisor non-link SCADA_MGMT packet before linked") + end + else + log.debug("illegal packet type " .. protocol .. " from supervisor", true) + end + else + log.debug("received packet from unconfigured channel " .. r_port, true) + end + end + end + + -- check if we are still linked with the supervisor + ---@nodiscard + function public.is_sv_linked() return self.sv.linked end + + -- check if we are still linked with the coordinator + ---@nodiscard + function public.is_api_linked() return self.api.linked end + + return public +end + + return pocket diff --git a/pocket/startup.lua b/pocket/startup.lua index 90837d3..3bd8069 100644 --- a/pocket/startup.lua +++ b/pocket/startup.lua @@ -16,11 +16,9 @@ local config = require("pocket.config") local pocket = require("pocket.pocket") local renderer = require("pocket.renderer") -local POCKET_VERSION = "alpha-v0.1.1" +local POCKET_VERSION = "alpha-v0.2.0" --- local print = util.print local println = util.println --- local print_ts = util.print_ts local println_ts = util.println_ts ---------------------------------------- @@ -72,19 +70,25 @@ local function main() local modem = ppm.get_wireless_modem() if modem == nil then println("startup> wireless modem not found: please craft the pocket computer with a wireless modem") - log.fatal("no wireless modem on startup") + log.fatal("startup> no wireless modem on startup") return end - -- create connection watchdog - local conn_watchdog = util.new_watchdog(config.COMMS_TIMEOUT) - conn_watchdog.cancel() - log.debug("startup> conn watchdog created") + -- create connection watchdogs + local conn_wd = { + sv = util.new_watchdog(config.COMMS_TIMEOUT), + api = util.new_watchdog(config.COMMS_TIMEOUT) + } + + conn_wd.sv.cancel() + conn_wd.api.cancel() + + log.debug("startup> conn watchdogs created") -- start comms, open all channels - -- local pocket_comms = pocket.comms(POCKET_VERSION, modem, config.SCADA_SV_PORT, config.SCADA_API_PORT, - -- config.LISTEN_PORT, config.TRUSTED_RANGE, conn_watchdog) - -- log.debug("startup> comms init") + local pocket_comms = pocket.comms(POCKET_VERSION, modem, config.SCADA_SV_PORT, config.SCADA_API_PORT, + config.LISTEN_PORT, config.TRUSTED_RANGE, conn_wd.sv, conn_wd.api) + log.debug("startup> comms init") -- base loop clock (2Hz, 10 ticks) local MAIN_CLOCK = 0.5 @@ -98,7 +102,7 @@ local function main() if not ui_ok then renderer.close_ui() println_ts(util.c("UI error: ", message)) - log.error(util.c("GUI crashed with error ", message)) + log.error(util.c("startup> GUI crashed with error ", message)) else -- start clock loop_clock.start() @@ -109,8 +113,9 @@ local function main() ---------------------------------------- if ui_ok then - -- start connection watchdog - conn_watchdog.feed() + -- start connection watchdogs + conn_wd.sv.feed() + conn_wd.api.feed() log.debug("startup> conn watchdog started") end @@ -122,30 +127,37 @@ local function main() if event == "timer" then if loop_clock.is_clock(param1) then -- main loop tick - loop_clock.start() - elseif conn_watchdog.is_timer(param1) then - -- supervisor watchdog timeout - log.info("server timeout") - -- pocket_comms.close() + -- relink if necessary + pocket_comms.link_update() + + loop_clock.start() + elseif conn_wd.sv.is_timer(param1) then + -- supervisor watchdog timeout + log.info("supervisor server timeout") + pocket_comms.close_sv() + elseif conn_wd.api.is_timer(param1) then + -- coordinator watchdog timeout + log.info("coordinator api server timeout") + pocket_comms.close_api() else -- a non-clock/main watchdog timer event - -- notify timer callback dispatcher tcallbackdsp.handle(param1) end elseif event == "modem_message" then -- got a packet - -- local packet = pocket_comms.parse_packet(param1, param2, param3, param4, param5) - -- pocket_comms.handle_packet(packet) + local packet = pocket_comms.parse_packet(param1, param2, param3, param4, param5) + pocket_comms.handle_packet(packet) - -- -- check if it was a disconnect - -- if not pocket_comms.is_linked() then - -- log_comms("supervisor closed connection") - - -- -- close connection - -- pocket_comms.close() - -- end + -- check if it was a disconnect + if not pocket_comms.is_sv_linked() then + log.info("supervisor closed connection") + pocket_comms.close_sv() + elseif not pocket_comms.is_api_linked() then + log.info("coordinator api closed connection") + pocket_comms.close_api() + end elseif event == "mouse_click" then -- handle a monitor touch event renderer.handle_mouse(core.events.touch(param1, param2, param3)) @@ -153,9 +165,9 @@ local function main() -- check for termination request if event == "terminate" or ppm.should_terminate() then - log.info("terminate requested, closing connections...") - -- pocket_comms.close() - log.info("supervisor connection closed") + log.info("terminate requested, closing server connections...") + pocket_comms.close() + log.info("connections closed") break end end diff --git a/scada-common/comms.lua b/scada-common/comms.lua index 497c3b3..b266a1a 100644 --- a/scada-common/comms.lua +++ b/scada-common/comms.lua @@ -2,7 +2,7 @@ -- Communications -- -local log = require("scada-common.log") +local log = require("scada-common.log") ---@class comms local comms = {} @@ -74,7 +74,8 @@ local DEVICE_TYPE = { PLC = 0, -- PLC device type for establish RTU = 1, -- RTU device type for establish SV = 2, -- supervisor device type for establish - CRDN = 3 -- coordinator device type for establish + CRDN = 3, -- coordinator device type for establish + PKT = 4 -- pocket device type for establish } ---@enum PLC_AUTO_ACK