diff --git a/test_login.py b/test_login.py new file mode 100644 index 0000000..63d3e17 --- /dev/null +++ b/test_login.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python3 +""" +Test script for the OIDC hybrid flow login fix (PR #237). + +Usage: + python test_login.py + VW_EMAIL=... VW_PASSWORD=... python test_login.py + +This script tests the fixed WeConnectSession.login() which uses the +OIDC hybrid flow (response_type=code id_token token) to get tokens +directly from the Auth0 callback URL without any BFF token exchange. +""" + +import logging +import os +import sys +import time + +# Set up detailed logging so we can see each step +logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s [%(levelname)-5s] %(name)s: %(message)s', + datefmt='%H:%M:%S', +) +# Quiet down noisy loggers +logging.getLogger('urllib3').setLevel(logging.WARNING) + +from weconnect.auth.we_connect_session import WeConnectSession +from weconnect.auth.session_manager import SessionUser +from weconnect.errors import AuthentificationError, TemporaryAuthentificationError + + +def test_login(email: str, password: str) -> bool: + """Attempt a login and report results.""" + + print("=" * 60) + print("OIDC Hybrid Flow Login Test") + print("=" * 60) + print(f"Email: {email}") + print(f"Target: identity.vwgroup.io/oidc/v1/authorize") + print(f"Flow: Hybrid (response_type=code id_token token)") + print() + + # Step 1: Create session + print("[1/4] Creating WeConnectSession...") + user = SessionUser(username=email, password=password) + session = WeConnectSession(sessionuser=user) + print(f" Client ID: {session.client_id}") + print(f" Scope: {session.scope}") + print(f" Redirect: {session.redirect_uri}") + print() + + # Step 2: Login + print("[2/4] Logging in (OIDC hybrid flow)...") + start = time.time() + try: + session.login() + elapsed = time.time() - start + except AuthentificationError as e: + print(f"\n ❌ Authentication FAILED: {e}") + return False + except TemporaryAuthentificationError as e: + print(f"\n ⚠️ Temporary failure: {e}") + return False + except Exception as e: + print(f"\n ❌ Unexpected error: {type(e).__name__}: {e}") + return False + + print(f" Completed in {elapsed:.1f}s") + print() + + # Step 3: Check tokens + # Note: this codebase uses camelCase property names + print("[3/4] Checking tokens...") + tok = session.accessToken + if not tok: + print(" ❌ Not authorized — no access_token!") + return False + + print(f" ✅ access_token: {tok[:20]}...{tok[-10:]}") + print(f" token_type: {session.tokenType or 'N/A'}") + print(f" expires_in: {session.expiresIn}s") + + idt = session.idToken + if idt: + print(f" ✅ id_token: {idt[:20]}...{idt[-10:]}") + else: + print(f" ⚠️ No id_token in session") + + rt = session.refreshToken + if rt: + print(f" refresh_token: {rt[:20]}...{rt[-10:]}") + else: + print(f" ℹ️ No refresh_token (expected — hybrid flow limitation)") + print(f" Session lasts ~2h, then re-login needed.") + + # Decode JWT payload without verifying signature + try: + import jwt + payload = jwt.decode(tok, options={"verify_signature": False}) + exp = payload.get('exp', 'N/A') + if exp != 'N/A': + from datetime import datetime, timezone + exp_dt = datetime.fromtimestamp(exp, tz=timezone.utc) + remaining = exp - time.time() + print(f" JWT exp: {exp_dt.isoformat()} ({remaining:.0f}s remaining)") + print(f" JWT iss: {payload.get('iss', 'N/A')}") + print(f" JWT sub: {payload.get('sub', 'N/A')[:40]}...") + print(f" JWT aud: {payload.get('aud', 'N/A')}") + except Exception: + pass + + print() + + # Step 4: Verify with a simple API call + print("[4/4] Testing API access...") + api_url = 'https://emea.bff.cariad.digital/vehicle/v1/vehicles' + try: + resp = session.get( + api_url, + headers={'accept': 'application/json'}, + timeout=30, + ) + if resp.status_code == 200: + data = resp.json() + count = len(data.get('data', [])) + print(f" ✅ API responded 200 — {count} vehicle(s) found") + for v in data.get('data', []): + print(f" • VIN: {v.get('vin', 'N/A')}") + elif resp.status_code == 401: + print(f" ❌ API returned 401 — tokens not accepted by VW API") + return False + else: + print(f" ⚠️ API returned {resp.status_code}: {resp.text[:200]}") + except Exception as e: + print(f" ⚠️ API call failed: {e}") + return True + + +class RefreshLogCapture(logging.Handler): + """Capture log messages to detect which refresh path was used.""" + + def __init__(self): + super().__init__(level=logging.DEBUG) + self.messages: list[str] = [] + + def emit(self, record: logging.LogRecord) -> None: + self.messages.append(record.getMessage()) + + +def test_refresh(email: str, password: str) -> bool: + """Test that refresh() works, and report whether a full login was needed.""" + + print("=" * 60) + print("OIDC Hybrid Flow — Token Refresh Test") + print("=" * 60) + print() + print("The hybrid flow issues NO refresh_token, so refresh() first") + print("tries prompt=none (silent, using Auth0 SSO cookie), and only") + print("falls back to a full credential-based login if the cookie expired.") + print() + + # Step 1: Initial login + print("[1/4] Initial login...") + user = SessionUser(username=email, password=password) + session = WeConnectSession(sessionuser=user) + try: + session.login() + except Exception as e: + print(f" ❌ Login failed: {type(e).__name__}: {e}") + return False + + old_token = session.accessToken + print(f" ✅ Logged in") + print(f" access_token: {old_token[:20]}...{old_token[-10:]}") + print() + + # Step 2: Expire the token (simulate time passing) + print("[2/4] Simulating token expiry...") + if session.token and 'expires_at' in session.token: + session.token['expires_at'] = 0 # force immediate expiry + print(f" ✅ Token marked as expired (expires_at = 0)") + print() + + # Step 3: Call refresh and capture how it happened + print("[3/4] Calling session.refresh()...") + log_capture = RefreshLogCapture() + logging.getLogger('weconnect').addHandler(log_capture) + + import time + start = time.time() + try: + session.refresh() + elapsed = time.time() - start + except Exception as e: + print(f" ❌ Refresh failed: {type(e).__name__}: {e}") + return False + finally: + logging.getLogger('weconnect').removeHandler(log_capture) + + new_token = session.accessToken + print(f" Completed in {elapsed:.1f}s") + print(f" New access_token: {new_token[:20]}...{new_token[-10:]}" if new_token else " No token!") + + # Detect which path was used + silent_success = any('Silent re-authentication succeeded' in m for m in log_capture.messages) + silent_failed = any('Silent re-auth failed' in m for m in log_capture.messages) + full_login = any('Performing full re-login' in m for m in log_capture.messages) + + print() + if silent_success: + print(" 🔇 SILENT re-auth was sufficient!") + print(" Auth0 SSO session cookie was still valid — no login form needed.") + print(" Tokens refreshed transparently without credentials.") + elif silent_failed and full_login: + print(" 🔐 FULL re-login was required.") + print(" Auth0 SSO session cookie had expired — credentials were re-sent.") + print(f" Silent re-auth failure reason: {next((m for m in log_capture.messages if 'Silent re-auth failed' in m), 'unknown')}") + else: + print(f" ⚠️ Could not determine refresh path from logs.") + if log_capture.messages: + print(f" Captured messages: {log_capture.messages}") + + if old_token == new_token: + print(f" ⚠️ Token unchanged — refresh may have returned same session") + else: + print(f" ✅ Token was replaced") + print() + + # Step 4: Verify new token works + print("[4/4] Verifying new token with API call...") + try: + resp = session.get( + 'https://emea.bff.cariad.digital/vehicle/v1/vehicles', + headers={'accept': 'application/json'}, + timeout=30, + ) + if resp.status_code == 200: + print(f" ✅ New token accepted by API (200)") + elif resp.status_code == 401: + print(f" ❌ New token rejected by API (401)") + return False + else: + print(f" ⚠️ API returned {resp.status_code}") + except Exception as e: + print(f" ⚠️ API call failed: {e}") + + print() + print("=" * 60) + if silent_success: + print("✅ Refresh test passed — silent re-auth works!") + else: + print("✅ Refresh test passed — full re-login fallback works!") + print("=" * 60) + return True + + +if __name__ == '__main__': + if '--refresh' in sys.argv: + sys.argv.remove('--refresh') + test_mode = 'refresh' + else: + test_mode = 'login' + + # Get credentials from args or env + if len(sys.argv) == 3: + email = sys.argv[1] + password = sys.argv[2] + else: + email = os.environ.get('VW_EMAIL') + password = os.environ.get('VW_PASSWORD') + + if not email or not password: + print("Usage: python test_login.py [--refresh] ") + print(" or: VW_EMAIL=... VW_PASSWORD=... python test_login.py [--refresh]") + print() + print(" --refresh Test token refresh (re-login) after successful login") + sys.exit(1) + + if test_mode == 'refresh': + success = test_refresh(email, password) + else: + success = test_login(email, password) + sys.exit(0 if success else 1) diff --git a/weconnect/auth/vw_web_session.py b/weconnect/auth/vw_web_session.py index 05c6c0d..249745e 100644 --- a/weconnect/auth/vw_web_session.py +++ b/weconnect/auth/vw_web_session.py @@ -34,7 +34,7 @@ def __init__(self, sessionuser, acceptTermsOnLogin=False, **kwargs): self.websession.proxies.update(self.proxies) self.websession.mount('https://', HTTPAdapter(max_retries=retries)) self.websession.headers = CaseInsensitiveDict({ - 'user-agent': 'Volkswagen/3.51.1-android/14', + 'user-agent': 'Volkswagen/3.61.0-android/14', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,' 'application/signed-exchange;v=b3', 'accept-language': 'en-US,en;q=0.9', @@ -322,10 +322,14 @@ def _handle_new_auth_flow(self, url: str) -> str: raise APICompatibilityError('Could not find state token in authorization page') # Create login form data + # Note: 'action=default' is required by Auth0 Universal Login to + # distinguish credential submission from other form actions. + # Without it, the login POST can be silently ignored. login_form = { 'username': self.sessionuser.username, 'password': self.sessionuser.password, - 'state': state + 'state': state, + 'action': 'default' } # Post to login URL diff --git a/weconnect/auth/we_connect_session.py b/weconnect/auth/we_connect_session.py index 302553f..d7be22f 100644 --- a/weconnect/auth/we_connect_session.py +++ b/weconnect/auth/we_connect_session.py @@ -1,18 +1,13 @@ import json import logging -import requests -from urllib.parse import parse_qsl, urlparse - -from oauthlib.common import add_params_to_uri, generate_nonce, to_unicode -from oauthlib.oauth2 import InsecureTransportError -from oauthlib.oauth2 import is_secure_transport +from oauthlib.common import to_unicode from requests.models import CaseInsensitiveDict from weconnect.auth.openid_session import AccessType from weconnect.auth.vw_web_session import VWWebSession -from weconnect.errors import AuthentificationError, RetrievalError, TemporaryAuthentificationError +from weconnect.errors import TemporaryAuthentificationError LOG = logging.getLogger("weconnect") @@ -22,7 +17,7 @@ class WeConnectSession(VWWebSession): def __init__(self, sessionuser, **kwargs): super(WeConnectSession, self).__init__(client_id='a24fba63-34b3-4d43-b181-942111e6bda8@apps_vw-dilab_com', refresh_url='https://identity.vwgroup.io/oidc/v1/token', - scope='openid profile badge cars dealers vin', + scope='openid profile badge cars dealers vin offline_access', redirect_uri='weconnect://authenticated', state=None, sessionuser=sessionuser, @@ -33,7 +28,7 @@ def __init__(self, sessionuser, **kwargs): 'content-type': 'application/json', 'content-version': '1', 'x-newrelic-id': 'VgAEWV9QDRAEXFlRAAYPUA==', - 'user-agent': 'Volkswagen/3.51.1-android/14', + 'user-agent': 'Volkswagen/3.61.0-android/14', 'accept-language': 'de-de', 'Cache-Control': 'no-cache', 'Pragma': 'no-cache', @@ -66,34 +61,42 @@ def request( def login(self): super(WeConnectSession, self).login() - authorizationUrl = self.authorizationUrl(url='https://identity.vwgroup.io/oidc/v1/authorize') - response = self.doWebAuth(authorizationUrl) - self.fetchTokens('https://emea.bff.cariad.digital/user-login/login/v1', + auth_url = self.authorizationUrl(url='https://identity.vwgroup.io/oidc/v1/authorize') + response = self.doWebAuth(auth_url) + self.fetchTokens('https://identity.vwgroup.io/oidc/v1/token', authorization_response=response ) def refresh(self): - self.refreshTokens( - 'https://emea.bff.cariad.digital/login/v1/idk/token', - ) - - def authorizationUrl(self, url, state=None, **kwargs): - if state is not None: - raise AuthentificationError('Do not provide state') - - params = [(('redirect_uri', self.redirect_uri)), - (('nonce', generate_nonce()))] - - authUrl = add_params_to_uri('https://emea.bff.cariad.digital/user-login/v1/authorize', params) - - tryLoginResponse: requests.Response = self.get(authUrl, allow_redirects=False, access_type=AccessType.NONE) - redirect = tryLoginResponse.headers['Location'] - query = urlparse(redirect).query - params = dict(parse_qsl(query)) - if 'state' in params: - self.state = params.get('state') - - return redirect + """Refresh tokens via silent re-auth or fall back to full re-login. + + The OIDC hybrid flow (response_type=code id_token token) does not + issue a refresh_token. To get fresh tokens we re-authenticate: + 1. First try prompt=none — if self.websession still has a valid + Auth0 SSO session cookie, Auth0 returns new tokens silently + without showing a login form. + 2. If that fails (session expired, or Auth0 rejects silent auth), + fall back to a full login with credentials. + """ + LOG.info('Access token expired — attempting silent re-authentication via Auth0 session cookie.') + try: + # Try silent re-auth: add prompt=none so Auth0 uses the existing + # SSO session cookie instead of showing the login form. + auth_url = self.authorizationUrl( + url='https://identity.vwgroup.io/oidc/v1/authorize', + prompt='none', + ) + response = self.doWebAuth(auth_url) + self.fetchTokens('https://identity.vwgroup.io/oidc/v1/token', + authorization_response=response) + if self.accessToken: + LOG.info('Silent re-authentication succeeded (Auth0 session was still valid).') + return + except Exception as e: + LOG.debug('Silent re-auth failed (%s), falling back to full login.', e) + + LOG.info('Performing full re-login with credentials.') + self.login() def clearTokens(self) -> None: """ @@ -112,42 +115,37 @@ def fetchTokens( authorization_response=None, **kwargs ): + """Extract tokens from OIDC hybrid flow callback URL. + + With hybrid flow (response_type=code id_token token), the OAuth + callback URL already contains access_token and id_token directly. + No server-side token exchange is needed because Auth0 binds the + authorization code to the CARIAD BFF as the authorized exchanger — + a direct POST to identity.vwgroup.io/oidc/v1/token would return + 401 access_denied. + + This follows the same approach as robinostlund/volkswagencarnet#333. + """ self.parseFromFragment(authorization_response) - if all(key in self.token for key in ('state', 'id_token', 'access_token', 'code')): - body: str = json.dumps( - { - 'state': self.token['state'], - 'id_token': self.token['id_token'], - 'redirect_uri': self.redirect_uri, - 'region': 'emea', - 'access_token': self.token['access_token'], - 'authorizationCode': self.token['code'], - }) - - loginHeadersForm: CaseInsensitiveDict = self.headers - loginHeadersForm['accept'] = 'application/json' - - tokenResponse = self.post(token_url, headers=loginHeadersForm, data=body, allow_redirects=False, access_type=AccessType.ID) - if tokenResponse.status_code != requests.codes['ok']: - raise TemporaryAuthentificationError(f'Token could not be fetched due to temporary WeConnect failure: {tokenResponse.status_code}') - token = self.parseFromBody(tokenResponse.text) - - # Ensure the token is properly stored in the session - if token is not None: - self.token = token # Explicitly store the token - LOG.debug(f"Successfully fetched tokens. Access token expires in: {token.get('expires_in', 'unknown')} seconds") - LOG.debug(f"Refresh token available: {'refresh_token' in token}") - # Verify critical tokens are present - if not all(key in token for key in ('access_token', 'id_token', 'refresh_token')): - LOG.warning("Some expected tokens are missing from the response") - else: - LOG.error("Token parsing returned None") - - return token - else: - LOG.error("Authorization response missing required tokens") - return None + if self.token is None: + raise TemporaryAuthentificationError('Failed to parse tokens from authorization response') + + if 'access_token' not in self.token: + raise TemporaryAuthentificationError( + 'No access token found in authorization response. ' + 'The OIDC hybrid flow callback did not return an access token.' + ) + + LOG.info('Successfully obtained tokens from OIDC hybrid flow callback') + LOG.debug('Access token expires in: %s seconds', self.token.get('expires_in', 'unknown')) + + # OIDC hybrid flow does not return refresh_token for security + # reasons. Re-login will be required when the access_token expires. + if 'refresh_token' not in self.token: + LOG.debug('No refresh token in response (expected with hybrid flow)') + + return self.token def parseFromBody(self, token_response, state=None): try: @@ -177,65 +175,12 @@ def refreshTokens( proxies=None, **kwargs ): - LOG.info('Refreshing tokens') - if not token_url: - raise ValueError("No token endpoint set for auto_refresh.") - - if not is_secure_transport(token_url): - raise InsecureTransportError() - - refresh_token = refresh_token or self.refreshToken - - if headers is None: - headers = self.headers - - # First try to get from the current token property, then fall back to stored token - if refresh_token is None: - refresh_token = self.refreshToken - # If still None, try to get from the token dict directly - if refresh_token is None and self.token is not None: - refresh_token = self.token.get('refresh_token') - - if not refresh_token: - raise AuthentificationError('No refresh token available. Please log in again.') - - # Create headers matching the examples format - tHeaders = { - "Accept-Encoding": "gzip, deflate, br", - "Connection": "keep-alive", - "Content-Type": "application/x-www-form-urlencoded", - "User-Agent": "Volkswagen/3.51.1-android/14", - "x-android-package-name": "com.volkswagen.weconnect", - } - - # Create form data body matching the examples format - body = { - "grant_type": "refresh_token", - "refresh_token": refresh_token, - "client_id": self.client_id, - } - - # Request new tokens using POST with form data - tokenResponse = self.post( - token_url, - data=body, - headers=tHeaders, - timeout=timeout, - verify=verify, - proxies=proxies, - ) - if tokenResponse.status_code == requests.codes['unauthorized']: - LOG.error('Token refresh failed with 401 - server requests new authorization. Refresh token may be expired or invalid.') - raise AuthentificationError('Refreshing tokens failed: Server requests new authorization. Please log in again.') - elif tokenResponse.status_code in (requests.codes['internal_server_error'], requests.codes['service_unavailable'], requests.codes['gateway_timeout']): - raise TemporaryAuthentificationError(f'Token could not be refreshed due to temporary WeConnect failure: {tokenResponse.status_code}') - elif tokenResponse.status_code == requests.codes['ok']: - newToken = self.parseFromBody(tokenResponse.text) - if newToken is not None and "refresh_token" not in newToken: - LOG.debug("No new refresh token given. Re-using old.") - self.token["refresh_token"] = refresh_token - # Update the token property as well - self.token = newToken - return newToken - else: - raise RetrievalError(f'Status Code from WeConnect while refreshing tokens was: {tokenResponse.status_code}') + """Token refresh is not available with OIDC hybrid flow. + + The hybrid flow does not return a refresh_token — Auth0 issues no + refresh tokens for security reasons with response_type=code id_token + token. When tokens expire, a full re-login is required. + """ + LOG.info('Token refresh not available (OIDC hybrid flow). Performing full re-login.') + self.login() + return self.token