SolarEdge Speichersteuerung

Fragen zur Nutzung, Features, usw..
72Yankee
Beiträge: 22
Registriert: Sa Feb 08, 2020 1:25 pm

Re: SolarEdge Speichersteuerung

Beitrag von 72Yankee »

Kann ich gerne machen, aber ich weiß schon das es nicht geht.
Bevor ich bei den WR mit Display ausgetauscht habe, hatte ich einen Supportfall bei SolarEdge aufgemacht, weil TuO nicht funktionierte.
Antwort war, dass WR mit Display nicht unterstützt werden.

Weiterhin würde es Probleme geben, bei Systemen mit mehreren Batterien.
Die habe ich aber nicht.

Viele Grüße
2 SolarEdge Einphasen-WR SE5000H-RWSAC Display + SE5000H-RWS SetApp
SolarEdge SESTI-S4
SolarEdge Dreiphasen-WR SE10K SetApp
QCells Q.Peak DUO-G5 - 14,52 kWP
LG-Chem RESU-10H + 10H Prime
openWB series2+ => Volvo XC40
openWB series2 => Renault ZOE
ChristophR
Beiträge: 1001
Registriert: So Okt 30, 2022 8:07 am
Has thanked: 31 times
Been thanked: 68 times

Re: SolarEdge Speichersteuerung

Beitrag von ChristophR »

72Yankee hat geschrieben: So Apr 06, 2025 8:15 pm Kann ich gerne machen, aber ich weiß schon das es nicht geht.
Bevor ich bei den WR mit Display ausgetauscht habe, hatte ich einen Supportfall bei SolarEdge aufgemacht, weil TuO nicht funktionierte.
Antwort war, dass WR mit Display nicht unterstützt werden.

Weiterhin würde es Probleme geben, bei Systemen mit mehreren Batterien.
Die habe ich aber nicht.

Viele Grüße
Dann brauchen wir es ja nicht nochmal probieren, wenn es generell die ToU Profile auch aus dem Monitoring Portal betrifft.
Dann wäre nur interessant, was im Register 0xE004 steht?
Wenn da bei dem System 1 steht, passt es so.
openWB Series 2 Standard+, SW-Version 2
SolarEdge SE10K-RWS, BYD LVS 8, 16,8 kWp.
CUPRA Born
72Yankee
Beiträge: 22
Registriert: Sa Feb 08, 2020 1:25 pm

Re: SolarEdge Speichersteuerung

Beitrag von 72Yankee »

Dann brauche ich eine Anleitung, wie man Modbusregister ausliest.
2 SolarEdge Einphasen-WR SE5000H-RWSAC Display + SE5000H-RWS SetApp
SolarEdge SESTI-S4
SolarEdge Dreiphasen-WR SE10K SetApp
QCells Q.Peak DUO-G5 - 14,52 kWP
LG-Chem RESU-10H + 10H Prime
openWB series2+ => Volvo XC40
openWB series2 => Renault ZOE
ChristophR
Beiträge: 1001
Registriert: So Okt 30, 2022 8:07 am
Has thanked: 31 times
Been thanked: 68 times

Re: SolarEdge Speichersteuerung

Beitrag von ChristophR »

72Yankee hat geschrieben: Mi Apr 09, 2025 7:44 pm Dann brauche ich eine Anleitung, wie man Modbusregister ausliest.
Ich habe dieses Tool hier gefunden, gibt aber auch einige andere:
https://www.baseblock.com/PRODUCTS/comtestpro.htm

Dort geht die Abfrage so, Port kann auch 502 sein, bei Dir muss unten eine 1 rauskommen:
2025-04-10 10_46_00-Baseblock ComTest Pro for Modbus Devices.png
2025-04-10 10_46_00-Baseblock ComTest Pro for Modbus Devices.png (87.66 KiB) 165 mal betrachtet
openWB Series 2 Standard+, SW-Version 2
SolarEdge SE10K-RWS, BYD LVS 8, 16,8 kWp.
CUPRA Born
72Yankee
Beiträge: 22
Registriert: Sa Feb 08, 2020 1:25 pm

Re: SolarEdge Speichersteuerung

Beitrag von 72Yankee »

Hallo und sorry,

aufgrund einer brandgefährlichen LG-Batterie wurde mit dem Tausch Batterie der WR gegen eine neue Version getauscht, bevor ich das Register auslesen konnte.

Jetzt kann ich nur noch einen SE5000H mit Sesti 4 und einen SE5000H mit entegrierter Sesti dienen.

Viele Grüße
2 SolarEdge Einphasen-WR SE5000H-RWSAC Display + SE5000H-RWS SetApp
SolarEdge SESTI-S4
SolarEdge Dreiphasen-WR SE10K SetApp
QCells Q.Peak DUO-G5 - 14,52 kWP
LG-Chem RESU-10H + 10H Prime
openWB series2+ => Volvo XC40
openWB series2 => Renault ZOE
ChristophR
Beiträge: 1001
Registriert: So Okt 30, 2022 8:07 am
Has thanked: 31 times
Been thanked: 68 times

Re: SolarEdge Speichersteuerung

Beitrag von ChristophR »

OK, gehen wir einfach mal von aus, dass das passte.

Ich habe es jetzt umgebaut, dass die Firmware nicht mehr ausgelesen wird.
Dann gab es noch Konflikte, die ich versucht habe aufzulösen, kriege sie aber nicht ganz weg.
https://github.com/openWB/core/pull/2269

