#!/usr/bin/python # -*- coding: UTF-8 -*- from flask import Blueprint from flask import current_app, request, jsonify, session, url_for import base64 import json import webauthn from datetime import datetime from email.mime.text import MIMEText from uuid import uuid4 import utils from utils.decorator import admin_required from utils.template import my_render_template from functions import mySMTP import config newuser_view = Blueprint( "newuser", __name__, template_folder = "template" ) @newuser_view.route( config.baseurl + "/signin" ) def new_user(): """ Serve the page to register to ICNML. """ current_app.logger.info( "New user registration page open" ) r = config.db.query_fetchall( "SELECT id, name FROM account_type WHERE can_singin = true" ) account_type = [] for rr in r: account_type.append( dict( rr ) ) return my_render_template( "signin.html", list_account_type = account_type ) @newuser_view.route( config.baseurl + "/do/signin", methods = [ "POST" ] ) def add_account_request_to_db(): """ Add the new user request to the database. """ current_app.logger.info( "Registrer new user request to the database" ) try: first_name = request.form[ "first_name" ] last_name = request.form[ "last_name" ] email = request.form[ "email" ] account_type = int( request.form[ "account_type" ] ) uuid = str( uuid4() ) email = email.lower() current_app.logger.debug( "First name: {}".format( first_name ) ) current_app.logger.debug( "Last name: {}".format( last_name ) ) current_app.logger.debug( "Email: {}".format( email ) ) current_app.logger.debug( "Account: {}".format( account_type ) ) sql = "SELECT name FROM account_type WHERE id = %s" account_type_name = config.db.query_fetchone( sql, ( account_type, ) )[ "name" ] account_type_name = account_type_name.lower() sql = "SELECT nextval( 'username_{}_seq' ) as id".format( account_type_name ) username_id = config.db.query_fetchone( sql )[ "id" ] current_app.logger.debug( "Username id:{}".format( username_id ) ) config.db.query( utils.sql.sql_insert_generate( "signin_requests", [ "first_name", "last_name", "email", "account_type", "uuid", "username_id" ] ), ( first_name, last_name, email, account_type, uuid, username_id, ) ) config.db.commit() return jsonify( { "error": False, "uuid": uuid } ) except: current_app.logger.error( "New user request database error" ) return jsonify( { "error": True } ) @newuser_view.route( config.baseurl + "/validate_signin" ) @admin_required def validate_signin(): """ Serve the page to admins regarding the validation of new users. """ current_app.logger.info( "Serve the signin valiation page" ) q = config.db.query( """ SELECT signin_requests.*, account_type.name as account_type FROM signin_requests LEFT JOIN account_type ON signin_requests.account_type = account_type.id WHERE signin_requests.status = 'pending' """ ) users = [] try: r = q.fetchall() for rr in r: users.append( dict( rr ) ) current_app.logger.debug( "Add '{}' to the validation list".format( rr[ "email" ] ) ) except: pass current_app.logger.info( "{} users added to the validation list".format( len( users ) ) ) return my_render_template( "validate_signin.html", users = users ) @newuser_view.route( config.baseurl + "/do/validate_signin", methods = [ "POST" ] ) @admin_required def do_validate_signin(): """ Prepare the new user data to be signed by the admin, and serve the page. """ request_id = request.form.get( "id" ) current_app.logger.info( "Signin validation begin for request_id '{}'".format( request_id ) ) s = config.db.query_fetchone( "SELECT * FROM signin_requests WHERE id = %s", ( request_id, ) ) s = dict( s ) r = {} r[ "user" ] = s r[ "user" ][ "request_time" ] = str( r[ "user" ][ "request_time" ] ) r[ "user" ][ "validation_time" ] = str( r[ "user" ][ "validation_time" ] ) r[ "acceptance" ] = {} r[ "acceptance" ][ "username" ] = session[ "username" ] r[ "acceptance" ][ "time" ] = str( datetime.now() ) j = json.dumps( r ) challenge = base64.b64encode( j ) challenge = challenge.replace( "=", "" ) session[ "validation_user_challenge" ] = challenge ############################################################################ user_id = session[ "user_id" ] key = config.db.query_fetchone( "SELECT * FROM webauthn WHERE user_id = %s AND usage_counter > 0 ORDER BY last_usage DESC LIMIT 1", ( user_id, ) ) webauthn_user = webauthn.WebAuthnUser( key[ "ukey" ], session[ "username" ], session[ "username" ], None, key[ "credential_id" ], key[ "pub_key" ], key[ "sign_count" ], config.RP_ID ) webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge ) return jsonify( { "error": False, "data": webauthn_assertion_options.assertion_dict } ) @newuser_view.route( config.baseurl + "/do/validate_signin_2", methods = [ "POST" ] ) @admin_required def do_validate_signin_2(): """ Verification of the signature of the new user data by the admin. """ current_app.logger.info( "Verification of the validated new user" ) challenge = session.get( "validation_user_challenge" ) assertion_response = request.form assertion_response_s = base64.b64encode( json.dumps( assertion_response ) ) credential_id = assertion_response.get( "id" ) user = config.db.query_fetchone( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) ) webauthn_user = webauthn.WebAuthnUser( user[ "ukey" ], session[ "username" ], session[ "username" ], None, user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], "icnml.unil.ch" ) webauthn_assertion_response = webauthn.WebAuthnAssertionResponse( webauthn_user, assertion_response, challenge, config.ORIGIN, uv_required = False ) try: webauthn_assertion_response.verify() current_app.logger.info( "Webauthn assertion verified" ) except Exception as e: return jsonify( { "error": True, "message": "Assertion failed. Error: {}".format( e ) } ) ############################################################################ if len( challenge ) % 4 != 0: challenge += "=" * ( 4 - ( len( challenge ) % 4 ) ) newuser = base64.b64decode( challenge ) newuser = json.loads( newuser ) user_type = int( newuser[ "user" ][ "account_type" ] ) email = newuser[ "user" ][ "email" ] email_hash = utils.hash.pbkdf2( email, utils.rand.random_data( config.EMAIL_SALT_LENGTH ), config.EMAIL_NB_ITERATIONS ).hash() request_id = newuser[ "user" ][ "id" ] username_id = newuser[ "user" ][ "username_id" ] n = config.db.query_fetchone( "SELECT name FROM account_type WHERE id = %s", ( user_type, ) )[ "name" ] username = "{}_{}".format( n, username_id ) username = username.lower() current_app.logger.info( "Creation of the new user '{}'".format( username ) ) try: config.db.query( "UPDATE signin_requests SET validation_time = now(), assertion_response = %s, status = 'validated' WHERE id = %s", ( assertion_response_s, request_id ) ) config.db.query( utils.sql.sql_insert_generate( "users", [ "username", "email", "type" ] ), ( username, email_hash, user_type ) ) config.db.commit() current_app.logger.debug( "User '{}' created".format( username ) ) except: current_app.logger.error( "Error while creating the user account for '{}'".format( username ) ) return jsonify( { "error": True, "message": "Can not insert into database." } ) ############################################################################ try: email_content = utils.template.render_jinja_html( "templates/email", "signin.html", username = username, url = "https://icnml.unil.ch" + url_for( "config_new_user", uuid = newuser[ "user" ][ "uuid" ] ) ) msg = MIMEText( email_content, "html" ) msg[ "Subject" ] = "ICNML - Login information" msg[ "From" ] = config.sender msg[ "To" ] = email current_app.logger.info( "Sending the email to the user '{}'".format( username ) ) with mySMTP() as s: s.sendmail( config.sender, [ email ], msg.as_string() ) return jsonify( { "error": False } ) except: return jsonify( { "error": True, "message": "Error while sending the email" } ) @newuser_view.route( config.baseurl + "/do/validation_reject", methods = [ "POST" ] ) def do_validation_reject(): """ Reject the request for a new user. """ request_id = request.form.get( "id" ) current_app.logger.info( "Reject the new user request {}".format( request_id ) ) try: sql = "UPDATE signin_requests SET status = 'rejected' WHERE id = %s" config.db.query( sql, ( request_id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } )