Untitled

mail@pastecode.io avatar
unknown
plain_text
6 months ago
12 kB
1
Indexable
Never
#!/usr/bin/env python
 
# import normal packages
import platform 
import sys
import os
import time
import dbus
import requests # for http GET
import signal
import math 
if sys.version_info.major == 2:
    import gobject
else:
    from gi.repository import GLib as gobject
 
# our own packages from victron
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '/opt/victronenergy/dbus-systemcalc-py/ext/velib_python'))
from vedbus import VeDbusService
from ve_utils import wrap_dbus_value

# settings
class Settings:
    servicename = "com.victronenergy.evcharger"
    deviceinstance = 45 # as defined by venusos 
    api_url = "http://192.168.178.121/api/"
    productname = "go-e Charger"
    pv_control_enabled = 1 # startup value
    keep_min_charge = 1 # if enabled, keep charging even if power is not sufficient

    
# contains all methods related to the go-e wallbox control
class Wallbox: 
   
    def __init__(self, api_url):
        self.api_url = api_url   
        self.av_amps = 0 # average amps in last 5 minutes
        self.store = {} # parameter store
    
    # update a setting of the wallbox
    def updateParameter(self, name, value):
        if self.store.get(name) == value: # skip if same value is already set
            return
            
        self.store.update({name:value})
         
        try:
            print ("param update", name, value)
            requests.get(self.api_url + "set?" + name + "=" + str(value))
        except:
            print ("param update failed")
            return False
        return True

    # request status of the wallbox
    def requestStatus(self):
        return requests.get(self.api_url + "status").json()
        
    # sets the charging power of the wallbox
    def set_charging_power(self, target_charge_power):      
        target_amps = math.floor(target_charge_power/240)
        self.av_amps = (self.av_amps * 19 + target_amps) / 20
        print("target amps:", target_amps)
        print("average amps:", self.av_amps)
        if self.av_amps < 3:
            self.updateParameter('frc', 1) # force off
        elif self.av_amps > 5:
            target_amps = max(target_amps, 6)
            self.updateParameter('psm', 1) # force single phase
            self.updateParameter('frc', 0) # neutral on
            self.updateParameter('amp', target_amps) # set amps


