# -*- coding: utf-8 -*-
# Python module: ModbusClient class (Client ModBus/TCP)
from . import constants as const
from .utils import crc16, set_bit
import re
import socket
import select
import struct
import random
class ModbusClient:
"""Modbus TCP client"""
def __init__(self, host=None, port=None, unit_id=None, timeout=None,
debug=None, auto_open=None, auto_close=None):
"""Constructor
Modbus server params (host, port) can be set here or with host(), port()
functions. Same for debug option.
Use functions avoid to launch ValueError except if params is incorrect.
:param host: hostname or IPv4/IPv6 address server address (optional)
:type host: str
:param port: TCP port number (optional)
:type port: int
:param unit_id: unit ID (optional)
:type unit_id: int
:param timeout: socket timeout in seconds (optional)
:type timeout: float
:param debug: debug state (optional)
:type debug: bool
:param auto_open: auto TCP connect (optional)
:type auto_open: bool
:param auto_close: auto TCP close (optional)
:type auto_close: bool
:return: Object ModbusClient
:rtype: ModbusClient
:raises ValueError: if a set parameter value is incorrect
"""
# object vars
self.__hostname = 'localhost'
self.__port = const.MODBUS_PORT
self.__unit_id = 1
self.__timeout = 30.0 # socket timeout
self.__debug = False # debug trace on/off
self.__auto_open = False # auto TCP connect
self.__auto_close = False # auto TCP close
self.__mode = const.MODBUS_TCP # default is Modbus/TCP
self.__sock = None # socket handle
self.__hd_tr_id = 0 # store transaction ID
self.__version = const.VERSION # version number
self.__last_error = const.MB_NO_ERR # last error code
self.__last_except = 0 # last expect code
# set host
if host:
if not self.host(host):
raise ValueError('host value error')
# set port
if port:
if not self.port(port):
raise ValueError('port value error')
# set unit_id
if unit_id is not None:
if self.unit_id(unit_id) is None:
raise ValueError('unit_id value error')
# set timeout
if timeout:
if not self.timeout(timeout):
raise ValueError('timeout value error')
# set debug
if debug:
if not self.debug(debug):
raise ValueError('debug value error')
# set auto_open
if auto_open:
if not self.auto_open(auto_open):
raise ValueError('auto_open value error')
# set auto_close
if auto_close:
if not self.auto_close(auto_close):
raise ValueError('auto_close value error')
def version(self):
"""Get package version
:return: current version of the package (like "0.0.1")
:rtype: str
"""
return self.__version
def last_error(self):
"""Get last error code
:return: last error code
:rtype: int
"""
return self.__last_error
def last_except(self):
"""Get last except code
:return: last except code
:rtype: int
"""
return self.__last_except
def host(self, hostname=None):
"""Get or set host (IPv4/IPv6 or hostname like 'plc.domain.net')
:param hostname: hostname or IPv4/IPv6 address or None for get value
:type hostname: str or None
:returns: hostname or None if set fail
:rtype: str or None
"""
if (hostname is None) or (hostname == self.__hostname):
return self.__hostname
# when hostname change ensure old socket is close
self.close()
# IPv4 ?
try:
socket.inet_pton(socket.AF_INET, hostname)
self.__hostname = hostname
return self.__hostname
except socket.error:
pass
# IPv6 ?
try:
socket.inet_pton(socket.AF_INET6, hostname)
self.__hostname = hostname
return self.__hostname
except socket.error:
pass
# DNS name ?
if re.match('^[a-z][a-z0-9\.\-]+$', hostname):
self.__hostname = hostname
return self.__hostname
else:
return None
def port(self, port=None):
"""Get or set TCP port
:param port: TCP port number or None for get value
:type port: int or None
:returns: TCP port or None if set fail
:rtype: int or None
"""
if (port is None) or (port == self.__port):
return self.__port
# when port change ensure old socket is close
self.close()
# valid port ?
if 0 < int(port) < 65536:
self.__port = int(port)
return self.__port
else:
return None
def unit_id(self, unit_id=None):
"""Get or set unit ID field
:param unit_id: unit ID (0 to 255) or None for get value
:type unit_id: int or None
:returns: unit ID or None if set fail
:rtype: int or None
"""
if unit_id is None:
return self.__unit_id
if 0 <= int(unit_id) < 256:
self.__unit_id = int(unit_id)
return self.__unit_id
else:
return None
def timeout(self, timeout=None):
"""Get or set timeout field
:param timeout: socket timeout in seconds or None for get value
:type timeout: float or None
:returns: timeout or None if set fail
:rtype: float or None
"""
if timeout is None:
return self.__timeout
if 0 < float(timeout) < 3600:
self.__timeout = float(timeout)
return self.__timeout
else:
return None
def debug(self, state=None):
"""Get or set debug mode
:param state: debug state or None for get value
:type state: bool or None
:returns: debug state or None if set fail
:rtype: bool or None
"""
if state is None:
return self.__debug
self.__debug = bool(state)
return self.__debug
def auto_open(self, state=None):
"""Get or set automatic TCP connect mode
:param state: auto_open state or None for get value
:type state: bool or None
:returns: auto_open state or None if set fail
:rtype: bool or None
"""
if state is None:
return self.__auto_open
self.__auto_open = bool(state)
return self.__auto_open
def auto_close(self, state=None):
"""Get or set automatic TCP close mode (after each request)
:param state: auto_close state or None for get value
:type state: bool or None
:returns: auto_close state or None if set fail
:rtype: bool or None
"""
if state is None:
return self.__auto_close
self.__auto_close = bool(state)
return self.__auto_close
def mode(self, mode=None):
"""Get or set modbus mode (TCP or RTU)
:param mode: mode (MODBUS_TCP/MODBUS_RTU) to set or None for get value
:type mode: int
:returns: mode or None if set fail
:rtype: int or None
"""
if mode is None:
return self.__mode
if mode == const.MODBUS_TCP or mode == const.MODBUS_RTU:
self.__mode = mode
return self.__mode
else:
return None
def open(self):
"""Connect to modbus server (open TCP connection)
:retu