From 400365546cbd4bc60ca174c4bf06ef82e67fac3e Mon Sep 17 00:00:00 2001 From: Daniel Elias Date: Fri, 17 Oct 2025 20:10:13 +0100 Subject: [PATCH] Added tracking lost script for LYS8 PLC's --- Scripts/LYS8_Tracking_Statistics.py | 240 ++++++++++++++++++++++++++++ 1 file changed, 240 insertions(+) create mode 100644 Scripts/LYS8_Tracking_Statistics.py diff --git a/Scripts/LYS8_Tracking_Statistics.py b/Scripts/LYS8_Tracking_Statistics.py new file mode 100644 index 0000000..a8f49a3 --- /dev/null +++ b/Scripts/LYS8_Tracking_Statistics.py @@ -0,0 +1,240 @@ +#region Parameters +plcs = { + 'PLC020': { + 'ip': '10.210.52.33', + 'port': 4840 + }, + 'PLC030': { + 'ip': '10.210.52.51', + 'port': 4840 + }, + 'PLC040': { + 'ip': '10.210.52.69', + 'port': 4840 + }, + #'PLC050': { + # 'ip': '10.210.52.87', + # 'port': 4840 + #} +} + +global unitFields, unitTrackingDB, unitStatusDB +unitFields = ['"UnitID"', + '"EMID"', + '"UniqueID"', + '"Track_Type"', + '"Track_Rec_Cnt"', + '"Track_Lost_Cnt"', + '"Track_Stray_Cnt"', + '"Track_Tolerance"'] +unitTrackingDB = '"DBI_Tracking_Statistic"' +unitStatusDB = '"DBI_Unit_Status"."Status_Units"' +#endregion + +#region Script +from opcua import Client +from opcua import ua +import pandas as pd + +class OPCUA: + connected = False + + def __init__(self, ip, port): + self.ip = ip + self.port = port + self.client = Client(f"opc.tcp://{ip}:{port}") + + # Set logger + #self.logger = logging.getLogger('PLC') + + # Set console + #self.console = Console() + + #def setLogLevel(self, level): + # self.logger.setLevel(level) + + def connect(self): + try: + self.client.connect() + #self.logger.info(f'Connected to {self.ip}:{self.port} via OPC UA') + print(f'Connected to {self.ip}:{self.port} via OPC UA') + self.connected = True + except Exception as e: + #self.logger.error(f'Failed to connect -> {e}') + print(f'Failed to connect -> {e}') + + def disconnect(self): + try: + if self.connected: + self.client.disconnect() + self.connected = False + #self.logger.info(f'Disconnected from {self.ip}:{self.port}') + print(f'Disconnected from {self.ip}:{self.port}') + except Exception as e: + #self.logger.error(f'Failed to disconnect -> {e} via OPC UA') + print(f'Failed to disconnect -> {e} via OPC UA') + + def getOrderNumber(self): + if self.connected: + return self.getValue('ns=3;s=OrderNumber') + return None + + def getValue(self, tag): + if self.connected: + node = self.client.get_node('ns=3;s=' + tag) + return node.get_value() + return None + + def getValues(self, tags): + if self.connected: + nodes = [] + for tag in tags: + nodes.append(self.client.get_node('ns=3;s=' + tag)) + + return self.client.get_values(nodes) + return None + + def getWordBits(self, tag): + if self.connected: + node = self.client.get_node('ns=3;s=' + tag) + value = node.get_value() + return list(bin(value)[2:].zfill(16)[::-1]) + return [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + + def getDateTime(self, tag): + if self.connected: + node = self.client.get_node('ns=3;s=' + tag) + raw = node.get_value() + day = (hex(raw[2])[2:]).zfill(2) + month = (hex(raw[1])[2:]).zfill(2) + year = (hex(raw[0])[2:]).zfill(2) + hour = (hex(raw[3])[2:]).zfill(2) + minutes = (hex(raw[4])[2:]).zfill(2) + seconds = (hex(raw[5])[2:]).zfill(2) + + return f'{day}-{month}-{year} {hour}:{minutes}:{seconds}' + return None + + def setValue(self, tag, value): + if self.connected: + node = self.client.get_node('ns=3;s=' + tag) + if type(value) == bool: + node.set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Boolean))) + if type(value) == int: + node.set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Byte))) + + def setByte(self, tag, value): + if self.connected: + node = self.client.get_node('ns=3;s=' + tag) + node.set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Byte))) + + def getNodes(self, tag): + if self.connected: + node = self.client.get_node('ns=3;s=' + tag) + return node.get_children() + return None + +def csvdata_to_dataframe(data, separator=','): + return pd.DataFrame([x.split(separator) for x in data.split('\n')]) + +def read_plc(name, ip, port=4840): + global dataset, unitFields, unitTrackingDB, unitStatusDB + + # Verify input parameters + if unitFields is None or unitTrackingDB is None or unitStatusDB is None: + print("Missing parameters") + return + + if name is None or ip is None or port is None: + print("Missing PLC parameters") + return + + # Setup OPC UA connection + opcPlc = OPCUA(ip, port) + opcPlc.connect() + + if opcPlc.connected == False: + return + + try: + units = [] + Unit_Name = [] + Unit_ID = [] + EM_ID = [] + UniqueID = [] + Track_Type = [] + Track_Rec_Cnt = [] + Track_Lost_Cnt = [] + Track_Stray_Cnt = [] + Track_Tolerance = [] + + # Get all units + for node in opcPlc.getNodes(unitTrackingDB): + unitName = node.nodeid.Identifier.split('.')[-1] + if not unitName in ['"Header"', '"header"', '"top_10"', '"Top_10"', '"top10"', '"Top10"']: + units.append(unitName) + + # Get data from PLC + for unit in units: + nodes = [] + unit_nr = int(unit.split('_')[1]) + + # Create node list + nodes.append(f'{unitStatusDB}[{unit_nr}]."Unit_Name"') + for field in unitFields: + nodes.append(f'{unitTrackingDB}.{unit}.{field}') + + # Get values + values = opcPlc.getValues(nodes) + + # Sort values + Unit_Name.append(str(values[0]).replace('"','')) + Unit_ID.append(str(values[1]).replace('"','')) + EM_ID.append(str(values[2]).replace('"','')) + UniqueID.append(str(values[3]).replace('"','')) + Track_Type.append(str(values[4]).replace('"','')) + Track_Rec_Cnt.append(str(values[5]).replace('"','')) + Track_Lost_Cnt.append(str(values[6]).replace('"','')) + Track_Stray_Cnt.append(str(values[7]).replace('"','')) + Track_Tolerance.append(str(values[8]).replace('"','')) + + # Create table with all data + table = { + 'PLC_Name': name, + 'Unit_Name': Unit_Name, + 'Unit_ID': Unit_ID, + 'EMID': EM_ID, + 'UniqueID': UniqueID, + 'Track_Type': Track_Type, + 'Track_Rec_Cnt': Track_Rec_Cnt, + 'Track_Lost_Cnt': Track_Lost_Cnt, + 'Track_Stray_Cnt': Track_Stray_Cnt, + 'Track_Tolerance': Track_Tolerance + } + + # Write data to CSV + pd.DataFrame(table).to_csv(f'{name}.csv') + + # Add raw data to dataset + with open(f'{name}.csv') as f: + dataset += f.read() + + except KeyboardInterrupt: + pass + finally: + opcPlc.disconnect() + +if __name__ == "__main__": + global dataset + dataset = '' + + for plcName in plcs: + ip = plcs[plcName]['ip'] + port = plcs[plcName]['port'] + read_plc(plcName, ip, port) + + # convert data to dataframe + df = csvdata_to_dataframe(dataset) + + +#endregion \ No newline at end of file