Skip to content
__init__.py 9.77 KiB
Newer Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-

from flask import Blueprint
from flask import current_app, request, jsonify, session, url_for

import base64
import json
import webauthn
from datetime import datetime
from email.mime.text import MIMEText
from uuid import uuid4

import utils
from utils.decorator import admin_required
from utils.template import my_render_template

from functions import mySMTP

import config

newuser_view = Blueprint( "newuser", __name__, template_folder = "template" )

@newuser_view.route( config.baseurl + "/signin" )
def new_user():
    """
        Serve the page to register to ICNML.
    """
    current_app.logger.info( "New user registration page open" )
    
    r = config.db.query_fetchall( "SELECT id, name FROM account_type WHERE can_singin = true" )
    
    account_type = []
    for rr in r:
        account_type.append( dict( rr ) )
    
    return my_render_template( 
        "signin.html",
        list_account_type = account_type
    )

@newuser_view.route( config.baseurl + "/do/signin", methods = [ "POST" ] )
def add_account_request_to_db():
    """
        Add the new user request to the database.
    """
    current_app.logger.info( "Registrer new user request to the database" )
    
    try:
        first_name = request.form[ "first_name" ]
        last_name = request.form[ "last_name" ]
        email = request.form[ "email" ]
        account_type = int( request.form[ "account_type" ] )
        uuid = str( uuid4() )
        
        email = email.lower()
        
        current_app.logger.debug( "First name: {}".format( first_name ) )
        current_app.logger.debug( "Last name:  {}".format( last_name ) )
        current_app.logger.debug( "Email:      {}".format( email ) )
        current_app.logger.debug( "Account:    {}".format( account_type ) )
        
        sql = "SELECT name FROM account_type WHERE id = %s"
        account_type_name = config.db.query_fetchone( sql, ( account_type, ) )[ "name" ]
        account_type_name = account_type_name.lower()

        sql = "SELECT nextval( 'username_{}_seq' ) as id".format( account_type_name ) 
        username_id = config.db.query_fetchone( sql )[ "id" ]
        
        current_app.logger.debug( "Username id:{}".format( username_id ) )
        
        config.db.query( 
            utils.sql.sql_insert_generate( "signin_requests", [ "first_name", "last_name", "email", "account_type", "uuid", "username_id" ] ),
            ( first_name, last_name, email, account_type, uuid, username_id, )
        )
        config.db.commit()
        
        return jsonify( {
            "error": False,
            "uuid": uuid
        } )
        
    except:
        current_app.logger.error( "New user request database error" )
        
        return jsonify( {
            "error": True
        } )

@newuser_view.route( config.baseurl + "/validate_signin" )
@admin_required
def validate_signin():
    """
        Serve the page to admins regarding the validation of new users.
    """
    current_app.logger.info( "Serve the signin valiation page" )
    
    q = config.db.query( """
        SELECT signin_requests.*, account_type.name as account_type 
        FROM signin_requests
        LEFT JOIN account_type ON signin_requests.account_type = account_type.id
        WHERE signin_requests.status = 'pending'
    """ )
    
    users = []
    
    try:
        r = q.fetchall()
        for rr in r:
            users.append( dict( rr ) )
            current_app.logger.debug( "Add '{}' to the validation list".format( rr[ "email" ] ) )
    
    except:
        pass
    
    current_app.logger.info( "{} users added to the validation list".format( len( users ) ) )
    
    return my_render_template( 
        "validate_signin.html",
        users = users
    )

@newuser_view.route( config.baseurl + "/do/validate_signin", methods = [ "POST" ] )
@admin_required
def do_validate_signin():
    """
        Prepare the new user data to be signed by the admin, and serve the page.
    """
    request_id = request.form.get( "id" )
    
    current_app.logger.info( "Signin validation begin for request_id '{}'".format( request_id ) )
    
    s = config.db.query_fetchone( "SELECT * FROM signin_requests WHERE id = %s", ( request_id, ) )
    s = dict( s )
    
    r = {}
    
    r[ "user" ] = s
    r[ "user" ][ "request_time" ] = str( r[ "user" ][ "request_time" ] )
    r[ "user" ][ "validation_time" ] = str( r[ "user" ][ "validation_time" ] )
    
    r[ "acceptance" ] = {}
    r[ "acceptance" ][ "username" ] = session[ "username" ]
    r[ "acceptance" ][ "time" ] = str( datetime.now() )
    
    j = json.dumps( r )
    challenge = base64.b64encode( j )
    challenge = challenge.replace( "=", "" )
    session[ "validation_user_challenge" ] = challenge
    
    ############################################################################
    
    user_id = session[ "user_id" ]
    
    key = config.db.query_fetchone( "SELECT * FROM webauthn WHERE user_id = %s AND usage_counter > 0 ORDER BY last_usage DESC LIMIT 1", ( user_id, ) )
    
    webauthn_user = webauthn.WebAuthnUser( 
        key[ "ukey" ], session[ "username" ], session[ "username" ], None,
        key[ "credential_id" ], key[ "pub_key" ], key[ "sign_count" ], config.RP_ID
    )
    webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge )
    
    return jsonify( {
        "error": False,
        "data": webauthn_assertion_options.assertion_dict
    } )

