From eeeb7ccd5b375444d4c2c515991309b6b456ce87 Mon Sep 17 00:00:00 2001 From: oldgitdaddy Date: Sat, 30 May 2026 23:19:02 +0200 Subject: [PATCH 1/4] Migrate WeConnect auth from deprecated BFF endpoints to direct OIDC VW shut down the legacy BFF endpoints (emea.bff.cariad.digital/user-login/*) at the Azure WAF layer, returning 403 for all clients. This broke WeConnect authentication. Replace the deprecated BFF-based auth flow with direct OIDC endpoints, following the same pattern already used by MyCupraSession: - Authorization: direct OIDC identity.vwgroup.io/oidc/v1/authorize (removed authorizationUrl() override to use parent class implementation) - Token exchange: identity.vwgroup.io/oidc/v1/token with grant_type=authorization_code (standard OIDC) - Token refresh: identity.vwgroup.io/oidc/v1/token (standard OIDC) - Updated User-Agent to Volkswagen/3.61.0-android/14 to match latest APK Fixes #155 in CarConnectivity --- weconnect/auth/vw_web_session.py | 2 +- weconnect/auth/we_connect_session.py | 84 ++++++++++------------------ 2 files changed, 30 insertions(+), 56 deletions(-) diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py index 05c6c0d..748c428 100644 --- a/weconnect/auth/vw_web_session.py +++ b/weconnect/auth/vw_web_session.py @@ -34,7 +34,7 @@ def __init__(self, sessionuser, acceptTermsOnLogin=False, **kwargs): self.websession.proxies.update(self.proxies) self.websession.mount('https://', HTTPAdapter(max_retries=retries)) self.websession.headers = CaseInsensitiveDict({ - 'user-agent': 'Volkswagen/3.51.1-android/14', + 'user-agent': 'Volkswagen/3.61.0-android/14', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,' 'application/signed-exchange;v=b3', 'accept-language': 'en-US,en;q=0.9', diff --git a/weconnect/auth/we_connect_session.py b/weconnect/auth/we_connect_session.py index 302553f..3826299 100644 --- a/weconnect/auth/we_connect_session.py +++ b/weconnect/auth/we_connect_session.py @@ -2,9 +2,7 @@ import logging import requests -from urllib.parse import parse_qsl, urlparse - -from oauthlib.common import add_params_to_uri, generate_nonce, to_unicode +from oauthlib.common import to_unicode from oauthlib.oauth2 import InsecureTransportError from oauthlib.oauth2 import is_secure_transport @@ -33,7 +31,7 @@ def __init__(self, sessionuser, **kwargs): 'content-type': 'application/json', 'content-version': '1', 'x-newrelic-id': 'VgAEWV9QDRAEXFlRAAYPUA==', - 'user-agent': 'Volkswagen/3.51.1-android/14', + 'user-agent': 'Volkswagen/3.61.0-android/14', 'accept-language': 'de-de', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', @@ -66,35 +64,17 @@ def request( def login(self): super(WeConnectSession, self).login() - authorizationUrl = self.authorizationUrl(url='https://identity.vwgroup.io/oidc/v1/authorize') - response = self.doWebAuth(authorizationUrl) - self.fetchTokens('https://emea.bff.cariad.digital/user-login/login/v1', + auth_url = self.authorizationUrl(url='https://identity.vwgroup.io/oidc/v1/authorize') + response = self.doWebAuth(auth_url) + self.fetchTokens('https://identity.vwgroup.io/oidc/v1/token', authorization_response=response ) def refresh(self): self.refreshTokens( - 'https://emea.bff.cariad.digital/login/v1/idk/token', + 'https://identity.vwgroup.io/oidc/v1/token', ) - def authorizationUrl(self, url, state=None, **kwargs): - if state is not None: - raise AuthentificationError('Do not provide state') - - params = [(('redirect_uri', self.redirect_uri)), - (('nonce', generate_nonce()))] - - authUrl = add_params_to_uri('https://emea.bff.cariad.digital/user-login/v1/authorize', params) - - tryLoginResponse: requests.Response = self.get(authUrl, allow_redirects=False, access_type=AccessType.NONE) - redirect = tryLoginResponse.headers['Location'] - query = urlparse(redirect).query - params = dict(parse_qsl(query)) - if 'state' in params: - self.state = params.get('state') - - return redirect - def clearTokens(self) -> None: """ Clear all stored tokens to force a fresh login. @@ -115,20 +95,19 @@ def fetchTokens( self.parseFromFragment(authorization_response) if all(key in self.token for key in ('state', 'id_token', 'access_token', 'code')): - body: str = json.dumps( - { - 'state': self.token['state'], - 'id_token': self.token['id_token'], - 'redirect_uri': self.redirect_uri, - 'region': 'emea', - 'access_token': self.token['access_token'], - 'authorizationCode': self.token['code'], - }) - - loginHeadersForm: CaseInsensitiveDict = self.headers - loginHeadersForm['accept'] = 'application/json' - - tokenResponse = self.post(token_url, headers=loginHeadersForm, data=body, allow_redirects=False, access_type=AccessType.ID) + body = { + 'code': self.token['code'], + 'redirect_uri': self.redirect_uri, + 'client_id': self.client_id, + 'grant_type': 'authorization_code', + 'state': self.token['state'], + 'id_token': self.token['id_token'] + } + + loginHeadersForm = self.headers.copy() + loginHeadersForm['content-type'] = 'application/x-www-form-urlencoded; charset=utf-8' + + tokenResponse = self.post(token_url, headers=loginHeadersForm, data=body, allow_redirects=False, access_type=AccessType.NONE) if tokenResponse.status_code != requests.codes['ok']: raise TemporaryAuthentificationError(f'Token could not be fetched due to temporary WeConnect failure: {tokenResponse.status_code}') token = self.parseFromBody(tokenResponse.text) @@ -189,7 +168,7 @@ def refreshTokens( if headers is None: headers = self.headers - # First try to get from the current token property, then fall back to stored token + # Try to get from the current token property, then fall back to stored token if refresh_token is None: refresh_token = self.refreshToken # If still None, try to get from the token dict directly @@ -199,30 +178,25 @@ def refreshTokens( if not refresh_token: raise AuthentificationError('No refresh token available. Please log in again.') - # Create headers matching the examples format - tHeaders = { - "Accept-Encoding": "gzip, deflate, br", - "Connection": "keep-alive", - "Content-Type": "application/x-www-form-urlencoded", - "User-Agent": "Volkswagen/3.51.1-android/14", - "x-android-package-name": "com.volkswagen.weconnect", - } - - # Create form data body matching the examples format + # Create body matching standard OIDC refresh token grant body = { - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": self.client_id, + 'grant_type': 'refresh_token', + 'refresh_token': refresh_token, + 'client_id': self.client_id, } + headers = headers.copy() + headers['content-type'] = 'application/x-www-form-urlencoded; charset=utf-8' + # Request new tokens using POST with form data tokenResponse = self.post( token_url, data=body, - headers=tHeaders, + headers=headers, timeout=timeout, verify=verify, proxies=proxies, + access_type=AccessType.NONE ) if tokenResponse.status_code == requests.codes['unauthorized']: LOG.error('Token refresh failed with 401 - server requests new authorization. Refresh token may be expired or invalid.') From 0b3228b913d61e361cb5488dff28a0fffde7e149 Mon Sep 17 00:00:00 2001 From: oldgitdaddy Date: Sun, 31 May 2026 09:59:21 +0200 Subject: [PATCH 2/4] Switch from direct OIDC code exchange to hybrid flow MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The direct POST to identity.vwgroup.io/oidc/v1/token with grant_type=authorization_code returns 401 access_denied because Auth0 binds the authorization code to the CARIAD BFF as the authorized exchanger. Switch to the OIDC hybrid flow (response_type=code id_token token) where access_token and id_token are delivered directly in the callback URL — no server-side token exchange is needed. The parent class OpenIDSession.authorizationUrl() already uses this response type so the authorization URL was already correct. Key changes: - fetchTokens(): extract tokens from callback directly, skip the broken token exchange POST to /oidc/v1/token - refresh(): trigger full re-login since hybrid flow issues no refresh_token (~2h access token lifetime) - refreshTokens(): simplified to delegate to login() for graceful fallback - _handle_new_auth_flow(): add action=default to login form POST (required by Auth0 Universal Login, per PR #333 finding) - Scope: add offline_access - Remove unused imports (requests, InsecureTransportError, etc.) Aligned with robinostlund/volkswagencarnet#333. Co-Authored-By: Claude Opus 4.8 --- weconnect/auth/vw_web_session.py | 6 +- weconnect/auth/we_connect_session.py | 147 +++++++++------------------ 2 files changed, 54 insertions(+), 99 deletions(-) diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py index 748c428..249745e 100644 --- a/weconnect/auth/vw_web_session.py +++ b/weconnect/auth/vw_web_session.py @@ -322,10 +322,14 @@ def _handle_new_auth_flow(self, url: str) -> str: raise APICompatibilityError('Could not find state token in authorization page') # Create login form data + # Note: 'action=default' is required by Auth0 Universal Login to + # distinguish credential submission from other form actions. + # Without it, the login POST can be silently ignored. login_form = { 'username': self.sessionuser.username, 'password': self.sessionuser.password, - 'state': state + 'state': state, + 'action': 'default' } # Post to login URL diff --git a/weconnect/auth/we_connect_session.py b/weconnect/auth/we_connect_session.py index 3826299..7791a3d 100644 --- a/weconnect/auth/we_connect_session.py +++ b/weconnect/auth/we_connect_session.py @@ -1,16 +1,13 @@ import json import logging -import requests from oauthlib.common import to_unicode -from oauthlib.oauth2 import InsecureTransportError -from oauthlib.oauth2 import is_secure_transport from requests.models import CaseInsensitiveDict from weconnect.auth.openid_session import AccessType from weconnect.auth.vw_web_session import VWWebSession -from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError +from weconnect.errors import TemporaryAuthentificationError LOG = logging.getLogger("weconnect") @@ -20,7 +17,7 @@ class WeConnectSession(VWWebSession): def __init__(self, sessionuser, **kwargs): super(WeConnectSession, self).__init__(client_id='a24fba63-34b3-4d43-b181-942111e6bda8@apps_vw-dilab_com', refresh_url='https://identity.vwgroup.io/oidc/v1/token', - scope='openid profile badge cars dealers vin', + scope='openid profile badge cars dealers vin offline_access', redirect_uri='weconnect://authenticated', state=None, sessionuser=sessionuser, @@ -71,9 +68,15 @@ def login(self): ) def refresh(self): - self.refreshTokens( - 'https://identity.vwgroup.io/oidc/v1/token', - ) + """Perform full re-login since OIDC hybrid flow does not issue refresh tokens. + + The hybrid flow (response_type=code id_token token) delivers tokens + directly in the callback URL with no refresh_token for security + reasons. When the access_token expires, we must do a complete + re-authentication. + """ + LOG.info('No refresh token available (OIDC hybrid flow). Performing full re-login.') + self.login() def clearTokens(self) -> None: """ @@ -92,41 +95,37 @@ def fetchTokens( authorization_response=None, **kwargs ): + """Extract tokens from OIDC hybrid flow callback URL. + + With hybrid flow (response_type=code id_token token), the OAuth + callback URL already contains access_token and id_token directly. + No server-side token exchange is needed because Auth0 binds the + authorization code to the CARIAD BFF as the authorized exchanger — + a direct POST to identity.vwgroup.io/oidc/v1/token would return + 401 access_denied. + + This follows the same approach as robinostlund/volkswagencarnet#333. + """ self.parseFromFragment(authorization_response) - if all(key in self.token for key in ('state', 'id_token', 'access_token', 'code')): - body = { - 'code': self.token['code'], - 'redirect_uri': self.redirect_uri, - 'client_id': self.client_id, - 'grant_type': 'authorization_code', - 'state': self.token['state'], - 'id_token': self.token['id_token'] - } - - loginHeadersForm = self.headers.copy() - loginHeadersForm['content-type'] = 'application/x-www-form-urlencoded; charset=utf-8' - - tokenResponse = self.post(token_url, headers=loginHeadersForm, data=body, allow_redirects=False, access_type=AccessType.NONE) - if tokenResponse.status_code != requests.codes['ok']: - raise TemporaryAuthentificationError(f'Token could not be fetched due to temporary WeConnect failure: {tokenResponse.status_code}') - token = self.parseFromBody(tokenResponse.text) - - # Ensure the token is properly stored in the session - if token is not None: - self.token = token # Explicitly store the token - LOG.debug(f"Successfully fetched tokens. Access token expires in: {token.get('expires_in', 'unknown')} seconds") - LOG.debug(f"Refresh token available: {'refresh_token' in token}") - # Verify critical tokens are present - if not all(key in token for key in ('access_token', 'id_token', 'refresh_token')): - LOG.warning("Some expected tokens are missing from the response") - else: - LOG.error("Token parsing returned None") - - return token - else: - LOG.error("Authorization response missing required tokens") - return None + if self.token is None: + raise TemporaryAuthentificationError('Failed to parse tokens from authorization response') + + if 'access_token' not in self.token: + raise TemporaryAuthentificationError( + 'No access token found in authorization response. ' + 'The OIDC hybrid flow callback did not return an access token.' + ) + + LOG.info('Successfully obtained tokens from OIDC hybrid flow callback') + LOG.debug('Access token expires in: %s seconds', self.token.get('expires_in', 'unknown')) + + # OIDC hybrid flow does not return refresh_token for security + # reasons. Re-login will be required when the access_token expires. + if 'refresh_token' not in self.token: + LOG.debug('No refresh token in response (expected with hybrid flow)') + + return self.token def parseFromBody(self, token_response, state=None): try: @@ -156,60 +155,12 @@ def refreshTokens( proxies=None, **kwargs ): - LOG.info('Refreshing tokens') - if not token_url: - raise ValueError("No token endpoint set for auto_refresh.") - - if not is_secure_transport(token_url): - raise InsecureTransportError() - - refresh_token = refresh_token or self.refreshToken - - if headers is None: - headers = self.headers - - # Try to get from the current token property, then fall back to stored token - if refresh_token is None: - refresh_token = self.refreshToken - # If still None, try to get from the token dict directly - if refresh_token is None and self.token is not None: - refresh_token = self.token.get('refresh_token') - - if not refresh_token: - raise AuthentificationError('No refresh token available. Please log in again.') - - # Create body matching standard OIDC refresh token grant - body = { - 'grant_type': 'refresh_token', - 'refresh_token': refresh_token, - 'client_id': self.client_id, - } - - headers = headers.copy() - headers['content-type'] = 'application/x-www-form-urlencoded; charset=utf-8' - - # Request new tokens using POST with form data - tokenResponse = self.post( - token_url, - data=body, - headers=headers, - timeout=timeout, - verify=verify, - proxies=proxies, - access_type=AccessType.NONE - ) - if tokenResponse.status_code == requests.codes['unauthorized']: - LOG.error('Token refresh failed with 401 - server requests new authorization. Refresh token may be expired or invalid.') - raise AuthentificationError('Refreshing tokens failed: Server requests new authorization. Please log in again.') - elif tokenResponse.status_code in (requests.codes['internal_server_error'], requests.codes['service_unavailable'], requests.codes['gateway_timeout']): - raise TemporaryAuthentificationError(f'Token could not be refreshed due to temporary WeConnect failure: {tokenResponse.status_code}') - elif tokenResponse.status_code == requests.codes['ok']: - newToken = self.parseFromBody(tokenResponse.text) - if newToken is not None and "refresh_token" not in newToken: - LOG.debug("No new refresh token given. Re-using old.") - self.token["refresh_token"] = refresh_token - # Update the token property as well - self.token = newToken - return newToken - else: - raise RetrievalError(f'Status Code from WeConnect while refreshing tokens was: {tokenResponse.status_code}') + """Token refresh is not available with OIDC hybrid flow. + + The hybrid flow does not return a refresh_token — Auth0 issues no + refresh tokens for security reasons with response_type=code id_token + token. When tokens expire, a full re-login is required. + """ + LOG.info('Token refresh not available (OIDC hybrid flow). Performing full re-login.') + self.login() + return self.token From 0fa29711a9ea2337ba1e08b651db076e1c5cc9eb Mon Sep 17 00:00:00 2001 From: oldgitdaddy Date: Sun, 31 May 2026 11:38:20 +0200 Subject: [PATCH 3/4] Improve refresh() to try silent re-auth before full re-login MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OIDC hybrid flow has no refresh_token, but the websession preserves Auth0 SSO cookies from the initial login. refresh() now: 1. First tries prompt=none — if the Auth0 session cookie is still valid, tokens are returned silently with no login form. 2. Falls back to full credential-based login if the session expired. This makes token refresh transparent when the Auth0 session outlasts the 2h access_token (which is common). Co-Authored-By: Claude Opus 4.8 --- test_login.py | 252 +++++++++++++++++++++++++++ weconnect/auth/we_connect_session.py | 34 +++- 2 files changed, 279 insertions(+), 7 deletions(-) create mode 100644 test_login.py diff --git a/test_login.py b/test_login.py new file mode 100644 index 0000000..4dea757 --- /dev/null +++ b/test_login.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python3 +""" +Test script for the OIDC hybrid flow login fix (PR #237). + +Usage: + python test_login.py + VW_EMAIL=... VW_PASSWORD=... python test_login.py + +This script tests the fixed WeConnectSession.login() which uses the +OIDC hybrid flow (response_type=code id_token token) to get tokens +directly from the Auth0 callback URL without any BFF token exchange. +""" + +import logging +import os +import sys +import time + +# Set up detailed logging so we can see each step +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s [%(levelname)-5s] %(name)s: %(message)s', + datefmt='%H:%M:%S', +) +# Quiet down noisy loggers +logging.getLogger('urllib3').setLevel(logging.WARNING) + +from weconnect.auth.we_connect_session import WeConnectSession +from weconnect.auth.session_manager import SessionUser +from weconnect.errors import AuthentificationError, TemporaryAuthentificationError + + +def test_login(email: str, password: str) -> bool: + """Attempt a login and report results.""" + + print("=" * 60) + print("OIDC Hybrid Flow Login Test") + print("=" * 60) + print(f"Email: {email}") + print(f"Target: identity.vwgroup.io/oidc/v1/authorize") + print(f"Flow: Hybrid (response_type=code id_token token)") + print() + + # Step 1: Create session + print("[1/4] Creating WeConnectSession...") + user = SessionUser(username=email, password=password) + session = WeConnectSession(sessionuser=user) + print(f" Client ID: {session.client_id}") + print(f" Scope: {session.scope}") + print(f" Redirect: {session.redirect_uri}") + print() + + # Step 2: Login + print("[2/4] Logging in (OIDC hybrid flow)...") + start = time.time() + try: + session.login() + elapsed = time.time() - start + except AuthentificationError as e: + print(f"\n ❌ Authentication FAILED: {e}") + return False + except TemporaryAuthentificationError as e: + print(f"\n ⚠️ Temporary failure: {e}") + return False + except Exception as e: + print(f"\n ❌ Unexpected error: {type(e).__name__}: {e}") + return False + + print(f" Completed in {elapsed:.1f}s") + print() + + # Step 3: Check tokens + # Note: this codebase uses camelCase property names + print("[3/4] Checking tokens...") + tok = session.accessToken + if not tok: + print(" ❌ Not authorized — no access_token!") + return False + + print(f" ✅ access_token: {tok[:20]}...{tok[-10:]}") + print(f" token_type: {session.tokenType or 'N/A'}") + print(f" expires_in: {session.expiresIn}s") + + idt = session.idToken + if idt: + print(f" ✅ id_token: {idt[:20]}...{idt[-10:]}") + else: + print(f" ⚠️ No id_token in session") + + rt = session.refreshToken + if rt: + print(f" refresh_token: {rt[:20]}...{rt[-10:]}") + else: + print(f" ℹ️ No refresh_token (expected — hybrid flow limitation)") + print(f" Session lasts ~2h, then re-login needed.") + + # Decode JWT payload without verifying signature + try: + import jwt + payload = jwt.decode(tok, options={"verify_signature": False}) + exp = payload.get('exp', 'N/A') + if exp != 'N/A': + from datetime import datetime, timezone + exp_dt = datetime.fromtimestamp(exp, tz=timezone.utc) + remaining = exp - time.time() + print(f" JWT exp: {exp_dt.isoformat()} ({remaining:.0f}s remaining)") + print(f" JWT iss: {payload.get('iss', 'N/A')}") + print(f" JWT sub: {payload.get('sub', 'N/A')[:40]}...") + print(f" JWT aud: {payload.get('aud', 'N/A')}") + except Exception: + pass + + print() + + # Step 4: Verify with a simple API call + print("[4/4] Testing API access...") + api_url = 'https://emea.bff.cariad.digital/vehicle/v1/vehicles' + try: + resp = session.get( + api_url, + headers={'accept': 'application/json'}, + timeout=30, + ) + if resp.status_code == 200: + data = resp.json() + count = len(data.get('data', [])) + print(f" ✅ API responded 200 — {count} vehicle(s) found") + for v in data.get('data', []): + print(f" • VIN: {v.get('vin', 'N/A')}") + elif resp.status_code == 401: + print(f" ❌ API returned 401 — tokens not accepted by VW API") + return False + else: + print(f" ⚠️ API returned {resp.status_code}: {resp.text[:200]}") + except Exception as e: + print(f" ⚠️ API call failed: {e}") + return True + + +def test_refresh(email: str, password: str) -> bool: + """Test that refresh() correctly triggers a full re-login.""" + + print("=" * 60) + print("OIDC Hybrid Flow — Token Refresh Test") + print("=" * 60) + print() + print("The hybrid flow issues NO refresh_token, so refresh()") + print("must perform a full re-login to get fresh tokens.") + print() + + # Step 1: Initial login + print("[1/3] Initial login...") + user = SessionUser(username=email, password=password) + session = WeConnectSession(sessionuser=user) + try: + session.login() + except Exception as e: + print(f" ❌ Login failed: {type(e).__name__}: {e}") + return False + + old_token = session.accessToken + old_expiry = session.expiresAt + print(f" ✅ Logged in — access_token: {old_token[:20]}...{old_token[-10:]}") + print(f" Expires at: {old_expiry}") + print() + + # Step 2: Call refresh (triggers re-login with hybrid flow) + print("[2/3] Calling session.refresh()...") + import time + start = time.time() + try: + session.refresh() + elapsed = time.time() - start + except Exception as e: + print(f" ❌ Refresh failed: {type(e).__name__}: {e}") + return False + + new_token = session.accessToken + new_expiry = session.expiresAt + print(f" ✅ Refresh complete in {elapsed:.1f}s") + print(f" New access_token: {new_token[:20]}...{new_token[-10:]}") + print(f" New expires at: {new_expiry}") + + if old_token == new_token: + print(f" ⚠️ Token unchanged — refresh may have returned same session") + else: + print(f" ✅ Token was replaced (fresh login)") + + if session.refreshToken: + print(f" ⚠️ Refresh token present (unexpected for hybrid flow)") + else: + print(f" ✅ No refresh token (expected — hybrid flow limitation)") + print() + + # Step 3: Verify new token works + print("[3/3] Verifying new token with API call...") + try: + resp = session.get( + 'https://emea.bff.cariad.digital/vehicle/v1/vehicles', + headers={'accept': 'application/json'}, + timeout=30, + ) + if resp.status_code == 200: + print(f" ✅ New token accepted by API (200)") + elif resp.status_code == 401: + print(f" ❌ New token rejected by API (401)") + return False + else: + print(f" ⚠️ API returned {resp.status_code}") + except Exception as e: + print(f" ⚠️ API call failed: {e}") + + print() + print("=" * 60) + print("✅ Refresh test passed! Re-login on refresh works.") + print("=" * 60) + return True + + print() + print("=" * 60) + print("✅ Login successful! Hybrid flow is working.") + print("=" * 60) + return True + + +if __name__ == '__main__': + if '--refresh' in sys.argv: + sys.argv.remove('--refresh') + test_mode = 'refresh' + else: + test_mode = 'login' + + # Get credentials from args or env + if len(sys.argv) == 3: + email = sys.argv[1] + password = sys.argv[2] + else: + email = os.environ.get('VW_EMAIL') + password = os.environ.get('VW_PASSWORD') + + if not email or not password: + print("Usage: python test_login.py [--refresh] ") + print(" or: VW_EMAIL=... VW_PASSWORD=... python test_login.py [--refresh]") + print() + print(" --refresh Test token refresh (re-login) after successful login") + sys.exit(1) + + if test_mode == 'refresh': + success = test_refresh(email, password) + else: + success = test_login(email, password) + sys.exit(0 if success else 1) diff --git a/weconnect/auth/we_connect_session.py b/weconnect/auth/we_connect_session.py index 7791a3d..d7be22f 100644 --- a/weconnect/auth/we_connect_session.py +++ b/weconnect/auth/we_connect_session.py @@ -68,14 +68,34 @@ def login(self): ) def refresh(self): - """Perform full re-login since OIDC hybrid flow does not issue refresh tokens. - - The hybrid flow (response_type=code id_token token) delivers tokens - directly in the callback URL with no refresh_token for security - reasons. When the access_token expires, we must do a complete - re-authentication. + """Refresh tokens via silent re-auth or fall back to full re-login. + + The OIDC hybrid flow (response_type=code id_token token) does not + issue a refresh_token. To get fresh tokens we re-authenticate: + 1. First try prompt=none — if self.websession still has a valid + Auth0 SSO session cookie, Auth0 returns new tokens silently + without showing a login form. + 2. If that fails (session expired, or Auth0 rejects silent auth), + fall back to a full login with credentials. """ - LOG.info('No refresh token available (OIDC hybrid flow). Performing full re-login.') + LOG.info('Access token expired — attempting silent re-authentication via Auth0 session cookie.') + try: + # Try silent re-auth: add prompt=none so Auth0 uses the existing + # SSO session cookie instead of showing the login form. + auth_url = self.authorizationUrl( + url='https://identity.vwgroup.io/oidc/v1/authorize', + prompt='none', + ) + response = self.doWebAuth(auth_url) + self.fetchTokens('https://identity.vwgroup.io/oidc/v1/token', + authorization_response=response) + if self.accessToken: + LOG.info('Silent re-authentication succeeded (Auth0 session was still valid).') + return + except Exception as e: + LOG.debug('Silent re-auth failed (%s), falling back to full login.', e) + + LOG.info('Performing full re-login with credentials.') self.login() def clearTokens(self) -> None: From 105dc1739c6a1e7818a4f5997093f4a97a5cd70a Mon Sep 17 00:00:00 2001 From: oldgitdaddy Date: Sun, 31 May 2026 11:54:58 +0200 Subject: [PATCH 4/4] Update test_login.py to detect silent vs full re-auth in refresh test The --refresh test now: 1. Captures WeConnectSession log messages during refresh() 2. Simulates token expiry (expires_at=0) to force the refresh path 3. Reports whether silent re-auth (prompt=none via Auth0 cookie) was sufficient, or a full credential-based re-login was required 4. Shows the failure reason if silent auth fell back Co-Authored-By: Claude Opus 4.8 --- test_login.py | 88 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 60 insertions(+), 28 deletions(-) diff --git a/test_login.py b/test_login.py index 4dea757..63d3e17 100644 --- a/test_login.py +++ b/test_login.py @@ -137,19 +137,31 @@ def test_login(email: str, password: str) -> bool: return True +class RefreshLogCapture(logging.Handler): + """Capture log messages to detect which refresh path was used.""" + + def __init__(self): + super().__init__(level=logging.DEBUG) + self.messages: list[str] = [] + + def emit(self, record: logging.LogRecord) -> None: + self.messages.append(record.getMessage()) + + def test_refresh(email: str, password: str) -> bool: - """Test that refresh() correctly triggers a full re-login.""" + """Test that refresh() works, and report whether a full login was needed.""" print("=" * 60) print("OIDC Hybrid Flow — Token Refresh Test") print("=" * 60) print() - print("The hybrid flow issues NO refresh_token, so refresh()") - print("must perform a full re-login to get fresh tokens.") + print("The hybrid flow issues NO refresh_token, so refresh() first") + print("tries prompt=none (silent, using Auth0 SSO cookie), and only") + print("falls back to a full credential-based login if the cookie expired.") print() # Step 1: Initial login - print("[1/3] Initial login...") + print("[1/4] Initial login...") user = SessionUser(username=email, password=password) session = WeConnectSession(sessionuser=user) try: @@ -159,13 +171,22 @@ def test_refresh(email: str, password: str) -> bool: return False old_token = session.accessToken - old_expiry = session.expiresAt - print(f" ✅ Logged in — access_token: {old_token[:20]}...{old_token[-10:]}") - print(f" Expires at: {old_expiry}") + print(f" ✅ Logged in") + print(f" access_token: {old_token[:20]}...{old_token[-10:]}") + print() + + # Step 2: Expire the token (simulate time passing) + print("[2/4] Simulating token expiry...") + if session.token and 'expires_at' in session.token: + session.token['expires_at'] = 0 # force immediate expiry + print(f" ✅ Token marked as expired (expires_at = 0)") print() - # Step 2: Call refresh (triggers re-login with hybrid flow) - print("[2/3] Calling session.refresh()...") + # Step 3: Call refresh and capture how it happened + print("[3/4] Calling session.refresh()...") + log_capture = RefreshLogCapture() + logging.getLogger('weconnect').addHandler(log_capture) + import time start = time.time() try: @@ -174,26 +195,40 @@ def test_refresh(email: str, password: str) -> bool: except Exception as e: print(f" ❌ Refresh failed: {type(e).__name__}: {e}") return False + finally: + logging.getLogger('weconnect').removeHandler(log_capture) new_token = session.accessToken - new_expiry = session.expiresAt - print(f" ✅ Refresh complete in {elapsed:.1f}s") - print(f" New access_token: {new_token[:20]}...{new_token[-10:]}") - print(f" New expires at: {new_expiry}") + print(f" Completed in {elapsed:.1f}s") + print(f" New access_token: {new_token[:20]}...{new_token[-10:]}" if new_token else " No token!") - if old_token == new_token: - print(f" ⚠️ Token unchanged — refresh may have returned same session") + # Detect which path was used + silent_success = any('Silent re-authentication succeeded' in m for m in log_capture.messages) + silent_failed = any('Silent re-auth failed' in m for m in log_capture.messages) + full_login = any('Performing full re-login' in m for m in log_capture.messages) + + print() + if silent_success: + print(" 🔇 SILENT re-auth was sufficient!") + print(" Auth0 SSO session cookie was still valid — no login form needed.") + print(" Tokens refreshed transparently without credentials.") + elif silent_failed and full_login: + print(" 🔐 FULL re-login was required.") + print(" Auth0 SSO session cookie had expired — credentials were re-sent.") + print(f" Silent re-auth failure reason: {next((m for m in log_capture.messages if 'Silent re-auth failed' in m), 'unknown')}") else: - print(f" ✅ Token was replaced (fresh login)") + print(f" ⚠️ Could not determine refresh path from logs.") + if log_capture.messages: + print(f" Captured messages: {log_capture.messages}") - if session.refreshToken: - print(f" ⚠️ Refresh token present (unexpected for hybrid flow)") + if old_token == new_token: + print(f" ⚠️ Token unchanged — refresh may have returned same session") else: - print(f" ✅ No refresh token (expected — hybrid flow limitation)") + print(f" ✅ Token was replaced") print() - # Step 3: Verify new token works - print("[3/3] Verifying new token with API call...") + # Step 4: Verify new token works + print("[4/4] Verifying new token with API call...") try: resp = session.get( 'https://emea.bff.cariad.digital/vehicle/v1/vehicles', @@ -212,13 +247,10 @@ def test_refresh(email: str, password: str) -> bool: print() print("=" * 60) - print("✅ Refresh test passed! Re-login on refresh works.") - print("=" * 60) - return True - - print() - print("=" * 60) - print("✅ Login successful! Hybrid flow is working.") + if silent_success: + print("✅ Refresh test passed — silent re-auth works!") + else: + print("✅ Refresh test passed — full re-login fallback works!") print("=" * 60) return True