from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, EmailStr
from urllib.parse import urlencode
import boto3
import os
import httpx
from typing import Optional
from dotenv import load_dotenv
import hmac
import hashlib
import base64
load_dotenv()
AWS_REGION = os.getenv("AWS_REGION")
USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID")
COGNITO_DOMAIN = os.getenv("COGNITO_DOMAIN")
COGNITO_CLIENT_ID = os.getenv("COGNITO_CLIENT_ID")
COGNITO_CLIENT_SECRET = os.getenv("COGNITO_CLIENT_SECRET", None)
COGNITO_REDIRECT_URI = os.getenv("COGNITO_REDIRECT_URI")
OKTA_BASE_URL = os.getenv("OKTA_BASE_URL")
OKTA_API_TOKEN = os.getenv("OKTA_API_TOKEN")
cognito = boto3.client("cognito-idp", region_name=AWS_REGION)
app = FastAPI()
class TokenPayload(BaseModel):
access_token: str
class RegisterRequest(BaseModel):
username: EmailStr
password: str
name: Optional[str] = None
def get_secret_hash(username: str) -> str:
message = username + COGNITO_CLIENT_ID
digest = hmac.new(
COGNITO_CLIENT_SECRET.encode("utf-8"),
msg=message.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(digest).decode()
@app.get("/")
def root():
return {"message": "API de prueba Cognito + Okta funcionando ✅"}
@app.get("/cognito/ping")
def cognito_ping():
"""
Verifica que podemos conectarnos al User Pool.
"""
try:
resp = cognito.describe_user_pool(UserPoolId=os.getenv("COGNITO_USER_POOL_ID"))
return {
"status": "ok",
"status": "ok",
"user_pool_id": os.getenv("COGNITO_USER_POOL_ID"),
"user_pool_name": resp["UserPool"]["Name"],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cognito/users")
def list_users():
"""
Lista usuarios del User Pool (incluye los federados por Okta).
"""
try:
resp = cognito.list_users(UserPoolId=os.getenv("COGNITO_USER_POOL_ID"))
return resp["Users"]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/cognito/exchange-code")
def exchange_code(code: str = Query(...)):
"""
Recibe el `code` (como el que te llegó en CloudFront) y lo intercambia por tokens.
"""
token_url = f"https://{COGNITO_DOMAIN}/oauth2/token"
data = {
"grant_type": "authorization_code",
"client_id": COGNITO_CLIENT_ID,
"code": code,
"redirect_uri": COGNITO_REDIRECT_URI,
}
headers = {
"Content-Type": "application/x-www-form-urlencoded"
}
auth = None
if COGNITO_CLIENT_SECRET:
import base64
basic = base64.b64encode(f"{COGNITO_CLIENT_ID}:{COGNITO_CLIENT_SECRET}".encode()).decode()
headers["Authorization"] = f"Basic {basic}"
try:
with httpx.Client() as http_client:
resp = http_client.post(token_url, data=data, headers=headers)
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=resp.text)
tokens = resp.json()
return tokens
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/cognito/me")
def get_me(payload: TokenPayload):
try:
resp = cognito.get_user(AccessToken=payload.access_token)
return resp
except cognito.exceptions.NotAuthorizedException as e:
raise HTTPException(status_code=401, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
class LoginRequest(BaseModel):
username: str
password: str
def okta_user_exists(email: str) -> bool:
"""
Llama a: {{host_okta}}/api/v1/users?search=profile.email eq "email"
y revisa si hay usuarios.
"""
if not OKTA_BASE_URL or not OKTA_API_TOKEN:
return False
url = f"{OKTA_BASE_URL}/api/v1/users"
params = {
"search": f'profile.email eq "{email}"'
}
headers = {
"Authorization": f"SSWS {OKTA_API_TOKEN}",
"Accept": "application/json",
}
with httpx.Client() as client:
resp = client.get(url, headers=headers, params=params)
if resp.status_code != 200:
print("Error llamando a Okta:", resp.status_code, resp.text)
return False
users = resp.json()
return len(users) > 0
def build_okta_auth_url(email: str) -> str:
"""
Construye la URL de autenticación usando la Hosted UI de Cognito
forzando el IdP = Okta.
"""
query = {
"client_id": COGNITO_CLIENT_ID,
"response_type": "code",
"redirect_uri": COGNITO_REDIRECT_URI,
"scope": "aws.cognito.signin.user.admin email openid profile",
"identity_provider": "Okta",
"login_hint": email,
}
return f"https://{COGNITO_DOMAIN}/oauth2/authorize?{urlencode(query)}"
@app.post("/auth/login")
def smart_login(req: LoginRequest):
email = req.username
if okta_user_exists(email):
okta_auth_url = build_okta_auth_url(email)
return {
"auth_via": "okta",
"message": "Usuario gestionado por Okta. Redirige al usuario a esta URL.",
"redirect_to": okta_auth_url,
}
try:
resp = cognito.initiate_auth(
ClientId=COGNITO_CLIENT_ID,
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": email,
"PASSWORD": req.password,
},
)
auth_result = resp.get("AuthenticationResult", {})
return {
"auth_via": "cognito",
"access_token": auth_result.get("AccessToken"),
"id_token": auth_result.get("IdToken"),
"refresh_token": auth_result.get("RefreshToken"),
"expires_in": auth_result.get("ExpiresIn"),
"token_type": auth_result.get("TokenType"),
}
except cognito.exceptions.NotAuthorizedException:
raise HTTPException(status_code=401, detail="Usuario o contraseña incorrectos")
except cognito.exceptions.UserNotFoundException:
raise HTTPException(status_code=404, detail="Usuario no encontrado en Cognito")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/auth/register")
def register(req: RegisterRequest):
email = req.username
if okta_user_exists(email):
okta_auth_url = build_okta_auth_url(email)
return {
"auth_via": "okta",
"message": "Este usuario ya existe en Okta. Debe autenticarse mediante Okta.",
"redirect_to": okta_auth_url,
}
try:
user_attributes = [
{"Name": "email", "Value": email},
]
if req.name:
user_attributes.append({"Name": "name", "Value": req.name})
resp = cognito.sign_up(
ClientId=COGNITO_CLIENT_ID,
SecretHash=get_secret_hash(email),
Username=email,
Password=req.password,
UserAttributes=user_attributes,
)
return {
"auth_via": "cognito",
"message": "Usuario creado en Cognito. Dependiendo de la configuración, debe confirmar su cuenta.",
"user_sub": resp.get("UserSub"),
"user_confirmed": resp.get("UserConfirmed"),
"code_delivery": resp.get("CodeDeliveryDetails"),
}
except cognito.exceptions.UsernameExistsException:
raise HTTPException(
status_code=409,
detail="El usuario ya existe en Cognito. Intenta iniciar sesión."
)
except cognito.exceptions.InvalidPasswordException as e:
raise HTTPException(
status_code=400,
detail=f"Contraseña no cumple las políticas de Cognito: {str(e)}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))