Skip to main content

DEMO En FastApi

main.py

from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, EmailStr
from urllib.parse import urlencode
import boto3
import os
import httpx
from typing import Optional
from dotenv import load_dotenv
import hmac
import hashlib
import base64

load_dotenv()

AWS_REGION = os.getenv("AWS_REGION")
USER_POOL_ID = os.getenv("COGNITO_USER_POOL_ID")
COGNITO_DOMAIN = os.getenv("COGNITO_DOMAIN")
COGNITO_CLIENT_ID = os.getenv("COGNITO_CLIENT_ID")
COGNITO_CLIENT_SECRET = os.getenv("COGNITO_CLIENT_SECRET", None)
COGNITO_REDIRECT_URI = os.getenv("COGNITO_REDIRECT_URI")

OKTA_BASE_URL = os.getenv("OKTA_BASE_URL")
OKTA_API_TOKEN = os.getenv("OKTA_API_TOKEN")

cognito = boto3.client("cognito-idp", region_name=AWS_REGION)

app = FastAPI()

class TokenPayload(BaseModel):
access_token: str

class RegisterRequest(BaseModel):
username: EmailStr
password: str
name: Optional[str] = None

def get_secret_hash(username: str) -> str:
message = username + COGNITO_CLIENT_ID
digest = hmac.new(
COGNITO_CLIENT_SECRET.encode("utf-8"),
msg=message.encode("utf-8"),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(digest).decode()

@app.get("/")
def root():
return {"message": "API de prueba Cognito + Okta funcionando ✅"}

@app.get("/cognito/ping")
def cognito_ping():
"""
Verifica que podemos conectarnos al User Pool.
"""
try:
resp = cognito.describe_user_pool(UserPoolId=os.getenv("COGNITO_USER_POOL_ID"))
return {
"status": "ok",
"status": "ok",
"user_pool_id": os.getenv("COGNITO_USER_POOL_ID"),
"user_pool_name": resp["UserPool"]["Name"],
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.get("/cognito/users")
def list_users():
"""
Lista usuarios del User Pool (incluye los federados por Okta).
"""
try:
resp = cognito.list_users(UserPoolId=os.getenv("COGNITO_USER_POOL_ID"))
return resp["Users"]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.get("/cognito/exchange-code")
def exchange_code(code: str = Query(...)):
"""
Recibe el `code` (como el que te llegó en CloudFront) y lo intercambia por tokens.
"""
token_url = f"https://{COGNITO_DOMAIN}/oauth2/token"

data = {
"grant_type": "authorization_code",
"client_id": COGNITO_CLIENT_ID,
"code": code,
"redirect_uri": COGNITO_REDIRECT_URI,
}

headers = {
"Content-Type": "application/x-www-form-urlencoded"
}

# Si tu App Client usa client_secret, añade Authorization Basic:
auth = None
if COGNITO_CLIENT_SECRET:
import base64
basic = base64.b64encode(f"{COGNITO_CLIENT_ID}:{COGNITO_CLIENT_SECRET}".encode()).decode()
headers["Authorization"] = f"Basic {basic}"

try:
with httpx.Client() as http_client:
resp = http_client.post(token_url, data=data, headers=headers)
if resp.status_code != 200:
raise HTTPException(status_code=resp.status_code, detail=resp.text)

tokens = resp.json()
return tokens # aquí tendrás access_token, id_token, etc.
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.post("/cognito/me")
def get_me(payload: TokenPayload):
try:
resp = cognito.get_user(AccessToken=payload.access_token)
return resp
except cognito.exceptions.NotAuthorizedException as e:
raise HTTPException(status_code=401, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))


class LoginRequest(BaseModel):
username: str # email
password: str

def okta_user_exists(email: str) -> bool:
"""
Llama a: {{host_okta}}/api/v1/users?search=profile.email eq "email"
y revisa si hay usuarios.
"""
if not OKTA_BASE_URL or not OKTA_API_TOKEN:
# Si no está configurado, asumimos que no usamos Okta
return False

url = f"{OKTA_BASE_URL}/api/v1/users"
params = {
"search": f'profile.email eq "{email}"'
}
headers = {
"Authorization": f"SSWS {OKTA_API_TOKEN}",
"Accept": "application/json",
}

with httpx.Client() as client:
resp = client.get(url, headers=headers, params=params)

if resp.status_code != 200:
# Podrías loguear el error y decidir qué hacer; aquí asumimos que no existe
print("Error llamando a Okta:", resp.status_code, resp.text)
return False

users = resp.json()
return len(users) > 0


def build_okta_auth_url(email: str) -> str:
"""
Construye la URL de autenticación usando la Hosted UI de Cognito
forzando el IdP = Okta.
"""
query = {
"client_id": COGNITO_CLIENT_ID,
"response_type": "code",
"redirect_uri": COGNITO_REDIRECT_URI,
"scope": "aws.cognito.signin.user.admin email openid profile",
"identity_provider": "Okta",
"login_hint": email, # le sugieres a Okta el correo
}
return f"https://{COGNITO_DOMAIN}/oauth2/authorize?{urlencode(query)}"

@app.post("/auth/login")
def smart_login(req: LoginRequest):
email = req.username

# 1) Primero validamos si el email existe en Okta
if okta_user_exists(email):
# En vez de hacer login directo en Cognito,
# devolvemos la URL para que el front redirija a Okta (vía Hosted UI).
okta_auth_url = build_okta_auth_url(email)
return {
"auth_via": "okta",
"message": "Usuario gestionado por Okta. Redirige al usuario a esta URL.",
"redirect_to": okta_auth_url,
}

# 2) Si no existe en Okta, usamos login directo con Cognito
try:
resp = cognito.initiate_auth(
ClientId=COGNITO_CLIENT_ID,
AuthFlow="USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": email,
"PASSWORD": req.password,
},
)

