#!/usr/bin/python # -*- coding: UTF-8 -*- from flask import Blueprint from flask import abort, redirect, url_for, request, jsonify, current_app, session from datetime import datetime, timedelta from uuid import uuid4 import pyotp import pytz import time import webauthn import config import utils from utils.decorator import login_required from utils.template import my_render_template login_view = Blueprint( "login", __name__ ) @login_view.before_request def renew_session(): """ Reset the timer before the automatic-logout. This function is called before every HTTP call. """ session.permanent = True session.permanent_session_lifetime = timedelta( seconds = config.session_timeout ) @login_view.route( "/is_logged" ) def is_logged(): """ App route to know if the user is logged in the ICNML main application. This route is used by nginx to protect some other locations, for example the PiAnoS dedicated pages. The session countdown timer is resetted to allow the user to use the protected location for the rest of the timeout. """ current_app.logger.info( "Check if the user is connected" ) if session.get( "logged", False ): return "ok" else: return abort( 403 ) @login_view.route( "/logout" ) def logout(): """ Logout the user, clear the session and redirect to the login page. """ current_app.logger.info( "Logout and clear session" ) session_clear_and_prepare() return redirect( url_for( "home" ) ) def session_clear_and_prepare(): """ Clear the session related to the user and initialize the login related variables. """ session.clear() session[ "process" ] = "login" session[ "need_to_check" ] = [ "password" ] session[ "logged" ] = False session[ "session_security_key" ] = str( uuid4() ) @login_view.route( "/login" ) def login(): """ Route to serve the login.html page. """ current_app.logger.info( "Login start" ) session_clear_and_prepare() return my_render_template( "login.html" ) @login_view.route( "/do/login", methods = [ "POST" ] ) def do_login(): """ Function to manadge the login workflow and check the username, password and TOTP data. This function is called multiple times because the check is done only for one data type at the time. If all the checks are OK, the user has provided all needed information, hence is logged in. """ #TODO: combine the security key checks in this function need_to_check = session.get( "need_to_check", [ "password" ] ) try: current_check = need_to_check[ 0 ] except: current_check = None session[ "need_to_check" ] = need_to_check current_app.logger.info( "Current check: {}".format( current_check ) ) ############################################################################ # Username and password check if current_check == "password": user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) ) if user == None: current_app.logger.error( "Username not found in the database" ) session_clear_and_prepare() return jsonify( { "error": False, "logged": False } ) form_password = request.form.get( "password", None ) if form_password == None or not utils.hash.pbkdf2( form_password, user[ "password" ] ).verify(): current_app.logger.error( "Password not validated" ) session_clear_and_prepare() return jsonify( { "error": False, "logged": False, } ) elif not user[ "active" ]: current_app.logger.error( "User not active" ) session_clear_and_prepare() return jsonify( { "error": False, "logged": False, "message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)." } ) else: session[ "username" ] = user[ "username" ] session[ "user_id" ] = user[ "id" ] session[ "password_check" ] = True current_app.logger.info( "User '{}' checked by password".format( user[ "username" ] ) ) session[ "need_to_check" ].remove( current_check ) session[ "password" ] = utils.hash.pbkdf2( form_password, "AES256", config.PASSWORD_NB_ITERATIONS ).hash() sql = "SELECT count( * ) FROM webauthn WHERE user_id = %s AND active = TRUE" security_keys_count = config.db.query_fetchone( sql, ( user[ "id" ], ) )[ "count" ] current_app.logger.info( "Number of security keys: {}".format( security_keys_count ) ) current_app.logger.info( "TOTP: {}".format( user[ "totp" ] is not None ) ) if security_keys_count > 0: session[ "need_to_check" ].append( "securitykey" ) elif user[ "totp" ]: session[ "need_to_check" ].append( "totp" ) else: current_app.logger.error( "Second factor missing" ) session_clear_and_prepare() return jsonify( { "error": False, "logged": False, "message": "Second factor missing. Contact the ICNML administrator (icnml@unil.ch)." } ) ############################################################################ # Time-based One Time Password check elif current_check == "totp": user = config.db.query_fetchone( "SELECT username, totp FROM users WHERE username = %s", ( session[ "username" ], ) ) totp_db = pyotp.TOTP( user[ "totp" ] ) totp_user = request.form.get( "totp", None ) if totp_user == None: return jsonify( { "error": True, "message": "No TOTP provided" } ) current_app.logger.info( "TOTP expected now: {}".format( totp_db.now() ) ) current_app.logger.info( "TOTP provided: {}".format( totp_user ) ) if not totp_db.verify( totp_user, valid_window = config.TOTP_VALIDWINDOW ): current_app.logger.error( "TOTP not valid in a {} time window".format( config.TOTP_VALIDWINDOW ) ) current_app.logger.info( "TOTP check for a bigger time window" ) # Check for a possible time difference now = datetime.now() time_diff = None try: for i in xrange( config.TOTP_VALIDWINDOW, config.TOTP_MAX_VALIDWINDOW ): for m in [ -1, 1 ]: if pyotp.utils.strings_equal( totp_user, totp_db.at( now, i * m ) ): raise else: current_app.logger.error( "TOTP not valid for this secret in a timeframe of {}".format( config.TOTP_MAX_VALIDWINDOW ) ) except: time_diff = i * m * totp_db.interval current_app.logger.info( "TOTP valid for {} seconds".format( time_diff ) ) # Error return session[ "logged" ] = False return jsonify( { "error": False, "logged": False, "message": "Wrong TOTP", "time_diff": time_diff, "time": time.time() } ) else: current_app.logger.info( "Valid TOTP in a {} time window".format( config.TOTP_VALIDWINDOW ) ) session[ "need_to_check" ].remove( current_check ) ############################################################################ # Check if all the data has been provided; login if ok if len( session[ "need_to_check" ] ) == 0 and session.get( "password_check", False ): for key in [ "process", "need_to_check", "password_check" ]: if key in session: session.pop( key ) session[ "logged" ] = True current_app.logger.info( "User '{}' connected".format( session[ "username" ] ) ) sql = """ SELECT users.type, account_type.name as account_type_name FROM users LEFT JOIN account_type ON users.type = account_type.id WHERE username = %s """ user = config.db.query_fetchone( sql, ( session[ "username" ], ) ) session[ "account_type" ] = int( user[ "type" ] ) session[ "account_type_name" ] = user[ "account_type_name" ] return jsonify( { "error": False, "logged": True, } ) else: current_app.logger.info( "Push '{}' as next login step".format( session[ "need_to_check" ][ 0 ] ) ) return jsonify( { "error": False, "next_step": session[ "need_to_check" ][ 0 ] } ) ################################################################################ # webauthn keys @login_view.route( "/webauthn/admin" ) @login_required def webauthn_admin(): """ Serve the administartion page for the FIDO2 keys. """ current_app.logger.info( "Webauthn admin page" ) return my_render_template( "webauthn/admin.html", keys = do_webauthn_get_list_of_keys( all_keys = True ) ) def do_webauthn_get_list_of_keys( uid = None, all_keys = False ): """ Get the list of keys for a particular user. Can be filtered by active keys only with the `all_keys` parameter. If the user id (uid) variable is not passed in parameter, the id of the currently logged user will be used (via the session). """ user_id = session.get( "user_id", uid ) current_app.logger.info( "Retrieving the security keys for user '{}'".format( session[ "username" ] ) ) sql = """ SELECT id, key_name as name, created_on, last_usage, usage_counter, active FROM webauthn WHERE user_id = %s """ if not all_keys: sql += " AND active = true" sql += " ORDER BY key_name ASC" keys = config.db.query_fetchall( sql, ( user_id, ) ) data = [] for key in keys: data.append( dict( key ) ) current_app.logger.debug( "key '{}' ({}) loaded".format( key[ "name" ], key[ "id" ] ) ) current_app.logger.info( "{} keys found in the database".format( len( data ) ) ) return data @login_view.route( "/webauthn/begin_activate", methods = [ "POST" ] ) @login_required def webauthn_begin_activate(): """ Start the registering process for a new security key. The json returned will be used by the javascript navigator.credentials.create() function. """ current_app.logger.info( "Start the registring process for a new security key" ) session[ "key_name" ] = request.form.get( "key_name", None ) username = session.get( "username" ) challenge = pyotp.random_base32( 64 ) ukey = pyotp.random_base32( 64 ) current_app.logger.debug( "User: {}".format( username ) ) current_app.logger.debug( "Challenge: {}".format( challenge ) ) session[ "challenge" ] = challenge session[ "register_ukey" ] = ukey make_credential_options = webauthn.WebAuthnMakeCredentialOptions( challenge, config.rp_name, config.RP_ID, ukey, username, username, None ) registration_dict = make_credential_options.registration_dict registration_dict[ "authenticatorSelection" ] = { "authenticatorAttachment": "cross-platform", "requireResidentKey": False, "userVerification": "discouraged" } return jsonify( registration_dict ) @login_view.route( "/webauthn/verify", methods = [ "POST" ] ) @login_required def webauthn_verify(): """ Verify the data produced by the security key while registring (with the navigator.credentials.create() function). """ current_app.logger.info( "Start webauthn verification process" ) challenge = session[ "challenge" ] user_id = session[ "user_id" ] key_name = session.get( "key_name", None ) ukey = session[ "register_ukey" ] current_app.logger.debug( "Session challenge: {}".format( challenge ) ) current_app.logger.debug( "Session user_id: {}".format( user_id ) ) current_app.logger.debug( "Session key_name: {}".format( key_name ) ) webauthn_registration_response = webauthn.WebAuthnRegistrationResponse( config.RP_ID, config.ORIGIN, request.form, challenge ) try: webauthn_credential = webauthn_registration_response.verify() current_app.logger.info( "Verification OK" ) except Exception as e: current_app.logger.info( "Verification failed" ) return jsonify( { "error": True, "message": "Registration failed. Error: {}".format( e ) } ) try: current_app.logger.info( "Insertion of the key to the database" ) config.db.query( utils.sql.sql_insert_generate( "webauthn", [ "user_id", "key_name", "ukey", "credential_id", "pub_key", "sign_count" ] ), ( user_id, key_name, ukey, webauthn_credential.credential_id, webauthn_credential.public_key, webauthn_credential.sign_count, ) ) config.db.commit() return jsonify( { "success": "User successfully registered." } ) except: current_app.logger.error( "Database insertion error" ) return jsonify( { "error": True, "message": "Database error" } ) ################################################################################ @login_view.route( "/webauthn/delete", methods = [ "POST" ] ) @login_required def webauthn_delete_key(): """ Delete a key based upon the key id and name for the currently logged user. """ current_app.logger.info( "Start security deletion" ) key_id = request.form.get( "key_id", False ) userid = session[ "user_id" ] current_app.logger.debug( "Session username: {}".format( session[ "username" ] ) ) current_app.logger.debug( "key_id: {}".format( key_id ) ) try: config.db.query( "DELETE FROM webauthn WHERE id = %s AND user_id = %s", ( key_id, userid, ) ) config.db.commit() current_app.logger.debug( "Security key deleted" ) return jsonify( { "error": False } ) except: current_app.logger.error( "Security key deletion failed" ) return jsonify( { "error": True } ); @login_view.route( "/webauthn/disable", methods = [ "POST" ] ) @login_required def webauthn_disable_key(): """ Disable a particular security key for the current user. """ key_id = request.form.get( "key_id", False ) userid = session[ "user_id" ] current_app.logger.info( "Disabling key '{}' for user '{}'".format( key_id, session[ "username" ] ) ) try: config.db.query( "UPDATE webauthn SET active = False WHERE id = %s AND user_id = %s", ( key_id, userid, ) ) config.db.commit() current_app.logger.debug( "Key disabled" ) return jsonify( { "error": False } ) except: current_app.logger.error( "Key disabling database error" ) return jsonify( { "error": True } ); @login_view.route( "/webauthn/enable", methods = [ "POST" ] ) @login_required def webauthn_enable_key(): """ Activation of a security key for the current user. """ key_id = request.form.get( "key_id", False ) userid = session[ "user_id" ] current_app.logger.info( "Enabling key '{}' for user '{}'".format( key_id, session[ "username" ] ) ) try: config.db.query( "UPDATE webauthn SET active = True WHERE id = %s AND user_id = %s", ( key_id, userid, ) ) config.db.commit() current_app.logger.debug( "Key enabled" ) return jsonify( { "error": False } ) except: current_app.logger.error( "Key enabling database error" ) return jsonify( { "error": True } ); @login_view.route( "/webauthn/rename", methods = [ "POST" ] ) @login_required def webauthn_rename_key(): """ Rename a security key for the current user. """ key_id = request.form.get( "key_id", False ) key_name = request.form.get( "key_name", False ) userid = session[ "user_id" ] current_app.logger.info( "Renaming key '{}' for user '{}'".format( key_id, session[ "username" ] ) ) try: config.db.query( "UPDATE webauthn SET key_name = %s WHERE id = %s AND user_id = %s", ( key_name, key_id, userid, ) ) config.db.commit() current_app.logger.debug( "Key renamed" ) return jsonify( { "error": False } ) except: current_app.logger.error( "Key renaming error" ) return jsonify( { "error": True } ); @login_view.route( "/webauthn/begin_assertion" ) def webauthn_begin_assertion(): """ Get the data to start the login process with all actives keys for a user. """ current_app.logger.info( "Webauthn start data preparation for user '{}'".format( session[ "username" ] ) ) user_id = session.get( "user_id" ) if "challenge" in session: del session[ "challenge" ] challenge = pyotp.random_base32( 64 ) session[ "challenge" ] = challenge current_app.logger.debug( "Challenge: '{}'".format( challenge ) ) key_list = config.db.query_fetchall( "SELECT * FROM webauthn WHERE user_id = %s AND active = true", ( user_id, ) ) current_app.logger.info( "{} keys active for this user".format( len( key_list ) ) ) credential_id_list = [] for key in key_list: credential_id_list.append( { "type": "public-key", "id": key[ "credential_id" ], "transports": [ "usb", "nfc", "ble", "internal" ] } ) current_app.logger.debug( "key '{}' added to the usable keys".format( key[ "credential_id" ] ) ) assertion_dict = { "challenge": challenge, "timeout": 60000, "allowCredentials": credential_id_list, "rpId": config.RP_ID, "userVerification": "discouraged" } return jsonify( { "error": False, "data": assertion_dict } ) @login_view.route( "/webauthn/verify_assertion", methods = [ "POST" ] ) def webauthn_verify_assertion(): """ Check the signed challenge provided to the user for the login process. """ current_app.logger.info( "Webauthn start assertion verification" ) challenge = session.get( "challenge" ) assertion_response = request.form credential_id = assertion_response.get( "id" ) current_app.logger.debug( "Used key: '{}'".format( credential_id ) ) user = config.db.query_fetchone( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) ) for key in [ "sign_count", "created_on", "last_usage", "usage_counter" ]: current_app.logger.debug( "key {}: {}".format( key, user[key] ) ) webauthn_user = webauthn.WebAuthnUser( None, session[ "username" ], None, None, user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], config.RP_ID ) webauthn_assertion_response = webauthn.WebAuthnAssertionResponse( webauthn_user, assertion_response, challenge, config.ORIGIN, uv_required = False ) try: sign_count = webauthn_assertion_response.verify() current_app.logger.info( "Webauthn key verified" ) except Exception as e: current_app.logger.error( "Webauthn assertion failed" ) return jsonify( { "error": True, "message": "Assertion failed. Error: {}".format( e ) } ) else: current_app.logger.debug( "Update key usage in the database" ) dt = datetime.now( pytz.timezone( "Europe/Zurich" ) ) q = config.db.query( "UPDATE webauthn SET sign_count = %s, last_usage = %s, usage_counter = usage_counter + 1 WHERE credential_id = %s", ( sign_count, dt, credential_id, ) ) config.db.commit() session[ "need_to_check" ].remove( "securitykey" ) do_login() return jsonify( { "error": False } )