diff --git a/rtu/rtu.lua b/rtu/rtu.lua index 27f18c6..a35770d 100644 --- a/rtu/rtu.lua +++ b/rtu/rtu.lua @@ -145,13 +145,6 @@ function rtu_comms(modem, local_port, server_port) self.seq_num = self.seq_num + 1 end - local _send_modbus = function (m_pkt) - local s_pkt = comms.scada_packet() - s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) - self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable()) - self.seq_num = self.seq_num + 1 - end - -- PUBLIC FUNCTIONS -- -- reconnect a newly connected modem @@ -164,6 +157,14 @@ function rtu_comms(modem, local_port, server_port) end end + -- send a MODBUS TCP packet + local send_modbus = function (m_pkt) + local s_pkt = comms.scada_packet() + s_pkt.make(self.seq_num, PROTOCOLS.MODBUS_TCP, m_pkt.raw_sendable()) + self.modem.transmit(self.s_port, self.l_port, s_pkt.raw_sendable()) + self.seq_num = self.seq_num + 1 + end + -- parse a MODBUS/SCADA packet local parse_packet = function(side, sender, reply_to, message, distance) local pkt = nil @@ -216,13 +217,28 @@ function rtu_comms(modem, local_port, server_port) if protocol == PROTOCOLS.MODBUS_TCP then local reply = modbus.reply__neg_ack(packet) - -- MODBUS instruction + -- handle MODBUS instruction if packet.unit_id <= #units then local unit = units[packet.unit_id] - local return_code, reply = unit.modbus_io.handle_packet(packet) - - if not return_code then - log._warning("MODBUS operation failed") + if unit.name == "redstone_io" then + -- immediately execute redstone RTU requests + local return_code, reply = unit.modbus_io.handle_packet(packet) + if not return_code then + log._warning("requested MODBUS operation failed") + end + else + -- check validity then pass off to unit comms thread + local return_code, reply = unit.modbus_io.check_request(packet) + if return_code then + -- check if an operation is already in progress for this unit + if unit.modbus_busy then + reply = unit.modbus_io.reply__srv_device_busy(packet) + else + unit.pkt_queue.push(packet) + end + else + log._warning("cannot perform requested MODBUS operation") + end end else -- unit ID out of range? @@ -230,7 +246,7 @@ function rtu_comms(modem, local_port, server_port) log._error("MODBUS packet requesting non-existent unit") end - _send_modbus(reply) + send_modbus(reply) elseif protocol == PROTOCOLS.SCADA_MGMT then -- SCADA management packet if packet.type == SCADA_MGMT_TYPES.REMOTE_LINKED then @@ -302,6 +318,7 @@ function rtu_comms(modem, local_port, server_port) end return { + send_modbus = send_modbus, reconnect_modem = reconnect_modem, parse_packet = parse_packet, handle_packet = handle_packet, diff --git a/rtu/startup.lua b/rtu/startup.lua index d4360b5..aad3483 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -19,7 +19,7 @@ os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.4.7" +local RTU_VERSION = "alpha-v0.4.8" local print = util.print local println = util.println @@ -142,7 +142,10 @@ for reactor_idx = 1, #rtu_redstone do reactor = rtu_redstone[reactor_idx].for_reactor, device = capabilities, -- use device field for redstone channels rtu = rs_rtu, - modbus_io = modbus.new(rs_rtu) + modbus_io = modbus.new(rs_rtu), + modbus_busy = false, + pkt_queue = nil, + thread = nil }) log._debug("init> initialized RTU unit #" .. #units .. ": redstone_io (redstone) [1] for reactor " .. rtu_redstone[reactor_idx].for_reactor) @@ -180,15 +183,22 @@ for i = 1, #rtu_devices do end if rtu_iface ~= nil then - table.insert(units, { + local rtu_unit = { name = rtu_devices[i].name, type = rtu_type, index = rtu_devices[i].index, reactor = rtu_devices[i].for_reactor, device = device, rtu = rtu_iface, - modbus_io = modbus.new(rtu_iface) - }) + modbus_io = modbus.new(rtu_iface), + modbus_busy = false, + pkt_queue = mqueue.new(), + thread = nil + } + + rtu_unit.thread = threads.thread__unit_comms(__shared_memory, rtu_unit) + + table.insert(units, rtu_unit) log._debug("init> initialized RTU unit #" .. #units .. ": " .. rtu_devices[i].name .. " (" .. rtu_type .. ") [" .. rtu_devices[i].index .. "] for reactor " .. rtu_devices[i].for_reactor) @@ -208,8 +218,16 @@ local comms_thread = threads.thread__comms(__shared_memory) smem_sys.conn_watchdog = util.new_watchdog(5) log._debug("init> conn watchdog started") +-- assemble thread list +local _threads = { main_thread.exec, comms_thread.exec } +for i = 1, #units do + if units[i].thread ~= nil then + table.insert(_threads, units[i].thread.exec) + end +end + -- run threads -parallel.waitForAll(main_thread.exec, comms_thread.exec) +parallel.waitForAll(table.unpack(_threads)) println_ts("exited") log._info("exited") diff --git a/rtu/threads.lua b/rtu/threads.lua index b02aafb..8aef13f 100644 --- a/rtu/threads.lua +++ b/rtu/threads.lua @@ -180,3 +180,52 @@ function thread__comms(smem) return { exec = exec } end + +-- per-unit communications handler thread +function thread__unit_comms(smem, unit) + -- execute thread + local exec = function () + log._debug("rtu unit thread start -> " .. unit.name .. "(" .. unit.type .. ")") + + -- load in from shared memory + local rtu_state = smem.rtu_state + + local packet_queue = unit.pkt_queue + + local last_update = util.time() + + -- thread loop + while true do + -- check for messages in the message queue + while packet_queue.ready() and not rtu_state.shutdown do + local msg = packet_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + unit.modbus_busy = true + local return_code, reply = unit.modbus_io.handle_packet(packet) + rtu.send_modbus(reply) + unit.modbus_busy = false + end + + -- quick yield + util.nop() + end + + -- check for termination request + if rtu_state.shutdown then + log._warning("rtu unit thread exiting -> " .. unit.name .. "(" .. unit.type .. ")") + break + end + + -- delay before next check + last_update = util.adaptive_delay(COMMS_SLEEP, last_update) + end + end + + return { exec = exec } +end diff --git a/scada-common/modbus.lua b/scada-common/modbus.lua index 494fde8..a2a1cc3 100644 --- a/scada-common/modbus.lua +++ b/scada-common/modbus.lua @@ -203,6 +203,46 @@ function new(rtu_dev) return return_ok, response end + -- validate a request without actually executing it + local check_request = function (packet) + local return_code = true + local response = { MODBUS_EXCODE.ACKNOWLEDGE } + + if #packet.data == 2 then + -- handle by function code + if packet.func_code == MODBUS_FCODE.READ_COILS then + elseif packet.func_code == MODBUS_FCODE.READ_DISCRETE_INPUTS then + elseif packet.func_code == MODBUS_FCODE.READ_MUL_HOLD_REGS then + elseif packet.func_code == MODBUS_FCODE.READ_INPUT_REGISTERS then + elseif packet.func_code == MODBUS_FCODE.WRITE_SINGLE_COIL then + elseif packet.func_code == MODBUS_FCODE.WRITE_SINGLE_HOLD_REG then + elseif packet.func_code == MODBUS_FCODE.WRITE_MUL_COILS then + elseif packet.func_code == MODBUS_FCODE.WRITE_MUL_HOLD_REGS then + else + -- unknown function + return_code = false + response = { MODBUS_EXCODE.ILLEGAL_FUNCTION } + end + else + -- invalid length + return_code = false + response = { MODBUS_EXCODE.NEG_ACKNOWLEDGE } + end + + -- default is to echo back + local func_code = packet.func_code + if not return_code then + -- echo back with error flag + func_code = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG) + end + + -- create reply + local reply = comms.modbus_packet() + reply.make(packet.txn_id, packet.unit_id, func_code, response) + + return return_code, reply + end + -- handle a MODBUS TCP packet and generate a reply local handle_packet = function (packet) local return_code = true @@ -258,6 +298,16 @@ function new(rtu_dev) return return_code, reply end + -- return a SERVER_DEVICE_BUSY error reply + local reply__srv_device_busy = function (packet) + -- reply back with error flag and exception code + local reply = comms.modbus_packet() + local fcode = bit.bor(packet.func_code, MODBUS_FCODE.ERROR_FLAG) + local data = { MODBUS_EXCODE.SERVER_DEVICE_BUSY } + reply.make(packet.txn_id, packet.unit_id, fcode, data) + return reply + end + -- return a NEG_ACKNOWLEDGE error reply local reply__neg_ack = function (packet) -- reply back with error flag and exception code @@ -279,7 +329,9 @@ function new(rtu_dev) end return { + check_request = check_request, handle_packet = handle_packet, + reply__srv_device_busy = reply__srv_device_busy, reply__neg_ack = reply__neg_ack, reply__gw_unavailable = reply__gw_unavailable }