Commit 64734a3e authored by Sai Teja Singamsetty's avatar Sai Teja Singamsetty 💬
Browse files

Nit Glug Blog

parent 29ad3b07
from __future__ import unicode_literals
import datetime
from django.core import signing
from django.db import models, transaction
from django.utils import timezone
from django.utils.crypto import get_random_string
from django.utils.translation import gettext_lazy as _
from .. import app_settings as allauth_app_settings
from . import app_settings, signals
from .adapter import get_adapter
from .managers import EmailAddressManager, EmailConfirmationManager
from .utils import user_email
class EmailAddress(models.Model):
user = models.ForeignKey(allauth_app_settings.USER_MODEL,
verbose_name=_('user'),
on_delete=models.CASCADE)
email = models.EmailField(unique=app_settings.UNIQUE_EMAIL,
max_length=app_settings.EMAIL_MAX_LENGTH,
verbose_name=_('e-mail address'))
verified = models.BooleanField(verbose_name=_('verified'), default=False)
primary = models.BooleanField(verbose_name=_('primary'), default=False)
objects = EmailAddressManager()
class Meta:
verbose_name = _("email address")
verbose_name_plural = _("email addresses")
if not app_settings.UNIQUE_EMAIL:
unique_together = [("user", "email")]
def __str__(self):
return self.email
def set_as_primary(self, conditional=False):
old_primary = EmailAddress.objects.get_primary(self.user)
if old_primary:
if conditional:
return False
old_primary.primary = False
old_primary.save()
self.primary = True
self.save()
user_email(self.user, self.email)
self.user.save()
return True
def send_confirmation(self, request=None, signup=False):
if app_settings.EMAIL_CONFIRMATION_HMAC:
confirmation = EmailConfirmationHMAC(self)
else:
confirmation = EmailConfirmation.create(self)
confirmation.send(request, signup=signup)
return confirmation
def change(self, request, new_email, confirm=True):
"""
Given a new email address, change self and re-confirm.
"""
with transaction.atomic():
user_email(self.user, new_email)
self.user.save()
self.email = new_email
self.verified = False
self.save()
if confirm:
self.send_confirmation(request)
class EmailConfirmation(models.Model):
email_address = models.ForeignKey(EmailAddress,
verbose_name=_('e-mail address'),
on_delete=models.CASCADE)
created = models.DateTimeField(verbose_name=_('created'),
default=timezone.now)
sent = models.DateTimeField(verbose_name=_('sent'), null=True)
key = models.CharField(verbose_name=_('key'), max_length=64, unique=True)
objects = EmailConfirmationManager()
class Meta:
verbose_name = _("email confirmation")
verbose_name_plural = _("email confirmations")
def __str__(self):
return "confirmation for %s" % self.email_address
@classmethod
def create(cls, email_address):
key = get_random_string(64).lower()
return cls._default_manager.create(email_address=email_address,
key=key)
def key_expired(self):
expiration_date = self.sent \
+ datetime.timedelta(days=app_settings
.EMAIL_CONFIRMATION_EXPIRE_DAYS)
return expiration_date <= timezone.now()
key_expired.boolean = True
def confirm(self, request):
if not self.key_expired() and not self.email_address.verified:
email_address = self.email_address
get_adapter(request).confirm_email(request, email_address)
signals.email_confirmed.send(sender=self.__class__,
request=request,
email_address=email_address)
return email_address
def send(self, request=None, signup=False):
get_adapter(request).send_confirmation_mail(request, self, signup)
self.sent = timezone.now()
self.save()
signals.email_confirmation_sent.send(sender=self.__class__,
request=request,
confirmation=self,
signup=signup)
class EmailConfirmationHMAC:
def __init__(self, email_address):
self.email_address = email_address
@property
def key(self):
return signing.dumps(
obj=self.email_address.pk,
salt=app_settings.SALT)
@classmethod
def from_key(cls, key):
try:
max_age = (
60 * 60 * 24 * app_settings.EMAIL_CONFIRMATION_EXPIRE_DAYS)
pk = signing.loads(
key,
max_age=max_age,
salt=app_settings.SALT)
ret = EmailConfirmationHMAC(EmailAddress.objects.get(pk=pk))
except (signing.SignatureExpired,
signing.BadSignature,
EmailAddress.DoesNotExist):
ret = None
return ret
def confirm(self, request):
if not self.email_address.verified:
email_address = self.email_address
get_adapter(request).confirm_email(request, email_address)
signals.email_confirmed.send(sender=self.__class__,
request=request,
email_address=email_address)
return email_address
def send(self, request=None, signup=False):
get_adapter(request).send_confirmation_mail(request, self, signup)
signals.email_confirmation_sent.send(sender=self.__class__,
request=request,
confirmation=self,
signup=signup)
from django.contrib.auth.signals import user_logged_out # noqa
from django.dispatch import Signal
user_logged_in = Signal(providing_args=["request", "user"])
# Typically followed by `user_logged_in` (unless, e-mail verification kicks in)
user_signed_up = Signal(providing_args=["request", "user"])
password_set = Signal(providing_args=["request", "user"])
password_changed = Signal(providing_args=["request", "user"])
password_reset = Signal(providing_args=["request", "user"])
email_confirmed = Signal(providing_args=["request", "email_address"])
email_confirmation_sent = Signal(
providing_args=["request", "confirmation", "signup"])
email_changed = Signal(
providing_args=[
"request", "user",
"from_email_address", "to_email_address"])
email_added = Signal(providing_args=["request", "user", "email_address"])
email_removed = Signal(providing_args=["request", "user", "email_address"])
from django import template
from allauth.account.utils import user_display
register = template.Library()
@register.simple_tag(name='user_display')
def user_display_tag(user):
"""
Example usage::
{% user_display user %}
or if you need to use in a {% blocktrans %}::
{% user_display user as user_display %}
{% blocktrans %}
{{ user_display }} has sent you a gift.
{% endblocktrans %}
"""
return user_display(user)
This diff is collapsed.
from django.urls import path, re_path
from . import views
urlpatterns = [
# path("signup/", views.signup, name="account_signup"),
path("login/", views.login, name="account_login"),
path("logout/", views.logout, name="account_logout"),
path("password/change/", views.password_change,
name="account_change_password"),
path("password/set/", views.password_set, name="account_set_password"),
path("inactive/", views.account_inactive, name="account_inactive"),
# E-mail
path("email/", views.email, name="account_email"),
path("confirm-email/", views.email_verification_sent,
name="account_email_verification_sent"),
re_path(r"^confirm-email/(?P<key>[-:\w]+)/$", views.confirm_email,
name="account_confirm_email"),
# password reset
path("password/reset/", views.password_reset,
name="account_reset_password"),
path("password/reset/done/", views.password_reset_done,
name="account_reset_password_done"),
re_path(r"^password/reset/key/(?P<uidb36>[0-9A-Za-z]+)-(?P<key>.+)/$",
views.password_reset_from_key,
name="account_reset_password_from_key"),
path("password/reset/key/done/", views.password_reset_from_key_done,
name="account_reset_password_from_key_done"),
]
import unicodedata
from collections import OrderedDict
from datetime import timedelta
from django.conf import settings
from django.contrib import messages
from django.contrib.auth import update_session_auth_hash
from django.core.exceptions import FieldDoesNotExist, ValidationError
from django.db import models
from django.db.models import Q
from django.http import HttpResponseRedirect
from django.utils.encoding import force_str
from django.utils.http import base36_to_int, int_to_base36, urlencode
from django.utils.timezone import now
from ..exceptions import ImmediateHttpResponse
from ..utils import (
get_request_param,
get_user_model,
import_callable,
valid_email_or_none,
)
from . import app_settings, signals
from .adapter import get_adapter
from .app_settings import EmailVerificationMethod
def _unicode_ci_compare(s1, s2):
"""
Perform case-insensitive comparison of two identifiers, using the
recommended algorithm from Unicode Technical Report 36, section
2.11.2(B)(2).
"""
norm_s1 = unicodedata.normalize('NFKC', s1).casefold()
norm_s2 = unicodedata.normalize('NFKC', s2).casefold()
return norm_s1 == norm_s2
def get_next_redirect_url(request, redirect_field_name="next"):
"""
Returns the next URL to redirect to, if it was explicitly passed
via the request.
"""
redirect_to = get_request_param(request, redirect_field_name)
if not get_adapter(request).is_safe_url(redirect_to):
redirect_to = None
return redirect_to
def get_login_redirect_url(request, url=None, redirect_field_name="next"):
if url and callable(url):
# In order to be able to pass url getters around that depend
# on e.g. the authenticated state.
url = url()
redirect_url = (
url or
get_next_redirect_url(
request,
redirect_field_name=redirect_field_name) or
get_adapter(request).get_login_redirect_url(request))
return redirect_url
_user_display_callable = None
def logout_on_password_change(request, user):
# Since it is the default behavior of Django to invalidate all sessions on
# password change, this function actually has to preserve the session when
# logout isn't desired.
if not app_settings.LOGOUT_ON_PASSWORD_CHANGE:
update_session_auth_hash(request, user)
def default_user_display(user):
if app_settings.USER_MODEL_USERNAME_FIELD:
return getattr(user, app_settings.USER_MODEL_USERNAME_FIELD)
else:
return force_str(user)
def user_display(user):
global _user_display_callable
if not _user_display_callable:
f = getattr(settings, "ACCOUNT_USER_DISPLAY",
default_user_display)
_user_display_callable = import_callable(f)
return _user_display_callable(user)
def user_field(user, field, *args):
"""
Gets or sets (optional) user model fields. No-op if fields do not exist.
"""
if not field:
return
User = get_user_model()
try:
field_meta = User._meta.get_field(field)
max_length = field_meta.max_length
except FieldDoesNotExist:
if not hasattr(user, field):
return
max_length = None
if args:
# Setter
v = args[0]
if v:
v = v[0:max_length]
setattr(user, field, v)
else:
# Getter
return getattr(user, field)
def user_username(user, *args):
if args and not app_settings.PRESERVE_USERNAME_CASING and args[0]:
args = [args[0].lower()]
return user_field(user, app_settings.USER_MODEL_USERNAME_FIELD, *args)
def user_email(user, *args):
return user_field(user, app_settings.USER_MODEL_EMAIL_FIELD, *args)
def perform_login(request, user, email_verification,
redirect_url=None, signal_kwargs=None,
signup=False):
"""
Keyword arguments:
signup -- Indicates whether or not sending the
email is essential (during signup), or if it can be skipped (e.g. in
case email verification is optional and we are only logging in).
"""
# Local users are stopped due to form validation checking
# is_active, yet, adapter methods could toy with is_active in a
# `user_signed_up` signal. Furthermore, social users should be
# stopped anyway.
adapter = get_adapter(request)
if not user.is_active:
return adapter.respond_user_inactive(request, user)
from .models import EmailAddress
has_verified_email = EmailAddress.objects.filter(user=user,
verified=True).exists()
if email_verification == EmailVerificationMethod.NONE:
pass
elif email_verification == EmailVerificationMethod.OPTIONAL:
# In case of OPTIONAL verification: send on signup.
if not has_verified_email and signup:
send_email_confirmation(request, user, signup=signup)
elif email_verification == EmailVerificationMethod.MANDATORY:
if not has_verified_email:
send_email_confirmation(request, user, signup=signup)
return adapter.respond_email_verification_sent(
request, user)
try:
adapter.login(request, user)
response = HttpResponseRedirect(
get_login_redirect_url(request, redirect_url))
if signal_kwargs is None:
signal_kwargs = {}
signals.user_logged_in.send(sender=user.__class__,
request=request,
response=response,
user=user,
**signal_kwargs)
adapter.add_message(
request,
messages.SUCCESS,
'account/messages/logged_in.txt',
{'user': user})
except ImmediateHttpResponse as e:
response = e.response
return response
def complete_signup(request, user, email_verification, success_url,
signal_kwargs=None):
if signal_kwargs is None:
signal_kwargs = {}
signals.user_signed_up.send(sender=user.__class__,
request=request,
user=user,
**signal_kwargs)
return perform_login(request, user,
email_verification=email_verification,
signup=True,
redirect_url=success_url,
signal_kwargs=signal_kwargs)
def cleanup_email_addresses(request, addresses):
"""
Takes a list of EmailAddress instances and cleans it up, making
sure only valid ones remain, without multiple primaries etc.
Order is important: e.g. if multiple primary e-mail addresses
exist, the first one encountered will be kept as primary.
"""
from .models import EmailAddress
adapter = get_adapter(request)
# Let's group by `email`
e2a = OrderedDict() # maps email to EmailAddress
primary_addresses = []
verified_addresses = []
primary_verified_addresses = []
for address in addresses:
# Pick up only valid ones...
email = valid_email_or_none(address.email)
if not email:
continue
# ... and non-conflicting ones...
if (app_settings.UNIQUE_EMAIL and
EmailAddress.objects.filter(email__iexact=email).exists()):
continue
a = e2a.get(email.lower())
if a:
a.primary = a.primary or address.primary
a.verified = a.verified or address.verified
else:
a = address
a.verified = a.verified or adapter.is_email_verified(request,
a.email)
e2a[email.lower()] = a
if a.primary:
primary_addresses.append(a)
if a.verified:
primary_verified_addresses.append(a)
if a.verified:
verified_addresses.append(a)
# Now that we got things sorted out, let's assign a primary
if primary_verified_addresses:
primary_address = primary_verified_addresses[0]
elif verified_addresses:
# Pick any verified as primary
primary_address = verified_addresses[0]
elif primary_addresses:
# Okay, let's pick primary then, even if unverified
primary_address = primary_addresses[0]
elif e2a:
# Pick the first
primary_address = e2a.keys()[0]
else:
# Empty
primary_address = None
# There can only be one primary
for a in e2a.values():
a.primary = primary_address.email.lower() == a.email.lower()
return list(e2a.values()), primary_address
def setup_user_email(request, user, addresses):
"""
Creates proper EmailAddress for the user that was just signed
up. Only sets up, doesn't do any other handling such as sending
out email confirmation mails etc.
"""
from .models import EmailAddress
assert not EmailAddress.objects.filter(user=user).exists()
priority_addresses = []
# Is there a stashed e-mail?
adapter = get_adapter(request)
stashed_email = adapter.unstash_verified_email(request)
if stashed_email:
priority_addresses.append(EmailAddress(user=user,
email=stashed_email,
primary=True,
verified=True))
email = user_email(user)
if email:
priority_addresses.append(EmailAddress(user=user,
email=email,
primary=True,
verified=False))
addresses, primary = cleanup_email_addresses(
request,
priority_addresses + addresses)
for a in addresses:
a.user = user
a.save()
EmailAddress.objects.fill_cache_for_user(user, addresses)
if (primary and email and email.lower() != primary.email.lower()):
user_email(user, primary.email)
user.save()
return primary
def send_email_confirmation(request, user, signup=False):
"""
E-mail verification mails are sent:
a) Explicitly: when a user signs up
b) Implicitly: when a user attempts to log in using an unverified
e-mail while EMAIL_VERIFICATION is mandatory.
Especially in case of b), we want to limit the number of mails
sent (consider a user retrying a few times), which is why there is
a cooldown period before sending a new mail. This cooldown period
can be configured in ACCOUNT_EMAIL_CONFIRMATION_COOLDOWN setting.
"""
from .models import EmailAddress, EmailConfirmation
cooldown_period = timedelta(
seconds=app_settings.EMAIL_CONFIRMATION_COOLDOWN
)
email = user_email(user)
if email:
try:
email_address = EmailAddress.objects.get_for_user(user, email)
if not email_address.verified:
if app_settings.EMAIL_CONFIRMATION_HMAC:
send_email = True
else:
send_email = not EmailConfirmation.objects.filter(
sent__gt=now() - cooldown_period,
email_address=email_address).exists()
if send_email:
email_address.send_confirmation(request,
signup=signup)
else:
send_email = False
except EmailAddress.DoesNotExist:
send_email = True
email_address = EmailAddress.objects.add_email(request,
user,
email,
signup=signup,
confirm=True)
assert email_address
# At this point, if we were supposed to send an email we have sent it.
if send_email:
get_adapter(request).add_message(
request,
messages.INFO,
'account/messages/'
'email_confirmation_sent.txt',
{'email': email})
if signup:
get_adapter(request).stash_user(request, user_pk_to_url_str(user))