auth_result = resp.get("AuthenticationResult", {})
return {
"auth_via": "cognito",
"access_token": auth_result.get("AccessToken"),
"id_token": auth_result.get("IdToken"),
"refresh_token": auth_result.get("RefreshToken"),
"expires_in": auth_result.get("ExpiresIn"),
"token_type": auth_result.get("TokenType"),
}

except cognito.exceptions.NotAuthorizedException:
raise HTTPException(status_code=401, detail="Usuario o contraseña incorrectos")
except cognito.exceptions.UserNotFoundException:
raise HTTPException(status_code=404, detail="Usuario no encontrado en Cognito")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

@app.post("/auth/register")
def register(req: RegisterRequest):
email = req.username

# 1) Primero validamos si el email existe en Okta
if okta_user_exists(email):
# Usuario gestionado por Okta, no lo creamos en Cognito,
# devolvemos URL para que se autentique con Okta.
okta_auth_url = build_okta_auth_url(email)
return {
"auth_via": "okta",
"message": "Este usuario ya existe en Okta. Debe autenticarse mediante Okta.",
"redirect_to": okta_auth_url,
}

# 2) Si no existe en Okta, creamos el usuario en Cognito
try:
user_attributes = [
{"Name": "email", "Value": email},
]

if req.name:
user_attributes.append({"Name": "name", "Value": req.name})

resp = cognito.sign_up(
ClientId=COGNITO_CLIENT_ID,
SecretHash=get_secret_hash(email),
Username=email,
Password=req.password,
UserAttributes=user_attributes,
)

return {
"auth_via": "cognito",
"message": "Usuario creado en Cognito. Dependiendo de la configuración, debe confirmar su cuenta.",
"user_sub": resp.get("UserSub"),
"user_confirmed": resp.get("UserConfirmed"),
"code_delivery": resp.get("CodeDeliveryDetails"),
}

except cognito.exceptions.UsernameExistsException:
# Ya existe en Cognito (pero no en Okta). Puedes decidir forzar login directo aquí o devolver error.
raise HTTPException(
status_code=409,
detail="El usuario ya existe en Cognito. Intenta iniciar sesión."
)
except cognito.exceptions.InvalidPasswordException as e:
raise HTTPException(
status_code=400,
detail=f"Contraseña no cumple las políticas de Cognito: {str(e)}"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))

.env.example

AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_REGION=

COGNITO_DOMAIN=
COGNITO_USER_POOL_ID=
COGNITO_CLIENT_ID=
COGNITO_CLIENT_SECRET=
COGNITO_PROVIDER_NAME=
COGNITO_REDIRECT_URI=

OKTA_BASE_URL=
OKTA_API_TOKEN=