Was uns noch fehlt, ist eine Config-Option für den Speicher für die SoC-Reserve.
Die kann ich aber nicht bauen, da kenne ich mich nicht mit aus.
Kann das jemand übernehmen?
openWB Series 2 Standard+, SW-Version 2
SolarEdge SE10K-RWS, BYD LVS 8, 16,8 kWp.
CUPRA Born
Basti
Beiträge: 107
Registriert: Di Feb 21, 2023 3:28 pm
Has thanked: 1 time
Been thanked: 12 times

Re: SolarEdge Speichersteuerung

Beitrag von Basti »

Bin mal kurz drüber gegangen über den Code , sieht gut aus . Hab mal deinen Code bisschen angepasst 😁 , welche Konflikte hast du denn ?

Code: Alles auswählen

#!/usr/bin/env python3
import logging
from typing import Any, TypedDict, Dict, Union, Optional

from pymodbus.constants import Endian
import pymodbus

from control import data
from modules.common import modbus
from modules.common.abstract_device import AbstractBat
from modules.common.component_state import BatState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo, FaultState
from modules.common.modbus import ModbusDataType
from modules.common.simcount import SimCounter
from modules.common.store import get_bat_value_store
from modules.devices.solaredge.solaredge.config import SolaredgeBatSetup

log = logging.getLogger(__name__)

# Constants for magic numbers and control modes
FLOAT32_UNSUPPORTED = -0xffffff00000000000000000000000000
MAX_DISCHARGE_LIMIT = 5000
ACTIVE_CONTROL_MODE = 4
DEFAULT_CONTROL_MODE = 0
REMOTE_CONTROL_MODE = 7
DEFAULT_STORAGE_CONTROL_MODE = 1


class KwargsDict(TypedDict):
    device_id: int
    client: modbus.ModbusTcpClient_


