Newer
Older
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
from flask import Blueprint
from flask import abort, redirect, jsonify, send_file
from flask import url_for, request, current_app, session
from cStringIO import StringIO
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from threading import Thread
import base64
import hashlib
import time
import webauthn
import config
import utils
login_view = Blueprint( "login", __name__, template_folder = "templates" )
@login_view.route( "/is_logged" )
def is_logged():
"""
App route to know if the user is logged in the ICNML main application.
This route is used by nginx to protect some other locations, for
example the PiAnoS dedicated pages.
"""
current_app.logger.info( "Check if the user is connected" )
if session.get( "logged", False ):
return "ok"
else:
return abort( 403 )
@login_view.route( "/logout" )
def logout():
"""
Logout the user, clear the session and redirect to the login page.
"""
current_app.logger.info( "Logout and clear session" )
session_clear_and_prepare()

Marco De Donno
committed
return redirect( url_for( "login.login" ) )
def session_clear_and_prepare():
"""
Clear the session related to the user and initialize the login related
variables.
"""
session.clear()
session[ "process" ] = "login"
session[ "need_to_check" ] = [ "password" ]
session[ "logged" ] = False
@login_view.route( "/login" )
def login():
"""
Route to serve the login.html page.
"""
current_app.logger.info( "Login start" )
session_clear_and_prepare()

Marco De Donno
committed
if request.query_string != "":
session[ "url_redirect" ] = request.query_string
return utils.template.my_render_template( "login/login.html" )
@login_view.route( "/do/login", methods = [ "POST" ] )
def do_login():
"""
Function to manadge the login workflow and check the username, password
and TOTP data. This function is called multiple times because the check
is done only for one data type at the time. If all the checks are OK,
the user has provided all needed information, hence is logged in.
"""
#TODO: combine the security key checks in this function
############################################################################
# Rate limiting check
def get_rate_limit_key():
target = request.form.get( "username", "" )

Marco De Donno
committed
try:
target = request.headers.environ[ "REMOTE_ADDR" ]
except:
pass
try:
target = str( ipaddress.ip_network( unicode( target ) ).supernet( 16 ) )

Marco De Donno
committed
except:
pass
return "rate_limit_{}".format( target )
def get_current_rate_limit():
try:
return int( config.redis_dbs[ "rate_limit" ].get( get_rate_limit_key() ) )
def rate_limit_to_seconds( nb ):
return pow( config.login_rate_limiting_base, max( nb, config.login_rate_limiting_limit ) )
def trigger_rate_limit():
rate_limit_value = get_current_rate_limit() + 1
config.redis_dbs[ "rate_limit" ].set(
get_rate_limit_key(),
rate_limit_value,
ex = rate_limit_to_seconds( rate_limit_value )
)
return rate_limit_value
if get_current_rate_limit() >= config.login_rate_limiting_limit:
rate_limit_value = trigger_rate_limit()
time_to_wait = str( timedelta( seconds = rate_limit_to_seconds( rate_limit_value ) ) )
return jsonify( {
"error": False,
"logged": False,
"message": "Rate limited. Wait {} (HH:MM:SS) or contact the administrator.".format( time_to_wait )
} )
############################################################################
# Start of the user and password check
need_to_check = session.get( "need_to_check", [ "password" ] )
try:
current_check = need_to_check[ 0 ]
except:
current_check = None
session[ "need_to_check" ] = need_to_check
current_app.logger.info( "Current check: {}".format( current_check ) )
############################################################################
# Username and password check
if current_check == "password":
user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) )
if user == None:
current_app.logger.error( "Username not found in the database" )
# WASTING TIME.
# This is done to limit data extraction for exitsing (or not)
# username based upon the execution time of the login function
# (time-based side channel attack).
# FOR SECURITY REASONS, DO NOT REMOVE THIS LINE
utils.hash.pbkdf2( config.fake_hash ).verify( config.fake_hash_stored )
trigger_rate_limit()
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False
} )
form_password = request.form.get( "password", None )
if form_password == None or not utils.hash.pbkdf2( form_password ).verify( user[ "password" ] ):
current_app.logger.error( "Password not validated" )
trigger_rate_limit()
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
} )
elif not user[ "active" ]:
current_app.logger.error( "User not active" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
"message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)."
} )
else:
Loading
Loading full blame...