Skip to main content

Best Practices

This guide establishes best practices for backend development using Django and Django REST Framework, covering fundamental aspects such as testing, error handling, logging, and monitoring specific to these frameworks.

Testing Guidelines with Django

Testing Strategy

We follow a testing pyramid that prioritizes:

  1. Unit Tests (70%): Tests for models, serializers, and individual functions
  2. Integration Tests (20%): Tests for views, APIs, and component interactions
  3. End-to-End Tests (10%): Complete user flow tests

Unit Testing with Django

Fundamental Principles

  • Isolation: Use TestCase or TransactionTestCase as needed
  • Speed: Unit tests should execute quickly
  • Clarity: Descriptive names following Django conventions
  • Coverage: Aim for a minimum 80% coverage in business logic

Test Structure

from django.test import TestCase
from django.contrib.auth import get_user_model
from rest_framework.test import APITestCase
from rest_framework import status

User = get_user_model()

class UserModelTest(TestCase):
def setUp(self):
self.user_data = {
'email': 'test@example.com',
'username': 'testuser',
'password': 'testpass123'
}

def test_create_user_with_valid_data(self):
"""Test creating a user with valid data"""
user = User.objects.create_user(**self.user_data)

self.assertEqual(user.email, self.user_data['email'])
self.assertEqual(user.username, self.user_data['username'])
self.assertTrue(user.check_password(self.user_data['password']))
self.assertTrue(user.is_active)

def test_user_str_representation(self):
"""Test the string representation of user"""
user = User.objects.create_user(**self.user_data)
self.assertEqual(str(user), user.email)

Django Model Testing

from django.test import TestCase
from django.core.exceptions import ValidationError
from django.db import IntegrityError
from myapp.models import Product, Category

class ProductModelTest(TestCase):
def setUp(self):
self.category = Category.objects.create(
name='Electronics',
slug='electronics'
)

def test_product_creation(self):
"""Test creating a product with valid data"""
product = Product.objects.create(
name='Laptop',
slug='laptop',
price=999.99,
category=self.category
)

self.assertEqual(product.name, 'Laptop')
self.assertEqual(product.category, self.category)
self.assertTrue(product.is_active)

def test_product_slug_uniqueness(self):
"""Test that product slugs must be unique"""
Product.objects.create(
name='Laptop 1',
slug='laptop',
price=999.99,
category=self.category
)

with self.assertRaises(IntegrityError):
Product.objects.create(
name='Laptop 2',
slug='laptop', # Duplicate slug
price=1299.99,
category=self.category
)

API Testing with DRF

ViewSet and API Testing

from rest_framework.test import APITestCase, APIClient
from rest_framework import status
from django.contrib.auth import get_user_model
from django.urls import reverse

User = get_user_model()

class UserAPITest(APITestCase):
def setUp(self):
self.client = APIClient()
self.user = User.objects.create_user(
email='test@example.com',
username='testuser',
password='testpass123'
)
self.admin_user = User.objects.create_superuser(
email='admin@example.com',
username='admin',
password='adminpass123'
)

def test_get_user_list_authenticated(self):
"""Test retrieving user list with authentication"""
self.client.force_authenticate(user=self.admin_user)
url = reverse('user-list')
response = self.client.get(url)

self.assertEqual(response.status_code, status.HTTP_200_OK)
self.assertEqual(len(response.data['results']), 2)

def test_get_user_list_unauthenticated(self):
"""Test retrieving user list without authentication"""
url = reverse('user-list')
response = self.client.get(url)

self.assertEqual(response.status_code, status.HTTP_401_UNAUTHORIZED)

def test_create_user_valid_data(self):
"""Test creating user with valid data"""
url = reverse('user-list')
data = {
'email': 'newuser@example.com',
'username': 'newuser',
'password': 'newpass123'
}
response = self.client.post(url, data, format='json')

self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertTrue(User.objects.filter(email=data['email']).exists())

Serializer Testing

from django.test import TestCase
from rest_framework import serializers
from myapp.serializers import UserSerializer, ProductSerializer
from myapp.models import Product, Category

class UserSerializerTest(TestCase):
def test_valid_user_serializer(self):
"""Test user serializer with valid data"""
data = {
'email': 'test@example.com',
'username': 'testuser',
'password': 'testpass123'
}
serializer = UserSerializer(data=data)

self.assertTrue(serializer.is_valid())
user = serializer.save()
self.assertEqual(user.email, data['email'])

def test_invalid_email_format(self):
"""Test user serializer with invalid email"""
data = {
'email': 'invalid-email',
'username': 'testuser',
'password': 'testpass123'
}
serializer = UserSerializer(data=data)

self.assertFalse(serializer.is_valid())
self.assertIn('email', serializer.errors)

Integration Testing

Database Testing

from django.test import TransactionTestCase
from django.db import transaction
from myapp.models import Order, Product

class OrderIntegrationTest(TransactionTestCase):
def setUp(self):
self.product = Product.objects.create(
name='Test Product',
price=99.99,
stock=10
)

def test_order_creation_with_stock_update(self):
"""Test order creation updates product stock"""
initial_stock = self.product.stock

with transaction.atomic():
order = Order.objects.create(
product=self.product,
quantity=2
)
self.product.stock -= order.quantity
self.product.save()

self.product.refresh_from_db()
self.assertEqual(self.product.stock, initial_stock - 2)

Django Testing Tools

  • Django TestCase: For tests that require database
  • Django TransactionTestCase: For tests that require transactions
  • DRF APITestCase: For REST API testing
  • Factory Boy: For creating test data
  • pytest-django: Alternative to unittest with more features
  • coverage.py: For measuring code coverage

Error Handling with Django and DRF

Custom Exceptions

from rest_framework import status
from rest_framework.exceptions import APIException
from django.core.exceptions import ValidationError as DjangoValidationError

class CustomAPIException(APIException):
"""Base class for custom API exceptions"""
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = 'A server error occurred.'
default_code = 'error'

class BusinessLogicError(CustomAPIException):
"""Exception for business logic violations"""
status_code = status.HTTP_400_BAD_REQUEST
default_detail = 'Business logic error occurred.'
default_code = 'business_logic_error'

class ResourceNotFoundError(CustomAPIException):
"""Exception for resource not found"""
status_code = status.HTTP_404_NOT_FOUND
default_detail = 'Resource not found.'
default_code = 'not_found'

class InsufficientPermissionsError(CustomAPIException):
"""Exception for insufficient permissions"""
status_code = status.HTTP_403_FORBIDDEN
default_detail = 'Insufficient permissions.'
default_code = 'insufficient_permissions'

Custom Exception Handler

# core/exceptions.py
from rest_framework.views import exception_handler
from rest_framework.response import Response
from rest_framework import status
import logging

logger = logging.getLogger(__name__)

def custom_exception_handler(exc, context):
"""Custom exception handler for DRF"""
# Call REST framework's default exception handler first
response = exception_handler(exc, context)

# Log the exception
request = context.get('request')
logger.error(
f"API Exception: {exc.__class__.__name__}: {str(exc)}",
extra={
'request_path': request.path if request else None,
'request_method': request.method if request else None,
'user': str(request.user) if request and hasattr(request, 'user') else None,
'exception_type': exc.__class__.__name__,
},
exc_info=True
)

if response is not None:
# Customize error response format
custom_response_data = {
'error': {
'status_code': response.status_code,
'message': get_error_message(response.data),
'code': getattr(exc, 'default_code', 'error'),
'details': response.data if isinstance(response.data, dict) else None
}
}
response.data = custom_response_data

return response

def get_error_message(data):
"""Extract meaningful error message from response data"""
if isinstance(data, dict):
if 'detail' in data:
return data['detail']
elif 'non_field_errors' in data:
return data['non_field_errors'][0] if data['non_field_errors'] else 'Validation error'
else:
# Return first error message found
for key, value in data.items():
if isinstance(value, list) and value:
return f"{key}: {value[0]}"
elif isinstance(value, str):
return f"{key}: {value}"
return str(data)