class SolaredgeBat(AbstractBat):
    # Define all possible registers with their data types
    REGISTERS = {
        "Battery1StateOfEnergy": (0xe184, ModbusDataType.FLOAT_32,),  # Mirror: 0xf584
        "Battery1InstantaneousPower": (0xe174, ModbusDataType.FLOAT_32,),  # Mirror: 0xf574
        "Battery2StateOfEnergy": (0xe284, ModbusDataType.FLOAT_32,),
        "Battery2InstantaneousPower": (0xe274, ModbusDataType.FLOAT_32,),
        "StorageControlMode": (0xe004, ModbusDataType.UINT_16,),
        "StorageBackupReserved": (0xe008, ModbusDataType.FLOAT_32,),
        "StorageChargeDischargeDefaultMode": (0xe00a, ModbusDataType.UINT_16,),
        "RemoteControlCommandMode": (0xe00d, ModbusDataType.UINT_16,),
        "RemoteControlCommandDischargeLimit": (0xe010, ModbusDataType.FLOAT_32,),
    }

    def __init__(self, component_config: SolaredgeBatSetup, **kwargs: Any) -> None:
        self.component_config = component_config
        self.kwargs: KwargsDict = kwargs

    def initialize(self) -> None:
        # Validate kwargs
        if 'device_id' not in self.kwargs or 'client' not in self.kwargs:
            raise ValueError("device_id and client must be provided in kwargs")
        
        self.__device_id: int = self.kwargs['device_id']
        self.__tcp_client: modbus.ModbusTcpClient_ = self.kwargs['client']
        self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
        self.store = get_bat_value_store(self.component_config.id)
        self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
        # Battery Index wird erst in PR 2236 umgesetzt, solange wird Default-Wert 1 genutzt:
        self.battery_index = getattr(self.component_config.configuration, "battery_index", 1)
        # SoC Reserve muss in Configurtion erst noch umgesetzt werden, solange wird Default-Wert 10 genutzt:
        self.soc_reserve_configured = getattr(self.component_config.configuration, "soc_reserve", 10)
        self.StorageControlMode_Default = DEFAULT_STORAGE_CONTROL_MODE

    def update(self) -> None:
        self.store.set(self.read_state())

    def read_state(self) -> BatState:
        unit = self.component_config.configuration.modbus_id
        registers_to_read = [
            f"Battery{self.battery_index}InstantaneousPower",
            f"Battery{self.battery_index}StateOfEnergy",
        ]
        try:
            values = self._read_registers(registers_to_read, unit)
        except pymodbus.exceptions.ModbusException as e:
            log.error(f"Failed to read registers: {e}")
            self.fault_state.add_fault(f"Modbus read error: {e}")
            return BatState(power=0, soc=0, imported=0, exported=0)

        power = values[f"Battery{self.battery_index}InstantaneousPower"]
        soc = values[f"Battery{self.battery_index}StateOfEnergy"]

        if power == FLOAT32_UNSUPPORTED:
            power = 0
        if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
            log.warning(f"Invalid SoC: {soc}, using 0")
            soc = 0

        imported, exported = self.sim_counter.sim_count(power)
        bat_state = BatState(
            power=power,
            soc=soc,
            imported=imported,
            exported=exported
        )
        log.debug(f"Bat {self.__tcp_client.address}: {bat_state}")
        return bat_state

    def set_power_limit(self, power_limit: Optional[int]) -> None:
        unit = self.component_config.configuration.modbus_id
        try:
            PowerLimitMode = data.data.bat_all_data.data.config.power_limit_mode
        except AttributeError:
            log.warning("PowerLimitMode not found, assuming 'no_limit'")
            PowerLimitMode = 'no_limit'

        if PowerLimitMode == 'no_limit':
            """
            Keine Speichersteuerung, andere Steuerungen zulassen (SolarEdge One, ioBroker, Node-Red etc.).
            Falls andere Steuerungen vorhanden sind, sollten diese nicht beeinflusst werden,
            daher erfolgt im Modus "Immer" der Speichersteuerung keine Steuerung.
            """
            return

        if power_limit is None:
            # Keine Ladung mit Speichersteuerung.
            registers_to_read = ["StorageControlMode"]
            try:
                values = self._read_registers(registers_to_read, unit)
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read registers: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                return

            if values["StorageControlMode"] == ACTIVE_CONTROL_MODE:
                # Steuerung deaktivieren.
                log.debug("Keine Speichersteuerung gefordert, Steuerung deaktivieren.")
                values_to_write = {
                    "RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
                    "StorageChargeDischargeDefaultMode": DEFAULT_CONTROL_MODE,
                    "RemoteControlCommandMode": DEFAULT_CONTROL_MODE,
                    "StorageControlMode": self.StorageControlMode_Default,
                }
                self._write_registers(values_to_write, unit)
            return

        elif power_limit >= 0:
            """
            Ladung mit Speichersteuerung.
            SolarEdge entlaedt den Speicher immer nur bis zur SoC-Reserve.
            Steuerung beenden, wenn der SoC vom Speicher die SoC-Reserve unterschreitet.
            """
            registers_to_read = [
                f"Battery{self.battery_index}StateOfEnergy",
                "StorageControlMode",
                "StorageBackupReserved",
                "RemoteControlCommandDischargeLimit",
            ]
            try:
                values = self._read_registers(registers_to_read, unit)
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read registers: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                return

            soc = values[f"Battery{self.battery_index}StateOfEnergy"]
            if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
                log.warning(f"Invalid SoC: {soc}, using 0")
                soc = 0
            soc = int(soc)  # SolarEdge protocol may require integer SoC for comparisons
            soc_reserve = max(int(self.soc_reserve_configured), int(values["StorageBackupReserved"]))
            discharge_limit = int(values["RemoteControlCommandDischargeLimit"])

            if values["StorageControlMode"] == ACTIVE_CONTROL_MODE:  # Speichersteuerung aktiv.
                if soc_reserve >= soc:
                    # Speichersteuerung deaktivieren, SoC-Reserve unterschritten.
                    log.debug("Speichersteuerung deaktivieren. SoC-Reserve unterschritten.")
                    values_to_write = {
                        "RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
                        "StorageChargeDischargeDefaultMode": DEFAULT_CONTROL_MODE,
                        "RemoteControlCommandMode": DEFAULT_CONTROL_MODE,
                        "StorageControlMode": self.StorageControlMode_Default,
                    }
                    self._write_registers(values_to_write, unit)
                elif discharge_limit not in range(int(power_limit) - 10, int(power_limit) + 10):
                    # DischargeLimit nur bei Abweichung von mehr als 10W, um Konflikte bei 2 Speichern zu verhindern.
                    log.debug(f"Speichersteuerung aktiv, Discharge-Limit {int(power_limit)}W.")
                    values_to_write = {
                        "RemoteControlCommandDischargeLimit": int(min(power_limit, MAX_DISCHARGE_LIMIT))
                    }
                    self._write_registers(values_to_write, unit)
            else:  # Speichersteuerung ist inaktiv.
                if soc_reserve < soc:
                    # Speichersteuerung nur aktivieren, wenn SoC ueber SoC-Reserve.
                    log.debug(f"Speichersteuerung aktivieren. Discharge-Limit: {int(power_limit)} W.")
                    self.StorageControlMode_Default = values["StorageControlMode"]
                    values_to_write = {
                        "StorageControlMode": ACTIVE_CONTROL_MODE,
                        "StorageChargeDischargeDefaultMode": REMOTE_CONTROL_MODE,
                        "RemoteControlCommandMode": REMOTE_CONTROL_MODE,
                        "RemoteControlCommandDischargeLimit": int(min(power_limit, MAX_DISCHARGE_LIMIT))
                    }
                    self._write_registers(values_to_write, unit)

    def _read_registers(self, register_names: list, unit: int) -> Dict[str, Union[int, float]]:
        values = {}
        for key in register_names:
            address, data_type = self.REGISTERS[key]
            try:
                values[key] = self.__tcp_client.read_holding_registers(
                    address, data_type, wordorder=Endian.Little, unit=unit
                )
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read register {key} at address {address}: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                values[key] = 0  # Fallback value
        log.debug(f"Bat raw values {self.__tcp_client.address}: {values}")
        return values
        # TODO: Optimize to read multiple contiguous registers in a single request if supported by ModbusTcpClient_

    def _write_registers(self, values_to_write: Dict[str, Union[int, float]], unit: int) -> None:
        for key, value in values_to_write.items():
            address, data_type = self.REGISTERS[key]
            encoded_value = self._encode_value(value, data_type)
            try:
                self.__tcp_client.write_registers(address, encoded_value, unit=unit)
                log.debug(f"Neuer Wert {encoded_value} in Register {address} geschrieben.")
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to write register {key} at address {address}: {e}")
                self.fault_state.add_fault(f"Modbus write error: {e}")

    def _encode_value(self, value: Union[int, float], data_type: ModbusDataType) -> list:
        builder = pymodbus.payload.BinaryPayloadBuilder(
            byteorder=pymodbus.constants.Endian.Big,
            wordorder=pymodbus.constants.Endian.Little
        )
        encode_methods = {
            ModbusDataType.UINT_32: builder.add_32bit_uint,
            ModbusDataType.INT_32: builder.add_32bit_int,
            ModbusDataType.UINT_16: builder.add_16bit_uint,
            ModbusDataType.INT_16: builder.add_16bit_int,
            ModbusDataType.FLOAT_32: builder.add_32bit_float,
        }
        if data_type in encode_methods:
            if data_type == ModbusDataType.FLOAT_32:
                encode_methods[data_type](float(value))
            else:
                encode_methods[data_type](int(value))
        else:
            raise ValueError(f"Unsupported data type: {data_type}")
        return builder.to_registers()

    def power_limit_controllable(self) -> bool:
        return True