# contains all methods related to dbus communication            
class Dbus:
    def __init__(self, wallbox):
        self.nextLoop = 0 # time until next loop runs
        self.wallbox = wallbox
        
        #formatting 
        _kwh = lambda p, v: (str(round(v, 2)) + 'kWh')
        _a = lambda p, v: (str(round(v, 1)) + 'A')
        _w = lambda p, v: (str(round(v, 1)) + 'W')
        _v = lambda p, v: (str(round(v, 1)) + 'V')
        _degC = lambda p, v: (str(v) + '°C')
        _s = lambda p, v: (str(v) + 's')
        _null = lambda p, v: ''

        # variables
        paths={
          '/Ac/Power': {'initial': 0, 'textformat': _w},
          '/Ac/L1/Power': {'initial': 0, 'textformat': _w},
          '/Ac/L2/Power': {'initial': 0, 'textformat': _w},
          '/Ac/L3/Power': {'initial': 0, 'textformat': _w},
          '/Ac/Energy/Forward': {'initial': 0, 'textformat': _kwh},
          '/ChargingTime': {'initial': 0, 'textformat': _s},
          
          '/Ac/Voltage': {'initial': 0, 'textformat': _v},
          '/Current': {'initial': 0, 'textformat': _a},
          '/SetCurrent': {'initial': 0, 'textformat': _a},
          '/MaxCurrent': {'initial': 0, 'textformat': _a},
          '/MCU/Temperature': {'initial': 0, 'textformat': _degC},
          
          '/StartStop': {'initial': 0, 'textformat': _null},
          '/Status': {'initial': 0, 'textformat': _null},
          '/Mode': {'initial': Settings.pv_control_enabled, 'textformat': _null}
        }


        self._dbusservice = VeDbusService(Settings.servicename)

        # Create the management objects, as specified in the ccgx dbus-api document
        self._dbusservice.add_path('/Mgmt/ProcessName', __file__)
        self._dbusservice.add_path('/Mgmt/ProcessVersion', 'Unkown version, and running on Python ' + platform.python_version())
        self._dbusservice.add_path('/Mgmt/Connection', "go-e HTTP API V2")

        # Create the mandatory objects
        self._dbusservice.add_path('/DeviceInstance', Settings.deviceinstance)
        self._dbusservice.add_path('/ProductId', 0xFFFF) # 
        self._dbusservice.add_path('/ProductName', Settings.productname)
        self._dbusservice.add_path('/CustomName', Settings.productname)    
        self._dbusservice.add_path('/Connected', 1)
        self._dbusservice.add_path('/UpdateIndex', 0)

        self._dbusservice.add_path('/FirmwareVersion', 0)
        self._dbusservice.add_path('/HardwareVersion', 0)
        self._dbusservice.add_path('/Serial', 0)


        # add path values to dbus
        for path, settings in paths.items():
          self._dbusservice.add_path(
            path, settings['initial'], gettextcallback=settings['textformat'], writeable=True, onchangecallback=self.handlechangedvalue)

            
    # callback from dbus, a value was changed externally
    def handlechangedvalue(self, path, value):
        print("someone else updated %s to %s" % (path, value))

        if path == '/SetCurrent':
            self.wallbox.updateParameter('amp', value)
        elif path == '/StartStop':
            if value == 0: # off -> force off
                self.wallbox.updateParameter('frc', 1)
            else: # else be neutral
                self.wallbox.updateParameter('frc', 0)
        elif path == '/MaxCurrent':
            self.wallbox.updateParameter('ama', min(value,16))
        elif path == '/Mode':
            Settings.pv_control_enabled = value
            if Settings.pv_control_enabled: #auto
                print("auto")
            else: # manual
                print("manual")
                # reset values that pv might change
                #self.wallbox.updateParameter('psm', 2) # force triple phase
                #self.wallbox.updateParameter('amp', 16) # set back to 16A

        self.nextLoop = 10 # wait 1 sec for go-e to process
        return True
        
    # main loop running every 100ms 
    def loop(self) :
        if self.nextLoop > 0:
            self.nextLoop -= 1
            return True
            
        self.nextLoop = 50 # default every 5 sec
        
        try:
            #get data from go-eCharger
            data = self.wallbox.requestStatus()

            #send data to DBus
            self._dbusservice['/FirmwareVersion'] = int(data['fwv'].replace('.', ''))
            self._dbusservice['/HardwareVersion'] = 2
            self._dbusservice['/Serial'] = data['fna']
  
            self._dbusservice['/Ac/L1/Power'] = data['nrg'][7] 
            self._dbusservice['/Ac/L2/Power'] = data['nrg'][8] 
            self._dbusservice['/Ac/L3/Power'] = data['nrg'][9] 
            self._dbusservice['/Ac/Power'] = data['nrg'][11]
            self._dbusservice['/Ac/Voltage'] = data['nrg'][0]
            self._dbusservice['/Current'] = max(data['nrg'][4], data['nrg'][5], data['nrg'][6])
            self._dbusservice['/Ac/Energy/Forward'] = float(data['wh']) / 1000.0

            self._dbusservice['/StartStop'] = int(data['alw'])
            self._dbusservice['/SetCurrent'] = int(data['amp'])
            self._dbusservice['/MaxCurrent'] = int(data['ama']) 
            
            if data['cdi'] != None:
                if data['cdi']['type'] == 1:
                    self._dbusservice['/ChargingTime'] = data['cdi']['value']/ 1000 # in seconds
                elif data['cdi']['type'] == 0:
                    self._dbusservice['/ChargingTime'] = (data['rbt'] - data['lcctc'])/ 1000 
                else:
                    self._dbusservice['/ChargingTime'] = 0
            
            self._dbusservice['/MCU/Temperature'] = int(data['tma'][0])

            # status translation
            # venusos 0:EVdisconnected; 1:Connected; 2:Charging; 3:Charged; 4:Wait sun; 5:Wait RFID; 6:Wait enable; 7:Low SOC; 8:Ground error; 9:Welded contacts error; defaut:Unknown;
            # go-e: value 'car' 1: charging station ready, no vehicle 2: vehicle loads 3: Waiting for vehicle 4: Charge finished, vehicle still connected
            status = 0
            if int(data['car']) == 1:
                status = 0
            elif int(data['car']) == 2:
                status = 2
            elif int(data['car']) == 3:
                status = 6
            elif int(data['car']) == 4:
                status = 3
            self._dbusservice['/Status'] = status
            
            target_consumption = 0 # consumption you aim for
            available_power = max(Battery.get_grid_power() + target_consumption  + self._dbusservice['/Ac/Power'] + Battery.get_multiplus_power(), 0) # grid + target + charger + multiplus 
            
            if Settings.pv_control_enabled and status > 0:
                self.wallbox.set_charging_power(available_power)
            
            # limit battery discharge
            if status == 2: # reduce discharge if car is charging
                Battery.set_max_discharge_power(Battery.get_soc() * 10)
            else:
                Battery.set_max_discharge_power(-1)
            
            
            if status == 0:
                self.nextLoop = 500 # if nothing is connected, update every 50 sec
                
        except Exception as e:
           print(e)
           

        return True # as gobject wants it
        
        
    # set a value on dbus   
    def set_value(service, object_path, value):
        return dbus.SystemBus().get_object(service, object_path).SetValue(wrap_dbus_value(value))
        
    # get a value from dbus
    def get_value(service, object_path):
        return dbus.SystemBus().get_object(service, object_path).GetValue()

        
class Battery:
    # get current inverter power
    def get_multiplus_power() : 
        try:
            return Dbus.get_value("com.victronenergy.system", "/Dc/Vebus/Power")
        except:
            return 0

    # get current battery soc
    def get_soc() : 
        try:
            return Dbus.get_value("com.victronenergy.system", "/Dc/Battery/Soc")
        except:
            return 0

    # set max discharge power of battery
    def set_max_discharge_power(power):
        if Dbus.get_value("com.victronenergy.settings", "/Settings/CGwacs/MaxDischargePower") != power:
            Dbus.set_value("com.victronenergy.settings", "/Settings/CGwacs/MaxDischargePower", power)
            print("max discharge power:", power)

    # returns available power in the system
    def get_grid_power() : 
        try:
            return -(Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L1/Power")\
            + Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L2/Power")\
            + Dbus.get_value("com.victronenergy.system", "/Ac/Grid/L3/Power"))
        except:
            return 0
        

  
def shutdown(a, b):
    #dbus_set_value("com.victronenergy.vebus.ttyUSB1", "/Hub4/L1/AcPowerSetpoint", 0)
    time.sleep(0.1)
    mainloop.quit()
      
signal.signal(signal.SIGINT, shutdown)



from dbus.mainloop.glib import DBusGMainLoop
DBusGMainLoop(set_as_default=True) # Have a mainloop, so we can send/receive asynchronous calls to and from dbus
  
_wb = Wallbox(Settings.api_url) 
_dbus = Dbus(_wb)

# run loop every 100ms
gobject.timeout_add(100, _dbus.loop)  

mainloop = gobject.MainLoop()
mainloop.run()