Serializer Validation

from rest_framework import serializers
from django.contrib.auth import get_user_model
from django.core.validators import validate_email

User = get_user_model()

class UserSerializer(serializers.ModelSerializer):
password_confirm = serializers.CharField(write_only=True)

class Meta:
model = User
fields = ['id', 'email', 'username', 'password', 'password_confirm']
extra_kwargs = {
'password': {'write_only': True, 'min_length': 8}
}

def validate_email(self, value):
"""Validate email format and uniqueness"""
validate_email(value)

if User.objects.filter(email=value).exists():
raise serializers.ValidationError(
"A user with this email already exists."
)
return value

def validate(self, attrs):
"""Cross-field validation"""
password = attrs.get('password')
password_confirm = attrs.get('password_confirm')

if password != password_confirm:
raise serializers.ValidationError({
'password_confirm': 'Passwords do not match.'
})

# Remove password_confirm from validated data
attrs.pop('password_confirm', None)
return attrs

def create(self, validated_data):
"""Create user with hashed password"""
password = validated_data.pop('password')
user = User.objects.create_user(password=password, **validated_data)
return user

Error Handling in Views

from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from django.db import transaction
from django.core.exceptions import ValidationError
from .exceptions import BusinessLogicError, ResourceNotFoundError

class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer

@action(detail=True, methods=['post'])
def purchase(self, request, pk=None):
"""Handle product purchase with proper error handling"""
try:
product = self.get_object()
quantity = request.data.get('quantity', 1)

# Validate business logic
if quantity <= 0:
raise BusinessLogicError("Quantity must be greater than 0")

if product.stock < quantity:
raise BusinessLogicError(
f"Insufficient stock. Available: {product.stock}, Requested: {quantity}"
)

# Perform transaction
with transaction.atomic():
product.stock -= quantity
product.save()

# Create order logic here
order = self.create_order(product, quantity, request.user)

return Response({
'message': 'Purchase successful',
'order_id': order.id,
'remaining_stock': product.stock
}, status=status.HTTP_201_CREATED)

except Product.DoesNotExist:
raise ResourceNotFoundError("Product not found")
except ValidationError as e:
raise BusinessLogicError(str(e))
except Exception as e:
logger.error(f"Unexpected error in purchase: {str(e)}")
raise CustomAPIException("An unexpected error occurred")

Standard Error Response Format

# Respuesta de éxito
{
"data": {
"id": 1,
"name": "Product Name",
"price": 99.99
}
}

# Respuesta de error de validación
{
"error": {
"status_code": 400,
"message": "email: This field is required.",
"code": "validation_error",
"details": {
"email": ["This field is required."],
"password": ["This field must be at least 8 characters long."]
}
}
}

# Respuesta de error de negocio
{
"error": {
"status_code": 400,
"message": "Insufficient stock. Available: 5, Requested: 10",
"code": "business_logic_error",
"details": null
}
}

# Authentication error response
{
"error": {
"status_code": 401,
"message": "Authentication credentials were not provided.",
"code": "not_authenticated",
"details": null
}
}

Settings Configuration

# settings.py
REST_FRAMEWORK = {
'EXCEPTION_HANDLER': 'core.exceptions.custom_exception_handler',
'DEFAULT_RENDERER_CLASSES': [
'rest_framework.renderers.JSONRenderer',
],
}

# Logging configuration
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'ERROR',
'class': 'logging.FileHandler',
'filename': 'logs/django_errors.log',
},
},
'loggers': {
'core.exceptions': {
'handlers': ['file'],
'level': 'ERROR',
'propagate': True,
},
},
}

Logging and Monitoring with Django

Django Logging Configuration

Settings Configuration

# settings.py
import os

LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
'json': {
'format': '{"level": "%(levelname)s", "time": "%(asctime)s", "module": "%(module)s", "message": "%(message)s"}',
},
},
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(BASE_DIR, 'logs', 'django.log'),
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'verbose',
},
'error_file': {
'level': 'ERROR',
'class': 'logging.handlers.RotatingFileHandler',
'filename': os.path.join(BASE_DIR, 'logs', 'django_errors.log'),
'maxBytes': 1024*1024*15, # 15MB
'backupCount': 10,
'formatter': 'verbose',
},
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'formatter': 'simple'
},
},
'root': {
'handlers': ['console'],
},
'loggers': {
'django': {
'handlers': ['file', 'console'],
'level': 'INFO',
'propagate': False,
},
'django.request': {
'handlers': ['error_file'],
'level': 'ERROR',
'propagate': False,
},
'myapp': { # Replace with your app name
'handlers': ['file', 'console'],
'level': 'DEBUG',
'propagate': False,
},
},
}

Using Logging in Views and Models

import logging
from django.contrib.auth import get_user_model
from rest_framework import viewsets, status
from rest_framework.response import Response

logger = logging.getLogger(__name__)
User = get_user_model()

class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer

def create(self, request, *args, **kwargs):
"""Create user with logging"""
logger.info(
f"User creation attempt",
extra={
'email': request.data.get('email'),
'ip_address': self.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT')
}
)

try:
response = super().create(request, *args, **kwargs)

logger.info(
f"User created successfully",
extra={
'user_id': response.data['id'],
'email': response.data['email']
}
)

return response

except Exception as e:
logger.error(
f"User creation failed: {str(e)}",
extra={
'email': request.data.get('email'),
'error_type': e.__class__.__name__
},
exc_info=True
)
raise

def get_client_ip(self, request):
"""Get client IP address"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip

Logging Middleware

# core/middleware.py
import logging
import time
import uuid

logger = logging.getLogger(__name__)

class RequestLoggingMiddleware:
"""Middleware to log all requests and responses"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
# Generate unique request ID
request.id = str(uuid.uuid4())
start_time = time.time()

# Log request
logger.info(
"Request started",
extra={
'request_id': request.id,
'method': request.method,
'path': request.path,
'user': str(request.user) if hasattr(request, 'user') else 'Anonymous',
'ip_address': self.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT', '')
}
)

response = self.get_response(request)

# Calculate response time
duration = time.time() - start_time

# Log response
logger.info(
"Request completed",
extra={
'request_id': request.id,
'status_code': response.status_code,
'duration_ms': round(duration * 1000, 2),
'content_length': len(response.content) if hasattr(response, 'content') else 0
}
)

return response

def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip

# Add to MIDDLEWARE in settings.py
MIDDLEWARE = [
'core.middleware.RequestLoggingMiddleware',
# ... other middleware
]

Health Checks with Django

Health Check View

# core/views.py
from django.http import JsonResponse
from django.db import connection
from django.core.cache import cache
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import AllowAny
import redis
import time

@api_view(['GET'])
@permission_classes([AllowAny])
def health_check(request):
"""Comprehensive health check endpoint"""
health_status = {
'status': 'healthy',
'timestamp': time.time(),
'services': {}
}

