Untitled
unknown
plain_text
3 years ago
12 kB
13
Indexable
#!/usr/bin/env python
# import normal packages
import platform
import sys
import os
import time
import dbus
import requests # for http GET
import signal
import math
if sys.version_info.major == 2:
import gobject
else:
from gi.repository import GLib as gobject
# our own packages from victron
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
from vedbus import VeDbusService
from ve_utils import wrap_dbus_value
# settings
class Settings:
servicename = "com.victronenergy.evcharger"
deviceinstance = 45 # as defined by venusos
api_url = "http://192.168.178.121/api/"
productname = "go-e Charger"
pv_control_enabled = 1 # startup value
keep_min_charge = 1 # if enabled, keep charging even if power is not sufficient
# contains all methods related to the go-e wallbox control
class Wallbox:
def __init__(self, api_url):
self.api_url = api_url
self.av_amps = 0 # average amps in last 5 minutes
self.store = {} # parameter store
# update a setting of the wallbox
def updateParameter(self, name, value):
if self.store.get(name) == value: # skip if same value is already set
return
self.store.update({name:value})
try:
print ("param update", name, value)
requests.get(self.api_url + "set?" + name + "=" + str(value))
except:
print ("param update failed")
return False
return True
# request status of the wallbox
def requestStatus(self):
return requests.get(self.api_url + "status").json()
# sets the charging power of the wallbox
def set_charging_power(self, target_charge_power):
target_amps = math.floor(target_charge_power/240)
self.av_amps = (self.av_amps * 19 + target_amps) / 20
print("target amps:", target_amps)
print("average amps:", self.av_amps)
if self.av_amps < 3:
self.updateParameter('frc', 1) # force off
elif self.av_amps > 5:
target_amps = max(target_amps, 6)
self.updateParameter('psm', 1) # force single phase
self.updateParameter('frc', 0) # neutral on
self.updateParameter('amp', target_amps) # set amps
# contains all methods related to dbus communication
class Dbus:
def __init__(self, wallbox):
self.nextLoop = 0 # time until next loop runs
self.wallbox = wallbox
#formatting
_kwh = lambda p, v: (str(round(v, 2)) + 'kWh')
_a = lambda p, v: (str(round(v, 1)) + 'A')
_w = lambda p, v: (str(round(v, 1)) + 'W')
_v = lambda p, v: (str(round(v, 1)) + 'V')
_degC = lambda p, v: (str(v) + '°C')
_s = lambda p, v: (str(v) + 's')
_null = lambda p, v: ''
# variables
paths={
'/Ac/Power': {'initial': 0, 'textformat': _w},
'/Ac/L1/Power': {'initial': 0, 'textformat': _w},
'/Ac/L2/Power': {'initial': 0, 'textformat': _w},
'/Ac/L3/Power': {'initial': 0, 'textformat': _w},
'/Ac/Energy/Forward': {'initial': 0, 'textformat': _kwh},
'/ChargingTime': {'initial': 0, 'textformat': _s},
'/Ac/Voltage': {'initial': 0, 'textformat': _v},
'/Current': {'initial': 0, 'textformat': _a},
'/SetCurrent': {'initial': 0, 'textformat': _a},
'/MaxCurrent': {'initial': 0, 'textformat': _a},
'/MCU/Temperature': {'initial': 0, 'textformat': _degC},
'/StartStop': {'initial': 0, 'textformat': _null},
'/Status': {'initial': 0, 'textformat': _null},
'/Mode': {'initial': Settings.pv_control_enabled, 'textformat': _null}
}
self._dbusservice = VeDbusService(Settings.servicename)
# Create the management objects, as specified in the ccgx dbus-api document
self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version())
self._dbusservice.add_path('/Mgmt/Connection', "go-e HTTP API V2")
# Create the mandatory objects
self._dbusservice.add_path('/DeviceInstance', Settings.deviceinstance)
self._dbusservice.add_path('/ProductId', 0xFFFF) #
self._dbusservice.add_path('/ProductName', Settings.productname)
self._dbusservice.add_path('/CustomName', Settings.productname)
self._dbusservice.add_path('/Connected', 1)
self._dbusservice.add_path('/UpdateIndex', 0)
self._dbusservice.add_path('/FirmwareVersion', 0)
self._dbusservice.add_path('/HardwareVersion', 0)
self._dbusservice.add_path('/Serial', 0)
# add path values to dbus
for path, settings in paths.items():
self._dbusservice.add_path(
path, settings['initial'], gettextcallback=settings['textformat'], writeable=True, onchangecallback=self.handlechangedvalue)
# callback from dbus, a value was changed externally
def handlechangedvalue(self, path, value):
print("someone else updated %s to %s" % (path, value))
if path == '/SetCurrent':
self.wallbox.updateParameter('amp', value)
elif path == '/StartStop':
if value == 0: # off -> force off
self.wallbox.updateParameter('frc', 1)
else: # else be neutral
self.wallbox.updateParameter('frc', 0)
elif path == '/MaxCurrent':
self.wallbox.updateParameter('ama', min(value,16))
elif path == '/Mode':
Settings.pv_control_enabled = value
if Settings.pv_control_enabled: #auto
print("auto")
else: # manual
print("manual")
# reset values that pv might change
#self.wallbox.updateParameter('psm', 2) # force triple phase
#self.wallbox.updateParameter('amp', 16) # set back to 16A
self.nextLoop = 10 # wait 1 sec for go-e to process
return True
# main loop running every 100ms
def loop(self) :
if self.nextLoop > 0:
self.nextLoop -= 1
return True
self.nextLoop = 50 # default every 5 sec
try:
#get data from go-eCharger
data = self.wallbox.requestStatus()
#send data to DBus
self._dbusservice['/FirmwareVersion'] = int(data['fwv'].replace('.', ''))
self._dbusservice['/HardwareVersion'] = 2
self._dbusservice['/Serial'] = data['fna']
self._dbusservice['/Ac/L1/Power'] = data['nrg'][7]
self._dbusservice['/Ac/L2/Power'] = data['nrg'][8]
self._dbusservice['/Ac/L3/Power'] = data['nrg'][9]
self._dbusservice['/Ac/Power'] = data['nrg'][11]
self._dbusservice['/Ac/Voltage'] = data['nrg'][0]
self._dbusservice['/Current'] = max(data['nrg'][4], data['nrg'][5], data['nrg'][6])
self._dbusservice['/Ac/Energy/Forward'] = float(data['wh']) / 1000.0
self._dbusservice['/StartStop'] = int(data['alw'])
self._dbusservice['/SetCurrent'] = int(data['amp'])
self._dbusservice['/MaxCurrent'] = int(data['ama'])
if data['cdi'] != None:
if data['cdi']['type'] == 1:
self._dbusservice['/ChargingTime'] = data['cdi']['value']/ 1000 # in seconds
elif data['cdi']['type'] == 0:
self._dbusservice['/ChargingTime'] = (data['rbt'] - data['lcctc'])/ 1000
else:
self._dbusservice['/ChargingTime'] = 0
self._dbusservice['/MCU/Temperature'] = int(data['tma'][0])
# status translation
# venusos 0:EVdisconnected; 1:Connected; 2:Charging; 3:Charged; 4:Wait sun; 5:Wait RFID; 6:Wait enable; 7:Low SOC; 8:Ground error; 9:Welded contacts error; defaut:Unknown;
# go-e: value 'car' 1: charging station ready, no vehicle 2: vehicle loads 3: Waiting for vehicle 4: Charge finished, vehicle still connected
status = 0
if int(data['car']) == 1:
status = 0
elif int(data['car']) == 2:
status = 2
elif int(data['car']) == 3:
status = 6
elif int(data['car']) == 4:
status = 3
self._dbusservice['/Status'] = status
target_consumption = 0 # consumption you aim for
available_power = max(Battery.get_grid_power() + target_consumption + self._dbusservice['/Ac/Power'] + Battery.get_multiplus_power(), 0) # grid + target + charger + multiplus
if Settings.pv_control_enabled and status > 0:
self.wallbox.set_charging_power(available_power)
# limit battery discharge
if status == 2: # reduce discharge if car is charging
Battery.set_max_discharge_power(Battery.get_soc() * 10)
else:
Battery.set_max_discharge_power(-1)
if status == 0:
self.nextLoop = 500 # if nothing is connected, update every 50 sec
except Exception as e:
print(e)
return True # as gobject wants it
# set a value on dbus
def set_value(service, object_path, value):
return dbus.SystemBus().get_object(service, object_path).SetValue(wrap_dbus_value(value))
# get a value from dbus
def get_value(service, object_path):
return dbus.SystemBus().get_object(service, object_path).GetValue()
class Battery:
# get current inverter power
def get_multiplus_power() :
try:
return Dbus.get_value("com.victronenergy.system", "/Dc/Vebus/Power")
except:
return 0
# get current battery soc
def get_soc() :
try:
return Dbus.get_value("com.victronenergy.system", "/Dc/Battery/Soc")
except:
return 0
# set max discharge power of battery
def set_max_discharge_power(power):
if Dbus.get_value("com.victronenergy.settings", "/Settings/CGwacs/MaxDischargePower") != power:
Dbus.set_value("com.victronenergy.settings", "/Settings/CGwacs/MaxDischargePower", power)
print("max discharge power:", power)
# returns available power in the system
def get_grid_power() :
try:
return -(Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L1/Power")\
+ Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L2/Power")\
+ Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L3/Power"))
except:
return 0
def shutdown(a, b):
#dbus_set_value("com.victronenergy.vebus.ttyUSB1", "/Hub4/L1/AcPowerSetpoint", 0)
time.sleep(0.1)
mainloop.quit()
signal.signal(signal.SIGINT, shutdown)
from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True) # Have a mainloop, so we can send/receive asynchronous calls to and from dbus
_wb = Wallbox(Settings.api_url)
_dbus = Dbus(_wb)
# run loop every 100ms
gobject.timeout_add(100, _dbus.loop)
mainloop = gobject.MainLoop()
mainloop.run()
Editor is loading...