Untitled
unknown
plain_text
6 months ago
12 kB
1
Indexable
Never
#!/usr/bin/env python # import normal packages import platform import sys import os import time import dbus import requests # for http GET import signal import math if sys.version_info.major == 2: import gobject else: from gi.repository import GLib as gobject # our own packages from victron sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python')) from vedbus import VeDbusService from ve_utils import wrap_dbus_value # settings class Settings: servicename = "com.victronenergy.evcharger" deviceinstance = 45 # as defined by venusos api_url = "http://192.168.178.121/api/" productname = "go-e Charger" pv_control_enabled = 1 # startup value keep_min_charge = 1 # if enabled, keep charging even if power is not sufficient # contains all methods related to the go-e wallbox control class Wallbox: def __init__(self, api_url): self.api_url = api_url self.av_amps = 0 # average amps in last 5 minutes self.store = {} # parameter store # update a setting of the wallbox def updateParameter(self, name, value): if self.store.get(name) == value: # skip if same value is already set return self.store.update({name:value}) try: print ("param update", name, value) requests.get(self.api_url + "set?" + name + "=" + str(value)) except: print ("param update failed") return False return True # request status of the wallbox def requestStatus(self): return requests.get(self.api_url + "status").json() # sets the charging power of the wallbox def set_charging_power(self, target_charge_power): target_amps = math.floor(target_charge_power/240) self.av_amps = (self.av_amps * 19 + target_amps) / 20 print("target amps:", target_amps) print("average amps:", self.av_amps) if self.av_amps < 3: self.updateParameter('frc', 1) # force off elif self.av_amps > 5: target_amps = max(target_amps, 6) self.updateParameter('psm', 1) # force single phase self.updateParameter('frc', 0) # neutral on self.updateParameter('amp', target_amps) # set amps # contains all methods related to dbus communication class Dbus: def __init__(self, wallbox): self.nextLoop = 0 # time until next loop runs self.wallbox = wallbox #formatting _kwh = lambda p, v: (str(round(v, 2)) + 'kWh') _a = lambda p, v: (str(round(v, 1)) + 'A') _w = lambda p, v: (str(round(v, 1)) + 'W') _v = lambda p, v: (str(round(v, 1)) + 'V') _degC = lambda p, v: (str(v) + '°C') _s = lambda p, v: (str(v) + 's') _null = lambda p, v: '' # variables paths={ '/Ac/Power': {'initial': 0, 'textformat': _w}, '/Ac/L1/Power': {'initial': 0, 'textformat': _w}, '/Ac/L2/Power': {'initial': 0, 'textformat': _w}, '/Ac/L3/Power': {'initial': 0, 'textformat': _w}, '/Ac/Energy/Forward': {'initial': 0, 'textformat': _kwh}, '/ChargingTime': {'initial': 0, 'textformat': _s}, '/Ac/Voltage': {'initial': 0, 'textformat': _v}, '/Current': {'initial': 0, 'textformat': _a}, '/SetCurrent': {'initial': 0, 'textformat': _a}, '/MaxCurrent': {'initial': 0, 'textformat': _a}, '/MCU/Temperature': {'initial': 0, 'textformat': _degC}, '/StartStop': {'initial': 0, 'textformat': _null}, '/Status': {'initial': 0, 'textformat': _null}, '/Mode': {'initial': Settings.pv_control_enabled, 'textformat': _null} } self._dbusservice = VeDbusService(Settings.servicename) # Create the management objects, as specified in the ccgx dbus-api document self._dbusservice.add_path('/Mgmt/ProcessName', __file__) self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version()) self._dbusservice.add_path('/Mgmt/Connection', "go-e HTTP API V2") # Create the mandatory objects self._dbusservice.add_path('/DeviceInstance', Settings.deviceinstance) self._dbusservice.add_path('/ProductId', 0xFFFF) # self._dbusservice.add_path('/ProductName', Settings.productname) self._dbusservice.add_path('/CustomName', Settings.productname) self._dbusservice.add_path('/Connected', 1) self._dbusservice.add_path('/UpdateIndex', 0) self._dbusservice.add_path('/FirmwareVersion', 0) self._dbusservice.add_path('/HardwareVersion', 0) self._dbusservice.add_path('/Serial', 0) # add path values to dbus for path, settings in paths.items(): self._dbusservice.add_path( path, settings['initial'], gettextcallback=settings['textformat'], writeable=True, onchangecallback=self.handlechangedvalue) # callback from dbus, a value was changed externally def handlechangedvalue(self, path, value): print("someone else updated %s to %s" % (path, value)) if path == '/SetCurrent': self.wallbox.updateParameter('amp', value) elif path == '/StartStop': if value == 0: # off -> force off self.wallbox.updateParameter('frc', 1) else: # else be neutral self.wallbox.updateParameter('frc', 0) elif path == '/MaxCurrent': self.wallbox.updateParameter('ama', min(value,16)) elif path == '/Mode': Settings.pv_control_enabled = value if Settings.pv_control_enabled: #auto print("auto") else: # manual print("manual") # reset values that pv might change #self.wallbox.updateParameter('psm', 2) # force triple phase #self.wallbox.updateParameter('amp', 16) # set back to 16A self.nextLoop = 10 # wait 1 sec for go-e to process return True # main loop running every 100ms def loop(self) : if self.nextLoop > 0: self.nextLoop -= 1 return True self.nextLoop = 50 # default every 5 sec try: #get data from go-eCharger data = self.wallbox.requestStatus() #send data to DBus self._dbusservice['/FirmwareVersion'] = int(data['fwv'].replace('.', '')) self._dbusservice['/HardwareVersion'] = 2 self._dbusservice['/Serial'] = data['fna'] self._dbusservice['/Ac/L1/Power'] = data['nrg'][7] self._dbusservice['/Ac/L2/Power'] = data['nrg'][8] self._dbusservice['/Ac/L3/Power'] = data['nrg'][9] self._dbusservice['/Ac/Power'] = data['nrg'][11] self._dbusservice['/Ac/Voltage'] = data['nrg'][0] self._dbusservice['/Current'] = max(data['nrg'][4], data['nrg'][5], data['nrg'][6]) self._dbusservice['/Ac/Energy/Forward'] = float(data['wh']) / 1000.0 self._dbusservice['/StartStop'] = int(data['alw']) self._dbusservice['/SetCurrent'] = int(data['amp']) self._dbusservice['/MaxCurrent'] = int(data['ama']) if data['cdi'] != None: if data['cdi']['type'] == 1: self._dbusservice['/ChargingTime'] = data['cdi']['value']/ 1000 # in seconds elif data['cdi']['type'] == 0: self._dbusservice['/ChargingTime'] = (data['rbt'] - data['lcctc'])/ 1000 else: self._dbusservice['/ChargingTime'] = 0 self._dbusservice['/MCU/Temperature'] = int(data['tma'][0]) # status translation # venusos 0:EVdisconnected; 1:Connected; 2:Charging; 3:Charged; 4:Wait sun; 5:Wait RFID; 6:Wait enable; 7:Low SOC; 8:Ground error; 9:Welded contacts error; defaut:Unknown; # go-e: value 'car' 1: charging station ready, no vehicle 2: vehicle loads 3: Waiting for vehicle 4: Charge finished, vehicle still connected status = 0 if int(data['car']) == 1: status = 0 elif int(data['car']) == 2: status = 2 elif int(data['car']) == 3: status = 6 elif int(data['car']) == 4: status = 3 self._dbusservice['/Status'] = status target_consumption = 0 # consumption you aim for available_power = max(Battery.get_grid_power() + target_consumption + self._dbusservice['/Ac/Power'] + Battery.get_multiplus_power(), 0) # grid + target + charger + multiplus if Settings.pv_control_enabled and status > 0: self.wallbox.set_charging_power(available_power) # limit battery discharge if status == 2: # reduce discharge if car is charging Battery.set_max_discharge_power(Battery.get_soc() * 10) else: Battery.set_max_discharge_power(-1) if status == 0: self.nextLoop = 500 # if nothing is connected, update every 50 sec except Exception as e: print(e) return True # as gobject wants it # set a value on dbus def set_value(service, object_path, value): return dbus.SystemBus().get_object(service, object_path).SetValue(wrap_dbus_value(value)) # get a value from dbus def get_value(service, object_path): return dbus.SystemBus().get_object(service, object_path).GetValue() class Battery: # get current inverter power def get_multiplus_power() : try: return Dbus.get_value("com.victronenergy.system", "/Dc/Vebus/Power") except: return 0 # get current battery soc def get_soc() : try: return Dbus.get_value("com.victronenergy.system", "/Dc/Battery/Soc") except: return 0 # set max discharge power of battery def set_max_discharge_power(power): if Dbus.get_value("com.victronenergy.settings", "/Settings/CGwacs/MaxDischargePower") != power: Dbus.set_value("com.victronenergy.settings", "/Settings/CGwacs/MaxDischargePower", power) print("max discharge power:", power) # returns available power in the system def get_grid_power() : try: return -(Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L1/Power")\ + Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L2/Power")\ + Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L3/Power")) except: return 0 def shutdown(a, b): #dbus_set_value("com.victronenergy.vebus.ttyUSB1", "/Hub4/L1/AcPowerSetpoint", 0) time.sleep(0.1) mainloop.quit() signal.signal(signal.SIGINT, shutdown) from dbus.mainloop.glib import DBusGMainLoop DBusGMainLoop(set_as_default=True) # Have a mainloop, so we can send/receive asynchronous calls to and from dbus _wb = Wallbox(Settings.api_url) _dbus = Dbus(_wb) # run loop every 100ms gobject.timeout_add(100, _dbus.loop) mainloop = gobject.MainLoop() mainloop.run()