component_descriptor = ComponentDescriptor(configuration_factory=SolaredgeBatSetup)
ChristophR
Beiträge: 1001
Registriert: So Okt 30, 2022 8:07 am
Has thanked: 31 times
Been thanked: 68 times

Re: SolarEdge Speichersteuerung

Beitrag von ChristophR »

Basti hat geschrieben: Fr Apr 18, 2025 7:16 pm Bin mal kurz drüber gegangen über den Code , sieht gut aus . Hab mal deinen Code bisschen angepasst 😁 , welche Konflikte hast du denn ?

Code: Alles auswählen

#!/usr/bin/env python3
import logging
from typing import Any, TypedDict, Dict, Union, Optional

from pymodbus.constants import Endian
import pymodbus

from control import data
from modules.common import modbus
from modules.common.abstract_device import AbstractBat
from modules.common.component_state import BatState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo, FaultState
from modules.common.modbus import ModbusDataType
from modules.common.simcount import SimCounter
from modules.common.store import get_bat_value_store
from modules.devices.solaredge.solaredge.config import SolaredgeBatSetup

log = logging.getLogger(__name__)

# Constants for magic numbers and control modes
FLOAT32_UNSUPPORTED = -0xffffff00000000000000000000000000
MAX_DISCHARGE_LIMIT = 5000
ACTIVE_CONTROL_MODE = 4
DEFAULT_CONTROL_MODE = 0
REMOTE_CONTROL_MODE = 7
DEFAULT_STORAGE_CONTROL_MODE = 1


class KwargsDict(TypedDict):
    device_id: int
    client: modbus.ModbusTcpClient_


