Added tracking lost script for LYS8 PLC's

This commit is contained in:
2025-10-17 20:10:13 +01:00
parent d95f6da138
commit 400365546c

View File

@@ -0,0 +1,240 @@
#region Parameters
plcs = {
'PLC020': {
'ip': '10.210.52.33',
'port': 4840
},
'PLC030': {
'ip': '10.210.52.51',
'port': 4840
},
'PLC040': {
'ip': '10.210.52.69',
'port': 4840
},
#'PLC050': {
# 'ip': '10.210.52.87',
# 'port': 4840
#}
}
global unitFields, unitTrackingDB, unitStatusDB
unitFields = ['"UnitID"',
'"EMID"',
'"UniqueID"',
'"Track_Type"',
'"Track_Rec_Cnt"',
'"Track_Lost_Cnt"',
'"Track_Stray_Cnt"',
'"Track_Tolerance"']
unitTrackingDB = '"DBI_Tracking_Statistic"'
unitStatusDB = '"DBI_Unit_Status"."Status_Units"'
#endregion
#region Script
from opcua import Client
from opcua import ua
import pandas as pd
class OPCUA:
connected = False
def __init__(self, ip, port):
self.ip = ip
self.port = port
self.client = Client(f"opc.tcp://{ip}:{port}")
# Set logger
#self.logger = logging.getLogger('PLC')
# Set console
#self.console = Console()
#def setLogLevel(self, level):
# self.logger.setLevel(level)
def connect(self):
try:
self.client.connect()
#self.logger.info(f'Connected to {self.ip}:{self.port} via OPC UA')
print(f'Connected to {self.ip}:{self.port} via OPC UA')
self.connected = True
except Exception as e:
#self.logger.error(f'Failed to connect -> {e}')
print(f'Failed to connect -> {e}')
def disconnect(self):
try:
if self.connected:
self.client.disconnect()
self.connected = False
#self.logger.info(f'Disconnected from {self.ip}:{self.port}')
print(f'Disconnected from {self.ip}:{self.port}')
except Exception as e:
#self.logger.error(f'Failed to disconnect -> {e} via OPC UA')
print(f'Failed to disconnect -> {e} via OPC UA')
def getOrderNumber(self):
if self.connected:
return self.getValue('ns=3;s=OrderNumber')
return None
def getValue(self, tag):
if self.connected:
node = self.client.get_node('ns=3;s=' + tag)
return node.get_value()
return None
def getValues(self, tags):
if self.connected:
nodes = []
for tag in tags:
nodes.append(self.client.get_node('ns=3;s=' + tag))
return self.client.get_values(nodes)
return None
def getWordBits(self, tag):
if self.connected:
node = self.client.get_node('ns=3;s=' + tag)
value = node.get_value()
return list(bin(value)[2:].zfill(16)[::-1])
return [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
def getDateTime(self, tag):
if self.connected:
node = self.client.get_node('ns=3;s=' + tag)
raw = node.get_value()
day = (hex(raw[2])[2:]).zfill(2)
month = (hex(raw[1])[2:]).zfill(2)
year = (hex(raw[0])[2:]).zfill(2)
hour = (hex(raw[3])[2:]).zfill(2)
minutes = (hex(raw[4])[2:]).zfill(2)
seconds = (hex(raw[5])[2:]).zfill(2)
return f'{day}-{month}-{year} {hour}:{minutes}:{seconds}'
return None
def setValue(self, tag, value):
if self.connected:
node = self.client.get_node('ns=3;s=' + tag)
if type(value) == bool:
node.set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Boolean)))
if type(value) == int:
node.set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Byte)))
def setByte(self, tag, value):
if self.connected:
node = self.client.get_node('ns=3;s=' + tag)
node.set_value(ua.DataValue(ua.Variant(value, ua.VariantType.Byte)))
def getNodes(self, tag):
if self.connected:
node = self.client.get_node('ns=3;s=' + tag)
return node.get_children()
return None
def csvdata_to_dataframe(data, separator=','):
return pd.DataFrame([x.split(separator) for x in data.split('\n')])
def read_plc(name, ip, port=4840):
global dataset, unitFields, unitTrackingDB, unitStatusDB
# Verify input parameters
if unitFields is None or unitTrackingDB is None or unitStatusDB is None:
print("Missing parameters")
return
if name is None or ip is None or port is None:
print("Missing PLC parameters")
return
# Setup OPC UA connection
opcPlc = OPCUA(ip, port)
opcPlc.connect()
if opcPlc.connected == False:
return
try:
units = []
Unit_Name = []
Unit_ID = []
EM_ID = []
UniqueID = []
Track_Type = []
Track_Rec_Cnt = []
Track_Lost_Cnt = []
Track_Stray_Cnt = []
Track_Tolerance = []
# Get all units
for node in opcPlc.getNodes(unitTrackingDB):
unitName = node.nodeid.Identifier.split('.')[-1]
if not unitName in ['"Header"', '"header"', '"top_10"', '"Top_10"', '"top10"', '"Top10"']:
units.append(unitName)
# Get data from PLC
for unit in units:
nodes = []
unit_nr = int(unit.split('_')[1])
# Create node list
nodes.append(f'{unitStatusDB}[{unit_nr}]."Unit_Name"')
for field in unitFields:
nodes.append(f'{unitTrackingDB}.{unit}.{field}')
# Get values
values = opcPlc.getValues(nodes)
# Sort values
Unit_Name.append(str(values[0]).replace('"',''))
Unit_ID.append(str(values[1]).replace('"',''))
EM_ID.append(str(values[2]).replace('"',''))
UniqueID.append(str(values[3]).replace('"',''))
Track_Type.append(str(values[4]).replace('"',''))
Track_Rec_Cnt.append(str(values[5]).replace('"',''))
Track_Lost_Cnt.append(str(values[6]).replace('"',''))
Track_Stray_Cnt.append(str(values[7]).replace('"',''))
Track_Tolerance.append(str(values[8]).replace('"',''))
# Create table with all data
table = {
'PLC_Name': name,
'Unit_Name': Unit_Name,
'Unit_ID': Unit_ID,
'EMID': EM_ID,
'UniqueID': UniqueID,
'Track_Type': Track_Type,
'Track_Rec_Cnt': Track_Rec_Cnt,
'Track_Lost_Cnt': Track_Lost_Cnt,
'Track_Stray_Cnt': Track_Stray_Cnt,
'Track_Tolerance': Track_Tolerance
}
# Write data to CSV
pd.DataFrame(table).to_csv(f'{name}.csv')
# Add raw data to dataset
with open(f'{name}.csv') as f:
dataset += f.read()
except KeyboardInterrupt:
pass
finally:
opcPlc.disconnect()
if __name__ == "__main__":
global dataset
dataset = ''
for plcName in plcs:
ip = plcs[plcName]['ip']
port = plcs[plcName]['port']
read_plc(plcName, ip, port)
# convert data to dataframe
df = csvdata_to_dataframe(dataset)
#endregion