Pzem4T_ConverterV1
pythona year ago
#!/usr/bin/env python3
#coding=utf-8
import serial
import time
import struct
class BTPOWER:
setAddrBytes = [0xB4,0xC0,0xA8,0x01,0x01,0x00,0x1E]
readVoltageBytes = [0xB0,0xC0,0xA8,0x01,0x01,0x00,0x1A]
readCurrentBytes = [0XB1,0xC0,0xA8,0x01,0x01,0x00,0x1B]
readPowerBytes = [0XB2,0xC0,0xA8,0x01,0x01,0x00,0x1C]
readRegPowerBytes = [0XB3,0xC0,0xA8,0x01,0x01,0x00,0x1D]
def __init__(self, com="/dev/rfcomm0", timeout=10.0):
self.ser = serial.Serial(
port=com,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout = timeout
)
if self.ser.isOpen():
self.ser.close()
self.ser.open()
def checkChecksum(self, _tuple):
_list = list(_tuple)
_checksum = _list[-1]
_list.pop()
_sum = sum(_list)
if _checksum == _sum%256:
return True
else:
raise Exception("Wrong checksum")
def isReady(self):
self.ser.write(serial.to_bytes(self.setAddrBytes))
rcv = self.ser.read(7)
if len(rcv) == 7:
unpacked = struct.unpack("!7B", rcv)
if(self.checkChecksum(unpacked)):
return True
else:
raise serial.SerialTimeoutException("Timeout setting address")
def readVoltage(self):
self.ser.write(serial.to_bytes(self.readVoltageBytes))
rcv = self.ser.read(7)
if len(rcv) == 7:
unpacked = struct.unpack("!7B", rcv)
if(self.checkChecksum(unpacked)):
tension = unpacked[2]+unpacked[3]/10.0
return tension
else:
raise serial.SerialTimeoutException("Timeout reading tension")
def readCurrent(self):
self.ser.write(serial.to_bytes(self.readCurrentBytes))
rcv = self.ser.read(7)
if len(rcv) == 7:
unpacked = struct.unpack("!7B", rcv)
if(self.checkChecksum(unpacked)):
current = unpacked[2]+unpacked[3]/100.0
return current
else:
raise serial.SerialTimeoutException("Timeout reading current")
def readPower(self):
self.ser.write(serial.to_bytes(self.readPowerBytes))
rcv = self.ser.read(7)
if len(rcv) == 7:
unpacked = struct.unpack("!7B", rcv)
if(self.checkChecksum(unpacked)):
power = unpacked[1]*256+unpacked[2]
return power
else:
raise serial.SerialTimeoutException("Timeout reading power")
def readRegPower(self):
self.ser.write(serial.to_bytes(self.readRegPowerBytes))
rcv = self.ser.read(7)
if len(rcv) == 7:
unpacked = struct.unpack("!7B", rcv)
if(self.checkChecksum(unpacked)):
regPower = unpacked[1]*256*256+unpacked[2]*256+unpacked[3]
return regPower
else:
raise serial.SerialTimeoutException("Timeout reading registered power")
def readAll(self):
if(self.isReady()):
return(self.readVoltage(),self.readCurrent(),self.readPower(),self.readRegPower())
def close(self):
self.ser.close()
if __name__ == "__main__":
try:
sensor = BTPOWER()
print("Checking readiness")
print(sensor.isReady())
print("Reading voltage")
print(sensor.readVoltage())
print("Reading current")
print(sensor.readCurrent())
print("Reading power")
print(sensor.readPower())
print("reading registered power")
print(sensor.readRegPower())
print("reading all")
print(sensor.readAll())
finally:
sensor.close()Pzem4T_GPIOV1
pythona year ago
# Reading PZEM-004t power sensor (new version v3.0) through Modbus-RTU protocol over TTL UART
# Run in python3
# To install library for PZEM:
# pip3 install modbus-tk
# pip3 install pyserial
import time
#library for PZEM-004T V3
import serial
import modbus_tk.defines as cst
from modbus_tk import modbus_rtu
# Connect to the slave
serial = serial.Serial(
port='/dev/ttyS0',
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1,
xonxoff=0
)
master = modbus_rtu.RtuMaster(serial)
master.set_timeout(2.0)
master.set_verbose(True)
while True:
data = master.execute(1, cst.READ_INPUT_REGISTERS, 0, 10)
voltage = data[0] / 10.0 # [V]
current = (data[1] + (data[2] << 16)) / 1000.0 # [A]
power = (data[3] + (data[4] << 16)) / 10.0 # [W]
energy = data[5] + (data[6] << 16) # [Wh]
frequency = data[7] / 10.0 # [Hz]
powerFactor = data[8] / 100.0
alarm = data[9] # 0 = no alarm
print('Voltage [V]\t: ', voltage)
print('Current [A]\t: ', current)
print('Power [W]\t: ', power) # active power (V * I * power factor)
print('Energy [Wh]\t: ', energy)
print('Frequency [Hz]\t: ', frequency)
print('Power factor []\t: ', powerFactor)
#print('Alarm : ', alarm)
print("--------------------")
time.sleep(1)
# Changing power alarm value to 100 W
# master.execute(1, cst.WRITE_SINGLE_REGISTER, 1, output_value=100)
#try:
#master.close()
#if slave.is_open:
#slave.close()
#except:
# pass- Total 2 snippets
- 1