class SolaredgeBat(AbstractBat):
    # Define all possible registers with their data types
    REGISTERS = {
        "Battery1StateOfEnergy": (0xe184, ModbusDataType.FLOAT_32,),  # Mirror: 0xf584
        "Battery1InstantaneousPower": (0xe174, ModbusDataType.FLOAT_32,),  # Mirror: 0xf574
        "Battery2StateOfEnergy": (0xe284, ModbusDataType.FLOAT_32,),
        "Battery2InstantaneousPower": (0xe274, ModbusDataType.FLOAT_32,),
        "StorageControlMode": (0xe004, ModbusDataType.UINT_16,),
        "StorageBackupReserved": (0xe008, ModbusDataType.FLOAT_32,),
        "StorageChargeDischargeDefaultMode": (0xe00a, ModbusDataType.UINT_16,),
        "RemoteControlCommandMode": (0xe00d, ModbusDataType.UINT_16,),
        "RemoteControlCommandDischargeLimit": (0xe010, ModbusDataType.FLOAT_32,),
    }

    def __init__(self, component_config: SolaredgeBatSetup, **kwargs: Any) -> None:
        self.component_config = component_config
        self.kwargs: KwargsDict = kwargs

    def initialize(self) -> None:
        # Validate kwargs
        if 'device_id' not in self.kwargs or 'client' not in self.kwargs:
            raise ValueError("device_id and client must be provided in kwargs")
        
        self.__device_id: int = self.kwargs['device_id']
        self.__tcp_client: modbus.ModbusTcpClient_ = self.kwargs['client']
        self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
        self.store = get_bat_value_store(self.component_config.id)
        self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
        # Battery Index wird erst in PR 2236 umgesetzt, solange wird Default-Wert 1 genutzt:
        self.battery_index = getattr(self.component_config.configuration, "battery_index", 1)
        # SoC Reserve muss in Configurtion erst noch umgesetzt werden, solange wird Default-Wert 10 genutzt:
        self.soc_reserve_configured = getattr(self.component_config.configuration, "soc_reserve", 10)
        self.StorageControlMode_Default = DEFAULT_STORAGE_CONTROL_MODE

    def update(self) -> None:
        self.store.set(self.read_state())

    def read_state(self) -> BatState:
        unit = self.component_config.configuration.modbus_id
        registers_to_read = [
            f"Battery{self.battery_index}InstantaneousPower",
            f"Battery{self.battery_index}StateOfEnergy",
        ]
        try:
            values = self._read_registers(registers_to_read, unit)
        except pymodbus.exceptions.ModbusException as e:
            log.error(f"Failed to read registers: {e}")
            self.fault_state.add_fault(f"Modbus read error: {e}")
            return BatState(power=0, soc=0, imported=0, exported=0)

        power = values[f"Battery{self.battery_index}InstantaneousPower"]
        soc = values[f"Battery{self.battery_index}StateOfEnergy"]

        if power == FLOAT32_UNSUPPORTED:
            power = 0
        if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
            log.warning(f"Invalid SoC: {soc}, using 0")
            soc = 0

        imported, exported = self.sim_counter.sim_count(power)
        bat_state = BatState(
            power=power,
            soc=soc,
            imported=imported,
            exported=exported
        )
        log.debug(f"Bat {self.__tcp_client.address}: {bat_state}")
        return bat_state

    def set_power_limit(self, power_limit: Optional[int]) -> None:
        unit = self.component_config.configuration.modbus_id
        try:
            PowerLimitMode = data.data.bat_all_data.data.config.power_limit_mode
        except AttributeError:
            log.warning("PowerLimitMode not found, assuming 'no_limit'")
            PowerLimitMode = 'no_limit'

        if PowerLimitMode == 'no_limit':
            """
            Keine Speichersteuerung, andere Steuerungen zulassen (SolarEdge One, ioBroker, Node-Red etc.).
            Falls andere Steuerungen vorhanden sind, sollten diese nicht beeinflusst werden,
            daher erfolgt im Modus "Immer" der Speichersteuerung keine Steuerung.
            """
            return

        if power_limit is None:
            # Keine Ladung mit Speichersteuerung.
            registers_to_read = ["StorageControlMode"]
            try:
                values = self._read_registers(registers_to_read, unit)
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read registers: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                return

            if values["StorageControlMode"] == ACTIVE_CONTROL_MODE:
                # Steuerung deaktivieren.
                log.debug("Keine Speichersteuerung gefordert, Steuerung deaktivieren.")
                values_to_write = {
                    "RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
                    "StorageChargeDischargeDefaultMode": DEFAULT_CONTROL_MODE,
                    "RemoteControlCommandMode": DEFAULT_CONTROL_MODE,
                    "StorageControlMode": self.StorageControlMode_Default,
                }
                self._write_registers(values_to_write, unit)
            return

        elif power_limit >= 0:
            """
            Ladung mit Speichersteuerung.
            SolarEdge entlaedt den Speicher immer nur bis zur SoC-Reserve.
            Steuerung beenden, wenn der SoC vom Speicher die SoC-Reserve unterschreitet.
            """
            registers_to_read = [
                f"Battery{self.battery_index}StateOfEnergy",
                "StorageControlMode",
                "StorageBackupReserved",
                "RemoteControlCommandDischargeLimit",
            ]
            try:
                values = self._read_registers(registers_to_read, unit)
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read registers: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                return

            soc = values[f"Battery{self.battery_index}StateOfEnergy"]
            if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
                log.warning(f"Invalid SoC: {soc}, using 0")
                soc = 0
            soc = int(soc)  # SolarEdge protocol may require integer SoC for comparisons
            soc_reserve = max(int(self.soc_reserve_configured), int(values["StorageBackupReserved"]))
            discharge_limit = int(values["RemoteControlCommandDischargeLimit"])

            if values["StorageControlMode"] == ACTIVE_CONTROL_MODE:  # Speichersteuerung aktiv.
                if soc_reserve >= soc:
                    # Speichersteuerung deaktivieren, SoC-Reserve unterschritten.
                    log.debug("Speichersteuerung deaktivieren. SoC-Reserve unterschritten.")
                    values_to_write = {
                        "RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
                        "StorageChargeDischargeDefaultMode": DEFAULT_CONTROL_MODE,
                        "RemoteControlCommandMode": DEFAULT_CONTROL_MODE,
                        "StorageControlMode": self.StorageControlMode_Default,
                    }
                    self._write_registers(values_to_write, unit)
                elif discharge_limit not in range(int(power_limit) - 10, int(power_limit) + 10):
                    # DischargeLimit nur bei Abweichung von mehr als 10W, um Konflikte bei 2 Speichern zu verhindern.
                    log.debug(f"Speichersteuerung aktiv, Discharge-Limit {int(power_limit)}W.")
                    values_to_write = {
                        "RemoteControlCommandDischargeLimit": int(min(power_limit, MAX_DISCHARGE_LIMIT))
                    }
                    self._write_registers(values_to_write, unit)
            else:  # Speichersteuerung ist inaktiv.
                if soc_reserve < soc:
                    # Speichersteuerung nur aktivieren, wenn SoC ueber SoC-Reserve.
                    log.debug(f"Speichersteuerung aktivieren. Discharge-Limit: {int(power_limit)} W.")
                    self.StorageControlMode_Default = values["StorageControlMode"]
                    values_to_write = {
                        "StorageControlMode": ACTIVE_CONTROL_MODE,
                        "StorageChargeDischargeDefaultMode": REMOTE_CONTROL_MODE,
                        "RemoteControlCommandMode": REMOTE_CONTROL_MODE,
                        "RemoteControlCommandDischargeLimit": int(min(power_limit, MAX_DISCHARGE_LIMIT))
                    }
                    self._write_registers(values_to_write, unit)

    def _read_registers(self, register_names: list, unit: int) -> Dict[str, Union[int, float]]:
        values = {}
        for key in register_names:
            address, data_type = self.REGISTERS[key]
            try:
                values[key] = self.__tcp_client.read_holding_registers(
                    address, data_type, wordorder=Endian.Little, unit=unit
                )
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read register {key} at address {address}: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                values[key] = 0  # Fallback value
        log.debug(f"Bat raw values {self.__tcp_client.address}: {values}")
        return values
        # TODO: Optimize to read multiple contiguous registers in a single request if supported by ModbusTcpClient_

    def _write_registers(self, values_to_write: Dict[str, Union[int, float]], unit: int) -> None:
        for key, value in values_to_write.items():
            address, data_type = self.REGISTERS[key]
            encoded_value = self._encode_value(value, data_type)
            try:
                self.__tcp_client.write_registers(address, encoded_value, unit=unit)
                log.debug(f"Neuer Wert {encoded_value} in Register {address} geschrieben.")
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to write register {key} at address {address}: {e}")
                self.fault_state.add_fault(f"Modbus write error: {e}")

    def _encode_value(self, value: Union[int, float], data_type: ModbusDataType) -> list:
        builder = pymodbus.payload.BinaryPayloadBuilder(
            byteorder=pymodbus.constants.Endian.Big,
            wordorder=pymodbus.constants.Endian.Little
        )
        encode_methods = {
            ModbusDataType.UINT_32: builder.add_32bit_uint,
            ModbusDataType.INT_32: builder.add_32bit_int,
            ModbusDataType.UINT_16: builder.add_16bit_uint,
            ModbusDataType.INT_16: builder.add_16bit_int,
            ModbusDataType.FLOAT_32: builder.add_32bit_float,
        }
        if data_type in encode_methods:
            if data_type == ModbusDataType.FLOAT_32:
                encode_methods[data_type](float(value))
            else:
                encode_methods[data_type](int(value))
        else:
            raise ValueError(f"Unsupported data type: {data_type}")
        return builder.to_registers()

    def power_limit_controllable(self) -> bool:
        return True


