Code: Alles auswählen
#!/usr/bin/env python3
import logging
from typing import Dict, Union, Optional
from pymodbus.constants import Endian
from control import data
from dataclass_utils import dataclass_from_dict
from modules.common import modbus
from modules.common.abstract_device import AbstractBat
from modules.common.component_state import BatState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo, FaultState
from modules.common.modbus import ModbusDataType
from modules.common.simcount import SimCounter
from modules.common.store import get_bat_value_store
from modules.devices.solaredge.solaredge.config import SolaredgeBatSetup
import pymodbus
log = logging.getLogger(__name__)
FLOAT32_UNSUPPORTED = -0xffffff00000000000000000000000000
class SolaredgeBat(AbstractBat):
# Define all possible registers with their data types
REGISTERS = {
"Battery1StateOfEnergy": (0xf584, ModbusDataType.FLOAT_32,), # Dezimal 62852
"Battery1InstantaneousPower": (0xf574, ModbusDataType.FLOAT_32,), # Dezimal 62836
"StorageControlMode": (0xe004, ModbusDataType.UINT_16,),
"StorageChargeDischargeDefaultMode": (0xe00a, ModbusDataType.UINT_16,),
"RemoteControlCommandMode": (0xe00d, ModbusDataType.UINT_16,),
"RemoteControlCommandDischargeLimit": (0xe010, ModbusDataType.FLOAT_32,),
}
def __init__(self,
device_id: int,
component_config: Union[Dict, SolaredgeBatSetup],
tcp_client: modbus.ModbusTcpClient_) -> None:
self.__device_id = device_id
self.component_config = dataclass_from_dict(SolaredgeBatSetup, component_config)
self.__tcp_client = tcp_client
self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
self.store = get_bat_value_store(self.component_config.id)
self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
self.last_mode = 'undefined'
self._cached_firmware: Optional[str] = None
def update(self) -> None:
self.store.set(self.read_state())
def read_state(self) -> BatState:
unit = self.component_config.configuration.modbus_id
registers_to_read = [
"Battery1InstantaneousPower",
"Battery1StateOfEnergy",
]
values = self._read_registers(registers_to_read, unit)
if values["Battery1InstantaneousPower"] == FLOAT32_UNSUPPORTED:
values["Battery1InstantaneousPower"] = 0
imported, exported = self.sim_counter.sim_count(values["Battery1InstantaneousPower"])
bat_state = BatState(
power=values["Battery1InstantaneousPower"],
soc=values["Battery1StateOfEnergy"],
imported=imported,
exported=exported
)
log.debug(f"Bat {self.__tcp_client.address}: {bat_state}")
return bat_state
def set_power_limit(self, power_limit: Optional[int]) -> None:
unit = self.component_config.configuration.modbus_id
PowerLimitMode = data.data.bat_all_data.data.config.power_limit_mode
if PowerLimitMode == 'no_limit':
return
if power_limit is None:
if self.last_mode is not None:
log.debug("Keine Speichersteuerung gefordert, Steuerung deaktivieren.")
firmware = self._read_firmware_version(unit)
if firmware and self._cached_firmware is None:
log.info(f"SolarEdge Firmware-Version (Unit {unit}): {firmware}")
elif firmware is None:
log.warning("Firmware konnte nicht gelesen werden, nehme Standardwert für ControlMode = 2.")
# Default Mode: Time of Use (2), bei alter Firmware: Max. Eigenverbrauch (7)
if firmware and self._firmware_older_than(firmware, "4.20.36"):
control_mode = 7
log.info("Firmware < 4.20.36 erkannt – setze StorageControlMode = 7 (Max. Eigenverbrauch)")
else:
control_mode = 2
values_to_write = {
"RemoteControlCommandDischargeLimit": 5000,
"StorageChargeDischargeDefaultMode": 0,
"RemoteControlCommandMode": 0,
"StorageControlMode": control_mode,
}
self._write_registers(values_to_write, unit)
self.last_mode = None
else:
return
elif power_limit >= 0 and self.last_mode != 'stop':
log.debug("Speichersteuerung aktivieren. Speicher-Entladung sperren.")
values_to_write = {
"StorageControlMode": 4,
"StorageChargeDischargeDefaultMode": 1,
"RemoteControlCommandMode": 1,
}
self._write_registers(values_to_write, unit)
self.last_mode = 'stop'
def _read_registers(self, register_names: list, unit: int) -> Dict[str, Union[int, float]]:
values = {}
for key in register_names:
address, data_type = self.REGISTERS[key]
values[key] = self.__tcp_client.read_holding_registers(
address, data_type, wordorder=Endian.Little, unit=unit
)
log.debug(f"Bat raw values {self.__tcp_client.address}: {values}")
return values
def _write_registers(self, values_to_write: Dict[str, Union[int, float]], unit: int) -> None:
for key, value in values_to_write.items():
address, data_type = self.REGISTERS[key]
encoded_value = self._encode_value(value, data_type)
self.__tcp_client.write_registers(address, encoded_value, unit=unit)
log.debug(f"Neuer Wert {encoded_value} in Register {address} geschrieben.")
def _encode_value(self, value: Union[int, float], data_type: ModbusDataType) -> list:
builder = pymodbus.payload.BinaryPayloadBuilder(
byteorder=pymodbus.constants.Endian.Big,
wordorder=pymodbus.constants.Endian.Little
)
encode_methods = {
ModbusDataType.UINT_32: builder.add_32bit_uint,
ModbusDataType.INT_32: builder.add_32bit_int,
ModbusDataType.UINT_16: builder.add_16bit_uint,
ModbusDataType.INT_16: builder.add_16bit_int,
ModbusDataType.FLOAT_32: builder.add_32bit_float,
}
if data_type == ModbusDataType.FLOAT_32:
encode_methods[data_type](float(value))
else:
encode_methods[data_type](int(value))
return builder.to_registers()
def _read_firmware_version(self, unit: int) -> Optional[str]:
if self._cached_firmware is not None:
return self._cached_firmware
try:
address = 43 # 40044 - 40001 = 43
length = 8
result = self.__tcp_client.client.read_holding_registers(address=address, count=length, unit=unit)
if result.isError():
log.warning(f"Fehler beim Lesen der Firmware-Version von Unit {unit}")
return None
from pymodbus.payload import BinaryPayloadDecoder
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers, byteorder=Endian.Big, wordorder=Endian.Big
)
firmware = decoder.decode_string(length * 2).decode('ascii').strip('\x00')
self._cached_firmware = firmware
return firmware
except Exception as e:
log.warning(f"Fehler beim Lesen der Firmware: {e}")
return None
@staticmethod
def _firmware_older_than(current: str, compare_to: str) -> bool:
try:
def to_tuple(version_str):
return tuple(map(int, version_str.strip().split(".")))
return to_tuple(current) < to_tuple(compare_to)
except Exception as e:
log.warning(f"Fehler beim Vergleichen der Firmware-Versionen: {e}")
return False
component_descriptor = ComponentDescriptor(configuration_factory=SolaredgeBatSetup)
Firmware wird nur einmal gelesen dann im Cache für die Verbindung , ansonsten habe ich seine Firmware Abfrage eingebaut . Kann es nur nicht live testen da ich unterwegs bin . Jetzt sollte nur noch eine Verbindung aufgebaut werden