local comms = require("scada-common.comms") local log = require("scada-common.log") local ppm = require("scada-common.ppm") local util = require("scada-common.util") local types = require("scada-common.types") local iocontrol = require("coordinator.iocontrol") local process = require("coordinator.process") local apisessions = require("coordinator.session.apisessions") local dialog = require("coordinator.ui.dialog") local print = util.print local println = util.println local PROTOCOL = comms.PROTOCOL local DEVICE_TYPE = comms.DEVICE_TYPE local ESTABLISH_ACK = comms.ESTABLISH_ACK local MGMT_TYPE = comms.MGMT_TYPE local CRDN_TYPE = comms.CRDN_TYPE local UNIT_COMMAND = comms.UNIT_COMMAND local FAC_COMMAND = comms.FAC_COMMAND local LINK_TIMEOUT = 60.0 local coordinator = {} -- request the user to select a monitor ---@nodiscard ---@param names table available monitors ---@return boolean|string|nil local function ask_monitor(names) println("available monitors:") for i = 1, #names do print(" " .. names[i]) end println("") println("select a monitor or type c to cancel") local iface = dialog.ask_options(names, "c") if iface ~= false and iface ~= nil then util.filter_table(names, function (x) return x ~= iface end) end return iface end -- configure monitor layout ---@param num_units integer number of units expected ---@param disable_flow_view boolean disable flow view (legacy) ---@return boolean success, monitors_struct? monitors function coordinator.configure_monitors(num_units, disable_flow_view) ---@class monitors_struct local monitors = { primary = nil, ---@type table|nil primary_name = "", flow = nil, ---@type table|nil flow_name = "", unit_displays = {}, unit_name_map = {} } local monitors_avail = ppm.get_monitor_list() local names = {} local available = {} -- get all interface names for iface, _ in pairs(monitors_avail) do table.insert(names, iface) table.insert(available, iface) end -- we need a certain number of monitors (1 per unit + 1 primary display + 1 flow display) local num_displays_needed = num_units + util.trinary(disable_flow_view, 1, 2) if #names < num_displays_needed then local message = "not enough monitors connected (need " .. num_displays_needed .. ")" println(message) log.warning(message) return false end -- attempt to load settings if not settings.load("/coord.settings") then log.warning("configure_monitors(): failed to load coordinator settings file (may not exist yet)") else local _primary = settings.get("PRIMARY_DISPLAY") local _flow = settings.get("FLOW_DISPLAY") local _unitd = settings.get("UNIT_DISPLAYS") -- filter out already assigned monitors util.filter_table(available, function (x) return x ~= _primary end) util.filter_table(available, function (x) return x ~= _flow end) if type(_unitd) == "table" then util.filter_table(available, function (x) return not util.table_contains(_unitd, x) end) end end --------------------- -- PRIMARY DISPLAY -- --------------------- local iface_primary_display = settings.get("PRIMARY_DISPLAY") ---@type boolean|string|nil if not util.table_contains(names, iface_primary_display) then println("primary display is not connected") local response = dialog.ask_y_n("would you like to change it", true) if response == false then return false end iface_primary_display = nil end while iface_primary_display == nil and #available > 0 do iface_primary_display = ask_monitor(available) end if type(iface_primary_display) ~= "string" then return false end settings.set("PRIMARY_DISPLAY", iface_primary_display) util.filter_table(available, function (x) return x ~= iface_primary_display end) monitors.primary = ppm.get_periph(iface_primary_display) monitors.primary_name = iface_primary_display -------------------------- -- FLOW MONITOR DISPLAY -- -------------------------- if not disable_flow_view then local iface_flow_display = settings.get("FLOW_DISPLAY") ---@type boolean|string|nil if not util.table_contains(names, iface_flow_display) then println("flow monitor display is not connected") local response = dialog.ask_y_n("would you like to change it", true) if response == false then return false end iface_flow_display = nil end while iface_flow_display == nil and #available > 0 do iface_flow_display = ask_monitor(available) end if type(iface_flow_display) ~= "string" then return false end settings.set("FLOW_DISPLAY", iface_flow_display) util.filter_table(available, function (x) return x ~= iface_flow_display end) monitors.flow = ppm.get_periph(iface_flow_display) monitors.flow_name = iface_flow_display end ------------------- -- UNIT DISPLAYS -- ------------------- local unit_displays = settings.get("UNIT_DISPLAYS") if unit_displays == nil then unit_displays = {} for i = 1, num_units do local display = nil while display == nil and #available > 0 do println("please select monitor for unit #" .. i) display = ask_monitor(available) end if display == false then return false end unit_displays[i] = display end else -- make sure all displays are connected for i = 1, num_units do local display = unit_displays[i] if not util.table_contains(names, display) then println("unit #" .. i .. " display is not connected") local response = dialog.ask_y_n("would you like to change it", true) if response == false then return false end display = nil end while display == nil and #available > 0 do display = ask_monitor(available) end if display == false then return false end unit_displays[i] = display end end settings.set("UNIT_DISPLAYS", unit_displays) if not settings.save("/coord.settings") then log.warning("configure_monitors(): failed to save coordinator settings file") end for i = 1, #unit_displays do monitors.unit_displays[i] = ppm.get_periph(unit_displays[i]) monitors.unit_name_map[i] = unit_displays[i] end return true, monitors end -- dmesg print wrapper ---@param message string message ---@param dmesg_tag string tag ---@param working? boolean to use dmesg_working ---@return function? update, function? done local function log_dmesg(message, dmesg_tag, working) local colors = { GRAPHICS = colors.green, SYSTEM = colors.cyan, BOOT = colors.blue, COMMS = colors.purple, CRYPTO = colors.yellow } if working then return log.dmesg_working(message, dmesg_tag, colors[dmesg_tag]) else log.dmesg(message, dmesg_tag, colors[dmesg_tag]) end end function coordinator.log_graphics(message) log_dmesg(message, "GRAPHICS") end function coordinator.log_sys(message) log_dmesg(message, "SYSTEM") end function coordinator.log_boot(message) log_dmesg(message, "BOOT") end function coordinator.log_comms(message) log_dmesg(message, "COMMS") end function coordinator.log_crypto(message) log_dmesg(message, "CRYPTO") end -- log a message for communications connecting, providing access to progress indication control functions ---@nodiscard ---@param message string ---@return function update, function done function coordinator.log_comms_connecting(message) local update, done = log_dmesg(message, "COMMS", true) ---@cast update function ---@cast done function return update, done end -- coordinator communications ---@nodiscard ---@param version string coordinator version ---@param nic nic network interface device ---@param num_units integer number of configured units for number of monitors, checked against SV ---@param crd_channel integer port of configured supervisor ---@param svr_channel integer listening port for supervisor replys ---@param pkt_channel integer listening port for pocket API ---@param range integer trusted device connection range ---@param sv_watchdog watchdog function coordinator.comms(version, nic, num_units, crd_channel, svr_channel, pkt_channel, range, sv_watchdog) local self = { sv_linked = false, sv_addr = comms.BROADCAST, sv_seq_num = 0, sv_r_seq_num = nil, sv_config_err = false, last_est_ack = ESTABLISH_ACK.ALLOW, last_api_est_acks = {}, est_start = 0, est_last = 0, est_tick_waiting = nil, est_task_done = nil } comms.set_trusted_range(range) -- PRIVATE FUNCTIONS -- -- configure network channels nic.closeAll() nic.open(crd_channel) -- link nic to apisessions apisessions.init(nic) -- send a packet to the supervisor ---@param msg_type MGMT_TYPE|CRDN_TYPE ---@param msg table local function _send_sv(protocol, msg_type, msg) local s_pkt = comms.scada_packet() local pkt ---@type mgmt_packet|crdn_packet if protocol == PROTOCOL.SCADA_MGMT then pkt = comms.mgmt_packet() elseif protocol == PROTOCOL.SCADA_CRDN then pkt = comms.crdn_packet() else return end pkt.make(msg_type, msg) s_pkt.make(self.sv_addr, self.sv_seq_num, protocol, pkt.raw_sendable()) nic.transmit(svr_channel, crd_channel, s_pkt) self.sv_seq_num = self.sv_seq_num + 1 end -- send an API establish request response ---@param packet scada_packet ---@param ack ESTABLISH_ACK local function _send_api_establish_ack(packet, ack) local s_pkt = comms.scada_packet() local m_pkt = comms.mgmt_packet() m_pkt.make(MGMT_TYPE.ESTABLISH, { ack }) s_pkt.make(packet.src_addr(), packet.seq_num() + 1, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) nic.transmit(pkt_channel, crd_channel, s_pkt) self.last_api_est_acks[packet.src_addr()] = ack end -- attempt connection establishment local function _send_establish() _send_sv(PROTOCOL.SCADA_MGMT, MGMT_TYPE.ESTABLISH, { comms.version, version, DEVICE_TYPE.CRD }) end -- keep alive ack ---@param srv_time integer local function _send_keep_alive_ack(srv_time) _send_sv(PROTOCOL.SCADA_MGMT, MGMT_TYPE.KEEP_ALIVE, { srv_time, util.time() }) end -- PUBLIC FUNCTIONS -- ---@class coord_comms local public = {} -- try to connect to the supervisor if not already linked ---@param abort boolean? true to print out cancel info if not linked (use on program terminate) ---@return boolean ok, boolean start_ui function public.try_connect(abort) local ok = true local start_ui = false if not self.sv_linked then if self.est_tick_waiting == nil then self.est_start = util.time_s() self.est_last = self.est_start self.est_tick_waiting, self.est_task_done = coordinator.log_comms_connecting("attempting to connect to configured supervisor on channel " .. svr_channel) _send_establish() else self.est_tick_waiting(math.max(0, LINK_TIMEOUT - (util.time_s() - self.est_start))) end if abort or (util.time_s() - self.est_start) >= LINK_TIMEOUT then self.est_task_done(false) if abort then coordinator.log_comms("supervisor connection attempt cancelled by user") elseif self.sv_config_err then coordinator.log_comms("supervisor cooling configuration invalid, check supervisor config file") elseif not self.sv_linked then if self.last_est_ack == ESTABLISH_ACK.DENY then coordinator.log_comms("supervisor connection attempt denied") elseif self.last_est_ack == ESTABLISH_ACK.COLLISION then coordinator.log_comms("supervisor connection failed due to collision") elseif self.last_est_ack == ESTABLISH_ACK.BAD_VERSION then coordinator.log_comms("supervisor connection failed due to version mismatch") else coordinator.log_comms("supervisor connection failed with no valid response") end end ok = false elseif self.sv_config_err then coordinator.log_comms("supervisor cooling configuration invalid, check supervisor config file") ok = false elseif (util.time_s() - self.est_last) > 1.0 then _send_establish() self.est_last = util.time_s() end elseif self.est_tick_waiting ~= nil then self.est_task_done(true) self.est_tick_waiting = nil self.est_task_done = nil start_ui = true end return ok, start_ui end -- close the connection to the server function public.close() sv_watchdog.cancel() self.sv_addr = comms.BROADCAST self.sv_linked = false self.sv_r_seq_num = nil iocontrol.fp_link_state(types.PANEL_LINK_STATE.DISCONNECTED) _send_sv(PROTOCOL.SCADA_MGMT, MGMT_TYPE.CLOSE, {}) end -- send a facility command ---@param cmd FAC_COMMAND command ---@param option any? optional option options for the optional options (like waste mode) function public.send_fac_command(cmd, option) _send_sv(PROTOCOL.SCADA_CRDN, CRDN_TYPE.FAC_CMD, { cmd, option }) end -- send the auto process control configuration with a start command ---@param config coord_auto_config configuration function public.send_auto_start(config) _send_sv(PROTOCOL.SCADA_CRDN, CRDN_TYPE.FAC_CMD, { FAC_COMMAND.START, config.mode, config.burn_target, config.charge_target, config.gen_target, config.limits }) end -- send a unit command ---@param cmd UNIT_COMMAND command ---@param unit integer unit ID ---@param option any? optional option options for the optional options (like burn rate) function public.send_unit_command(cmd, unit, option) _send_sv(PROTOCOL.SCADA_CRDN, CRDN_TYPE.UNIT_CMD, { cmd, unit, option }) end -- parse a packet ---@param side string ---@param sender integer ---@param reply_to integer ---@param message any ---@param distance integer ---@return mgmt_frame|crdn_frame|nil packet function public.parse_packet(side, sender, reply_to, message, distance) local s_pkt = nic.receive(side, sender, reply_to, message, distance) local pkt = nil if s_pkt then -- get as SCADA management packet if s_pkt.protocol() == PROTOCOL.SCADA_MGMT then local mgmt_pkt = comms.mgmt_packet() if mgmt_pkt.decode(s_pkt) then pkt = mgmt_pkt.get() end -- get as coordinator packet elseif s_pkt.protocol() == PROTOCOL.SCADA_CRDN then local crdn_pkt = comms.crdn_packet() if crdn_pkt.decode(s_pkt) then pkt = crdn_pkt.get() end else log.debug("attempted parse of illegal packet type " .. s_pkt.protocol(), true) end end return pkt end -- handle a packet ---@param packet mgmt_frame|crdn_frame|nil ---@return boolean close_ui function public.handle_packet(packet) local was_linked = self.sv_linked if packet ~= nil then local l_chan = packet.scada_frame.local_channel() local r_chan = packet.scada_frame.remote_channel() local src_addr = packet.scada_frame.src_addr() local protocol = packet.scada_frame.protocol() if l_chan ~= crd_channel then log.debug("received packet on unconfigured channel " .. l_chan, true) elseif r_chan == pkt_channel then if not self.sv_linked then log.debug("discarding pocket API packet before linked to supervisor") elseif protocol == PROTOCOL.SCADA_CRDN then ---@cast packet crdn_frame -- look for an associated session local session = apisessions.find_session(src_addr) -- coordinator packet if session ~= nil then -- pass the packet onto the session handler session.in_queue.push_packet(packet) else -- any other packet should be session related, discard it log.debug("discarding SCADA_CRDN packet without a known session") end elseif protocol == PROTOCOL.SCADA_MGMT then ---@cast packet mgmt_frame -- look for an associated session local session = apisessions.find_session(src_addr) -- SCADA management packet if session ~= nil then -- pass the packet onto the session handler session.in_queue.push_packet(packet) elseif packet.type == MGMT_TYPE.ESTABLISH then -- establish a new session -- validate packet and continue if packet.length == 3 and type(packet.data[1]) == "string" and type(packet.data[2]) == "string" then local comms_v = packet.data[1] local firmware_v = packet.data[2] local dev_type = packet.data[3] if comms_v ~= comms.version then if self.last_api_est_acks[src_addr] ~= ESTABLISH_ACK.BAD_VERSION then log.info(util.c("dropping API establish packet with incorrect comms version v", comms_v, " (expected v", comms.version, ")")) end _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.BAD_VERSION) elseif dev_type == DEVICE_TYPE.PKT then -- pocket linking request local id = apisessions.establish_session(src_addr, firmware_v) coordinator.log_comms(util.c("API_ESTABLISH: pocket (", firmware_v, ") [@", src_addr, "] connected with session ID ", id)) _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.ALLOW) else log.debug(util.c("API_ESTABLISH: illegal establish packet for device ", dev_type, " on pocket channel")) _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.DENY) end else log.debug("invalid establish packet (on API listening channel)") _send_api_establish_ack(packet.scada_frame, ESTABLISH_ACK.DENY) end else -- any other packet should be session related, discard it log.debug(util.c("discarding pocket SCADA_MGMT packet without a known session from computer ", src_addr)) end else log.debug("illegal packet type " .. protocol .. " on pocket channel", true) end elseif r_chan == svr_channel then -- check sequence number if self.sv_r_seq_num == nil then self.sv_r_seq_num = packet.scada_frame.seq_num() elseif self.sv_linked and ((self.sv_r_seq_num + 1) ~= packet.scada_frame.seq_num()) then log.warning("sequence out-of-order: last = " .. self.sv_r_seq_num .. ", new = " .. packet.scada_frame.seq_num()) return false elseif self.sv_linked and src_addr ~= self.sv_addr then log.debug("received packet from unknown computer " .. src_addr .. " while linked; channel in use by another system?") return false else self.sv_r_seq_num = packet.scada_frame.seq_num() end -- feed watchdog on valid sequence number sv_watchdog.feed() -- handle packet if protocol == PROTOCOL.SCADA_CRDN then ---@cast packet crdn_frame if self.sv_linked then if packet.type == CRDN_TYPE.INITIAL_BUILDS then if packet.length == 2 then -- record builds local fac_builds = iocontrol.record_facility_builds(packet.data[1]) local unit_builds = iocontrol.record_unit_builds(packet.data[2]) if fac_builds and unit_builds then -- acknowledge receipt of builds _send_sv(PROTOCOL.SCADA_CRDN, CRDN_TYPE.INITIAL_BUILDS, {}) else log.debug("received invalid INITIAL_BUILDS packet") end else log.debug("INITIAL_BUILDS packet length mismatch") end elseif packet.type == CRDN_TYPE.FAC_BUILDS then if packet.length == 1 then -- record facility builds if iocontrol.record_facility_builds(packet.data[1]) then -- acknowledge receipt of builds _send_sv(PROTOCOL.SCADA_CRDN, CRDN_TYPE.FAC_BUILDS, {}) else log.debug("received invalid FAC_BUILDS packet") end else log.debug("FAC_BUILDS packet length mismatch") end elseif packet.type == CRDN_TYPE.FAC_STATUS then -- update facility status if not iocontrol.update_facility_status(packet.data) then log.debug("received invalid FAC_STATUS packet") end elseif packet.type == CRDN_TYPE.FAC_CMD then -- facility command acknowledgement if packet.length >= 2 then local cmd = packet.data[1] local ack = packet.data[2] == true if cmd == FAC_COMMAND.SCRAM_ALL then iocontrol.get_db().facility.scram_ack(ack) elseif cmd == FAC_COMMAND.STOP then iocontrol.get_db().facility.stop_ack(ack) elseif cmd == FAC_COMMAND.START then if packet.length == 7 then process.start_ack_handle({ table.unpack(packet.data, 2) }) else log.debug("SCADA_CRDN process start (with configuration) ack echo packet length mismatch") end elseif cmd == FAC_COMMAND.ACK_ALL_ALARMS then iocontrol.get_db().facility.ack_alarms_ack(ack) elseif cmd == FAC_COMMAND.SET_WASTE_MODE then process.waste_ack_handle(packet.data[2]) elseif cmd == FAC_COMMAND.SET_PU_FB then process.pu_fb_ack_handle(packet.data[2]) else log.debug(util.c("received facility command ack with unknown command ", cmd)) end else log.debug("SCADA_CRDN facility command ack packet length mismatch") end elseif packet.type == CRDN_TYPE.UNIT_BUILDS then -- record builds if packet.length == 1 then if iocontrol.record_unit_builds(packet.data[1]) then -- acknowledge receipt of builds _send_sv(PROTOCOL.SCADA_CRDN, CRDN_TYPE.UNIT_BUILDS, {}) else log.debug("received invalid UNIT_BUILDS packet") end else log.debug("UNIT_BUILDS packet length mismatch") end elseif packet.type == CRDN_TYPE.UNIT_STATUSES then -- update statuses if not iocontrol.update_unit_statuses(packet.data) then log.debug("received invalid UNIT_STATUSES packet") end elseif packet.type == CRDN_TYPE.UNIT_CMD then -- unit command acknowledgement if packet.length == 3 then local cmd = packet.data[1] local unit_id = packet.data[2] local ack = packet.data[3] == true local unit = iocontrol.get_db().units[unit_id] ---@type ioctl_unit if unit ~= nil then if cmd == UNIT_COMMAND.SCRAM then unit.scram_ack(ack) elseif cmd == UNIT_COMMAND.START then unit.start_ack(ack) elseif cmd == UNIT_COMMAND.RESET_RPS then unit.reset_rps_ack(ack) elseif cmd == UNIT_COMMAND.SET_BURN then unit.set_burn_ack(ack) elseif cmd == UNIT_COMMAND.SET_WASTE then unit.set_waste_ack(ack) elseif cmd == UNIT_COMMAND.ACK_ALL_ALARMS then unit.ack_alarms_ack(ack) elseif cmd == UNIT_COMMAND.SET_GROUP then -- UI will be updated to display current group if changed successfully else log.debug(util.c("received unit command ack with unknown command ", cmd)) end else log.debug(util.c("received unit command ack with unknown unit ", unit_id)) end else log.debug("SCADA_CRDN unit command ack packet length mismatch") end else log.debug("received unknown SCADA_CRDN packet type " .. packet.type) end else log.debug("discarding SCADA_CRDN packet before linked") end elseif protocol == PROTOCOL.SCADA_MGMT then ---@cast packet mgmt_frame if self.sv_linked then if packet.type == MGMT_TYPE.KEEP_ALIVE then -- keep alive request received, echo back if packet.length == 1 then local timestamp = packet.data[1] local trip_time = util.time() - timestamp if trip_time > 750 then log.warning("coordinator KEEP_ALIVE trip time > 750ms (" .. trip_time .. "ms)") end -- log.debug("coordinator RTT = " .. trip_time .. "ms") iocontrol.get_db().facility.ps.publish("sv_ping", trip_time) _send_keep_alive_ack(timestamp) else log.debug("SCADA keep alive packet length mismatch") end elseif packet.type == MGMT_TYPE.CLOSE then -- handle session close sv_watchdog.cancel() self.sv_addr = comms.BROADCAST self.sv_linked = false self.sv_r_seq_num = nil iocontrol.fp_link_state(types.PANEL_LINK_STATE.DISCONNECTED) log.info("server connection closed by remote host") else log.debug("received unknown SCADA_MGMT packet type " .. packet.type) end elseif packet.type == MGMT_TYPE.ESTABLISH then -- connection with supervisor established if packet.length == 2 then local est_ack = packet.data[1] local config = packet.data[2] if est_ack == ESTABLISH_ACK.ALLOW then -- reset to disconnected before validating iocontrol.fp_link_state(types.PANEL_LINK_STATE.DISCONNECTED) if type(config) == "table" and #config == 2 then -- get configuration ---@class facility_conf local conf = { num_units = config[1], ---@type integer cooling = config[2] ---@type sv_cooling_conf } if conf.num_units == num_units then -- init io controller iocontrol.init(conf, public) self.sv_addr = src_addr self.sv_linked = true self.sv_r_seq_num = nil self.sv_config_err = false iocontrol.fp_link_state(types.PANEL_LINK_STATE.LINKED) else self.sv_config_err = true log.warning("supervisor config's number of units don't match coordinator's config, establish failed") end else log.debug("invalid supervisor configuration table received, establish failed") end else log.debug("SCADA_MGMT establish packet reply (len = 2) unsupported") end self.last_est_ack = est_ack elseif packet.length == 1 then local est_ack = packet.data[1] if est_ack == ESTABLISH_ACK.DENY then if self.last_est_ack ~= est_ack then iocontrol.fp_link_state(types.PANEL_LINK_STATE.DENIED) log.info("supervisor connection denied") end elseif est_ack == ESTABLISH_ACK.COLLISION then if self.last_est_ack ~= est_ack then iocontrol.fp_link_state(types.PANEL_LINK_STATE.COLLISION) log.warning("supervisor connection denied due to collision") end elseif est_ack == ESTABLISH_ACK.BAD_VERSION then if self.last_est_ack ~= est_ack then iocontrol.fp_link_state(types.PANEL_LINK_STATE.BAD_VERSION) log.warning("supervisor comms version mismatch") end else log.debug("SCADA_MGMT establish packet reply (len = 1) unsupported") end self.last_est_ack = est_ack else log.debug("SCADA_MGMT establish packet length mismatch") end else log.debug("discarding non-link SCADA_MGMT packet before linked") end else log.debug("illegal packet type " .. protocol .. " on supervisor listening channel", true) end else log.debug("received packet for unknown channel " .. r_chan, true) end end return was_linked and not self.sv_linked end -- check if the coordinator is still linked to the supervisor ---@nodiscard function public.is_linked() return self.sv_linked end return public end return coordinator