component_descriptor = ComponentDescriptor(configuration_factory=SolaredgeBatSetup)
Super, kann ich gerne so übernehmen.

Folgende Fragen / Hinweise:
1. Variable self.StorageControlMode_Default:
Mir ist kein besserer Name hierfür eingefallen, es ist ja eigentlich nicht ein Default-Wert, sondern der alte Wert, den ich aus dem System auslese.
Wenn das nicht klappt oder aufgrund eines vorherigen Absturzes nicht bekannt ist, soll der Wert 1 für MaxEigenverbrauch gesetzt werden.
Fällt Dir hierzu ein besserer Variablenname ein? Der passt eigentlich gar nicht so gut.

2. Das Deaktivieren der Steuerung wird an 2 Stellen aufgerufen und ist dann jeweils identisch. Lagert man sowas in eine weitere Def aus, um es dann einheitlich aufzurufen? Oder lohnt sich das noch nicht? Das würde noch fehlen.

3. def initialize ist ja gerade erst neu von openWB dazu gekommen, kann die zusätzliche Prüfung von Dir irgendwo zentral hinterlegt werden, wenn sie nötig ist?

4. Vergleich vom soc:
Hatte funktioniert, kann aber nicht schaden.

5. Validierung von Ergebnissen:
Bei einigen Registern (ControlMode, CommandMode) gibt es ja nur eine überschaubare Menge valider Werte, sollte man die nochmal prüfen bevor man sie anwendet?

6. Die Konflikte hat eigentlich nur Github, ich bin damit m.E. fein:
https://github.com/openWB/core/pull/2269/conflicts

Kannst Du den PR und ggf. die Config Anpassungen übernehmen? Ich kann jetzt wahrscheinlich nicht viel mehr dazu beitragen.

P.S: CONTROL_MODE und COMMAND_MODE gehen jetzt ein wenig durcheinander, ich benenne die Variablen nochmal eindeutiger.
P.P.S: Sieht jetzt so aus:

Code: Alles auswählen

#!/usr/bin/env python3
import logging
from typing import Any, TypedDict, Dict, Union, Optional

from pymodbus.constants import Endian
import pymodbus

from control import data
from modules.common import modbus
from modules.common.abstract_device import AbstractBat
from modules.common.component_state import BatState
from modules.common.component_type import ComponentDescriptor
from modules.common.fault_state import ComponentInfo, FaultState
from modules.common.modbus import ModbusDataType
from modules.common.simcount import SimCounter
from modules.common.store import get_bat_value_store
from modules.devices.solaredge.solaredge.config import SolaredgeBatSetup

log = logging.getLogger(__name__)

# Constants for magic numbers and control modes
FLOAT32_UNSUPPORTED = -0xffffff00000000000000000000000000
MAX_DISCHARGE_LIMIT = 5000
DEFAULT_CONTROL_MODE = 1  # Control Mode Max Eigenverbrauch
REMOTE_CONTROL_MODE = 4  # Control Mode Remotesteuerung
DEFAULT_COMMAND_MODE = 0  # Command Mode ohne Steuerung
ACTIVE_COMMAND_MODE = 7  # Command Mode Max Eigenverbrauch bei Steuerung


class KwargsDict(TypedDict):
    device_id: int
    client: modbus.ModbusTcpClient_


