Skip to content
__init__.py 38.4 KiB
Newer Older
# -*- coding: UTF-8 -*-

from flask import Blueprint
from flask import abort, redirect, jsonify, send_file
from flask import url_for, request, current_app, session
from cStringIO import StringIO
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from threading import Thread
from uuid import uuid4
import base64
import hashlib
import pyotp
import pytz
import time
import webauthn

import config
import utils
login_view = Blueprint( "login", __name__, template_folder = "templates" )

@login_view.route( "/is_logged" )
def is_logged():
    """
        App route to know if the user is logged in the ICNML main application.
Marco De Donno's avatar
Marco De Donno committed
        This route is used by nginx to protect some other locations, for
        example the PiAnoS dedicated pages.
    """
    current_app.logger.info( "Check if the user is connected" )
    
    if session.get( "logged", False ):
        return "ok"
    
    else:
        return abort( 403 )

@login_view.route( "/logout" )
def logout():
    """
        Logout the user, clear the session and redirect to the login page.
    """
    current_app.logger.info( "Logout and clear session" )
    
    session_clear_and_prepare()
    return redirect( url_for( "login.login" ) )

def session_clear_and_prepare():
    """
Marco De Donno's avatar
Marco De Donno committed
        Clear the session related to the user and initialize the login related
        variables.
    """
    session.clear()
    
    session[ "process" ] = "login"
    session[ "need_to_check" ] = [ "password" ]
    session[ "logged" ] = False

@login_view.route( "/login" )
def login():
    """
        Route to serve the login.html page.
    """
    current_app.logger.info( "Login start" )
    
    session_clear_and_prepare()
    
    if request.query_string != "":
        session[ "url_redirect" ] = request.query_string
    
    return utils.template.my_render_template( "login/login.html" )

@login_view.route( "/do/login", methods = [ "POST" ] )
def do_login():
    """
Marco De Donno's avatar
Marco De Donno committed
        Function to manadge the login workflow and check the username, password
        and TOTP data. This function is called multiple times because the check
        is done only for one data type at the time. If all the checks are OK,
        the user has provided all needed information, hence is logged in.
    """
    #TODO: combine the security key checks in this function
    
    ############################################################################
    #   Rate limiting check
    
    def get_rate_limit_key():
        target = request.form.get( "username", "" )
        
            target = request.headers.environ[ "REMOTE_ADDR" ]
        except:
            pass
        try:
            target = str( ipaddress.ip_network( unicode( target ) ).supernet( 16 ) )
            pass
        
        return "rate_limit_{}".format( target )
    
    def get_current_rate_limit():
        try:
            return int( config.redis_dbs[ "rate_limit" ].get( get_rate_limit_key() ) )
    def rate_limit_to_seconds( nb ):
        return pow( config.login_rate_limiting_base, max( nb, config.login_rate_limiting_limit ) )
    def trigger_rate_limit():
        rate_limit_value = get_current_rate_limit() + 1
        
        config.redis_dbs[ "rate_limit" ].set(
            get_rate_limit_key(),
            rate_limit_value,
            ex = rate_limit_to_seconds( rate_limit_value )
    if get_current_rate_limit() >= config.login_rate_limiting_limit:
        rate_limit_value = trigger_rate_limit()
        time_to_wait = str( timedelta( seconds = rate_limit_to_seconds( rate_limit_value ) ) )
        
        return jsonify( {
            "error": False,
            "logged": False,
            "message": "Rate limited. Wait {} (HH:MM:SS) or contact the administrator.".format( time_to_wait )
        } )
    
    ############################################################################
    #   Start of the user and password check
    
    need_to_check = session.get( "need_to_check", [ "password" ] )
    try:
        current_check = need_to_check[ 0 ]
    except:
        current_check = None
    
    session[ "need_to_check" ] = need_to_check
    
    current_app.logger.info( "Current check: {}".format( current_check ) )
    
    ############################################################################
    #   Username and password check

    if current_check == "password":
        user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) )
        
        if user == None:
            current_app.logger.error( "Username not found in the database" )
            
            # WASTING TIME.
            # This is done to limit data extraction for exitsing (or not)
            # username based upon the execution time of the login function
            # (time-based side channel attack).
            # FOR SECURITY REASONS, DO NOT REMOVE THIS LINE
            utils.hash.pbkdf2( config.fake_hash ).verify( config.fake_hash_stored )
            session_clear_and_prepare()
            
            return jsonify( {
                "error": False,
                "logged": False
            } )
        
        form_password = request.form.get( "password", None )
        
        if form_password == None or not utils.hash.pbkdf2( form_password ).verify( user[ "password" ] ):
            current_app.logger.error( "Password not validated" )
            
            session_clear_and_prepare()
            
            return jsonify( {
                "error": False,
                "logged": False,
            } )
        
        elif not user[ "active" ]:
            current_app.logger.error( "User not active" )
            
            session_clear_and_prepare()
            
            return jsonify( {
                "error": False,
                "logged": False,
                "message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)."
            } )
        
        else:
Loading
Loading full blame...