#39 RTU unit threads

This commit is contained in:
Mikayla Fischler 2022-04-29 13:19:01 -04:00
parent e833176c65
commit 84e7ad43bc
4 changed files with 155 additions and 19 deletions

View File

@ -145,13 +145,6 @@ function rtu_comms(modem, local_port, server_port)
self.seq_num = self.seq_num + 1
end
local _send_modbus = function (m_pkt)
local s_pkt = comms.scada_packet()
s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable())
self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable())
self.seq_num = self.seq_num + 1
end
-- PUBLIC FUNCTIONS --
-- reconnect a newly connected modem
@ -164,6 +157,14 @@ function rtu_comms(modem, local_port, server_port)
end
end
-- send a MODBUS TCP packet
local send_modbus = function (m_pkt)
local s_pkt = comms.scada_packet()
s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable())
self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable())
self.seq_num = self.seq_num + 1
end
-- parse a MODBUS/SCADA packet
local parse_packet = function(side, sender, reply_to, message, distance)
local pkt = nil
@ -216,13 +217,28 @@ function rtu_comms(modem, local_port, server_port)
if protocol == PROTOCOLS.MODBUS_TCP then
local reply = modbus.reply__neg_ack(packet)
-- MODBUS instruction
-- handle MODBUS instruction
if packet.unit_id <= #units then
local unit = units[packet.unit_id]
if unit.name == "redstone_io" then
-- immediately execute redstone RTU requests
local return_code, reply = unit.modbus_io.handle_packet(packet)
if not return_code then
log._warning("MODBUS operation failed")
log._warning("requested MODBUS operation failed")
end
else
-- check validity then pass off to unit comms thread
local return_code, reply = unit.modbus_io.check_request(packet)
if return_code then
-- check if an operation is already in progress for this unit
if unit.modbus_busy then
reply = unit.modbus_io.reply__srv_device_busy(packet)
else
unit.pkt_queue.push(packet)
end
else
log._warning("cannot perform requested MODBUS operation")
end
end
else
-- unit ID out of range?
@ -230,7 +246,7 @@ function rtu_comms(modem, local_port, server_port)
log._error("MODBUS packet requesting non-existent unit")
end
_send_modbus(reply)
send_modbus(reply)
elseif protocol == PROTOCOLS.SCADA_MGMT then
-- SCADA management packet
if packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then
@ -302,6 +318,7 @@ function rtu_comms(modem, local_port, server_port)
end
return {
send_modbus = send_modbus,
reconnect_modem = reconnect_modem,
parse_packet = parse_packet,
handle_packet = handle_packet,

View File

@ -19,7 +19,7 @@ os.loadAPI("dev/boiler_rtu.lua")
os.loadAPI("dev/imatrix_rtu.lua")
os.loadAPI("dev/turbine_rtu.lua")
local RTU_VERSION = "alpha-v0.4.7"
local RTU_VERSION = "alpha-v0.4.8"
local print = util.print
local println = util.println
@ -142,7 +142,10 @@ for reactor_idx = 1, #rtu_redstone do
reactor = rtu_redstone[reactor_idx].for_reactor,
device = capabilities, -- use device field for redstone channels
rtu = rs_rtu,
modbus_io = modbus.new(rs_rtu)
modbus_io = modbus.new(rs_rtu),
modbus_busy = false,
pkt_queue = nil,
thread = nil
})
log._debug("init> initialized RTU unit #" .. #units .. ": redstone_io (redstone) [1] for reactor " .. rtu_redstone[reactor_idx].for_reactor)
@ -180,15 +183,22 @@ for i = 1, #rtu_devices do
end
if rtu_iface ~= nil then
table.insert(units, {
local rtu_unit = {
name = rtu_devices[i].name,
type = rtu_type,
index = rtu_devices[i].index,
reactor = rtu_devices[i].for_reactor,
device = device,
rtu = rtu_iface,
modbus_io = modbus.new(rtu_iface)
})
modbus_io = modbus.new(rtu_iface),
modbus_busy = false,
pkt_queue = mqueue.new(),
thread = nil
}
rtu_unit.thread = threads.thread__unit_comms(__shared_memory, rtu_unit)
table.insert(units, rtu_unit)
log._debug("init> initialized RTU unit #" .. #units .. ": " .. rtu_devices[i].name .. " (" .. rtu_type .. ") [" ..
rtu_devices[i].index .. "] for reactor " .. rtu_devices[i].for_reactor)
@ -208,8 +218,16 @@ local comms_thread = threads.thread__comms(__shared_memory)
smem_sys.conn_watchdog = util.new_watchdog(5)
log._debug("init> conn watchdog started")
-- assemble thread list
local _threads = { main_thread.exec, comms_thread.exec }
for i = 1, #units do
if units[i].thread ~= nil then
table.insert(_threads, units[i].thread.exec)
end
end
-- run threads
parallel.waitForAll(main_thread.exec, comms_thread.exec)
parallel.waitForAll(table.unpack(_threads))
println_ts("exited")
log._info("exited")

View File

@ -180,3 +180,52 @@ function thread__comms(smem)
return { exec = exec }
end
-- per-unit communications handler thread
function thread__unit_comms(smem, unit)
-- execute thread
local exec = function ()
log._debug("rtu unit thread start -> " .. unit.name .. "(" .. unit.type .. ")")
-- load in from shared memory
local rtu_state = smem.rtu_state
local packet_queue = unit.pkt_queue
local last_update = util.time()
-- thread loop
while true do
-- check for messages in the message queue
while packet_queue.ready() and not rtu_state.shutdown do
local msg = packet_queue.pop()
if msg.qtype == mqueue.TYPE.COMMAND then
-- received a command
elseif msg.qtype == mqueue.TYPE.DATA then
-- received data
elseif msg.qtype == mqueue.TYPE.PACKET then
-- received a packet
unit.modbus_busy = true
local return_code, reply = unit.modbus_io.handle_packet(packet)
rtu.send_modbus(reply)
unit.modbus_busy = false
end
-- quick yield
util.nop()
end
-- check for termination request
if rtu_state.shutdown then
log._warning("rtu unit thread exiting -> " .. unit.name .. "(" .. unit.type .. ")")
break
end
-- delay before next check
last_update = util.adaptive_delay(COMMS_SLEEP, last_update)
end
end
return { exec = exec }
end

View File

@ -203,6 +203,46 @@ function new(rtu_dev)
return return_ok, response
end
-- validate a request without actually executing it
local check_request = function (packet)
local return_code = true
local response = { MODBUS_EXCODE.ACKNOWLEDGE }
if #packet.data == 2 then
-- handle by function code
if packet.func_code == MODBUS_FCODE.READ_COILS then
elseif packet.func_code == MODBUS_FCODE.READ_DISCRETE_INPUTS then
elseif packet.func_code == MODBUS_FCODE.READ_MUL_HOLD_REGS then
elseif packet.func_code == MODBUS_FCODE.READ_INPUT_REGISTERS then
elseif packet.func_code == MODBUS_FCODE.WRITE_SINGLE_COIL then
elseif packet.func_code == MODBUS_FCODE.WRITE_SINGLE_HOLD_REG then
elseif packet.func_code == MODBUS_FCODE.WRITE_MUL_COILS then
elseif packet.func_code == MODBUS_FCODE.WRITE_MUL_HOLD_REGS then
else
-- unknown function
return_code = false
response = { MODBUS_EXCODE.ILLEGAL_FUNCTION }
end
else
-- invalid length
return_code = false
response = { MODBUS_EXCODE.NEG_ACKNOWLEDGE }
end
-- default is to echo back
local func_code = packet.func_code
if not return_code then
-- echo back with error flag
func_code = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG)
end
-- create reply
local reply = comms.modbus_packet()
reply.make(packet.txn_id, packet.unit_id, func_code, response)
return return_code, reply
end
-- handle a MODBUS TCP packet and generate a reply
local handle_packet = function (packet)
local return_code = true
@ -258,6 +298,16 @@ function new(rtu_dev)
return return_code, reply
end
-- return a SERVER_DEVICE_BUSY error reply
local reply__srv_device_busy = function (packet)
-- reply back with error flag and exception code
local reply = comms.modbus_packet()
local fcode = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG)
local data = { MODBUS_EXCODE.SERVER_DEVICE_BUSY }
reply.make(packet.txn_id, packet.unit_id, fcode, data)
return reply
end
-- return a NEG_ACKNOWLEDGE error reply
local reply__neg_ack = function (packet)
-- reply back with error flag and exception code
@ -279,7 +329,9 @@ function new(rtu_dev)
end
return {
check_request = check_request,
handle_packet = handle_packet,
reply__srv_device_busy = reply__srv_device_busy,
reply__neg_ack = reply__neg_ack,
reply__gw_unavailable = reply__gw_unavailable
}