class SolaredgeBat(AbstractBat):
    # Define all possible registers with their data types
    REGISTERS = {
        "Battery1StateOfEnergy": (0xe184, ModbusDataType.FLOAT_32,),  # Mirror: 0xf584
        "Battery1InstantaneousPower": (0xe174, ModbusDataType.FLOAT_32,),  # Mirror: 0xf574
        "Battery2StateOfEnergy": (0xe284, ModbusDataType.FLOAT_32,),
        "Battery2InstantaneousPower": (0xe274, ModbusDataType.FLOAT_32,),
        "StorageControlMode": (0xe004, ModbusDataType.UINT_16,),
        "StorageBackupReserved": (0xe008, ModbusDataType.FLOAT_32,),
        "StorageChargeDischargeDefaultMode": (0xe00a, ModbusDataType.UINT_16,),
        "RemoteControlCommandMode": (0xe00d, ModbusDataType.UINT_16,),
        "RemoteControlCommandDischargeLimit": (0xe010, ModbusDataType.FLOAT_32,),
    }

    def __init__(self, component_config: SolaredgeBatSetup, **kwargs: Any) -> None:
        self.component_config = component_config
        self.kwargs: KwargsDict = kwargs

    def initialize(self) -> None:
        # Validate kwargs
        if 'device_id' not in self.kwargs or 'client' not in self.kwargs:
            raise ValueError("device_id and client must be provided in kwargs")
        
        self.__device_id: int = self.kwargs['device_id']
        self.__tcp_client: modbus.ModbusTcpClient_ = self.kwargs['client']
        self.sim_counter = SimCounter(self.__device_id, self.component_config.id, prefix="speicher")
        self.store = get_bat_value_store(self.component_config.id)
        self.fault_state = FaultState(ComponentInfo.from_component_config(self.component_config))
        # Battery Index wird erst in PR 2236 umgesetzt, solange wird Default-Wert 1 genutzt:
        self.battery_index = getattr(self.component_config.configuration, "battery_index", 1)
        # SoC Reserve muss in Configurtion erst noch umgesetzt werden, solange wird Default-Wert 10 genutzt:
        self.soc_reserve_configured = getattr(self.component_config.configuration, "soc_reserve", 10)
        self.StorageControlMode_Read = DEFAULT_CONTROL_MODE

    def update(self) -> None:
        self.store.set(self.read_state())

    def read_state(self) -> BatState:
        unit = self.component_config.configuration.modbus_id
        registers_to_read = [
            f"Battery{self.battery_index}InstantaneousPower",
            f"Battery{self.battery_index}StateOfEnergy",
        ]
        try:
            values = self._read_registers(registers_to_read, unit)
        except pymodbus.exceptions.ModbusException as e:
            log.error(f"Failed to read registers: {e}")
            self.fault_state.add_fault(f"Modbus read error: {e}")
            return BatState(power=0, soc=0, imported=0, exported=0)

        power = values[f"Battery{self.battery_index}InstantaneousPower"]
        soc = values[f"Battery{self.battery_index}StateOfEnergy"]

        if power == FLOAT32_UNSUPPORTED:
            power = 0
        if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
            log.warning(f"Invalid SoC: {soc}, using 0")
            soc = 0

        imported, exported = self.sim_counter.sim_count(power)
        bat_state = BatState(
            power=power,
            soc=soc,
            imported=imported,
            exported=exported
        )
        log.debug(f"Bat {self.__tcp_client.address}: {bat_state}")
        return bat_state

    def set_power_limit(self, power_limit: Optional[int]) -> None:
        unit = self.component_config.configuration.modbus_id
        try:
            PowerLimitMode = data.data.bat_all_data.data.config.power_limit_mode
        except AttributeError:
            log.warning("PowerLimitMode not found, assuming 'no_limit'")
            PowerLimitMode = 'no_limit'

        if PowerLimitMode == 'no_limit':
            """
            Keine Speichersteuerung, andere Steuerungen zulassen (SolarEdge One, ioBroker, Node-Red etc.).
            Falls andere Steuerungen vorhanden sind, sollten diese nicht beeinflusst werden,
            daher erfolgt im Modus "Immer" der Speichersteuerung keine Steuerung.
            """
            return

        if power_limit is None:
            # Keine Ladung mit Speichersteuerung.
            registers_to_read = ["StorageControlMode"]
            try:
                values = self._read_registers(registers_to_read, unit)
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read registers: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                return

            if values["StorageControlMode"] == REMOTE_CONTROL_MODE:
                # Steuerung deaktivieren.
                log.debug("Keine Speichersteuerung gefordert, Steuerung deaktivieren.")
                values_to_write = {
                    "RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
                    "StorageChargeDischargeDefaultMode": DEFAULT_COMMAND_MODE,
                    "RemoteControlCommandMode": DEFAULT_COMMAND_MODE,
                    "StorageControlMode": self.StorageControlMode_Read,
                }
                self._write_registers(values_to_write, unit)
            return

        elif power_limit >= 0:
            """
            Ladung mit Speichersteuerung.
            SolarEdge entlaedt den Speicher immer nur bis zur SoC-Reserve.
            Steuerung beenden, wenn der SoC vom Speicher die SoC-Reserve unterschreitet.
            """
            registers_to_read = [
                f"Battery{self.battery_index}StateOfEnergy",
                "StorageControlMode",
                "StorageBackupReserved",
                "RemoteControlCommandDischargeLimit",
            ]
            try:
                values = self._read_registers(registers_to_read, unit)
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read registers: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                return

            soc = values[f"Battery{self.battery_index}StateOfEnergy"]
            if soc == FLOAT32_UNSUPPORTED or not 0 <= soc <= 100:
                log.warning(f"Invalid SoC: {soc}, using 0")
                soc = 0
            soc = int(soc)  # SolarEdge protocol may require integer SoC for comparisons
            soc_reserve = max(int(self.soc_reserve_configured), int(values["StorageBackupReserved"]))
            discharge_limit = int(values["RemoteControlCommandDischargeLimit"])

            if values["StorageControlMode"] == REMOTE_CONTROL_MODE:  # Speichersteuerung aktiv.
                if soc_reserve >= soc:
                    # Speichersteuerung deaktivieren, wenn SoC-Reserve unterschritten.
                    log.debug("Speichersteuerung deaktivieren. SoC-Reserve unterschritten.")
                    values_to_write = {
                        "RemoteControlCommandDischargeLimit": MAX_DISCHARGE_LIMIT,
                        "StorageChargeDischargeDefaultMode": DEFAULT_COMMAND_MODE,
                        "RemoteControlCommandMode": DEFAULT_COMMAND_MODE,
                        "StorageControlMode": self.StorageControlMode_Read,
                    }
                    self._write_registers(values_to_write, unit)
                elif discharge_limit not in range(int(power_limit) - 10, int(power_limit) + 10):
                    # DischargeLimit nur bei Abweichung von mehr als 10W, um Konflikte bei 2 Speichern zu verhindern.
                    log.debug(f"Speichersteuerung aktiv, Discharge-Limit {int(power_limit)}W.")
                    values_to_write = {
                        "RemoteControlCommandDischargeLimit": int(min(power_limit, MAX_DISCHARGE_LIMIT))
                    }
                    self._write_registers(values_to_write, unit)
            else:  # Speichersteuerung ist inaktiv.
                if soc_reserve < soc:
                    # Speichersteuerung nur aktivieren, wenn SoC ueber SoC-Reserve.
                    log.debug(f"Speichersteuerung aktivieren. Discharge-Limit: {int(power_limit)} W.")
                    self.StorageControlMode_Read = values["StorageControlMode"]
                    values_to_write = {
                        "StorageControlMode": REMOTE_CONTROL_MODE,
                        "StorageChargeDischargeDefaultMode": ACTIVE_COMMAND_MODE,
                        "RemoteControlCommandMode": ACTIVE_COMMAND_MODE,
                        "RemoteControlCommandDischargeLimit": int(min(power_limit, MAX_DISCHARGE_LIMIT))
                    }
                    self._write_registers(values_to_write, unit)

    def _read_registers(self, register_names: list, unit: int) -> Dict[str, Union[int, float]]:
        values = {}
        for key in register_names:
            address, data_type = self.REGISTERS[key]
            try:
                values[key] = self.__tcp_client.read_holding_registers(
                    address, data_type, wordorder=Endian.Little, unit=unit
                )
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to read register {key} at address {address}: {e}")
                self.fault_state.add_fault(f"Modbus read error: {e}")
                values[key] = 0  # Fallback value
        log.debug(f"Bat raw values {self.__tcp_client.address}: {values}")
        return values
        # TODO: Optimize to read multiple contiguous registers in a single request if supported by ModbusTcpClient_

    def _write_registers(self, values_to_write: Dict[str, Union[int, float]], unit: int) -> None:
        for key, value in values_to_write.items():
            address, data_type = self.REGISTERS[key]
            encoded_value = self._encode_value(value, data_type)
            try:
                self.__tcp_client.write_registers(address, encoded_value, unit=unit)
                log.debug(f"Neuer Wert {encoded_value} in Register {address} geschrieben.")
            except pymodbus.exceptions.ModbusException as e:
                log.error(f"Failed to write register {key} at address {address}: {e}")
                self.fault_state.add_fault(f"Modbus write error: {e}")

    def _encode_value(self, value: Union[int, float], data_type: ModbusDataType) -> list:
        builder = pymodbus.payload.BinaryPayloadBuilder(
            byteorder=pymodbus.constants.Endian.Big,
            wordorder=pymodbus.constants.Endian.Little
        )
        encode_methods = {
            ModbusDataType.UINT_32: builder.add_32bit_uint,
            ModbusDataType.INT_32: builder.add_32bit_int,
            ModbusDataType.UINT_16: builder.add_16bit_uint,
            ModbusDataType.INT_16: builder.add_16bit_int,
            ModbusDataType.FLOAT_32: builder.add_32bit_float,
        }
        if data_type in encode_methods:
            if data_type == ModbusDataType.FLOAT_32:
                encode_methods[data_type](float(value))
            else:
                encode_methods[data_type](int(value))
        else:
            raise ValueError(f"Unsupported data type: {data_type}")
        return builder.to_registers()

    def power_limit_controllable(self) -> bool:
        return True


component_descriptor = ComponentDescriptor(configuration_factory=SolaredgeBatSetup)
openWB Series 2 Standard+, SW-Version 2
SolarEdge SE10K-RWS, BYD LVS 8, 16,8 kWp.
CUPRA Born
Antworten