From 8c4598e7a6d6565a8be8944ddb5758f3be94b763 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Wed, 27 Apr 2022 12:46:04 -0400 Subject: [PATCH] #32 new threaded RTU code --- rtu/rtu.lua | 4 +- rtu/startup.lua | 143 ++++++++++++++++------------------------------ rtu/threads.lua | 147 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 199 insertions(+), 95 deletions(-) create mode 100644 rtu/threads.lua diff --git a/rtu/rtu.lua b/rtu/rtu.lua index ccf6e44..54bf780 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -183,7 +183,7 @@ function rtu_comms(modem, local_port, server_port) end -- handle a MODBUS/SCADA packet - local handle_packet = function(packet, units, ref) + local handle_packet = function(packet, units, rtu_state) if packet ~= nil then local protocol = packet.scada_frame.protocol() @@ -209,7 +209,7 @@ function rtu_comms(modem, local_port, server_port) -- SCADA management packet if packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then -- acknowledgement - ref.linked = true + rtu_state.linked = true elseif packet.type == SCADA_MGMT_TYPES.RTU_ADVERT then -- request for capabilities again send_advertisement(units) diff --git a/rtu/startup.lua b/rtu/startup.lua index 2d4c58f..990819f 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -6,26 +6,26 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("scada-common/modbus.lua") os.loadAPI("scada-common/rsio.lua") os.loadAPI("config.lua") os.loadAPI("rtu.lua") +os.loadAPI("threads.lua") os.loadAPI("dev/redstone_rtu.lua") os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.3.2" +local RTU_VERSION = "alpha-v0.4.0" local print = util.print local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local async_wait = util.async_wait - log._info("========================================") log._info("BOOTING rtu.startup " .. RTU_VERSION) log._info("========================================") @@ -35,15 +35,37 @@ println(">> RTU " .. RTU_VERSION .. " <<") -- startup ---------------------------------------- -local units = {} -local linked = false - -- mount connected devices ppm.mount_all() +local __shared_memory = { + -- RTU system state flags + rtu_state = { + linked = false, + shutdown = false + }, + + -- core RTU devices + rtu_dev = { + modem = ppm.get_wireless_modem() + }, + + -- system objects + rtu_sys = { + rtu_comms = nil, + units = {} + }, + + -- message queues + q = { + mq_comms = mqeueu.new() + } +} + +local smem_dev = __shared_memory.rtu_dev + -- get modem -local modem = ppm.get_wireless_modem() -if modem == nil then +if smem_dev.modem == nil then println("boot> wireless modem not found") log._warning("no wireless modem on startup") return @@ -52,9 +74,11 @@ end local rtu_comms = rtu.rtu_comms(modem, config.LISTEN_PORT, config.SERVER_PORT) ---------------------------------------- --- determine configuration +-- interpret config and init units ---------------------------------------- +local units = __shared_memory.rtu_sys.units + local rtu_redstone = config.RTU_REDSTONE local rtu_devices = config.RTU_DEVICES @@ -69,12 +93,12 @@ for reactor_idx = 1, #rtu_redstone do for i = 1, #io_table do local valid = false - local config = io_table[i] + local conf = io_table[i] -- verify configuration - if rsio.is_valid_channel(config.channel) and rsio.is_valid_side(config.side) then - if config.bundled_color then - valid = rsio.is_color(config.bundled_color) + if rsio.is_valid_channel(conf.channel) and rsio.is_valid_side(conf.side) then + if conf.bundled_color then + valid = rsio.is_color(conf.bundled_color) else valid = true end @@ -87,24 +111,24 @@ for reactor_idx = 1, #rtu_redstone do log._warning(message) else -- link redstone in RTU - local mode = rsio.get_io_mode(config.channel) + local mode = rsio.get_io_mode(conf.channel) if mode == rsio.IO_MODE.DIGITAL_IN then - rs_rtu.link_di(config.channel, config.side, config.bundled_color) + rs_rtu.link_di(conf.channel, conf.side, conf.bundled_color) elseif mode == rsio.IO_MODE.DIGITAL_OUT then - rs_rtu.link_do(config.channel, config.side, config.bundled_color) + rs_rtu.link_do(conf.channel, conf.side, conf.bundled_color) elseif mode == rsio.IO_MODE.ANALOG_IN then - rs_rtu.link_ai(config.channel, config.side) + rs_rtu.link_ai(conf.channel, conf.side) elseif mode == rsio.IO_MODE.ANALOG_OUT then - rs_rtu.link_ao(config.channel, config.side) + rs_rtu.link_ao(conf.channel, conf.side) else -- should be unreachable code, we already validated channels log._error("init> fell through if chain attempting to identify IO mode", true) break end - table.insert(capabilities, config.channel) + table.insert(capabilities, conf.channel) - log._debug("init> linked redstone " .. #capabilities .. ": " .. rsio.to_string(config.channel) .. " (" .. config.side .. + log._debug("init> linked redstone " .. #capabilities .. ": " .. rsio.to_string(conf.channel) .. " (" .. conf.side .. ") for reactor " .. rtu_redstone[reactor_idx].for_reactor) end end @@ -171,82 +195,15 @@ for i = 1, #rtu_devices do end ---------------------------------------- --- main loop +-- start system ---------------------------------------- --- advertisement/heartbeat clock (every 2 seconds) -local loop_clock = os.startTimer(2) +-- init threads +local main_thread = threads.thread__main(__shared_memory) +local comms_thread = threads.thread__comms(__shared_memory) --- event loop -while true do - local event, param1, param2, param3, param4, param5 = os.pullEventRaw() - - if event == "peripheral_detach" then - -- handle loss of a device - local device = ppm.handle_unmount(param1) - - for i = 1, #units do - -- find disconnected device - if units[i].device == device.dev then - -- we are going to let the PPM prevent crashes - -- return fault flags/codes to MODBUS queries - local unit = units[i] - println_ts("lost the " .. unit.type .. " on interface " .. unit.name) - end - end - elseif event == "peripheral" then - -- relink lost peripheral to correct unit entry - local type, device = ppm.mount(param1) - - for i = 1, #units do - local unit = units[i] - - -- find disconnected device to reconnect - if unit.name == param1 then - -- found, re-link - unit.device = device - - if unit.type == "boiler" then - unit.rtu = boiler_rtu.new(device) - elseif unit.type == "turbine" then - unit.rtu = turbine_rtu.new(device) - elseif unit.type == "imatrix" then - unit.rtu = imatrix_rtu.new(device) - end - - unit.modbus_io = modbus.new(unit.rtu) - - println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) - end - end - elseif event == "timer" and param1 == loop_clock then - -- start next clock timer - loop_clock = os.startTimer(2) - - -- period tick, if we are linked send heartbeat, if not send advertisement - if linked then - rtu_comms.send_heartbeat() - else - -- advertise units - rtu_comms.send_advertisement(units) - end - elseif event == "modem_message" then - -- got a packet - local link_ref = { linked = linked } - local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) - - async_wait(function () rtu_comms.handle_packet(packet, units, link_ref) end) - - -- if linked, stop sending advertisements - linked = link_ref.linked - end - - -- check for termination request - if event == "terminate" or ppm.should_terminate() then - log._warning("terminate requested, exiting...") - break - end -end +-- run threads +parallel.waitForAll(main_thread.exec, comms_thread.exec) println_ts("exited") log._info("exited") diff --git a/rtu/threads.lua b/rtu/threads.lua new file mode 100644 index 0000000..bc96d3b --- /dev/null +++ b/rtu/threads.lua @@ -0,0 +1,147 @@ +-- #REQUIRES comms.lua +-- #REQUIRES log.lua +-- #REQUIRES ppm.lua +-- #REQUIRES util.lua + +local print = util.print +local println = util.println +local print_ts = util.print_ts +local println_ts = util.println_ts + +local MAIN_CLOCK = 2 -- (2Hz, 40 ticks) +local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) + +-- main thread +function thread__main(smem) + -- execute thread + local exec = function () + -- advertisement/heartbeat clock + local loop_clock = os.startTimer(MAIN_CLOCK) + + -- load in from shared memory + local rtu_state = smem.rtu_state + local rtu_dev = smem.rtu_dev + local rtu_comms = smem.rtu_sys.rtu_comms + + -- event loop + while true do + local event, param1, param2, param3, param4, param5 = os.pullEventRaw() + + if event == "peripheral_detach" then + -- handle loss of a device + local device = ppm.handle_unmount(param1) + + for i = 1, #units do + -- find disconnected device + if units[i].device == device.dev then + -- we are going to let the PPM prevent crashes + -- return fault flags/codes to MODBUS queries + local unit = units[i] + println_ts("lost the " .. unit.type .. " on interface " .. unit.name) + end + end + elseif event == "peripheral" then + -- relink lost peripheral to correct unit entry + local type, device = ppm.mount(param1) + + for i = 1, #units do + local unit = units[i] + + -- find disconnected device to reconnect + if unit.name == param1 then + -- found, re-link + unit.device = device + + if unit.type == "boiler" then + unit.rtu = boiler_rtu.new(device) + elseif unit.type == "turbine" then + unit.rtu = turbine_rtu.new(device) + elseif unit.type == "imatrix" then + unit.rtu = imatrix_rtu.new(device) + end + + unit.modbus_io = modbus.new(unit.rtu) + + println_ts("reconnected the " .. unit.type .. " on interface " .. unit.name) + end + end + elseif event == "timer" and param1 == loop_clock then + -- start next clock timer + loop_clock = os.startTimer(MAIN_CLOCK) + + -- period tick, if we are linked send heartbeat, if not send advertisement + if rtu_state.linked then + rtu_comms.send_heartbeat() + else + -- advertise units + rtu_comms.send_advertisement(units) + end + elseif event == "modem_message" then + -- got a packet + local packet = rtu_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + smem.q.mq_comms.push_packet(packet) + end + + rtu_comms.handle_packet(packet, units, link_ref) + end + + -- check for termination request + if event == "terminate" or ppm.should_terminate() then + rtu_state.shutdown = true + log._warning("terminate requested, main thread exiting") + break + end + end + end + + return { exec = exec } +end + +-- communications handler thread +function thread__comms(smem) + -- execute thread + local exec = function () + -- load in from shared memory + local rtu_state = smem.rtu_state + local rtu_comms = smem.rtu_sys.rtu_comms + local units = smem.rtu_sys.units + + local comms_queue = smem.q.mq_comms + + local last_update = util.time() + + -- thread loop + while true do + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (rtu_state passed to allow setting link flag) + rtu_comms.handle_packet(msg.message, units, rtu_state) + end + + -- quick yield + if comms_queue.ready() then util.nop() end + end + + -- check for termination request + if rtu_state.shutdown then + log._warning("comms thread exiting") + break + end + + -- delay before next check + local sleep_for = COMMS_CLOCK - (util.time() - last_update) + if sleep_for > 0.05 then + sleep(sleep_for) + end + end + end +end