@newuser_view.route( config.baseurl + "/do/validate_signin_2", methods = [ "POST" ] )
@admin_required
def do_validate_signin_2():
    """
        Verification of the signature of the new user data by the admin.
    """
    current_app.logger.info( "Verification of the validated new user" )
    
    challenge = session.get( "validation_user_challenge" )
    assertion_response = request.form
    assertion_response_s = base64.b64encode( json.dumps( assertion_response ) )
    credential_id = assertion_response.get( "id" )
    
    user = config.db.query_fetchone( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) )
    
    webauthn_user = webauthn.WebAuthnUser( 
        user[ "ukey" ], session[ "username" ], session[ "username" ], None,
        user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], "icnml.unil.ch"
    )

    webauthn_assertion_response = webauthn.WebAuthnAssertionResponse( 
        webauthn_user,
        assertion_response,
        challenge,
        config.ORIGIN,
        uv_required = False
    )
    
    try:
        webauthn_assertion_response.verify()
        current_app.logger.info( "Webauthn assertion verified" )
        
    except Exception as e:
        return jsonify( {
            "error": True,
            "message": "Assertion failed. Error: {}".format( e )
        } )
    
    ############################################################################
    
    if len( challenge ) % 4 != 0:
        challenge += "=" * ( 4 - ( len( challenge ) % 4 ) )
    
    newuser = base64.b64decode( challenge )
    newuser = json.loads( newuser )
    user_type = int( newuser[ "user" ][ "account_type" ] )
    email = newuser[ "user" ][ "email" ]
    email_hash = utils.hash.pbkdf2( email, utils.rand.random_data( config.EMAIL_SALT_LENGTH ), config.EMAIL_NB_ITERATIONS ).hash()
    request_id = newuser[ "user" ][ "id" ]
    username_id = newuser[ "user" ][ "username_id" ]
    
    n = config.db.query_fetchone( "SELECT name FROM account_type WHERE id = %s", ( user_type, ) )[ "name" ]
    
    username = "{}_{}".format( n, username_id )
    username = username.lower()
    
    current_app.logger.info( "Creation of the new user '{}'".format( username ) )
    
    try:
        config.db.query( "UPDATE signin_requests SET validation_time = now(), assertion_response = %s, status = 'validated' WHERE id = %s", ( assertion_response_s, request_id ) )
        config.db.query( utils.sql.sql_insert_generate( "users", [ "username", "email", "type" ] ), ( username, email_hash, user_type ) )
        config.db.commit()
        
        current_app.logger.debug( "User '{}' created".format( username ) )
    
    except:
        current_app.logger.error( "Error while creating the user account for '{}'".format( username ) )
        
        return jsonify( {
            "error": True,
            "message": "Can not insert into database."
        } )
    
    ############################################################################
    
    try:
        email_content = utils.template.render_jinja_html( 
            "templates/email", "signin.html",
            username = username,
            url = "https://icnml.unil.ch" + url_for( "config_new_user", uuid = newuser[ "user" ][ "uuid" ] )
        )
        
        msg = MIMEText( email_content, "html" )
        
        msg[ "Subject" ] = "ICNML - Login information"
        msg[ "From" ] = config.sender
        msg[ "To" ] = email
        
        current_app.logger.info( "Sending the email to the user '{}'".format( username ) )
        
        with mySMTP() as s:
            s.sendmail( config.sender, [ email ], msg.as_string() )
        
        return jsonify( {
            "error": False
        } )
    
    except:
        return jsonify( {
            "error": True,
            "message": "Error while sending the email"
        } )

@newuser_view.route( config.baseurl + "/do/validation_reject", methods = [ "POST" ] )
def do_validation_reject():
    """
        Reject the request for a new user.
    """
    request_id = request.form.get( "id" )
    
    current_app.logger.info( "Reject the new user request {}".format( request_id ) )
    
    try:
        sql = "UPDATE signin_requests SET status = 'rejected' WHERE id = %s"
        config.db.query( sql, ( request_id, ) )
        config.db.commit()
        
        return jsonify( {
            "error": False
        } )
    
    except:
        return jsonify( {
            "error": True
        } )