# Database check
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
health_status['services']['database'] = 'healthy'
except Exception as e:
health_status['services']['database'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'

# Cache check (Redis/Memcached)
try:
cache.set('health_check', 'test', 30)
cache.get('health_check')
health_status['services']['cache'] = 'healthy'
except Exception as e:
health_status['services']['cache'] = f'unhealthy: {str(e)}'
health_status['status'] = 'unhealthy'

# Celery check (if using Celery)
try:
from celery import current_app
inspect = current_app.control.inspect()
stats = inspect.stats()
if stats:
health_status['services']['celery'] = 'healthy'
else:
health_status['services']['celery'] = 'no workers available'
except Exception as e:
health_status['services']['celery'] = f'unhealthy: {str(e)}'

status_code = 200 if health_status['status'] == 'healthy' else 503
return JsonResponse(health_status, status=status_code)

# urls.py
from django.urls import path
from core.views import health_check

urlpatterns = [
path('health/', health_check, name='health_check'),
# ... other URLs
]

Performance Monitoring

Django Debug Toolbar (Development Only)

# settings.py (development)
if DEBUG:
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']

DEBUG_TOOLBAR_CONFIG = {
'SHOW_TOOLBAR_CALLBACK': lambda request: True,
}

Slow Query Monitoring

# core/middleware.py
import logging
from django.db import connection
from django.conf import settings

logger = logging.getLogger(__name__)

class DatabaseQueryLoggingMiddleware:
"""Log slow database queries"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
# Reset queries count
initial_queries = len(connection.queries)

response = self.get_response(request)

# Check for slow queries
if settings.DEBUG:
for query in connection.queries[initial_queries:]:
query_time = float(query['time'])
if query_time > 0.1: # Queries > 100ms
logger.warning(
f"Slow query detected: {query_time}s",
extra={
'query': query['sql'][:200], # Truncate long queries
'duration': query_time,
'path': request.path
}
)

return response

Metrics and Alerts

Custom Management Command for Metrics

# management/commands/collect_metrics.py
from django.core.management.base import BaseCommand
from django.contrib.auth import get_user_model
from django.db.models import Count
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)
User = get_user_model()

class Command(BaseCommand):
help = 'Collect and log application metrics'

def handle(self, *args, **options):
"""Collect various application metrics"""
now = datetime.now()

# User metrics
total_users = User.objects.count()
active_users_24h = User.objects.filter(
last_login__gte=now - timedelta(hours=24)
).count()

new_users_24h = User.objects.filter(
date_joined__gte=now - timedelta(hours=24)
).count()

logger.info(
"Daily metrics collected",
extra={
'total_users': total_users,
'active_users_24h': active_users_24h,
'new_users_24h': new_users_24h,
'timestamp': now.isoformat()
}
)

self.stdout.write(
self.style.SUCCESS(f'Metrics collected at {now}')
)

Logging and Monitoring

  • Django-extensions: Additional development tools
  • Sentry: Error tracking and performance monitoring
  • New Relic: Django-specific APM
  • Datadog: Infrastructure and application monitoring
  • Prometheus + Grafana: Metrics collection and visualization

Performance

  • Django Debug Toolbar: Development debugging
  • django-silk: Request and query profiling
  • django-cache-panel: Cache monitoring
  • django-querycount: Detect N+1 queries

Sentry Configuration

# settings.py
import sentry_sdk
from sentry_sdk.integrations.django import DjangoIntegration
from sentry_sdk.integrations.logging import LoggingIntegration

sentry_logging = LoggingIntegration(
level=logging.INFO, # Capture info and above as breadcrumbs
event_level=logging.ERROR # Send errors as events
)

sentry_sdk.init(
dsn="YOUR_SENTRY_DSN",
integrations=[
DjangoIntegration(
transaction_style='url',
middleware_spans=True,
signals_spans=True,
),
sentry_logging,
],
traces_sample_rate=0.1, # 10% of transactions
send_default_pii=False, # Don't send personal information
environment=os.getenv('ENVIRONMENT', 'development'),
)

Security Best Practices with Django

Authentication and Authorization

Django Authentication System

# settings.py
AUTH_USER_MODEL = 'auth.User' # Custom user model

# Password validation
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
'OPTIONS': {
'min_length': 8,
}
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]

# Session security
SESSION_COOKIE_SECURE = True # HTTPS only
SESSION_COOKIE_HTTPONLY = True # No JavaScript access
SESSION_COOKIE_SAMESITE = 'Strict'
SESSION_EXPIRE_AT_BROWSER_CLOSE = True

Custom User Model

# auth/models.py
from django.contrib.auth.models import AbstractUser
from django.db import models

class User(AbstractUser):
email = models.EmailField(unique=True)
is_verified = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)

USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']

def __str__(self):
return self.email

JWT Authentication with DRF

# settings.py
from datetime import timedelta

SIMPLE_JWT = {
'ACCESS_TOKEN_LIFETIME': timedelta(minutes=15),
'REFRESH_TOKEN_LIFETIME': timedelta(days=7),
'ROTATE_REFRESH_TOKENS': True,
'BLACKLIST_AFTER_ROTATION': True,
'UPDATE_LAST_LOGIN': True,

'ALGORITHM': 'HS256',
'SIGNING_KEY': SECRET_KEY,
'VERIFYING_KEY': None,

'AUTH_HEADER_TYPES': ('Bearer',),
'AUTH_HEADER_NAME': 'HTTP_AUTHORIZATION',
'USER_ID_FIELD': 'id',
'USER_ID_CLAIM': 'user_id',
}

REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework_simplejwt.authentication.JWTAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
}

Custom Permissions

# core/permissions.py
from rest_framework import permissions

class IsOwnerOrReadOnly(permissions.BasePermission):
"""Custom permission to only allow owners to edit objects"""

def has_object_permission(self, request, view, obj):
# Read permissions for any request
if request.method in permissions.SAFE_METHODS:
return True

# Write permissions only to owner
return obj.owner == request.user

class IsAdminOrReadOnly(permissions.BasePermission):
"""Custom permission for admin-only write access"""

def has_permission(self, request, view):
if request.method in permissions.SAFE_METHODS:
return True
return request.user.is_staff

# Usage in ViewSets
class ProductViewSet(viewsets.ModelViewSet):
queryset = Product.objects.all()
serializer_class = ProductSerializer
permission_classes = [IsAdminOrReadOnly]

Input Validation and Sanitization

Serializer Validation

from rest_framework import serializers
from django.core.validators import validate_email
import re

class UserRegistrationSerializer(serializers.ModelSerializer):
password = serializers.CharField(write_only=True, min_length=8)
password_confirm = serializers.CharField(write_only=True)

class Meta:
model = User
fields = ['email', 'username', 'password', 'password_confirm']

def validate_email(self, value):
"""Validate email format and uniqueness"""
validate_email(value)

if User.objects.filter(email=value).exists():
raise serializers.ValidationError("Email already exists")

return value.lower()

def validate_password(self, value):
"""Validate password strength"""
if len(value) < 8:
raise serializers.ValidationError("Password must be at least 8 characters")

if not re.search(r'[A-Z]', value):
raise serializers.ValidationError("Password must contain uppercase letter")

if not re.search(r'[a-z]', value):
raise serializers.ValidationError("Password must contain lowercase letter")

if not re.search(r'\d', value):
raise serializers.ValidationError("Password must contain number")

return value

def validate(self, attrs):
"""Cross-field validation"""
if attrs['password'] != attrs['password_confirm']:
raise serializers.ValidationError("Passwords don't match")

attrs.pop('password_confirm')
return attrs

def create(self, validated_data):
"""Create user with hashed password"""
password = validated_data.pop('password')
user = User.objects.create_user(password=password, **validated_data)
return user

SQL Injection Prevention

# ✅ Use Django ORM (automatically safe)
users = User.objects.filter(email=user_email)

# ✅ Raw queries with parameters
from django.db import connection

def get_user_by_email(email):
with connection.cursor() as cursor:
cursor.execute("SELECT * FROM auth_user WHERE email = %s", [email])
return cursor.fetchone()

# ✅ Extra() with parameters
User.objects.extra(
where=["email = %s"],
params=[email]
)

# ❌ NEVER do this (vulnerable to SQL injection)
User.objects.extra(where=[f"email = '{email}'"]) # VULNERABLE

Security Headers and Middleware

Security Settings

# settings.py

# Security headers
SECURE_BROWSER_XSS_FILTER = True
SECURE_CONTENT_TYPE_NOSNIFF = True
X_FRAME_OPTIONS = 'DENY'
SECURE_HSTS_SECONDS = 31536000 # 1 year
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True

# HTTPS settings (production)
SECURE_SSL_REDIRECT = True
SECURE_PROXY_SSL_HEADER = ('HTTP_X_FORWARDED_PROTO', 'https')

# CSRF protection
CSRF_COOKIE_SECURE = True
CSRF_COOKIE_HTTPONLY = True
CSRF_COOKIE_SAMESITE = 'Strict'

# Content Security Policy
CSP_DEFAULT_SRC = ("'self'",)
CSP_SCRIPT_SRC = ("'self'", "'unsafe-inline'")
CSP_STYLE_SRC = ("'self'", "'unsafe-inline'")
CSP_IMG_SRC = ("'self'", "data:", "https:")

Rate Limiting Middleware

# core/middleware.py
from django.core.cache import cache
from django.http import HttpResponseTooManyRequests
import time

class RateLimitMiddleware:
"""Simple rate limiting middleware"""

def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
# Get client IP
ip = self.get_client_ip(request)

# Rate limit key
cache_key = f"rate_limit_{ip}"

# Get current request count
requests = cache.get(cache_key, 0)

# Check rate limit (100 requests per hour)
if requests >= 100:
return HttpResponseTooManyRequests("Rate limit exceeded")

# Increment counter
cache.set(cache_key, requests + 1, 3600) # 1 hour

response = self.get_response(request)
return response

def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip

CORS Configuration

# settings.py
CORS_ALLOWED_ORIGINS = [
"https://example.com",
"https://www.example.com",
"http://localhost:3000", # Development only
]

CORS_ALLOW_CREDENTIALS = True

CORS_ALLOWED_HEADERS = [
'accept',
'accept-encoding',
'authorization',
'content-type',
'dnt',
'origin',
'user-agent',
'x-csrftoken',
'x-requested-with',
]

# For development only
if DEBUG:
CORS_ALLOW_ALL_ORIGINS = True

Data Protection

Sensitive Data Encryption

# core/encryption.py
from cryptography.fernet import Fernet
from django.conf import settings
import base64

class DataEncryption:
def __init__(self):
key = settings.ENCRYPTION_KEY.encode()
self.cipher_suite = Fernet(base64.urlsafe_b64encode(key[:32]))

def encrypt(self, data):
"""Encrypt sensitive data"""
if isinstance(data, str):
data = data.encode()
return self.cipher_suite.encrypt(data).decode()

def decrypt(self, encrypted_data):
"""Decrypt sensitive data"""
return self.cipher_suite.decrypt(encrypted_data.encode()).decode()

# Usage in models
from django.db import models

class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
_phone_number = models.TextField() # Encrypted field

@property
def phone_number(self):
if self._phone_number:
encryption = DataEncryption()
return encryption.decrypt(self._phone_number)
return None

@phone_number.setter
def phone_number(self, value):
if value:
encryption = DataEncryption()
self._phone_number = encryption.encrypt(value)
else:
self._phone_number = None

Environment Variables

# settings.py
import os
from decouple import config

# Database
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME'),
'USER': config('DB_USER'),
'PASSWORD': config('DB_PASSWORD'),
'HOST': config('DB_HOST', default='localhost'),
'PORT': config('DB_PORT', default='5432'),
}
}

# Security keys
SECRET_KEY = config('SECRET_KEY')
ENCRYPTION_KEY = config('ENCRYPTION_KEY')

# External APIs
EXTERNAL_API_KEY = config('EXTERNAL_API_KEY')

API Security

API Versioning

# urls.py
from django.urls import path, include

urlpatterns = [
path('api/v1/', include('api.v1.urls')),
path('api/v2/', include('api.v2.urls')),
]

# api/v1/urls.py
from rest_framework.routers import DefaultRouter
from .views import UserViewSet

router = DefaultRouter()
router.register(r'users', UserViewSet)

urlpatterns = router.urls

API Throttling

# settings.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_CLASSES': [
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'anon': '100/hour',
'user': '1000/hour'
}
}

# Custom throttling in views
from rest_framework.throttling import UserRateThrottle

class LoginThrottle(UserRateThrottle):
scope = 'login'

class LoginView(APIView):
throttle_classes = [LoginThrottle]
throttle_scope = 'login'

# Add to settings
# 'login': '5/min'

API Deprecation con Django

# core/middleware.py
from django.http import HttpResponse
from django.utils.deprecation import MiddlewareMixin
import re

class APIVersioningMiddleware(MiddlewareMixin):
"""Middleware for API versioning and deprecation warnings"""

def process_request(self, request):
# Extract version from URL or headers
version = request.META.get('HTTP_API_VERSION', 'v1')

# Check URL for version
version_match = re.match(r'/api/v(\d+)/', request.path)
if version_match:
version = f"v{version_match.group(1)}"

request.api_version = version
return None

def process_response(self, request, response):
# Add deprecation warnings for old API versions
if hasattr(request, 'api_version'):
if request.api_version == 'v1':
response['Deprecation'] = 'true'
response['Sunset'] = '2024-12-31'
response['Link'] = '</api/v2/>; rel="successor-version"'

return response

# Custom API versioning with DRF
from rest_framework.versioning import URLPathVersioning, HeaderVersioning

class CustomURLPathVersioning(URLPathVersioning):
"""Custom URL path versioning"""
default_version = 'v1'
allowed_versions = ['v1', 'v2']
version_param = 'version'

class CustomHeaderVersioning(HeaderVersioning):
"""Custom header versioning"""
default_version = 'v1'
allowed_versions = ['v1', 'v2']
version_header = 'HTTP_API_VERSION'

# settings.py
REST_FRAMEWORK = {
'DEFAULT_VERSIONING_CLASS': 'core.versioning.CustomURLPathVersioning',
'DEFAULT_VERSION': 'v1',
'ALLOWED_VERSIONS': ['v1', 'v2'],
}

# Usage in ViewSets
from rest_framework import viewsets
from rest_framework.decorators import action
from rest_framework.response import Response

class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()

def get_serializer_class(self):
"""Return different serializers based on API version"""
if self.request.version == 'v2':
return UserSerializerV2
return UserSerializer

@action(detail=False, methods=['get'])
def deprecated_endpoint(self, request):
"""Example of deprecated endpoint"""
if request.version == 'v1':
# Add deprecation warning
response = Response({
'message': 'This endpoint is deprecated. Use /api/v2/users/new-endpoint/',
'data': []
})
response['Deprecation'] = 'true'
response['Sunset'] = '2024-12-31'
return response

# New implementation for v2
return Response({'message': 'New endpoint implementation'})

Performance Optimization with Django

Database Performance

Query Optimization with Django ORM

from django.db import models
from django.contrib.auth import get_user_model

User = get_user_model()

# ✅ Use select_related for ForeignKey
def get_user_with_profile(user_id):
return User.objects.select_related('profile').get(id=user_id)

# ✅ Use prefetch_related for ManyToMany and reverse ForeignKey
def get_users_with_orders():
return User.objects.prefetch_related('orders').all()

# ✅ Limit fields with only()
def get_user_basic_info():
return User.objects.only('id', 'email', 'username').all()

# ✅ Use defer() to exclude heavy fields
def get_users_without_large_fields():
return User.objects.defer('profile_image', 'bio').all()

# ✅ Efficient pagination
from django.core.paginator import Paginator

def get_paginated_users(page_number, per_page=20):
users = User.objects.all().order_by('-created_at')
paginator = Paginator(users, per_page)
return paginator.get_page(page_number)

# ✅ Efficient aggregations
from django.db.models import Count, Avg

def get_user_statistics():
return User.objects.aggregate(
total_users=Count('id'),
avg_orders=Avg('orders__total')
)

Database Indexes

# models.py
from django.db import models

class User(models.Model):
email = models.EmailField(unique=True, db_index=True)
username = models.CharField(max_length=150, db_index=True)
created_at = models.DateTimeField(auto_now_add=True, db_index=True)

class Meta:
indexes = [
models.Index(fields=['email', 'created_at']),
models.Index(fields=['-created_at']), # Descending order
models.Index(fields=['username'], condition=models.Q(is_active=True)), # Partial index
]

class Order(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
status = models.CharField(max_length=20)

class Meta:
indexes = [
models.Index(fields=['user', '-created_at']),
models.Index(fields=['status', 'created_at']),
]

Caching con Django

# settings.py
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
}
}
}

# Cache timeout settings
CACHE_TTL = 60 * 15 # 15 minutes

# Usage in views
from django.core.cache import cache
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator

# Function-based view caching
@cache_page(60 * 15) # Cache for 15 minutes
def user_list(request):
users = User.objects.all()
return render(request, 'users/list.html', {'users': users})

# Manual caching
def get_user_profile(user_id):
cache_key = f'user_profile_{user_id}'
profile = cache.get(cache_key)

if profile is None:
try:
user = User.objects.select_related('profile').get(id=user_id)
profile = {
'id': user.id,
'email': user.email,
'profile': user.profile.__dict__ if hasattr(user, 'profile') else None
}
cache.set(cache_key, profile, CACHE_TTL)
except User.DoesNotExist:
return None

return profile

# Cache invalidation
def update_user_profile(user_id, data):
user = User.objects.get(id=user_id)
# Update user data
for key, value in data.items():
setattr(user, key, value)
user.save()

# Invalidate cache
cache_key = f'user_profile_{user_id}'
cache.delete(cache_key)

return user

# Class-based view caching
@method_decorator(cache_page(60 * 15), name='dispatch')
class ProductListView(ListView):
model = Product
template_name = 'products/list.html'

Memory Management with Django

QuerySet Optimization

# ✅ Use iterator() for large datasets
def process_large_dataset():
# Avoid loading all objects into memory
for user in User.objects.iterator(chunk_size=1000):
process_user(user)

# ✅ Use bulk operations
def bulk_create_users(user_data_list):
users = [User(**data) for data in user_data_list]
User.objects.bulk_create(users, batch_size=1000)

def bulk_update_users(users_to_update):
User.objects.bulk_update(
users_to_update,
['email', 'username'],
batch_size=1000
)

# ✅ Clean unused QuerySets
def efficient_data_processing():
# Bad: maintains reference to QuerySet
users = User.objects.all()
for user in users:
process_user(user)

# Good: frees memory after processing
for user in User.objects.iterator():
process_user(user)

File Processing con Django

import csv
from django.http import StreamingHttpResponse
from django.core.files.storage import default_storage

class CSVBuffer:
"""Buffer for streaming CSV responses"""
def write(self, value):
return value

def stream_csv_response(queryset, filename):
"""Stream large CSV files without loading everything in memory"""

def generate_csv():
buffer = CSVBuffer()
writer = csv.writer(buffer)

# Write header
yield writer.writerow(['ID', 'Email', 'Username', 'Created'])

# Write data in chunks
for obj in queryset.iterator(chunk_size=1000):
yield writer.writerow([
obj.id,
obj.email,
obj.username,
obj.created_at.strftime('%Y-%m-%d')
])

response = StreamingHttpResponse(
generate_csv(),
content_type='text/csv'
)
response['Content-Disposition'] = f'attachment; filename="{filename}"'
return response

# File upload processing
def process_uploaded_file(uploaded_file):
"""Process uploaded files in chunks"""
chunk_size = 8192

with default_storage.open(uploaded_file.name, 'rb') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
# Process chunk
process_chunk(chunk)

API Performance with DRF

Response Compression

# settings.py
MIDDLEWARE = [
'django.middleware.gzip.GZipMiddleware', # Add at the top
# ... other middleware
]

# Custom compression middleware
class ConditionalGZipMiddleware:
def __init__(self, get_response):
self.get_response = get_response

def __call__(self, request):
response = self.get_response(request)

# Only compress API responses > 1KB
if (hasattr(response, 'content') and
len(response.content) > 1024 and
request.path.startswith('/api/')):

response['Content-Encoding'] = 'gzip'

return response

Pagination con DRF

# settings.py
REST_FRAMEWORK = {
'DEFAULT_PAGINATION_CLASS': 'rest_framework.pagination.PageNumberPagination',
'PAGE_SIZE': 20
}

# Custom pagination
from rest_framework.pagination import PageNumberPagination
from rest_framework.response import Response

class CustomPagination(PageNumberPagination):
page_size = 20
page_size_query_param = 'page_size'
max_page_size = 100

def get_paginated_response(self, data):
return Response({
'pagination': {
'page': self.page.number,
'pages': self.page.paginator.num_pages,
'per_page': self.page_size,
'total': self.page.paginator.count,
'has_next': self.page.has_next(),
'has_previous': self.page.has_previous(),
},
'results': data
})

# Usage in ViewSets
class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
pagination_class = CustomPagination

def get_queryset(self):
queryset = User.objects.all()

# Filtering
email = self.request.query_params.get('email')
if email:
queryset = queryset.filter(email__icontains=email)

# Sorting
ordering = self.request.query_params.get('ordering', '-created_at')
queryset = queryset.order_by(ordering)

return queryset

# Advanced filtering with django-filter
from django_filters import rest_framework as filters

class UserFilter(filters.FilterSet):
email = filters.CharFilter(lookup_expr='icontains')
created_after = filters.DateTimeFilter(field_name='created_at', lookup_expr='gte')
created_before = filters.DateTimeFilter(field_name='created_at', lookup_expr='lte')

class Meta:
model = User
fields = ['email', 'is_active']

class UserViewSet(viewsets.ModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
filterset_class = UserFilter
ordering_fields = ['created_at', 'email', 'username']
ordering = ['-created_at']

Asynchronous Processing with Django

Background Jobs with Celery

# settings.py
import os

# Celery Configuration
CELERY_BROKER_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
CELERY_RESULT_BACKEND = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'

# celery.py
from celery import Celery
import os

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

# tasks.py
from celery import shared_task
from django.core.mail import send_mail
from django.contrib.auth import get_user_model
import logging

logger = logging.getLogger(__name__)
User = get_user_model()

@shared_task(bind=True, max_retries=3)
def send_welcome_email(self, user_id):
"""Send welcome email to user"""
try:
user = User.objects.get(id=user_id)

send_mail(
subject='Welcome to our platform!',
message=f'Hello {user.username}, welcome to our platform!',
from_email='noreply@example.com',
recipient_list=[user.email],
fail_silently=False,
)

logger.info(f'Welcome email sent to user {user_id}')
return f'Email sent successfully to {user.email}'

except User.DoesNotExist:
logger.error(f'User {user_id} not found')
raise
except Exception as exc:
logger.error(f'Failed to send email to user {user_id}: {exc}')
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

@shared_task
def process_bulk_data(data_list):
"""Process large amounts of data in background"""
results = []

for item in data_list:
try:
# Process individual item
result = process_single_item(item)
results.append({'success': True, 'result': result})
except Exception as e:
results.append({'success': False, 'error': str(e)})

return results

# Usage in views
from .tasks import send_welcome_email

class UserRegistrationView(APIView):
def post(self, request):
serializer = UserSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()

# Send welcome email asynchronously
send_welcome_email.delay(user.id)

return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

Batch Processing con Django

from django.db import transaction
from django.utils import timezone
from datetime import timedelta

def batch_process_users(batch_size=1000):
"""Process users in batches to avoid memory issues"""

# Get total count
total_users = User.objects.count()
processed = 0

# Process in batches
while processed < total_users:
batch = User.objects.all()[processed:processed + batch_size]

with transaction.atomic():
for user in batch:
# Process individual user
process_user(user)

processed += batch_size

# Optional: small delay between batches
time.sleep(0.1)

# Management command for batch processing
# management/commands/process_inactive_users.py
from django.core.management.base import BaseCommand
from django.utils import timezone
from datetime import timedelta

class Command(BaseCommand):
help = 'Process inactive users in batches'

def add_arguments(self, parser):
parser.add_argument('--batch-size', type=int, default=1000)
parser.add_argument('--days-inactive', type=int, default=30)

def handle(self, *args, **options):
batch_size = options['batch_size']
days_inactive = options['days_inactive']

cutoff_date = timezone.now() - timedelta(days=days_inactive)

inactive_users = User.objects.filter(
last_login__lt=cutoff_date,
is_active=True
)

total = inactive_users.count()
processed = 0

self.stdout.write(f'Processing {total} inactive users...')

# Process in batches
for i in range(0, total, batch_size):
batch = inactive_users[i:i + batch_size]

with transaction.atomic():
for user in batch:
user.is_active = False
user.save()
processed += 1

self.stdout.write(f'Processed {processed}/{total} users')

self.stdout.write(
self.style.SUCCESS(f'Successfully processed {processed} users')
)

Database Interaction with Django

Connection Management

Database Configuration

# settings.py
import os
from decouple import config

DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME'),
'USER': config('DB_USER'),
'PASSWORD': config('DB_PASSWORD'),
'HOST': config('DB_HOST', default='localhost'),
'PORT': config('DB_PORT', default='5432'),
'OPTIONS': {
'MAX_CONNS': 20,
'MIN_CONNS': 2,
'CONN_MAX_AGE': 600, # 10 minutes
},
}
}

# Multiple databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'main_db',
# ... other settings
},
'users_db': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'users_db',
# ... other settings
},
'analytics_db': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'analytics_db',
# ... other settings
}
}

# Database routing
class DatabaseRouter:
"""Route specific models to specific databases"""

route_app_labels = {'users', 'analytics'}

def db_for_read(self, model, **hints):
if model._meta.app_label == 'users':
return 'users_db'
elif model._meta.app_label == 'analytics':
return 'analytics_db'
return 'default'

def db_for_write(self, model, **hints):
if model._meta.app_label == 'users':
return 'users_db'
elif model._meta.app_label == 'analytics':
return 'analytics_db'
return 'default'

def allow_migrate(self, db, app_label, model_name=None, **hints):
if app_label == 'users':
return db == 'users_db'
elif app_label == 'analytics':
return db == 'analytics_db'
return db == 'default'

DATABASE_ROUTERS = ['path.to.routers.DatabaseRouter']

Transaction Management with Django

ACID Transactions

from django.db import transaction
from django.db.models import F
from decimal import Decimal

@transaction.atomic
def perform_transfer(from_account_id, to_account_id, amount):
"""Perform money transfer between accounts"""

# Lock accounts for update to prevent race conditions
from_account = Account.objects.select_for_update().get(id=from_account_id)
to_account = Account.objects.select_for_update().get(id=to_account_id)

# Check balance
if from_account.balance < amount:
raise ValueError('Insufficient funds')

# Perform transfer
from_account.balance = F('balance') - amount
to_account.balance = F('balance') + amount

from_account.save(update_fields=['balance'])
to_account.save(update_fields=['balance'])

# Record transaction
Transaction.objects.create(
from_account=from_account,
to_account=to_account,
amount=amount,
transaction_type='transfer'
)

return {'success': True, 'message': 'Transfer completed'}

# Manual transaction management
def complex_operation():
"""Example of manual transaction management"""

try:
with transaction.atomic():
# Create user
user = User.objects.create(
email='test@example.com',
username='testuser'
)

# Create profile
profile = UserProfile.objects.create(
user=user,
bio='Test bio'
)

# Create account
account = Account.objects.create(
user=user,
balance=Decimal('100.00')
)

# If any operation fails, all will be rolled back
if not validate_user_data(user):
raise ValueError('Invalid user data')

except Exception as e:
# Transaction automatically rolled back
logger.error(f'Failed to create user: {e}')
raise

# Savepoints for nested transactions
def nested_transaction_example():
"""Example using savepoints"""

with transaction.atomic():
# Create user (will be committed)
user = User.objects.create(email='user@example.com')

# Create savepoint
sid = transaction.savepoint()

try:
# Risky operation
risky_operation(user)
transaction.savepoint_commit(sid)
except Exception:
# Rollback to savepoint (user creation is preserved)
transaction.savepoint_rollback(sid)
logger.warning('Risky operation failed, but user was created')

Query Optimization with Django ORM

Efficient Queries

from django.db.models import Q, Exists, OuterRef
from datetime import datetime, timedelta

# ✅ Use only() for specific fields
def get_active_users():
thirty_days_ago = datetime.now() - timedelta(days=30)
return User.objects.filter(
is_active=True,
last_login__gte=thirty_days_ago
).only('id', 'email', 'username', 'last_login').order_by('-last_login')[:100]

# ✅ Use select_related for JOINs (ForeignKey)
def get_users_with_profiles():
return User.objects.select_related('profile').filter(
is_active=True
).only(
'id', 'email', 'username',
'profile__bio', 'profile__avatar_url', 'profile__location'
)

# ✅ Use Exists for conditional checks
def get_users_with_recent_orders():
one_year_ago = datetime.now() - timedelta(days=365)

recent_orders = Order.objects.filter(
user=OuterRef('pk'),
created_at__gte=one_year_ago
)

return User.objects.filter(
Exists(recent_orders)
).only('id', 'email', 'username')

# ✅ Use prefetch_related to optimize reverse ForeignKey
def get_users_with_orders():
return User.objects.prefetch_related(
'orders'
).filter(is_active=True)

# ✅ Raw queries when necessary
def complex_analytics_query():
return User.objects.raw("""
SELECT u.id, u.email,
COUNT(o.id) as order_count,
SUM(o.total) as total_spent
FROM auth_user u
LEFT JOIN orders o ON u.id = o.user_id
WHERE u.is_active = true
GROUP BY u.id, u.email
HAVING COUNT(o.id) > 5
ORDER BY total_spent DESC
LIMIT 100
""")

Index Strategy con Django

# models.py
from django.db import models

class User(models.Model):
email = models.EmailField(unique=True)
username = models.CharField(max_length=150, db_index=True)
is_active = models.BooleanField(default=True, db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
last_login = models.DateTimeField(null=True, blank=True)

class Meta:
indexes = [
# Composite indexes
models.Index(fields=['is_active', '-created_at']),
models.Index(fields=['email', 'is_active']),

# Partial indexes
models.Index(
fields=['last_login'],
condition=models.Q(is_active=True),
name='active_users_last_login_idx'
),

# Text search (PostgreSQL)
models.Index(
fields=['username', 'email'],
name='user_search_idx'
),
]

class Order(models.Model):
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='orders')
status = models.CharField(max_length=20, db_index=True)
total = models.DecimalField(max_digits=10, decimal_places=2)
created_at = models.DateTimeField(auto_now_add=True)

class Meta:
indexes = [
# Composite indexes (order matters)
models.Index(fields=['user', '-created_at']),
models.Index(fields=['status', 'created_at']),
models.Index(fields=['created_at', 'total']),

# Partial index for active orders
models.Index(
fields=['created_at'],
condition=models.Q(status__in=['pending', 'processing']),
name='active_orders_date_idx'
),
]

# Full-text search con PostgreSQL
class Product(models.Model):
name = models.CharField(max_length=200)
description = models.TextField()

class Meta:
indexes = [
# GIN index for full-text search
models.Index(
fields=['name', 'description'],
name='product_search_idx',
# PostgreSQL specific
opclasses=['gin_trgm_ops', 'gin_trgm_ops']
),
]

# Migration para crear índices personalizados
# migrations/0002_add_custom_indexes.py
from django.db import migrations

class Migration(migrations.Migration):
dependencies = [
('myapp', '0001_initial'),
]

operations = [
migrations.RunSQL(
"CREATE INDEX CONCURRENTLY idx_user_email_trgm ON myapp_user USING gin (email gin_trgm_ops);",
reverse_sql="DROP INDEX IF EXISTS idx_user_email_trgm;"
),
migrations.RunSQL(
"CREATE INDEX CONCURRENTLY idx_product_search ON myapp_product USING gin (to_tsvector('english', name || ' ' || description));",
reverse_sql="DROP INDEX IF EXISTS idx_product_search;"
),
]

Data Validation and Constraints with Django

Database-Level Constraints

# models.py
from django.db import models
from django.core.validators import MinValueValidator, MaxValueValidator, RegexValidator

class User(models.Model):
email = models.EmailField(
unique=True,
validators=[
RegexValidator(
regex=r'^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$',
message='Invalid email format'
)
]
)
age = models.PositiveIntegerField(
validators=[
MinValueValidator(13, message='Age must be at least 13'),
MaxValueValidator(120, message='Age cannot exceed 120')
],
null=True,
blank=True
)

class Meta:
constraints = [
models.CheckConstraint(
check=models.Q(age__gte=13) & models.Q(age__lte=120),
name='valid_age_range'
),
models.UniqueConstraint(
fields=['email'],
name='unique_email'
),
]

class Product(models.Model):
name = models.CharField(max_length=200)
price = models.DecimalField(
max_digits=10,
decimal_places=2,
validators=[MinValueValidator(0.01)]
)

class Meta:
constraints = [
models.CheckConstraint(
check=models.Q(price__gt=0),
name='positive_price'
),
]

class Order(models.Model):
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='orders'
)
status = models.CharField(
max_length=20,
choices=[
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('cancelled', 'Cancelled'),
],
default='pending'
)

class Meta:
constraints = [
models.UniqueConstraint(
fields=['user', 'created_at'],
condition=models.Q(status='pending'),
name='one_pending_order_per_user_per_day'
),
]

Application-Level Validation

from django.core.exceptions import ValidationError
from django.core.validators import validate_email
import re

def validate_password_strength(password):
"""Custom password strength validator"""
if len(password) < 8:
raise ValidationError('Password must be at least 8 characters long')

if not re.search(r'[A-Z]', password):
raise ValidationError('Password must contain at least one uppercase letter')

if not re.search(r'[a-z]', password):
raise ValidationError('Password must contain at least one lowercase letter')

if not re.search(r'\d', password):
raise ValidationError('Password must contain at least one digit')

if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
raise ValidationError('Password must contain at least one special character')

class User(models.Model):
email = models.EmailField(unique=True)
username = models.CharField(max_length=150)
age = models.PositiveIntegerField(null=True, blank=True)

def clean(self):
"""Model-level validation"""
super().clean()

# Email validation
if self.email:
try:
validate_email(self.email)
except ValidationError:
raise ValidationError({'email': 'Invalid email format'})

# Age validation
if self.age is not None:
if self.age < 13:
raise ValidationError({'age': 'Age must be at least 13'})
if self.age > 120:
raise ValidationError({'age': 'Age cannot exceed 120'})

def save(self, *args, **kwargs):
"""Override save to ensure validation"""
self.full_clean()
super().save(*args, **kwargs)

# Custom validators
def validate_phone_number(value):
"""Custom phone number validator"""
phone_regex = re.compile(r'^\+?1?\d{9,15}$')
if not phone_regex.match(value):
raise ValidationError('Invalid phone number format')

class UserProfile(models.Model):
user = models.OneToOneField(User, on_delete=models.CASCADE)
phone_number = models.CharField(
max_length=20,
validators=[validate_phone_number],
blank=True
)

def clean(self):
"""Cross-field validation"""
super().clean()

# Example: validate that phone is required for certain user types
if self.user.user_type == 'premium' and not self.phone_number:
raise ValidationError({
'phone_number': 'Phone number is required for premium users'
})

Migration Best Practices with Django

Safe Migrations

# migrations/0001_initial.py
from django.db import migrations, models

class Migration(migrations.Migration):
initial = True

dependencies = []

operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.AutoField(primary_key=True)),
('email', models.EmailField(unique=True)),
('username', models.CharField(max_length=150)),
('password_hash', models.CharField(max_length=128)),
('is_active', models.BooleanField(default=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'db_table': 'users',
},
),
# Add indexes
migrations.AddIndex(
model_name='user',
index=models.Index(fields=['email'], name='users_email_idx'),
),
migrations.AddIndex(
model_name='user',
index=models.Index(fields=['is_active'], name='users_active_idx'),
),
]

# migrations/0002_add_user_profile_fields.py
from django.db import migrations, models

class Migration(migrations.Migration):
dependencies = [
('myapp', '0001_initial'),
]

operations = [
# Add new fields (safe - no downtime)
migrations.AddField(
model_name='user',
name='bio',
field=models.TextField(blank=True, default=''),
),
migrations.AddField(
model_name='user',
name='avatar_url',
field=models.URLField(blank=True, default=''),
),
migrations.AddField(
model_name='user',
name='location',
field=models.CharField(max_length=100, blank=True, default=''),
),
]

# migrations/0003_add_indexes_concurrently.py
from django.db import migrations, models

class Migration(migrations.Migration):
dependencies = [
('myapp', '0002_add_user_profile_fields'),
]

operations = [
# Add indexes concurrently (PostgreSQL)
migrations.RunSQL(
"CREATE INDEX CONCURRENTLY users_location_idx ON users(location) WHERE location != '';",
reverse_sql="DROP INDEX IF EXISTS users_location_idx;",
state_operations=[
migrations.AddIndex(
model_name='user',
index=models.Index(
fields=['location'],
condition=models.Q(location__ne=''),
name='users_location_idx'
),
),
],
),
]

# Data migration example
# migrations/0004_populate_user_slugs.py
from django.db import migrations
from django.utils.text import slugify

def populate_slugs(apps, schema_editor):
"""Populate slug field for existing users"""
User = apps.get_model('myapp', 'User')

for user in User.objects.all():
if not user.slug:
user.slug = slugify(user.username)
user.save(update_fields=['slug'])

def reverse_populate_slugs(apps, schema_editor):
"""Reverse migration - clear slugs"""
User = apps.get_model('myapp', 'User')
User.objects.update(slug='')

class Migration(migrations.Migration):
dependencies = [
('myapp', '0003_add_indexes_concurrently'),
]

operations = [
# Add slug field
migrations.AddField(
model_name='user',
name='slug',
field=models.SlugField(blank=True, default=''),
),
# Populate data
migrations.RunPython(
populate_slugs,
reverse_populate_slugs,
),
# Make field required after populating
migrations.AlterField(
model_name='user',
name='slug',
field=models.SlugField(unique=True),
),
]

Migration Safety Guidelines

# Safe operations (no downtime):
# - Adding new models
# - Adding new fields with default values or null=True
# - Adding indexes concurrently (PostgreSQL)
# - Removing indexes
# - Adding constraints that are already satisfied

# Potentially unsafe operations (may cause downtime):
# - Removing fields or models
# - Renaming fields or models
# - Adding NOT NULL constraints without defaults
# - Changing field types
# - Adding unique constraints on existing data

# Example: Safe field removal (two-step process)
# Step 1: Stop using the field in code, deploy
# Step 2: Remove field in migration, deploy

# migrations/0005_remove_deprecated_field.py
class Migration(migrations.Migration):
dependencies = [
('myapp', '0004_populate_user_slugs'),
]

operations = [
# Only remove after confirming field is not used in code
migrations.RemoveField(
model_name='user',
name='deprecated_field',
),
]

# Custom migration for complex operations
# migrations/0006_complex_data_migration.py
from django.db import migrations, transaction

def complex_data_migration(apps, schema_editor):
"""Complex data migration with proper error handling"""
User = apps.get_model('myapp', 'User')

# Process in batches to avoid memory issues
batch_size = 1000
total_users = User.objects.count()

for i in range(0, total_users, batch_size):
with transaction.atomic():
batch = User.objects.all()[i:i + batch_size]

for user in batch:
# Perform complex transformation
user.normalized_email = user.email.lower().strip()
user.save(update_fields=['normalized_email'])

class Migration(migrations.Migration):
dependencies = [
('myapp', '0005_remove_deprecated_field'),
]

operations = [
migrations.AddField(
model_name='user',
name='normalized_email',
field=models.EmailField(blank=True, default=''),
),
migrations.RunPython(
complex_data_migration,
migrations.RunPython.noop, # No reverse operation
),
]

Backup and Recovery with Django

Automated Backups

#!/bin/bash
# backup_django_db.sh

# Django project settings
PROJECT_DIR="/path/to/django/project"
BACKUP_DIR="/backups/django"
DATE=$(date +%Y%m%d_%H%M%S)

# Activate virtual environment
source /path/to/venv/bin/activate

# Change to project directory
cd $PROJECT_DIR

# Create backup using Django's dumpdata
python manage.py dumpdata --natural-foreign --natural-primary \
--exclude auth.permission --exclude contenttypes \
--indent 2 > "$BACKUP_DIR/django_data_$DATE.json"

# Create database backup (PostgreSQL)
DB_NAME=$(python manage.py shell -c "from django.conf import settings; print(settings.DATABASES['default']['NAME'])")
pg_dump $DB_NAME > "$BACKUP_DIR/db_backup_$DATE.sql"

# Compress backups
gzip "$BACKUP_DIR/django_data_$DATE.json"
gzip "$BACKUP_DIR/db_backup_$DATE.sql"

# Remove backups older than 7 days
find $BACKUP_DIR -name "django_data_*.json.gz" -mtime +7 -delete
find $BACKUP_DIR -name "db_backup_*.sql.gz" -mtime +7 -delete

echo "Django backup completed: $DATE"

Django Management Commands para Backup

# management/commands/backup_data.py
from django.core.management.base import BaseCommand
from django.core.management import call_command
from django.conf import settings
import os
from datetime import datetime
import gzip
import shutil

class Command(BaseCommand):
help = 'Create backup of Django data'

def add_arguments(self, parser):
parser.add_argument('--output-dir', type=str, default='/backups')
parser.add_argument('--compress', action='store_true')

def handle(self, *args, **options):
output_dir = options['output_dir']
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

# Ensure backup directory exists
os.makedirs(output_dir, exist_ok=True)

# Create data backup
backup_file = os.path.join(output_dir, f'django_backup_{timestamp}.json')

with open(backup_file, 'w') as f:
call_command(
'dumpdata',
'--natural-foreign',
'--natural-primary',
'--exclude=auth.permission',
'--exclude=contenttypes',
'--indent=2',
stdout=f
)

# Compress if requested
if options['compress']:
with open(backup_file, 'rb') as f_in:
with gzip.open(f'{backup_file}.gz', 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(backup_file)
backup_file = f'{backup_file}.gz'

self.stdout.write(
self.style.SUCCESS(f'Backup created: {backup_file}')
)

# management/commands/restore_data.py
from django.core.management.base import BaseCommand
from django.core.management import call_command
import gzip
import json

class Command(BaseCommand):
help = 'Restore Django data from backup'

def add_arguments(self, parser):
parser.add_argument('backup_file', type=str)
parser.add_argument('--flush', action='store_true',
help='Flush database before restore')

def handle(self, *args, **options):
backup_file = options['backup_file']

if options['flush']:
self.stdout.write('Flushing database...')
call_command('flush', '--noinput')

# Handle compressed files
if backup_file.endswith('.gz'):
with gzip.open(backup_file, 'rt') as f:
data = f.read()
else:
with open(backup_file, 'r') as f:
data = f.read()

# Validate JSON
try:
json.loads(data)
except json.JSONDecodeError as e:
self.stdout.write(
self.style.ERROR(f'Invalid JSON in backup file: {e}')
)
return

# Restore data
self.stdout.write('Restoring data...')
call_command('loaddata', backup_file)

self.stdout.write(
self.style.SUCCESS(f'Data restored from: {backup_file}')
)

Monitoring y Health Checks

# management/commands/health_check.py
from django.core.management.base import BaseCommand
from django.db import connection
from django.core.cache import cache
from django.conf import settings
import logging

logger = logging.getLogger(__name__)

class Command(BaseCommand):
help = 'Perform comprehensive health check'

def handle(self, *args, **options):
health_status = {
'database': self.check_database(),
'cache': self.check_cache(),
'migrations': self.check_migrations(),
'disk_space': self.check_disk_space(),
}

# Log results
for service, status in health_status.items():
if status['healthy']:
self.stdout.write(
self.style.SUCCESS(f'✓ {service}: {status["message"]}')
)
else:
self.stdout.write(
self.style.ERROR(f'✗ {service}: {status["message"]}')
)
logger.error(f'Health check failed for {service}: {status["message"]}')

# Overall status
all_healthy = all(status['healthy'] for status in health_status.values())
if all_healthy:
self.stdout.write(self.style.SUCCESS('All systems healthy'))
else:
self.stdout.write(self.style.ERROR('Some systems unhealthy'))
exit(1)

def check_database(self):
"""Check database connectivity"""
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
result = cursor.fetchone()
if result[0] == 1:
return {'healthy': True, 'message': 'Database connection OK'}
except Exception as e:
return {'healthy': False, 'message': f'Database error: {str(e)}'}

def check_cache(self):
"""Check cache connectivity"""
try:
cache.set('health_check', 'test', 30)
value = cache.get('health_check')
if value == 'test':
return {'healthy': True, 'message': 'Cache connection OK'}
else:
return {'healthy': False, 'message': 'Cache read/write failed'}
except Exception as e:
return {'healthy': False, 'message': f'Cache error: {str(e)}'}

def check_migrations(self):
"""Check for unapplied migrations"""
from django.core.management import execute_from_command_line
from io import StringIO
import sys

try:
# Capture output
old_stdout = sys.stdout
sys.stdout = captured_output = StringIO()

execute_from_command_line(['manage.py', 'showmigrations', '--plan'])

# Restore stdout
sys.stdout = old_stdout
output = captured_output.getvalue()

# Check for unapplied migrations
if '[ ]' in output:
return {'healthy': False, 'message': 'Unapplied migrations found'}
else:
return {'healthy': True, 'message': 'All migrations applied'}

except Exception as e:
return {'healthy': False, 'message': f'Migration check error: {str(e)}'}

def check_disk_space(self):
"""Check available disk space"""
import shutil
try:
total, used, free = shutil.disk_usage('/')
free_percent = (free / total) * 100

if free_percent < 10:
return {'healthy': False, 'message': f'Low disk space: {free_percent:.1f}% free'}
else:
return {'healthy': True, 'message': f'Disk space OK: {free_percent:.1f}% free'}
except Exception as e:
return {'healthy': False, 'message': f'Disk space check error: {str(e)}'}

execute_from_command_line(['manage.py', 'showmigrations', '--plan'])

sys.stdout = old_stdout
output = captured_output.getvalue()

if '[ ]' in output: # Unapplied migrations
return {'healthy': False, 'message': 'Unapplied migrations found'}
else:
return {'healthy': True, 'message': 'All migrations applied'}

except Exception as e:
return {'healthy': False, 'message': f'Migration check error: {str(e)}'}

def check_disk_space(self):
"""Check available disk space"""
import shutil

try:
total, used, free = shutil.disk_usage('/')
free_percent = (free / total) * 100

if free_percent < 10: # Less than 10% free
return {'healthy': False, 'message': f'Low disk space: {free_percent:.1f}% free'}
else:
return {'healthy': True, 'message': f'Disk space OK: {free_percent:.1f}% free'}

except Exception as e:
return {'healthy': False, 'message': f'Disk check error: {str(e)}'}

Conclusion

These Django and Django REST Framework specific best practices are fundamental for maintaining a robust, secure, and scalable backend. Consistent implementation of these practices will significantly improve:

  • Code Quality: Through comprehensive testing and appropriate validations
  • Security: With robust authentication and protection against common vulnerabilities
  • Performance: By optimizing queries, using cache effectively, and managing resources efficiently
  • Maintainability: With structured logging, proactive monitoring, and safe migrations

Remember to adapt these guidelines according to your project's specific needs and keep them updated with Django community best practices.