Skip to content
__init__.py 34.9 KiB
Newer Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-

from flask import Blueprint
from flask import abort, redirect, jsonify, send_file
from flask import url_for, request, current_app, session
from cStringIO import StringIO
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from threading import Thread
from uuid import uuid4
import base64
import hashlib
import json
import pyotp
import pytz
import time
import webauthn

import config
import utils
from functions import mySMTP
login_view = Blueprint( "login", __name__, template_folder = "templates" )

@login_view.route( "/is_logged" )
def is_logged():
    """
        App route to know if the user is logged in the ICNML main application.
        This route is used by nginx to protect some other locations, for example the PiAnoS dedicated pages.
    """
    current_app.logger.info( "Check if the user is connected" )
    
    if session.get( "logged", False ):
        return "ok"
    
    else:
        return abort( 403 )

@login_view.route( "/logout" )
def logout():
    """
        Logout the user, clear the session and redirect to the login page.
    """
    current_app.logger.info( "Logout and clear session" )
    
    session_clear_and_prepare()
    return redirect( url_for( "login.login" ) )

def session_clear_and_prepare():
    """
        Clear the session related to the user and initialize the login related variables.
    """
    session.clear()
    
    session[ "process" ] = "login"
    session[ "need_to_check" ] = [ "password" ]
    session[ "logged" ] = False

@login_view.route( "/login" )
def login():
    """
        Route to serve the login.html page.
    """
    current_app.logger.info( "Login start" )
    
    session_clear_and_prepare()
    
    if request.query_string != "":
        session[ "url_redirect" ] = request.query_string
    
    return utils.template.my_render_template( "login.html" )

@login_view.route( "/do/login", methods = [ "POST" ] )
def do_login():
    """
        Function to manadge the login workflow and check the username, password and TOTP data.
        This function is called multiple times because the check is done only for one data type at the time.
        If all the checks are OK, the user has provided all needed information, hence is logged in.
    """
    #TODO: combine the security key checks in this function
    
    need_to_check = session.get( "need_to_check", [ "password" ] )
    try:
        current_check = need_to_check[ 0 ]
    except:
        current_check = None
    
    session[ "need_to_check" ] = need_to_check
    
    current_app.logger.info( "Current check: {}".format( current_check ) )
    
    ############################################################################
    #   Username and password check

    if current_check == "password":
        user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) )
        
        if user == None:
            current_app.logger.error( "Username not found in the database" )
            
            session_clear_and_prepare()
            
            return jsonify( {
                "error": False,
                "logged": False
            } )
        
        form_password = request.form.get( "password", None )
        
        if form_password == None or not utils.hash.pbkdf2( form_password, user[ "password" ] ).verify():
            current_app.logger.error( "Password not validated" )
            
            session_clear_and_prepare()
            
            return jsonify( {
                "error": False,
                "logged": False,
            } )
        
        elif not user[ "active" ]:
            current_app.logger.error( "User not active" )
            
            session_clear_and_prepare()
            
            return jsonify( {
                "error": False,
                "logged": False,
                "message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)."
            } )
        
        else:
            session[ "username" ] = user[ "username" ]
            session[ "user_id" ] = user[ "id" ]
            session[ "password_check" ] = True
            
            current_app.logger.info( "User '{}' checked by password".format( user[ "username" ] ) )
            
            session[ "need_to_check" ].remove( current_check )
            session[ "password" ] = utils.hash.pbkdf2( form_password, "AES256", config.PASSWORD_NB_ITERATIONS ).hash()
            
            sql = "SELECT count( * ) FROM webauthn WHERE user_id = %s AND active = TRUE"
            security_keys_count = config.db.query_fetchone( sql, ( user[ "id" ], ) )[ "count" ]
            
            current_app.logger.info( "Number of security keys: {}".format( security_keys_count ) )
            current_app.logger.info( "TOTP: {}".format( user[ "totp" ] is not None ) )
            
            if config.envtype.upper() != "DEV":
                hra = hashlib.sha512( request.headers.environ[ "REMOTE_ADDR" ] ).hexdigest()
                username = session[ "username" ]
                key = "{}_{}".format( username, hra )
                saved_totp = config.redis_shared.get( key ) == "ok"
                
                if security_keys_count > 0:
                    session[ "need_to_check" ].append( "securitykey" )
                    elif user[ "totp" ]:
                        session[ "need_to_check" ].append( "totp" )
                    else:
                        current_app.logger.error( "Second factor missing" )
                        
                        session_clear_and_prepare()
                        
                        return jsonify( {
                            "error": False,
                            "logged": False,
                            "next_step": "totp_reset"
                        } )
    
    ############################################################################
    #   Time-based One Time Password check

    elif current_check == "totp":
        user = config.db.query_fetchone( "SELECT username, totp FROM users WHERE username = %s", ( session[ "username" ], ) )
        
        totp_db = pyotp.TOTP( user[ "totp" ] )
        totp_user = request.form.get( "totp", None )
        totp_save_serverside = request.form.get( "save", False )
        
        if totp_user == None:
            return jsonify( {
                "error": True,
                "message": "No TOTP provided"
            } )
        
        current_app.logger.info( "TOTP expected now: {}".format( totp_db.now() ) )
        current_app.logger.info( "TOTP provided:     {}".format( totp_user ) )
        
        if not totp_db.verify( totp_user, valid_window = config.TOTP_VALIDWINDOW ):
            current_app.logger.error( "TOTP not valid in a {} time window".format( config.TOTP_VALIDWINDOW ) )
            current_app.logger.info( "TOTP check for a bigger time window" )
            
Loading
Loading full blame...