''' Originally created by Burt Peterson Updated and maintained by Dustin Roeder (dmroeder@gmail.com) Copyright 2017 Dustin Roeder Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ''' from datetime import datetime, timedelta from lgxDevice import * from random import randrange import socket from struct import * import sys import time programNames = [] taglist = [] class PLC: def __init__(self): ''' Initialize our parameters ''' self.IPAddress = "" self.ProcessorSlot = 0 self.Micro800 = False self.Port = 44818 self.VendorID = 0x1337 self.Context = 0x00 self.ContextPointer = 0 self.Socket = socket.socket() self.SocketConnected = False self.OTNetworkConnectionID=None self.SessionHandle = 0x0000 self.SessionRegistered = False self.SerialNumber = randrange(65000) self.OriginatorSerialNumber = 42 self.SequenceCounter = 1 self.Offset = 0 self.KnownTags = {} self.StructIdentifier = 0x0fCE self.CIPTypes = {160:(88 ,"STRUCT", 'B'), 193:(1, "BOOL", '?'), 194:(1, "SINT", 'b'), 195:(2, "INT", 'h'), 196:(4, "DINT", 'i'), 197:(8, "LINT", 'q'), 198:(1, "USINT", 'B'), 199:(2, "UINT", 'H'), 200:(4, "UDINT", 'I'), 201:(8, "LWORD", 'Q'), 202:(4, "REAL", 'f'), 203:(8, "LREAL", 'd'), 211:(4, "DWORD", 'I'), 218:(0, "STRING", 'B')} def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): ''' Clean up on exit ''' return _closeConnection(self) def Read(self, *args): ''' We have two options for reading depending on the arguments, read a single tag, or read an array ''' if not args: return "You must provide a tag name" elif len(args) == 1: return _readTag(self, args[0], 1) elif len(args) == 2: return _readTag(self, args[0], args[1]) else: return "You provided too many arguments for a read" def Write(self, *args): ''' We have two options for writing depending on the arguments, write a single tag, or write an array ''' if not args or args == 1: return "You must provide a tag name and value" elif len(args) == 2: _writeTag(self, args[0], args[1]) else: return "You provided too many arguments, not sure what you want to do" def MultiRead(self, *args): ''' Read multiple tags in one request ''' return _multiRead(self, args) def GetPLCTime(self): ''' Get the PLC's clock time ''' return _getPLCTime(self) def SetPLCTime(self): ''' Sets the PLC's clock time ''' return _setPLCTime(self) def GetTagList(self): ''' Retrieves the tag list from the PLC ''' return _getTagList(self) def Discover(self): ''' Query all the EIP devices on the network ''' return _discover() def Close(self): ''' Close the connection to the PLC ''' return _closeConnection(self) class LGXTag(): def __init__(self): self.TagName = "" self.Offset = 0 self.DataType = "" def _readTag(self, tag, elements): ''' processes the read request ''' self.Offset = 0 if not _connect(self): return None t,b,i = TagNameParser(tag, 0) InitialRead(self, t, b) datatype = self.KnownTags[b][0] bitCount = self.CIPTypes[datatype][0] * 8 if datatype == 211: # bool array tagData = _buildTagIOI(self, tag, isBoolArray=True) words = _getWordCount(i, elements, bitCount) readRequest = _addReadIOI(self, tagData, words) elif BitofWord(t): # bits of word split_tag = tag.split('.') bitPos = split_tag[len(split_tag)-1] bitPos = int(bitPos) tagData = _buildTagIOI(self, tag, isBoolArray=False) words = _getWordCount(bitPos, elements, bitCount) readRequest = _addReadIOI(self, tagData, words) else: # everything else tagData = _buildTagIOI(self, tag, isBoolArray=False) readRequest = _addReadIOI(self, tagData, elements) eipHeader = _buildEIPHeader(self, readRequest) retData = _getBytes(self, eipHeader) status = unpack_from(' 2: temp += (tagCount-2)*2 offsets = pack(' index > 255: # if index is more than 1 byte... RequestTagData += pack(' 65535: RequestTagData += pack(' index[i] > 255: # if index is more than 1 byte... RequestTagData += pack(' 65535: # if index is more than 4 bytes RequestTagData += pack('T Id EIPItem2ID = 0xB1 #(H) Connecteted Transport (Vol 2 2-6.3.2) EIPItem2Length = EIPConnectedDataLength #(H) Length of CIP Payload EIPSequence = self.SequenceCounter #(H) self.SequenceCounter += 1 self.SequenceCounter = self.SequenceCounter%0x10000 EIPHeaderFrame = pack(' 0: wordCount += 1 return wordCount def InitialRead(self, tag, baseTag): ''' Store each unique tag read in a dict so that we can retreive the data type or data length (for STRING) later ''' # if a tag alread exists, return True if baseTag in self.KnownTags: return True tagData = _buildTagIOI(self, baseTag, isBoolArray=False) readRequest = _addPartialReadIOI(self, tagData, 1) eipHeader = _buildEIPHeader(self, readRequest) # send our tag read request self.Socket.send(eipHeader) retData = self.Socket.recv(1024) status = unpack_from('