Guten Abend
Ich besitze einen Fisker und habe mich mal etwas beschäftigt mit der Entwicklungsumgebung von openWB 2. Die läuft nun soweit auf einer VM/Debian 11. Habe mich auch eingelesen in die SoC Module der 2er Version (Templates und primär Polestar, BMW). Muss dazu sagen dass ich in python und Github keine Erfahrungen habe. Auch mit der Tokenauthentifizierung kenne ich mich nicht gut aus.
Leider bekomme ich den Fehler "ModuleNotFoundError" beim ausführen von "sudo phyton3 soc.py" aber auch von api.py
Habe die Pfade mehrmals kontrolliert, die scheinen zu passen (wenn ich nicht komplett auf dem Holzweg bin).
Unten das was ich versucht habe anzupassen und per FTP auf die Entwicklungsumgebung draufgespielt habe. Eine leere __init__.py habe ich auch im Verzeichnis /fisker/ erstellt.
Hier die Doku zu der API-Schnittstelle:
https://www.fiskerapi.io/fisker-api
config.py
Code: Alles auswählen
from typing import Optional
class FiskerOceanConfiguration:
def __init__(self, user_id: Optional[str] = None, password: Optional[str] = None, vin: Optional[str] = None):
self.user_id = user_id
self.password = password
self.vin = vin
class FiskerOcean:
def __init__(self,
name: str = "FiskerOcean",
type: str = "fisker",
configuration: FiskerOceanConfiguration = None) -> None:
self.name = name
self.type = type
self.configuration = configuration or FiskerOceanConfiguration()
soc.py
Code: Alles auswählen
from typing import List
import logging
from helpermodules.cli import run_using_positional_cli_args
from modules.common import store
from modules.common.abstract_device import DeviceDescriptor
from modules.common.abstract_vehicle import VehicleUpdateData
from modules.common.component_state import CarState
from modules.common.configurable_vehicle import ConfigurableVehicle
from modules.vehicles.fisker import api
from modules.vehicles.fisker.config import FiskerOcean, FiskerOceanConfiguration
log = logging.getLogger(__name__)
def create_vehicle(vehicle_config: FiskerOcean, vehicle: int):
def updater(vehicle_update_data: VehicleUpdateData) -> CarState:
return api.fetch_soc(
vehicle_config.configuration.user_id,
vehicle_config.configuration.password,
vehicle_config.configuration.vin,
vehicle)
return ConfigurableVehicle(vehicle_config=vehicle_config, component_updater=updater, vehicle=vehicle)
def FiskerOcean_update(user_id: str, password: str, vin: str, charge_point: int):
log.debug("FiskerOcean: user_id="+user_id+"vin="+vin+"charge_point="+str(charge_point))
vehicle_config = FiskerOcean(configuration=FiskerOceanConfiguration(user_id, password, vin))
store.get_car_value_store(charge_point).store.set(api.fetch_soc(
vehicle_config.configuration.user_id,
vehicle_config.configuration.password,
vehicle_config.configuration.vin,
charge_point))
def main(argv: List[str]):
run_using_positional_cli_args(FiskerOcean_update, argv)
device_descriptor = DeviceDescriptor(configuration_factory=FiskerOcean)
api.py
Code: Alles auswählen
import logging
from modules.common import req
import time
from modules.vehicles.fisker.auth import FiskerAuth
from modules.common.component_state import CarState
log = logging.getLogger(__name__)
class FiskerApi:
def __init__(self, username: str, password: str, vin: str) -> None:
self.auth = FiskerAuth(username, password, vin)
self.vin = vin
self.client_session = req.get_http_session()
def query_params(self, params: dict, url='https://auth.fiskerdps.com/auth/login') -> dict or None:
access_token = self.auth.get_auth_token()
if access_token is None:
raise Exception("query_params error:could not get auth token")
headers = {
"Content-Type": "application/json",
"authorization": f"Bearer {self.auth.access_token}"
}
log.info("query_params:%s", params['query'])
try:
result = self.client_session.get(url=url, params=params, headers=headers)
except Exception as e:
if result.status_code == 401:
self.auth.delete_token()
if self.auth.access_token is not None:
# if we got an access code but the query failed, VIN could be wrong, so let`s check it
self.check_vin()
raise e
result_data = result.json()
if result_data.get('errors'):
error_message = result_data['errors'][0]['message']
raise Exception("query_params error: %s", error_message)
return result_data
def get_battery_data(self) -> dict or None:
params = {
"query": "query GetBatteryData($vin: String!) { getBatteryData(vin: $vin) { "
+ "batteryChargeLevelPercentage estimatedDistanceToEmptyKm } }",
"operationName": "GetBatteryData",
"variables": "{\"vin\":\"" + self.vin + "\"}"
}
result = self.query_params(params)
return result['data']['getBatteryData']
def check_vin(self) -> None:
# get Vehicle Data
params = {
"query": "query GetConsumerCarsV2 { getConsumerCarsV2 { vin internalVehicleIdentifier __typename }}",
"operationName": "GetConsumerCarsV2",
"variables": "{}"
}
result = self.query_params(params, url='https://api.fiskerdps.com/api/v2/users/me')
if result is not None and result['data'] is not None:
vins = []
# get list of cars and store the ones not matching our vin
cars = result['data']['getConsumerCarsV2']
if len(cars) == 0:
raise Exception("Es konnten keine Fahrzeuge im Account gefunden werden. Bitte in den Einstellungen " +
"prüfen, ob der Besitzer-Account des Fisker Ocean eingetragen ist.")
for i in range(0, len(cars)):
if cars[i]['vin'] == self.vin:
pass
else:
vins.append(cars[i]['vin'])
if len(vins) > 0:
raise Exception("You probably specified a wrong VIN. We only found:%s", ",".join(vins))
def fetch_soc(user_id: str, password: str, vin: str, vehicle: int) -> CarState:
api = FiskerApi(user_id, password, vin)
bat_data = api.get_battery_data()
soc = bat_data['batteryChargeLevelPercentage']
est_range = bat_data['estimatedDistanceToEmptyKm']
return CarState(soc, est_range, time.strftime("%m/%d/%Y, %H:%M:%S"))
auth.py (nur tlw. angepasst)
Code: Alles auswählen
import logging
import json
import requests
import os
import re
from datetime import datetime, timedelta
from modules.common.store import RAMDISK_PATH
log = logging.getLogger(__name__)
class FiskerAuth:
""" base class for Fisker authentication"""
def __init__(self, username: str, password: str, vin: str) -> None:
self.username = username
self.password = password
self.vin = vin
self.access_token = None
self.refresh_token = None
self.token_expiry = None
self.client_session = requests.session()
self.resume_path = None
self.code = None
self.token = None
self.token_file = str(RAMDISK_PATH)+'/fiskerocean_token_'+vin+'.json'
self.token_store: dict = {}
def delete_token(self) -> None:
self.access_token = None
self.refresh_token = None
self.token_expiry = None
# remove from ramdisk
if os.path.exists(self.token_file):
try:
os.remove(self.token_file)
except IOError:
log.error("delete_token:error deleting token store %s", self.token_file)
def _load_token_from_ramdisk(self) -> None:
if os.path.exists(self.token_file):
log.info("_load_token_from_ramdisk:loading from file %s", self.token_file)
self.token_store = {}
with open(self.token_file, "r") as tf:
try:
self.token_store = json.load(tf)
self.access_token = self.token_store['access_token']
self.refresh_token = self.token_store['refresh_token']
self.token_expiry = datetime.strptime(self.token_store['token_expiry'], "%d.%m.%Y %H:%M:%S")
except json.JSONDecodeError as e:
log.error("_load_token_from_ramdisk:error loading token store %s:%s",
self.token_file, e)
if 'access_token' not in self.token_store:
self.access_token = None
if 'refresh_token' not in self.token_store:
self.refresh_token = None
if 'token_expiry' not in self.token_store:
self.token_expiry = None
def _save_token_to_ramdisk(self) -> None:
try:
tf = open(self.token_file, mode='w', encoding='utf-8')
except IOError as e:
log.error("_save_token_to_ramdisk:error saving token store %s:%s", self.token_file, e)
return
try:
json.dump(self.token_store, tf, ensure_ascii=False, indent=4)
except json.JSONDecodeError as e:
log.error("_save_token_to_ramdisk:error saving token store %s:%s", self.token_file, e)
# auth step 3: get token
def get_auth_token(self) -> str or None:
# first try to load token from ramdisk
self._load_token_from_ramdisk()
if self.token_expiry is not None and self.token_expiry > datetime.now():
log.info("get_auth_token:using token from file %s", self.token_file)
return self.access_token
else:
log.info("get_auth_token:token from file %s expired. New authentication required", self.token_file)
code = self._get_auth_code()
if code is None:
return None
# get token
params = {
"query": "query getAuthToken($code: String) { getAuthToken(code: $code) { id_token access_token \
refresh_token expires_in }}",
"operationName": "getAuthToken",
"variables": json.dumps({"code": code})
}
headers = {
"Content-Type": "application/json"
}
log.info("get_auth_token:attempting to get new token")
try:
result = self.client_session.get("https://auth.fiskerdps.com/auth/login",
params=params, headers=headers)
except requests.RequestException as e:
log.error("get_auth_token:http error:%s", e)
return None
if result.status_code != 200:
log.error("get_auth_token:error:get response:%d", result.status_code)
return None
result_data = result.json()
log.info(result_data)
if result_data['data']['getAuthToken'] is not None:
self.access_token = result_data['data']['getAuthToken']['access_token']
self.refresh_token = result_data['data']['getAuthToken']['refresh_token']
self.token_expiry = datetime.now(
) + timedelta(seconds=result_data['data']['getAuthToken']['expires_in'])
# save tokens to ramdisk
self.token_store['access_token'] = self.access_token
self.token_store['refresh_token'] = self.refresh_token
self.token_store['token_expiry'] = self.token_expiry.strftime("%d.%m.%Y %H:%M:%S")
self._save_token_to_ramdisk()
else:
log.error("get_auth_token:error getting token:no valid data in http response")
return None
log.info("get_auth_token:got token:%s", self.access_token)
return self.access_token
# auth step 2: get code
def _get_auth_code(self) -> str or None:
self.resume_path = self._get_auth_resumePath()
if self.resume_path is None:
return None
params = {
'client_id': 'polmystar'
}
data = {
'pf.username': self.username,
'pf.pass': self.password
}
log.info("_get_auth_code:attempting to get new code")
try:
result = self.client_session.post(
f"https://polestarid.eu.polestar.com/as/{self.resume_path}/resume/as/authorization.ping",
params=params,
data=data
)
except requests.RequestException as e:
log.error("_get_auth_code:http error:%s", e)
return None
if result.status_code != 200:
log.error("_get_auth_code:error getting auth code: post response:%d", result.status_code)
return None
if re.search(r"ERR", result.request.path_url, flags=re.IGNORECASE) is not None:
log.error("_get_auth_code:error:check username/password")
return None
# get code
m = re.search(r"code=(.+)", result.request.path_url)
if m is not None:
code = m.group(1)
log.info("_get_auth_code:got code %s", code)
else:
code = None
log.info("_get_auth_code:error getting auth code")
return code
# auth step 1: get resumePath
def _get_auth_resumePath(self) -> str or None:
# Get Resume Path
params = {
"response_type": "code",
"client_id": "polmystar",
"redirect_uri": "https://www.polestar.com/sign-in-callback"
}
log.info("_get_auth_resumePath:attempting to get resumePath")
try:
result = self.client_session.get("https://polestarid.eu.polestar.com/as/authorization.oauth2",
params=params)
except requests.RequestException as e:
log.error("_get_auth_resumePath:http error:%s", e)
return None
if result.status_code != 200:
log.error("_get_auth_resumePath:get response:%d", result.status_code)
return None
m = re.search(r"resumePath=([^&]+)", result.url)
if m is not None:
resume_path = m.group(1)
log.info("_get_auth_resumePath:got resumePath %s", resume_path)
else:
resume_path = None
log.info("_get_auth_resumePath:error getting resumePath")
return resume_path
Irgendwie stehe ich an und komm nicht weiter. Kann mir jemand hier weiterhelfen?