#!/usr/bin/python # -*- coding: UTF-8 -*- from cStringIO import StringIO from datetime import datetime, timedelta from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from threading import Thread from uuid import uuid4 import base64 import cPickle import functools import hashlib import json import os import smtplib from PIL import Image, ImageDraw, ImageFont from flask import Flask from flask import jsonify from flask import render_template, send_from_directory from flask import request from flask import send_file from flask import session from flask import url_for from flask_compress import Compress from flask_session import Session from werkzeug import abort, redirect import gnupg import pyotp import pytz import time import qrcode import webauthn from NIST.fingerprint import NISTf_auto from PiAnoS import caseExistsInDB from const import pfsp from functions import float_or_null from functions import pbkdf2, AESCipher from functions import pil2buffer from functions import random_data from functions import render_jinja_html from functions import rotate_image_upon_exif from functions import sql_insert_generate import config ################################################################################ from version import __version__, __branch__, __commit__, __commiturl__, __treeurl__ ################################################################################ Image.MAX_IMAGE_PIXELS = 1 * 1024 * 1024 * 1024 ################################################################################ app = Flask( __name__ ) app.config.from_pyfile( "config.py" ) Compress( app ) Session( app ) baseurl = os.environ.get( "BASEURL", "" ) envtype = os.environ.get( "ENVTYPE", "" ) ################################################################################ gnupg._parsers.Verify.TRUST_LEVELS[ "ENCRYPTION_COMPLIANCE_MODE" ] = 23 ################################################################################ # Decorators def session_field_required( field, value ): def decorator( func ): @functools.wraps( func ) def wrapper_login_required( *args, **kwargs ): if not field in session: return redirect( url_for( "login" ) ) elif not session.get( field ) == value: return redirect( url_for( "login" ) ) return func( *args, **kwargs ) return wrapper_login_required return decorator def login_required( func ): @functools.wraps( func ) def wrapper_login_required( *args, **kwargs ): if not session.get( "logged", False ) : return redirect( url_for( "login" ) ) return func( *args, **kwargs ) return wrapper_login_required def referer_required( func ): @functools.wraps( func ) def wrapper_login_required( *args, **kwargs ): if not request.headers.get( "Referer", False ): return "referrer needed", 404 return func( *args, **kwargs ) return wrapper_login_required def admin_required( func ): @functools.wraps( func ) def wrapper_login_required( *args, **kwargs ): if not session.get( "logged", False ) or not session.get( "account_type", None ) == 1: return redirect( url_for( "login" ) ) return func( *args, **kwargs ) return wrapper_login_required def submitter_required( func ): @functools.wraps( func ) def wrapper_login_required( *args, **kwargs ): if not session.get( "logged", False ) or not session.get( "account_type", None ) == 3: return redirect( url_for( "login" ) ) return func( *args, **kwargs ) return wrapper_login_required def redis_cache( ttl = 3600 ): def decorator( func ): @functools.wraps( func ) def wrapper_cache( *args, **kwargs ): lst = [] lst.append( func.__name__ ) lst.extend( args ) index = "_".join( lst ) index = hashlib.sha256( index ).hexdigest() d = config.redis_shared.get( index ) if d != None: buff = StringIO() buff.write( base64.b64decode( d ) ) buff.seek( 0 ) return cPickle.load( buff ) else: d = func( *args, **kwargs ) buff = StringIO() cPickle.dump( d, buff ) buff.seek( 0 ) d_cached = base64.b64encode( buff.getvalue() ) config.redis_shared.set( index, d_cached, ex = ttl ) return d return wrapper_cache return decorator ################################################################################ # Generic routing @app.route( "/ping" ) @app.route( baseurl + "/ping" ) def ping(): """ Ping function to check if the web application is healthy. This function need to check all important element of the web application, i.e. the flask app and the postgresql database. If the application report a error 500, the healthcheck done by the container orchestrator will fail, hence re-scheduling the container as needed. """ if not config.db.check(): config.db.connect() # Try to re-connect to the database to try to serve the requests before the rescheduling return abort( 500 ) else: return "pong" @app.route( baseurl + "/version" ) def version(): """ Function to report the version of the web app. The version.py file is re-generated by the CI/CD for production. """ try: return jsonify( { "error": False, "version": __version__, "branch": __branch__, "commit": __commit__, "commiturl": __commiturl__, "treeurl": __treeurl__ } ) except: return jsonify( { "error": True } ) ################################################################################ # CDN serving @app.route( baseurl + "/cdn/" ) def send_cdn_files( subpath ): """ Serve the files from the cdn directory. """ return send_from_directory( "cdn", subpath ) ################################################################################ # App serving @app.route( baseurl + "/app/" ) def send_app_files( subpath ): """ Serve the file from the app directory (all files related to the ICNML application). """ return send_from_directory( "app", subpath ) @app.route( baseurl + "/static/" ) def send_static_files( subpath ): """ Serve static files from the static directory. """ return send_from_directory( "static", subpath ) ################################################################################ # Sessions @app.before_request def renew_session(): """ Reset the timer before the automatic-logout. This function is called before every HTTP call. """ session.permanent = True app.permanent_session_lifetime = timedelta( seconds = config.session_timeout ) @app.route( baseurl + "/is_logged" ) def is_logged(): """ App route to know if the user is logged in the ICNML main application. This route is used by nginx to protect some other locations, for example the PiAnoS dedicated pages. The session countdown timer is resetted to allow the user to use the protected location for the rest of the timeout. """ if session.get( "logged", False ): return "ok" else: return abort( 403 ) @app.route( baseurl + "/logout" ) def logout(): """ Logout the user, clear the session and redirect to the login page. """ session_clear_and_prepare() return redirect( url_for( "home" ) ) def session_clear_and_prepare(): """ Clear the session related to the user and initialize the login related variables. """ session.clear() session[ "process" ] = "login" session[ "need_to_check" ] = [ "password" ] session[ "logged" ] = False session[ "session_security_key" ] = str( uuid4() ) @app.route( baseurl + "/login" ) def login(): """ Route to serve the login.html page. """ session_clear_and_prepare() return render_template( "login.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/do/login", methods = [ "POST" ] ) def do_login(): """ Function to manadge the login workflow and check the username, password and TOTP data. This function is called multiple times because the check is done only for one data type at the time. If all the checks are OK, the user has provided all needed information, hence is logged in. """ #TODO: double-check this function with someone external #TODO: combine the security key checks in this function need_to_check = session.get( "need_to_check", [ "password" ] ) try: current_check = need_to_check[ 0 ] except: current_check = None session[ "need_to_check" ] = need_to_check ############################################################################ # Username and password check if current_check == "password": q = config.db.query( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) ) user = q.fetchone() if user == None: session_clear_and_prepare() return jsonify( { "error": False, "logged": False } ) form_password = request.form.get( "password", None ) if form_password == None or not pbkdf2( form_password, user[ "password" ] ).verify(): session_clear_and_prepare() return jsonify( { "error": False, "logged": False, } ) elif not user[ "active" ]: session_clear_and_prepare() return jsonify( { "error": False, "logged": False, "message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)." } ) else: session[ "username" ] = user[ "username" ] session[ "user_id" ] = user[ "id" ] session[ "password_check" ] = True session[ "need_to_check" ].remove( current_check ) session[ "password" ] = pbkdf2( form_password, "AES256", config.PASSWORD_NB_ITERATIONS ).hash() sql = "SELECT count( * ) FROM webauthn WHERE user_id = %s AND active = TRUE" security_keys_count = config.db.query_fetchone( sql, ( user[ "id" ], ) )[ "count" ] if security_keys_count > 0: session[ "need_to_check" ].append( "securitykey" ) elif user[ "totp" ]: session[ "need_to_check" ].append( "totp" ) else: session_clear_and_prepare() return jsonify( { "error": False, "logged": False, "message": "Second factor missing. Contact the ICNML administrator (icnml@unil.ch)." } ) ############################################################################ # Time-based One Time Password check elif current_check == "totp": q = config.db.query( "SELECT username, totp FROM users WHERE username = %s", ( session[ "username" ], ) ) user = q.fetchone() if not pyotp.TOTP( user[ "totp" ] ).verify( request.form[ "totp" ], valid_window = 2 ): session[ "logged" ] = False return jsonify( { "error": False, "logged": False, "message": "Wrong TOTP", "time": time.time() } ) else: session[ "need_to_check" ].remove( current_check ) ############################################################################ # Check if all the data has been provided; login if ok if len( session[ "need_to_check" ] ) == 0 and session.get( "password_check", False ): for key in [ "process", "need_to_check", "password_check" ]: if key in session: session.pop( key ) session[ "logged" ] = True q = config.db.query( "SELECT type FROM users WHERE username = %s", ( session[ "username" ], ) ) user = q.fetchone() session[ "account_type" ] = int( user[ "type" ] ) return jsonify( { "error": False, "logged": True, } ) else: return jsonify( { "error": False, "next_step": session[ "need_to_check" ][ 0 ] } ) ################################################################################ # Reset @app.route( baseurl + "/reset_password" ) def password_reset(): """ Serve the password_reset.html page. """ session.clear() session[ "process" ] = "request_password_reset" return render_template( "users/password_reset.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, envtype = envtype ) @app.route( baseurl + "/do/reset_password", methods = [ "POST" ] ) def do_password_reset(): """ Start the check of the username for a password reset. The check is done in a thread to allow a fast "OK" response, even if the username does not exists. This prevent data extraction (presence or not of a username). """ email = request.form.get( "email", None ) Thread( target = do_password_reset_thread, args = ( email, ) ).start() return jsonify( { "error": False, "message": "OK" } ) def do_password_reset_thread( email ): """ Search in the database if the provided email is present, and send the password reset email if so. """ if email == None: return False else: q = config.db.query( "SELECT id, username, email FROM users" ) users = q.fetchall() for user in users: if not user[ "email" ].startswith( "pbkdf2$" ): continue elif pbkdf2( email, user[ "email" ] ).verify(): id = hashlib.sha512( random_data( 100 ) ).hexdigest() #################################################################### data = { "process": "password_reset", "process_id": id, "user_id": user[ "id" ] } data = json.dumps( data ) data = base64.b64encode( data ) reset_id = "reset_{}".format( id ) config.redis_shared.set( reset_id, data, ex = 24 * 3600 ) #################################################################### with app.app_context(), app.test_request_context(): url = config.domain + url_for( "password_reset_stage2", id = id ) email_content = render_jinja_html( "templates/email", "reset.html", url = url, username = user[ "username" ] ) msg = MIMEText( email_content, "html" ) msg[ "Subject" ] = "ICNML - User password reset" msg[ "From" ] = config.sender msg[ "To" ] = email s = smtplib.SMTP( config.smtpserver ) s.sendmail( config.sender, [ email ], msg.as_string() ) s.quit() @app.route( baseurl + "/reset_password_stage2/", methods = [ "GET", "POST" ] ) def password_reset_stage2( id ): """ Serve the reset password, second stage (password edit fields) page, and set the data in the database if provided. """ #TODO: double-check this function with someone external reset_id = "reset_{}".format( id ) data = config.redis_shared.get( reset_id ) if data != None: data = base64.b64decode( data ) data = json.loads( data ) password = request.form.get( "password", None ) userid = data.get( "user_id", None ) if password != None: password = pbkdf2( password, random_data( 50 ), config.EMAIL_NB_ITERATIONS ).hash() config.db.query( "UPDATE users SET password = %s WHERE id = %s", ( password, userid ) ) config.db.commit() reset_id = "reset_{}".format( id ) config.redis_shared.delete( reset_id ) return jsonify( { "error": False, "password_updated": True } ) else: return render_template( "users/password_reset_stage2.html", baseurl = baseurl, id = id, js = config.cdnjs, css = config.cdncss, envtype = envtype ) else: return jsonify( { "error": True, "message": "Reset procedure not found/expired" } ) ################################################################################ # webauthn keys @app.route( baseurl + "/webauthn/admin" ) @login_required def webauthn_admin(): """ Serve the administartion page for the FIDO2 keys. """ return render_template( "webauthn/admin.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, account_type = session.get( "account_type", None ), keys = do_webauthn_get_list_of_keys( all = True ), envtype = envtype ) def do_webauthn_get_list_of_keys( uid = None, all = False ): """ Get the list of keys for a particular user. Can be filtered by active keys only with the `all` parameter. If the user id (uid) variable is not passed in parameter, the id of the currently logged user will be used (via the session). """ user_id = session.get( "user_id", uid ) sql = """ SELECT id, key_name as name, created_on, last_usage, usage_counter, active FROM webauthn WHERE user_id = %s """ if not all: sql += " AND active = true" sql += " ORDER BY key_name ASC" q = config.db.query( sql, ( user_id, ) ) keys = q.fetchall() data = [] for key in keys: data.append( dict( key ) ) return data @app.route( baseurl + "/webauthn/begin_activate", methods = [ "POST" ] ) @login_required def webauthn_begin_activate(): """ Start the registering process for a new security key. The json returned will be used by the javascript navigator.credentials.create() function. """ session[ "key_name" ] = request.form.get( "key_name", None ) username = session.get( "username" ) challenge = pyotp.random_base32( 64 ) ukey = pyotp.random_base32( 64 ) session[ "challenge" ] = challenge session[ "register_ukey" ] = ukey make_credential_options = webauthn.WebAuthnMakeCredentialOptions( challenge, config.rp_name, config.RP_ID, ukey, username, username, None ) registration_dict = make_credential_options.registration_dict registration_dict[ "authenticatorSelection" ] = { "authenticatorAttachment": "cross-platform", "requireResidentKey": False, "userVerification": "discouraged" } return jsonify( registration_dict ) @app.route( baseurl + "/webauthn/verify", methods = [ "POST" ] ) @login_required def webauthn_verify(): """ Verify the data produced by the security key while registring (with the navigator.credentials.create() function). """ challenge = session[ "challenge" ] user_id = session[ "user_id" ] key_name = session.get( "key_name", None ) ukey = session[ "register_ukey" ] webauthn_registration_response = webauthn.WebAuthnRegistrationResponse( config.RP_ID, config.ORIGIN, request.form, challenge ) try: webauthn_credential = webauthn_registration_response.verify() except Exception as e: return jsonify( { "error": True, "message": "Registration failed. Error: {}".format( e ) } ) try: config.db.query( sql_insert_generate( "webauthn", [ "user_id", "key_name", "ukey", "credential_id", "pub_key", "sign_count" ] ), ( user_id, key_name, ukey, webauthn_credential.credential_id, webauthn_credential.public_key, webauthn_credential.sign_count, ) ) config.db.commit() return jsonify( { "success": "User successfully registered." } ) except: return jsonify( { "error": True, "message": "Database error" } ) ################################################################################ @app.route( baseurl + "/webauthn/delete", methods = [ "POST" ] ) @login_required def webauthn_delete_key(): """ Delete a key based upon the key id and name for the currently logged user. """ key_id = request.form.get( "key_id", False ) userid = session[ "user_id" ] try: config.db.query( "DELETE FROM webauthn WHERE id = %s AND user_id = %s", ( key_id, userid, ) ) config.db.commit() return jsonify( { "error": False } ) except Exception as e: return jsonify( { "error": True } ); @app.route( baseurl + "/webauthn/disable", methods = [ "POST" ] ) @login_required def webauthn_disable_key(): """ Disable a particular security key for the current user. """ key_id = request.form.get( "key_id", False ) userid = session[ "user_id" ] try: config.db.query( "UPDATE webauthn SET active = False WHERE id = %s AND user_id = %s", ( key_id, userid, ) ) config.db.commit() return jsonify( { "error": False } ) except Exception as e: return jsonify( { "error": True } ); @app.route( baseurl + "/webauthn/enable", methods = [ "POST" ] ) @login_required def webauthn_enable_key(): """ Activation of a security key for the current user. """ key_id = request.form.get( "key_id", False ) userid = session[ "user_id" ] try: config.db.query( "UPDATE webauthn SET active = True WHERE id = %s AND user_id = %s", ( key_id, userid, ) ) config.db.commit() return jsonify( { "error": False } ) except Exception as e: return jsonify( { "error": True } ); @app.route( baseurl + "/webauthn/rename", methods = [ "POST" ] ) @login_required def webauthn_rename_key(): """ Rename a security key for the current user. """ key_id = request.form.get( "key_id", False ) key_name = request.form.get( "key_name", False ) userid = session[ "user_id" ] try: config.db.query( "UPDATE webauthn SET key_name = %s WHERE id = %s AND user_id = %s", ( key_name, key_id, userid, ) ) config.db.commit() return jsonify( { "error": False } ) except Exception as e: return jsonify( { "error": True } ); @app.route( baseurl + "/webauthn/begin_assertion" ) def webauthn_begin_assertion(): """ Get the data to start the login process with all actives keys for a user. """ user_id = session.get( "user_id" ) if "challenge" in session: del session[ "challenge" ] challenge = pyotp.random_base32( 64 ) session[ "challenge" ] = challenge q = config.db.query( "SELECT * FROM webauthn WHERE user_id = %s AND active = true", ( user_id, ) ) key_list = q.fetchall() credential_id_list = [] for key in key_list: credential_id_list.append( { "type": "public-key", "id": key[ "credential_id" ], "transports": [ "usb", "nfc", "ble", "internal" ] } ) assertion_dict = { "challenge": challenge, "timeout": 60000, "allowCredentials": credential_id_list, "rpId": config.RP_ID, } return jsonify( { "error": False, "data": assertion_dict } ) @app.route( baseurl + "/webauthn/verify_assertion", methods = [ "POST" ] ) def webauthn_verify_assertion(): """ Check the signed challenge provided to the user for the login process. """ challenge = session.get( "challenge" ) assertion_response = request.form credential_id = assertion_response.get( "id" ) q = config.db.query( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) ) user = q.fetchone() webauthn_user = webauthn.WebAuthnUser( None, session[ "username" ], None, None, user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], config.RP_ID ) webauthn_assertion_response = webauthn.WebAuthnAssertionResponse( webauthn_user, assertion_response, challenge, config.ORIGIN, uv_required = False ) try: sign_count = webauthn_assertion_response.verify() except Exception as e: return jsonify( { "error": True, "message": "Assertion failed. Error: {}".format( e ) } ) else: dt = datetime.now( pytz.timezone( "Europe/Zurich" ) ) q = config.db.query( "UPDATE webauthn SET sign_count = %s, last_usage = %s, usage_counter = usage_counter + 1 WHERE credential_id = %s", ( sign_count, dt, credential_id, ) ) config.db.commit() session[ "need_to_check" ].remove( "securitykey" ) do_login() return jsonify( { "error": False } ) ################################################################################ # New user @app.route( baseurl + "/signin" ) def new_user(): """ Serve the page to register to ICNML. """ q = config.db.query( "SELECT id, name FROM account_type WHERE can_singin = true" ) r = q.fetchall() account_type = [] for rr in r: account_type.append( dict( rr ) ) return render_template( "users/signin.html", baseurl = baseurl, list_account_type = account_type, js = config.cdnjs, css = config.cdncss, envtype = envtype ) @app.route( baseurl + "/do/signin", methods = [ "POST" ] ) def add_account_request_to_db(): """ Add the new user request to the database. """ try: first_name = request.form[ "first_name" ] last_name = request.form[ "last_name" ] email = request.form[ "email" ] account_type = int( request.form[ "account_type" ] ) uuid = str( uuid4() ) sql = "SELECT name FROM account_type WHERE id = %s" account_type_name = config.db.query_fetchone( sql, ( account_type, ) )[ "name" ] account_type_name = account_type_name.lower() sql = "SELECT nextval( 'username_{}_seq' ) as id".format( account_type_name ) username_id = config.db.query_fetchone( sql )[ "id" ] config.db.query( sql_insert_generate( "signin_requests", [ "first_name", "last_name", "email", "account_type", "uuid", "username_id" ] ), ( first_name, last_name, email, account_type, uuid, username_id, ) ) config.db.commit() return jsonify( { "error": False, "uuid": uuid } ) except: return jsonify( { "error": True } ) @app.route( baseurl + "/validate_signin" ) @admin_required def validate_signin(): """ Serve the page to admins regarding the validation of new users. """ q = config.db.query( """ SELECT signin_requests.*, account_type.name as account_type FROM signin_requests LEFT JOIN account_type ON signin_requests.account_type = account_type.id WHERE signin_requests.status = 'pending' """ ) users = [] try: r = q.fetchall() for rr in r: users.append( dict( rr ) ) except: pass return render_template( "users/validate_signin.html", baseurl = baseurl, users = users, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, envtype = envtype ) @app.route( baseurl + "/do/validate_signin", methods = [ "POST" ] ) @admin_required def do_validate_signin(): """ Prepare the new user data to be signed by the admin, and serve the page. """ request_id = request.form.get( "id" ) q = config.db.query( "SELECT * FROM signin_requests WHERE id = %s", ( request_id, ) ) s = q.fetchone() s = dict( s ) r = {} r[ "user" ] = s r[ "user" ][ "request_time" ] = str( r[ "user" ][ "request_time" ] ) r[ "user" ][ "validation_time" ] = str( r[ "user" ][ "validation_time" ] ) r[ "acceptance" ] = {} r[ "acceptance" ][ "username" ] = session[ "username" ] r[ "acceptance" ][ "time" ] = str( datetime.now() ) j = json.dumps( r ) challenge = base64.b64encode( j ) challenge = challenge.replace( "=", "" ) session[ "validation_user_challenge" ] = challenge ############################################################################ user_id = session[ "user_id" ] q = config.db.query( "SELECT * FROM webauthn WHERE user_id = %s AND usage_counter > 0 ORDER BY last_usage DESC LIMIT 1", ( user_id, ) ) key = q.fetchone() webauthn_user = webauthn.WebAuthnUser( key[ "ukey" ], session[ "username" ], session[ "username" ], None, key[ "credential_id" ], key[ "pub_key" ], key[ "sign_count" ], config.RP_ID ) webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge ) return jsonify( { "error": False, "data": webauthn_assertion_options.assertion_dict } ) @app.route( baseurl + "/do/validate_signin_2", methods = [ "POST" ] ) @admin_required def do_validate_signin_2(): """ Verification of the signature of the new user data by the admin. """ challenge = session.get( "validation_user_challenge" ) assertion_response = request.form assertion_response_s = base64.b64encode( json.dumps( assertion_response ) ) credential_id = assertion_response.get( "id" ) q = config.db.query( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) ) user = q.fetchone() webauthn_user = webauthn.WebAuthnUser( user[ "ukey" ], session[ "username" ], session[ "username" ], None, user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], "icnml.unil.ch" ) webauthn_assertion_response = webauthn.WebAuthnAssertionResponse( webauthn_user, assertion_response, challenge, config.ORIGIN, uv_required = False ) try: sign_count = webauthn_assertion_response.verify() except Exception as e: return jsonify( { "error": True, "message": "Assertion failed. Error: {}".format( e ) } ) ############################################################################ if len( challenge ) % 4 != 0: challenge += "=" * ( 4 - ( len( challenge ) % 4 ) ) newuser = base64.b64decode( challenge ) newuser = json.loads( newuser ) user_type = int( newuser[ "user" ][ "account_type" ] ) email = newuser[ "user" ][ "email" ] email_hash = pbkdf2( email, random_data( 100 ), config.EMAIL_NB_ITERATIONS ).hash() request_uuid = newuser[ "user" ][ "uuid" ] request_id = newuser[ "user" ][ "id" ] username_id = newuser[ "user" ][ "username_id" ] n = config.db.query( "SELECT name FROM account_type WHERE id = %s", ( user_type, ) ) n = n.fetchone() n = n[ 0 ] username = "{}_{}".format( n, username_id ) username = username.lower() try: config.db.query( "UPDATE signin_requests SET validation_time = now(), assertion_response = %s, status = 'validated' WHERE id = %s", ( assertion_response_s, request_id ) ) config.db.query( sql_insert_generate( "users", [ "username", "email", "type" ] ), ( username, email_hash, user_type ) ) config.db.commit() except: return jsonify( { "error": True, "message": "Can not insert into database." } ) ############################################################################ email_content = render_jinja_html( "templates/email", "signin.html", username = username, url = "https://icnml.unil.ch" + url_for( "config_new_user", uuid = newuser[ "user" ][ "uuid" ] ) ) msg = MIMEText( email_content, "html" ) msg[ "Subject" ] = "ICNML - Login information" msg[ "From" ] = config.sender msg[ "To" ] = email s = smtplib.SMTP( config.smtpserver ) s.sendmail( config.sender, [ email ], msg.as_string() ) s.quit() ############################################################################ return jsonify( { "error": False } ) @app.route( baseurl + "/do/validation_reject", methods = [ "POST" ] ) def do_validation_reject(): """ Reject the request for a new user. """ request_id = request.form.get( "id" ) try: sql = "UPDATE signin_requests SET status = 'rejected' WHERE id = %s" r = config.db.query( sql, ( request_id, ) ) config.db.commit() return jsonify( { "error": False } ) except Exception as e: return jsonify( { "error": True } ) @app.route( baseurl + "/config/" ) def config_new_user( uuid ): """ Serve the first page to the new user to configure his account. """ session.clear() q = config.db.query( "SELECT email FROM signin_requests WHERE uuid = %s", ( uuid, ) ) r = q.fetchone() try: email = r[ "email" ] session[ "signin_user_validation_email" ] = email session[ "signin_user_validation_uuid" ] = uuid return render_template( "users/config.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, envtype = envtype, next_step = "do_config_new_user" ) except: return redirect( url_for( "home" ) ) @app.route( baseurl + "/do/config", methods = [ "POST" ] ) def do_config_new_user(): """ Save the configuration of the new user to the database. """ email = session[ "signin_user_validation_email" ] uuid = session[ "signin_user_validation_uuid" ] username = request.form.get( "username" ) password = request.form.get( "password" ) session[ "username" ] = username ############################################################################ q = config.db.query( "SELECT count(*) FROM signin_requests WHERE uuid = %s AND email = %s", ( uuid, email, ) ) r = q.fetchone() r = r[ 0 ] if r == 0: return jsonify( { "error": True, "message": "no signin request" } ) q = config.db.query( "SELECT * FROM users WHERE username = %s", ( username, ) ) user = q.fetchone() if user == None: return jsonify( { "error": True, "message": "no user" } ) elif not password.startswith( "pbkdf2" ): return jsonify( { "error": True, "message": "password not in the correct format" } ) elif not pbkdf2( email, user[ "email" ] ).verify(): return jsonify( { "error": True, "message": "email not corresponding to the request form" } ) elif user.get( "password", None ) != None: return jsonify( { "error": True, "message": "password already set" } ) ############################################################################ password = pbkdf2( password, random_data( 65 ), config.PASSWORD_NB_ITERATIONS ).hash() q = config.db.query( "UPDATE users SET password = %s WHERE username = %s", ( password, username, ) ) config.db.commit() session[ "username" ] = username ############################################################################ return jsonify( { "error": False } ) @app.route( baseurl + "/config/donor/" ) def config_new_user_donor( h ): """ Serve the configuration page for a new donor. """ session.clear() sql = "SELECT id, username, email FROM users WHERE type = 2 AND password IS NULL" for r in config.db.query_fetchall( sql ): if h == hashlib.sha512( r[ "email" ] ).hexdigest(): user = r break else: return redirect( url_for( "home" ) ) session[ "email_hash" ] = h session[ "user_id" ] = user[ "id" ] return render_template( "users/config.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, envtype = envtype, next_step = "do_config_new_donor", hash = h ) @app.route( baseurl + "/do/config/donor", methods = [ "POST" ] ) def do_config_new_donor(): """ Save the donor configuration to the database. """ username = request.form.get( "username" ) password = request.form.get( "password" ) password = pbkdf2( password, random_data( 100 ), config.PASSWORD_NB_ITERATIONS ).hash() h = request.form.get( "hash" ) sql = "SELECT id FROM users WHERE username = %s" user_id = config.db.query_fetchone( sql, ( username, ) )[ "id" ] session[ "username" ] = username if session[ "email_hash" ] == h and session[ "user_id" ] == user_id: q = config.db.query( "UPDATE users SET password = %s WHERE username = %s", ( password, username, ) ) config.db.commit() return jsonify( { "error": False } ) else: return jsonify( { "error": True, "message": "Invalid parameters" } ) @app.route( baseurl + "/totp_help" ) def totp_help(): """ Serve the help page for the TOTP. """ return render_template( "totp_help.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, envtype = envtype ) ################################################################################ # QR Code generation def renew_secret(): """ Request a new TOTP secret. """ secret = pyotp.random_base32( 40 ) session[ "secret" ] = secret return secret def get_secret(): """ Retrieve the current secret. """ secret = session.get( "secret", None ) if secret == None: secret = renew_secret() return secret @app.route( baseurl + "/set_secret" ) def set_secret(): """ Set the new secret value for the TOTP in the database. """ config.db.query( "UPDATE users SET totp = %s WHERE username = %s", ( session[ "secret" ], session[ "username" ], ) ) config.db.commit() return jsonify( { "error": False } ) @app.route( baseurl + "/secret" ) def request_secret(): """ Serve the current secret as JSON. """ get_secret() return jsonify( { "error": False, "secret": session[ "secret" ] } ) @app.route( baseurl + "/new_secret" ) @login_required def request_renew_secret(): """ Serve current secret. """ renew_secret() return jsonify( { "error": False, "secret": session[ "secret" ] } ) @app.route( baseurl + "/user/config/totp_qrcode.png" ) def user_totp_qrcode(): """ Generate the TOTP PNG QRcode image ready to scan. """ if "username" in session: qrcode_value = "otpauth://totp/ICNML%20{}?secret={}".format( session[ "username" ], get_secret() ) else: qrcode_value = "otpauth://totp/ICNML?secret={}".format( get_secret() ) img = qrcode.make( qrcode_value ) temp = StringIO() img.save( temp, format = "png" ) temp.seek( 0 ) return send_file( temp, mimetype = "image/png" ) @app.route( baseurl + "/user/config/totp" ) @login_required def user_totp_config(): """ Serve the TOTP configuration page. """ return render_template( "users/totp.html", baseurl = baseurl, secret = get_secret(), js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, envtype = envtype, account_type = session.get( "account_type", None ) ) ################################################################################ # Data decryption encryption_prefix = "icnml$" def do_decrypt( data ): """ Try to decrypt the data stored server-side. This encryption is done with a key derived from the password of the user, only stored in RAM while the user is connected. """ try: data = AESCipher( session[ "password" ] ).decrypt( data ) if data.startswith( encryption_prefix ): return data[ len( encryption_prefix ): ] else: return "-" except: return "-" def do_encrypt( data ): """ Encryption of any data passed in argument with a key derived from the user password. This key is only stored in RAM (in redis) while the user is connected. The sensitive shall be encrypted on the client side first, hence never leaving the computer of the user. """ return AESCipher( session[ "password" ] ).encrypt( encryption_prefix + data ) ################################################################################ # File upload @app.route( baseurl + "/upload", methods = [ "GET", "POST" ] ) @login_required def upload_file(): """ Main function dealing with the upload of files (tenprint, latent and consent forms). This function accept traditionals images and NIST files for the fingerprint data, and PDFs for the consent forms. """ upload_type = request.form.get( "upload_type", None ) file_extension = request.form.get( "extension", None ) if isinstance( file_extension, str ): file_extension = file_extension.lower() if upload_type == None: return jsonify( { "error": True, "msg": "Must specify a file type to upload a file" } ) if request.method == "POST": if "file" not in request.files: return jsonify( { "error": True, "msg": "No file in the POST request" } ) elif "upload_id" not in request.form: return jsonify( { "error": True, "msg": "No upload_id" } ) else: try: upload_uuid = request.form.get( "upload_id" ) sql = "SELECT id FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( upload_uuid, ) ) upload_id = r.fetchone()[ "id" ] except: return jsonify( { "error": True, "msg": "upload not related to a submission form" } ) file = request.files[ "file" ] file_name = do_encrypt( file.filename ) _, file_ext = os.path.splitext( file.filename ) file_uuid = str( uuid4() ) fp = StringIO() file.save( fp ) file_size = fp.tell() fp.seek( 0 ) if file_extension in config.NIST_file_extensions: file_data = fp.getvalue() file_data = base64.b64encode( file_data ) try: n = NISTf_auto( fp ) except: return jsonify( { "error": True, "msg": "Error while loading the NIST file" } ) # Save the NIST file in the DB sql = sql_insert_generate( "files", [ "folder", "creator", "filename", "type", "format", "size", "uuid", "data" ] ) data = ( upload_id, session[ "user_id" ], file_name, 5, "NIST", file_size, file_uuid, file_data, ) config.db.query( sql, data ) # Segmentation of the NIST file fpc_in_file = [] for fpc in config.all_fpc: try: try: img = n.get_print( fpc = fpc ) except: img = n.get_palmar( fpc = fpc ) buff = StringIO() img.save( buff, format = "TIFF" ) buff.seek( 0 ) img_data = buff.getvalue() img_data = base64.b64encode( img_data ) sql = sql_insert_generate( "files_segments", [ "tenprint", "uuid", "pc", "data" ] ) data = ( file_uuid, str( uuid4() ), fpc, img_data, ) config.db.query( sql, data ) fpc_in_file.append( fpc ) except: pass config.db.commit() return jsonify( { "error": False, "fpc": fpc_in_file } ) else: if upload_type in [ "latent_target", "latent_incidental", "tenprint_card_front", "tenprint_card_back" ]: img = Image.open( fp ) img_format = img.format width, height = img.size try: res = int( img.info[ "dpi" ][ 0 ] ) except: return jsonify( { "error": True, "msg": "No resolution found in the image. Upload not possible at the moment." } ) try: img = rotate_image_upon_exif( img ) except: pass buff = StringIO() img.save( buff, format = img_format ) buff.seek( 0 ) file_data = buff.getvalue() if upload_type in [ "tenprint_card_front", "tenprint_card_back" ]: create_thumbnail( file_uuid, img ) else: file_data = fp.getvalue() file_data_r = file_data file_data = base64.b64encode( file_data ) sql = "SELECT id FROM files_type WHERE name = %s" upload_type_id = config.db.query( sql, ( upload_type, ) ).fetchone()[ 0 ] #################################################################### if upload_type == "consent_form": sql = "SELECT email_aes FROM submissions WHERE uuid = %s" email = config.db.query_fetchone( sql, ( upload_uuid, ) )[ "email_aes" ] email = do_decrypt( email ) sql = "SELECT username, email FROM users WHERE type = 2 ORDER BY id DESC" for username_db, email_db in config.db.query_fetchall( sql ): if pbkdf2( email, email_db ).verify(): username = username_db url_hash = hashlib.sha512( email_db ).hexdigest() break else: return jsonify( { "error": True, "message": "user not found" } ) # Email for the donor email_content = render_jinja_html( "templates/email", "donor.html", username = username, url = "https://icnml.unil.ch" + url_for( "config_new_user_donor", h = url_hash ) ) msg = MIMEMultipart() msg[ "Subject" ] = "ICNML - You have been added as donor" msg[ "From" ] = config.sender msg[ "To" ] = email msg.attach( MIMEText( email_content, "html" ) ) part = MIMEApplication( file_data_r, Name = "consent_form.pdf" ) part[ "Content-Disposition" ] = "attachment; filename=consent_form.pdf" msg.attach( part ) try: s = smtplib.SMTP( config.smtpserver ) s.sendmail( config.sender, [ email ], msg.as_string() ) s.quit() except: return jsonify( { "error": True, "message": "Can not send the email to the user" } ) else: # Consent form save file_data = base64.b64encode( file_data ) file_data = gpg.encrypt( file_data, *config.gpg_key ) file_data = str( file_data ) file_data = base64.b64encode( file_data ) sql = sql_insert_generate( "cf", [ "uuid", "data", "email" ] ) data = ( file_uuid, file_data, pbkdf2( email, iterations = 100000 ).hash(), ) config.db.query( sql , data ) sql = "UPDATE submissions SET consent_form = true WHERE uuid = %s" config.db.query( sql, ( upload_uuid, ) ) config.db.commit() else: sql = sql_insert_generate( "files", [ "folder", "creator", "filename", "type", "format", "size", "width", "height", "resolution", "uuid", "data" ] ) data = ( upload_id, session[ "user_id" ], file_name, upload_type_id, img_format, file_size, width, height, res, file_uuid, file_data, ) config.db.query( sql, data ) config.db.commit() return jsonify( { "error": False, "uuid": file_uuid } ) else: return abort( 403 ) ################################################################################ # Submission of a new donor @app.route( baseurl + "/submission/new" ) @submitter_required def submission_new(): """ Serve the page to start a new submission (new donor). """ return render_template( "submission/new.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission/do_new", methods = [ "POST" ] ) @submitter_required def submission_do_new(): """ Check the new donor data, and store the new submission process in the database. """ email = request.form.get( "email", False ) email = email.lower() if email: # Check for duplicate base upon the email data sql = "SELECT id, email_hash FROM submissions WHERE submitter_id = %s" r = config.db.query( sql, ( session[ "user_id" ], ) ) for case in r.fetchall(): if pbkdf2( email, case[ "email_hash" ] ).verify(): return jsonify( { "error": True, "msg": "Email already used" } ) break else: # Insert the new donor id = str( uuid4() ) email_aes = do_encrypt( email ) email_hash = pbkdf2( email, iterations = config.EMAIL_NB_ITERATIONS ).hash() upload_nickname = request.form.get( "upload_nickname", None ) upload_nickname = do_encrypt( upload_nickname ) submitter_id = session[ "user_id" ] status = "pending" sql = sql_insert_generate( "submissions", [ "uuid", "email_aes", "email_hash", "nickname", "status", "submitter_id" ] ) data = ( id, email_aes, email_hash, upload_nickname, status, submitter_id, ) config.db.query( sql, data ) config.db.commit() userid = config.db.query_fetchone( "SELECT nextval( 'username_donor_seq' ) as id" )[ "id" ] username = "donor_{}".format( userid ) sql = sql_insert_generate( "users", [ "username", "email", "type" ] ) data = ( username, email_hash, 2 ) config.db.query( sql, data ) config.db.commit() return jsonify( { "error": False, "id": id } ) else: return jsonify( { "error": True, "msg": "Email not provided" } ) @app.route( baseurl + "/submission//add_files" ) @submitter_required def submission_upload_tplp( id ): """ Serve the page to upload tenprint and latent images files. This page is not accessible if a consent form is not available in the database for this particular donor. """ try: sql = """ SELECT email_aes as email, nickname, created_time, consent_form FROM submissions WHERE submitter_id = %s AND uuid = %s """ r = config.db.query( sql, ( session[ "user_id" ], id ) ) user = r.fetchone() if user[ "consent_form" ]: for key in [ "email", "nickname" ]: user[ key ] = do_decrypt( user[ key ] ) return render_template( "submission/add_files.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, upload_id = id, session_security_key = session.get( "session_security_key" ), envtype = envtype, nist_file_extensions = json.dumps( config.NIST_file_extensions ), **user ) else: return redirect( url_for( "submission_consent_form", id = id ) ) except: return jsonify( { "error": True, "msg": "Case not found" } ) @app.route( baseurl + "/submission//consent_form" ) @submitter_required def submission_consent_form( id ): """ Serve the page to upload the consent form for the user. """ sql = """ SELECT email_aes as email, nickname, created_time FROM submissions WHERE submitter_id = %s AND uuid = %s """ r = config.db.query( sql, ( session[ "user_id" ], id ) ) user = r.fetchone() if user != None: for key in [ "email", "nickname" ]: user[ key ] = do_decrypt( user[ key ] ) return render_template( "submission/consent_form.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, upload_id = id, session_security_key = session.get( "session_security_key" ), envtype = envtype, **user ) else: return abort( 404 ) @app.route( baseurl + "/submission//set/nickname", methods = [ "POST" ] ) @submitter_required def submission_update_nickname( id ): """ Change the nickname of the donor in the database. THIS INFORMATION SHALL BE ENCRYPTED ON THE CLIENT SIDE FIRST WITH A UNIQUE ENCRYPTION KEY NOT TRANSMETTED TO THE SERVER! """ nickname = request.form.get( "nickname", None ) if nickname != None and len( nickname ) != 0: try: nickname = do_encrypt( nickname ) sql = "UPDATE submissions SET nickname = %s WHERE uuid = %s" config.db.query( sql, ( nickname, id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True, "message": "DB error" } ) else: return jsonify( { "error": True, "message": "No new nickname in the POST request" } ) @app.route( baseurl + "/submission/list" ) @submitter_required def submission_list(): """ Get the list of all submissions folder for the currently logged submitter. """ sql = "SELECT * FROM submissions WHERE submitter_id = %s ORDER BY created_time DESC" r = config.db.query( sql, ( session[ "user_id" ], ) ) q = r.fetchall() donors = [] for donor in q: donors.append( { "id": donor.get( "id", None ), "email": do_decrypt( donor.get( "email_aes", None ) ), "nickname": do_decrypt( donor.get( "nickname", None ) ), "uuid": donor.get( "uuid", None ) } ) return render_template( "submission/list.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, donors = donors, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission//latent/list" ) @app.route( baseurl + "/submission//latent/list/" ) @submitter_required def submission_latent_list( id, ltype = "all" ): """ Get the list of latent for a particular submission folder. """ if ltype in [ "target", "incidental", "all" ]: sql = "SELECT id, nickname FROM submissions WHERE uuid = %s AND submitter_id = %s" r = config.db.query( sql, ( id, session[ "user_id" ], ) ) case_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = """ SELECT files.uuid, files.filename, files.size, files.creation_time FROM files LEFT JOIN files_type ON files.type = files_type.id WHERE folder = %s AND """ if ltype == "target": sql += " files_type.name = 'latent_target'" elif ltype == "incidental": sql += " files_type.name = 'latent_incidental'" elif ltype == "all": sql += " ( files_type.name = 'latent_target' OR files_type.name = 'latent_incidental' )" sql += " ORDER BY files.id DESC" r = config.db.query( sql, ( case_id, ) ) files = r.fetchall() for i, v in enumerate( files ): v[ "filename" ] = do_decrypt( v[ "filename" ] ) v[ "size" ] = round( ( float( v[ "size" ] ) / ( 1024 * 1024 ) ) * 100 ) / 100 return render_template( "submission/latent_list.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, submission_id = id, latent_type = ltype, files = files, nickname = nickname, session_security_key = session.get( "session_security_key" ), envtype = envtype ) else: return abort( 403 ) @app.route( baseurl + "/submission//latent/" ) @submitter_required def submission_latent( id, lid ): """ Serve the page to edit a particular latent image. """ sql = "SELECT id, nickname FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) submission_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = """ SELECT files.uuid, files.filename, files.note, files.format, files.resolution, files.width, files.height, files.size, files.creation_time, files.type, files_type.name as file_type FROM files LEFT JOIN files_type ON files.type = files_type.id WHERE folder = %s AND files.uuid = %s """ r = config.db.query( sql, ( submission_id, lid, ) ) file = r.fetchone() file[ "size" ] = round( 100 * float( file[ "size" ] ) / ( 1024 * 1024 ) ) / 100 file[ "filename" ] = do_decrypt( file[ "filename" ] ) file[ "note" ] = do_decrypt( file[ "note" ] ) file[ "file_type" ] = file[ "file_type" ].replace( "latent_", "" ) return render_template( "submission/latent.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, submission_id = id, nickname = nickname, file = file, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission//latent//pfsp" ) @submitter_required def submission_latent_pfsp( id, lid ): """ Serve the page to set the PFSP information (location on the finger or the palm print) for the latent. """ sql = "SELECT id, nickname FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) submission_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = """ SELECT files.uuid, files.filename, files.note, files.format, files.resolution, files.width, files.height, files.size, files.creation_time, files.type, files_type.name as file_type FROM files LEFT JOIN files_type ON files.type = files_type.id WHERE folder = %s AND files.uuid = %s """ r = config.db.query( sql, ( submission_id, lid, ) ) file = r.fetchone() file[ "size" ] = round( 100 * float( file[ "size" ] ) / ( 1024 * 1024 ) ) / 100 file[ "filename" ] = do_decrypt( file[ "filename" ] ) file[ "note" ] = do_decrypt( file[ "note" ] ) file[ "file_type" ] = file[ "file_type" ].replace( "latent_", "" ) sql = "SELECT pfsp FROM latent_info WHERE uuid = %s" try: current_pfsp = config.db.query( sql, ( lid, ) ).fetchone()[ 0 ] except: current_pfsp = None for z in pfsp.zones: if z[ "desc" ] == current_pfsp: current_pfsp = ",".join( z[ "sel" ] ) return render_template( "submission/latent_pfsp.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, submission_id = id, nickname = nickname, file = file, pfsp_zones = pfsp.zones, current_pfsp = current_pfsp, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission//latent//set/pfsp", methods = [ "POST" ] ) @submitter_required def submission_latent_pfsp_set( id, lid ): """ Save the PFSP information relative to a latent. """ pfsp = request.form.get( "pfsp" ) sql = "SELECT id FROM latent_info WHERE uuid = %s" q = config.db.query( sql, ( lid, ) ).fetchone() if q == None: sql = sql_insert_generate( "latent_info", [ "uuid", "pfsp" ] ) config.db.query( sql, ( lid, pfsp, ) ) else: sql = "UPDATE latent_info SET pfsp = %s WHERE uuid = %s" config.db.query( sql, ( pfsp, lid, ) ) config.db.commit() return jsonify( { "error": False } ) @app.route( baseurl + "/submission//latent//delete" ) @submitter_required def submission_latent_delete( id, lid ): """ Delete a latent from the database. """ sql = "SELECT id FROM submissions WHERE submitter_id = %s AND uuid = %s" q = config.db.query( sql, ( session[ "user_id" ], id, ) ) if q != None: sql = "DELETE FROM files WHERE creator = %s AND uuid = %s" config.db.query( sql, ( session[ "user_id" ], lid, ) ) config.db.commit() return jsonify( { "error": False } ) else: return jsonify( { "error": True } ) ################################################################################ # Image processing @app.route( baseurl + "/image/file//preview" ) @login_required def image_file_serve( id ): """ Function to get an image from the database and return it as PNG preview image. """ try: img, _ = image_serve( "thumbnails", id ) if img == None: img, _ = image_serve( "files", id ) if img == None: return abort( 404 ) img = create_thumbnail( id, img ) buff = pil2buffer( img, "PNG" ) return send_file( buff, mimetype = "image/png" ) except: img = Image.new( "L", ( 210, 297 ), 255 ) draw = ImageDraw.Draw( img ) font = ImageFont.truetype( "arial.ttf", 18 ) draw.text( ( 0, 0 ), "No preview", 0, font = font ) buff = pil2buffer( img, "PNG" ) return send_file( buff, mimetype = "image/png" ) @app.route( baseurl + "/image/segment//" ) @login_required def image_segment_serve( tid, pc ): """ Serve a preview for a segment image. """ img, id = image_serve( "files_segments", ( tid, pc ) ) img = create_thumbnail( id, img ) buff = pil2buffer( img, "PNG" ) return send_file( buff, mimetype = "image/png" ) @app.route( baseurl + "/image/template//" ) @app.route( baseurl + "/image/template///" ) @login_required def image_tp_template( tid, t, action = "full" ): """ Serve a template image, full-resolution or preview. """ if t in [ "front", "back" ]: img, _ = image_serve( "tenprint_cards", ( tid, t ) ) if action == "preview": img.thumbnail( ( 500, 500 ) ) buff = pil2buffer( img, "PNG" ) return send_file( buff, mimetype = "image/png" ) else: return abort( 403 ) def image_serve( db, id ): """ Backend function to get the image from the database. """ if db == "files_segments": if isinstance( id, tuple ): tp, pc = id sql = "SELECT data, uuid FROM {} WHERE tenprint = %s AND pc = %s".format( db ) p = ( tp, pc, ) else: sql = "SELECT data, uuid FROM {} WHERE uuid = %s".format( db ) p = ( id, ) elif db in [ "files", "thumbnails" ]: sql = "SELECT data, uuid FROM {} WHERE uuid = %s".format( db ) p = ( id, ) elif db == "tenprint_cards": id, t = id sql = "SELECT image_{}, id FROM {} WHERE id = %s".format( t, db ) p = ( id, ) else: raise Exception( "table not authorized" ) data = config.db.query( sql, p ).fetchone() if data == None: return None, None else: img, rid = data img = str2img( img ) return img, rid def str2img( data ): """ Convert a base64 string image to a PIL image. """ if data == None: return None else: img = base64.b64decode( data ) buff = StringIO() buff.write( img ) buff.seek( 0 ) img = Image.open( buff ) return img @app.route( baseurl + "/image/file//info" ) @login_required def img_info( id ): """ Get and return the metadata for a particular image. See do_img_info() for more informations. """ d = do_img_info( id ) if d != None: return jsonify( d ) else: return abort( 404 ) def do_img_info( id ): """ Retrieve the metadata for a particular image from the database. """ sql = "SELECT size, width, height, resolution, format FROM files WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) d = r.fetchone() if d != None: return dict( d ) else: return None def create_thumbnail( file_uuid, img ): """ Generate a thumbnail image for a PIL image passed in argument. """ img.thumbnail( ( 1000, 1000 ) ) width, height = img.size file_format = img.format buff = StringIO() img.save( buff, format = img.format ) img_size = buff.tell() buff.seek( 0 ) img_data = buff.getvalue() img_data = base64.b64encode( img_data ) sql = sql_insert_generate( "thumbnails", [ "uuid", "width", "height", "size", "format", "data" ] ) data = ( file_uuid, width, height, img_size, img.format, img_data, ) config.db.query( sql, data ) config.db.commit() return img @app.route( baseurl + "/image/segment//start" ) @login_required def image_tenprint_segmentation( id ): """ Route to start the segmentation of a tenprint image into segments (fingers or palm images). """ ret = do_image_tenprint_segmentation( id ) return jsonify( { "error": False, "data": ret } ) def do_image_tenprint_segmentation( id ): """ Backend function to create all the segments images for a tenprint souce image. """ sql = "SELECT size, resolution, type, format, data FROM files WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) img = r.fetchone() res = img[ "resolution" ] img_format = img[ "format" ] t = { 1: "front", 2: "back" }[ img[ "type" ] ] img = base64.b64decode( img[ "data" ] ) buff = StringIO() buff.write( img ) buff.seek( 0 ) img = Image.open( buff ) sql = "SELECT template FROM file_template WHERE file = %s" r = config.db.query( sql, ( id, ) ) template_id = r.fetchone()[ "template" ] zones = get_tenprint_template_zones( template_id, t ) for z in zones: tl_x, tl_y, br_x, br_y = map( lambda v: v * res / 2.54 , [ z[ "tl_x" ], z[ "tl_y" ], z[ "br_x" ], z[ "br_y" ] ] ) tmp = img.crop( ( tl_x, tl_y, br_x, br_y ) ) buff = StringIO() tmp.save( buff, format = img_format ) buff.seek( 0 ) file_data = buff.getvalue() file_data = base64.b64encode( file_data ) sql = "SELECT id FROM files_segments WHERE tenprint = %s AND pc = %s" q = config.db.query( sql, ( id, z[ "pc" ], ) ).fetchone() if q == None: sql = sql_insert_generate( "files_segments", [ "tenprint", "uuid", "pc", "data" ] ) data = ( id, str( uuid4() ), z[ "pc" ], file_data ) config.db.query( sql, data ) else: sql = "UPDATE files_segments SET data = %s WHERE tenprint = %s AND pc = %s" data = ( file_data, id, z[ "pc" ] ) config.db.query( sql, data ) config.db.commit() return True ################################################################################ # Donor tenprints @app.route( baseurl + "/submission//tenprint/list" ) @submitter_required def submission_tenprint_list( id ): """ Serve the page with the list of tenprint images, splitted by front, back and NIST format. """ sql = "SELECT id, nickname FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) submission_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = """ SELECT id, filename, uuid, type, creation_time FROM files WHERE folder = %s AND ( type = 1 OR type = 2 OR type = 5 ) ORDER BY creation_time DESC """ r = config.db.query( sql, ( submission_id, ) ) q = r.fetchall() tenprint_cards = { "1": [], "2": [], "5": [] } for tenprint in q: tenprint_cards[ str( tenprint[ "type" ] ) ].append( { "id": tenprint.get( "id", None ), "filename": do_decrypt( tenprint.get( "filename", None ) ), "uuid": tenprint.get( "uuid", None ), "type": tenprint.get( "type", None ) } ) return render_template( "submission/tenprint_list.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, tenprint_cards_front = tenprint_cards[ "1" ], tenprint_cards_back = tenprint_cards[ "2" ], tenprint_cards_nist = tenprint_cards[ "5" ], submission_id = id, nickname = nickname, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission//tenprint/" ) @submitter_required def submission_tenprint( id, tid ): """ Serve the page to see and edit a tenprint file. """ sql = "SELECT id, nickname FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) submission_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = """ SELECT files.uuid, files.filename, files.note, files.format, files.resolution, files.width, files.height, files.size, files.creation_time, files.type, file_template.template FROM files LEFT JOIN file_template ON files.uuid = file_template.file WHERE folder = %s AND files.uuid = %s """ r = config.db.query( sql, ( submission_id, tid, ) ) file = r.fetchone() if file[ "type" ] == 5: return redirect( url_for( "submission_tenprint_segments_list", id = id, tid = tid ) ) else: file[ "size" ] = round( 100 * float( file[ "size" ] ) / ( 1024 * 1024 ) ) / 100 file[ "filename" ] = do_decrypt( file[ "filename" ] ) file[ "note" ] = do_decrypt( file[ "note" ] ) if file[ "type" ] == 1: t = "front" elif file[ "type" ] == 2: t = "back" ############################################################################ try: sql = "SELECT width, height, image_resolution FROM tenprint_cards WHERE id = %s LIMIT 1" r = config.db.query( sql, ( file[ "template" ], ) ) tmp = r.fetchone() card_info = { "width": int( round( float( tmp[ "width" ] ) / 2.54 * tmp[ "image_resolution" ] ) ), "height": int( round( float( tmp[ "height" ] ) / 2.54 * tmp[ "image_resolution" ] ) ), "width_cm": tmp[ "width" ], "height_cm": tmp[ "height" ] } except: card_info = { "width": 0, "height": 0, "width_cm": 0, "height_cm": 0 } ############################################################################ sql = "SELECT id, country_code, name, width, height, size_display FROM tenprint_cards ORDER BY country_code" tenprint_templates = config.db.query( sql ).fetchall() ############################################################################ zones = get_tenprint_template_zones( file[ "template" ], t ) datacolumns = [ "tl_x", "tl_y", "br_x", "br_y", "angle" ] ############################################################################ sql = "SELECT width, height, resolution FROM files WHERE uuid = %s LIMIT 1" r = config.db.query( sql, ( tid, ) ) img_info = r.fetchone() svg_hw_factor = float( img_info[ "width" ] ) / float( img_info[ "height" ] ) return render_template( "submission/tenprint.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, upload_id = id, tenprint_id = tid, file = file, nickname = nickname, submission_id = id, session_security_key = session.get( "session_security_key" ), t = t, card_id = file[ "uuid" ], card_info = card_info, img_info = img_info, svg_hw_factor = svg_hw_factor, zones = zones, datacolumns = datacolumns, tenprint_templates = tenprint_templates, envtype = envtype ) @app.route( baseurl + "/submission//tenprint//delete" ) @submitter_required def submission_tenprint_delete( id, tid ): """ Endpoint to delete a tenprint image. """ sql = "SELECT id FROM submissions WHERE submitter_id = %s AND uuid = %s" q = config.db.query( sql, ( session[ "user_id" ], id, ) ) if q != None: sql = "DELETE FROM files WHERE creator = %s AND uuid = %s" config.db.query( sql, ( session[ "user_id" ], tid, ) ) sql = "DELETE FROM files_segments WHERE tenprint = %s" config.db.query( sql, ( tid, ) ) config.db.commit() return jsonify( { "error": False } ) else: return jsonify( { "error": True } ) @app.route( baseurl + "/submission//tenprint//set/template", methods = [ "POST" ] ) @submitter_required def submission_tenprint_set_template( id, file ): """ Set the template id for a tenprint image. """ template = request.form.get( "template" ) sql = "SELECT id FROM file_template WHERE file = %s" q = config.db.query( sql, ( file, ) ).fetchone() if q == None: sql = sql_insert_generate( "file_template", [ "file", "template" ] ) config.db.query( sql, ( file, template, ) ) config.db.commit() else: sql = "UPDATE file_template SET template = %s WHERE file = %s" config.db.query( sql, ( template, file, ) ) config.db.commit() return jsonify( { "error": False } ) @app.route( baseurl + "/submission////set/note", methods = [ "POST" ] ) @submitter_required def submission_file_set_note( id, t, file ): """ Store the user encrypted notes for a tenprint image. """ note = request.form.get( "note" ) note = do_encrypt( note ) sql = "UPDATE files SET note = %s WHERE uuid = %s RETURNING id" config.db.query( sql, ( note, file, ) ) config.db.commit() return jsonify( { "error": False } ) ################################################################################ # Tenprint segments @app.route( baseurl + "/submission//tenprint//segment/list" ) @submitter_required def submission_tenprint_segments_list( id, tid ): """ Serve the page with the list of segments for a tenprint image. """ sql = "SELECT id, nickname FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) submission_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = "SELECT uuid, filename FROM files WHERE folder = %s AND files.uuid = %s" r = config.db.query( sql, ( submission_id, tid, ) ) file = r.fetchone() filename = do_decrypt( file[ "filename" ] ) tid = file[ "uuid" ] ############################################################################ sql = """ SELECT files_segments.pc, files_segments.data, pc.name FROM files_segments LEFT JOIN pc ON pc.id = files_segments.pc WHERE tenprint = %s """ segments = config.db.query( sql, ( tid, ) ).fetchall() nb_segments = len( segments ) ############################################################################ return render_template( "submission/segment_list.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, upload_id = id, tenprint_id = tid, nickname = nickname, filename = filename, submission_id = id, tid = tid, segments = segments, nb_segments = nb_segments, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission//tenprint//segment/" ) @submitter_required def submission_segment( id, tid, pc ): """ Serve the page to edit the information relative to a segment image. """ pc = int( pc ) pc_list = [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 25, 27 ] if not pc in pc_list: return redirect( url_for( "submission_tenprint_segments_list", id = id, tid = tid ) ) else: sql = "SELECT id, nickname FROM submissions WHERE uuid = %s" r = config.db.query( sql, ( id, ) ) submission_id, nickname = r.fetchone() nickname = do_decrypt( nickname ) sql = "SELECT uuid, filename, type FROM files WHERE folder = %s AND files.uuid = %s" r = config.db.query( sql, ( submission_id, tid, ) ) tp_file = r.fetchone() tp_filename = do_decrypt( tp_file[ "filename" ] ) sql = "SELECT name FROM pc WHERE id = %s" pc_name = config.db.query( sql, ( pc, ) ).fetchone()[ 0 ] sql = """ SELECT gp.div_name FROM files_segments LEFT JOIN gp ON files_segments.gp = gp.id WHERE files_segments.tenprint = %s AND files_segments.pc = %s """ current_gp = config.db.query( sql, ( tid, pc, ) ).fetchone()[ 0 ] if pc in xrange( 1, 10 ): next_pc = pc + 1 tp_type = "finger" elif pc == 10: next_pc = None tp_type = "finger" elif pc == 25: next_pc = 27 tp_type = "palm" elif pc == 27: next_pc = None tp_type = "palm" else: return abort( 404 ) return render_template( "submission/segment.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, submission_id = id, nickname = nickname, pc_name = pc_name, tp_filename = tp_filename, tid = tid, pc = pc, next_pc = next_pc, current_gp = current_gp, tp_type = tp_type, session_security_key = session.get( "session_security_key" ), envtype = envtype ) @app.route( baseurl + "/submission//tenprint//segment//set/gp", methods = [ "POST" ] ) @submitter_required def submission_segment_set_gp( id, tid, pc ): """ Set the general pattern of a fingerprint segment image (FPC 1-10). """ gp = request.form.get( "gp" ) sql = "SELECT id FROM gp WHERE name = %s" r = config.db.query( sql, ( gp, ) ).fetchone() if r == None: return jsonify( { "error": True, "message": "General patter not recognized" } ) gp_id = r[ "id" ] sql = "UPDATE files_segments SET gp = %s WHERE tenprint = %s AND pc = %s" config.db.query( sql, ( gp_id, tid, pc, ) ) config.db.commit() return jsonify( { "error": False } ) ################################################################################ # User profile @app.route( baseurl + "/user/myprofile/tenprint" ) @login_required def user_myprofile_tenprint(): """ Serve the page to see all the information related to the current user. This page is the summary of all informations related to the current logged user, i.e. the tenprint cards and latent images. The consent form, beeing mendatory to upload the tenprint and latent images, and beeing encrypted in the database, is not accessible by the user via this interface. The consent form has been sent the the donor by email anyways before uploading any of the images. """ sql = """ SELECT files.id, files.uuid FROM users LEFT JOIN submissions ON users.email = submissions.email_hash LEFT JOIN files ON files.folder = submissions.id WHERE users.id = %s AND ( files.type = 1 OR files.type = 2 OR files.type = 5 ) """ tenprint_cards = config.db.query_fetchall( sql, ( session[ "user_id" ], ) ) return render_template( "users/profile/tenprint.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, tenprint_cards = tenprint_cards, account_type = session.get( "account_type", None ), envtype = envtype ) ################################################################################ # Tenprint templates @app.route( baseurl + "/template/tenprint/list" ) @admin_required def template_tenprint_list(): """ Serve the page with the list of templates. """ sql = "SELECT id, country_code, name FROM tenprint_cards ORDER BY name ASC" tp_templates = config.db.query( sql ).fetchall() return render_template( "tp_template/list.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, tp_templates = tp_templates, envtype = envtype ) @app.route( baseurl + "/template/tenprint/new" ) @admin_required def template_tenprint_new_meta(): """ Serve the page to create a new tenprint template. """ return render_template( "tp_template/new_meta.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, envtype = envtype ) @app.route( baseurl + "/template/tenprint/new//images" ) @admin_required def template_tenprint_new_images( id ): """ Add new images to a tenprint template. """ sql = "SELECT id, name, country_code FROM tenprint_cards WHERE id = %s" card = config.db.query( sql, ( id, ) ).fetchone() return render_template( "tp_template/new_images.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, card = card, envtype = envtype ) @app.route( baseurl + "/template/tenprint/new/insert", methods = [ "POST" ] ) @admin_required def template_tenprint_new_do(): """ Save the tenprint template to the database. """ name = request.form.get( "name" ) country_code = request.form.get( "country_code" ) sql = sql_insert_generate( "tenprint_cards", [ "name", "country_code" ] ) q = config.db.query( sql, ( name, country_code, ) ) id = q.fetchone()[ 0 ] for pc in [ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 25, 27 ]: sql = sql_insert_generate( "tenprint_zones", [ "card", "pc", "angle", "tl_x", "tl_y", "br_x", "br_y" ] ) config.db.query( sql, ( id, pc, 0, 0, 0, 0, 0 ) ) config.db.commit() return jsonify( { "error": False, "id": id } ) @app.route( baseurl + "/template/tenprint/new//upload_image", methods = [ "POST" ] ) @admin_required def template_tenprint_new_uploadimage( id ): """ Save the front and back images for a tenprint template to the databse. """ face = request.form.get( "card_face" ) if face in [ "front", "back" ]: data = request.files[ "file" ] img = Image.open( data ) image_width, image_height = img.size try: res = img.info[ "dpi" ][ 0 ] width = round( image_width * 2.54 / float( res ) ) height = round( image_height * 2.54 / float( res ) ) except: res = 0 width = 0 height = 0 fp = StringIO() img.save( fp, format = "JPEG" ) fp.seek( 0 ) data = fp.getvalue() data = base64.b64encode( data ) sql = """ UPDATE tenprint_cards SET image_{0} = %s, image_{0}_width = %s, image_{0}_height = %s, image_resolution = %s, image_format = %s, width = %s, height = %s WHERE id = %s""".format( face ) config.db.query( sql, ( data, image_width, image_height, res, "JPEG", width, height, id, ) ) config.db.commit() if res != 0: return jsonify( { "error": False } ) else: return jsonify( { "need_action": True, "action": "set_resolution" } ) else: return abort( 403 ) @app.route( baseurl + "/template/tenprint//set/resolution", methods = [ "POST" ] ) @admin_required def template_tenprint_new_setresolution( id ): """ Set the resolution for a tenprint template image. """ res = request.form.get( "resolution" ) try: sql = "UPDATE tenprint_cards SET image_resolution = %s WHERE id = %s" config.db.query( sql, ( res, id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) def get_tenprint_template_zones( id, t ): """ Get all the segments zones for a template passed in parameter. """ sql = """ SELECT tenprint_zones.pc, tl_x, tl_y, br_x, br_y, angle, pc.name FROM tenprint_zones JOIN tenprint_zones_location ON tenprint_zones.pc = tenprint_zones_location.pc JOIN pc ON tenprint_zones.pc = pc.id WHERE card = %s AND tenprint_zones_location.side = %s ORDER BY pc """ r = config.db.query( sql, ( id, t, ) ).fetchall() zones = [] for pc, tl_x, tl_y, br_x, br_y, angle, pc_name in r: tl_x = float_or_null( tl_x ) tl_y = float_or_null( tl_y ) br_x = float_or_null( br_x ) br_y = float_or_null( br_y ) zones.append( { "pc": pc, "tl_x": tl_x, "tl_y": tl_y, "br_x": br_x, "br_y": br_y, "angle": angle, "pc_name": pc_name } ) return zones @app.route( baseurl + "/template/tenprint//" ) @login_required def template_tenprint( id, t ): """ Serve the tenprint template page. """ if t in [ "front", "back" ]: sql = """ SELECT tenprint_zones.pc, tl_x, tl_y, br_x, br_y, angle, pc.name FROM tenprint_zones JOIN tenprint_zones_location ON tenprint_zones.pc = tenprint_zones_location.pc JOIN pc ON tenprint_zones.pc = pc.id WHERE card = %s AND tenprint_zones_location.side = %s ORDER BY pc """ r = config.db.query( sql, ( id, t, ) ).fetchall() zones = [] for pc, tl_x, tl_y, br_x, br_y, angle, pc_name in r: tl_x = float_or_null( tl_x ) tl_y = float_or_null( tl_y ) br_x = float_or_null( br_x ) br_y = float_or_null( br_y ) zones.append( { "pc": pc, "tl_x": tl_x, "tl_y": tl_y, "br_x": br_x, "br_y": br_y, "angle": angle, "pc_name": pc_name } ) datacolumns = [ "tl_x", "tl_y", "br_x", "br_y", "angle" ] sql = """ SELECT id, name, country_code, width, height, size_display, image_{0}_width, image_{0}_height, image_resolution FROM tenprint_cards WHERE id = %s LIMIT 1 """.format( t ) r = config.db.query( sql, ( id, ) ) img_info = r.fetchone() card_info = { "width": int( round( float( img_info[ "width" ] ) / 2.54 * img_info[ "image_resolution" ] ) ), "height": int( round( float( img_info[ "height" ] ) / 2.54 * img_info[ "image_resolution" ] ) ), } svg_h = float( img_info[ "image_{}_height".format( t ) ] ) svg_w = float( img_info[ "image_{}_width".format( t ) ] ) svg_hw_factor = svg_w / svg_h return render_template( "tp_template/template.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, account_type = session.get( "account_type", None ), zones = zones, img_info = img_info, card_info = card_info, svg_hw_factor = svg_hw_factor, card_id = id, t = t, datacolumns = datacolumns, envtype = envtype, **config.misc ) else: return abort( 403 ) @app.route( baseurl + "/template/tenprint//set/zones", methods = [ "POST" ] ) @login_required def update_zone_coordinates( id ): """ Update the segments zones coordinates in the database. """ id = int( id ) data = request.form.get( "data" ) if data != None: data = json.loads( data ) for pc, value in data.iteritems(): pc = int( pc ) for coordinate, v in value.iteritems(): sql = "UPDATE tenprint_zones SET {} = %s WHERE card = %s AND pc = %s".format( coordinate ) data = ( v, id, pc, ) config.db.query( sql, data ) config.db.commit() return jsonify( { "error": False } ) else: return abort( 403 ) @app.route( baseurl + "/template/tenprint//delete/zone", methods = [ "POST" ] ) @login_required def delete_zone_coordinates( id ): """ Delete a unused segment zone for a template (for example FPC 25 and 27 on the front-page, ...) """ pc = request.form.get( "pc" ) try: sql = "DELETE FROM tenprint_zones WHERE card = %s AND pc = %s" config.db.query( sql, ( id, pc, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @app.route( baseurl + "/template/tenprint//set/", methods = [ "POST" ] ) @login_required def update_tptemplate_var( id, varname ): """ Update the name, country_code or displayed size variable for a tenprint template. """ if not varname in [ "name", "country_code", "size_display" ]: return jsonify( { "error": True } ) else: try: data = request.form.get( varname ) data = str( data ) sql = "UPDATE tenprint_cards SET {} = %s WHERE id = %s".forat( varname ) config.db.query( sql, ( data, id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @app.route( baseurl + "/template/tenprint//set/hw", methods = [ "POST" ] ) @login_required def update_tptemplate_hw( id ): """ Set the image size of a template. """ try: h = request.form.get( "height" ) w = request.form.get( "width" ) h = float( h ) w = float( w ) sql = "UPDATE tenprint_cards SET height = %s, width = %s WHERE id = %s" config.db.query( sql, ( h, w, id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @app.route( baseurl + "/template/tenprint//set/resolution" ) @login_required def update_tptemplate_res( id ): """ Update the resolution of the image for a template. """ try: res = request.form.get( "resolution" ) res = float( res ) sql = "UPDATE tenprint_cards SET image_resolution = %s WHERE id = %s" config.db.query( sql, ( res, id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) ################################################################################ # PiAnoS API @app.route( baseurl + "/pianos_api" ) @admin_required def pianos_actions(): """ Serve the page with all actions related to the dedicated PiAnoS server. """ return render_template( "PiAnoS/actions.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, envtype = envtype ) @app.route( baseurl + "/pianos_api/add_user/all" ) @admin_required def pianos_update_all_accounts(): """ serve the function to update the users in PiAnoS """ return jsonify( { "error": not do_pianos_update_all_accounts() } ) def do_pianos_update_all_accounts(): """ Copy/update the credentials for all users. This function keep the credentials in sync between ICNML and PiAnoS. """ try: sql = """ SELECT users.username, users.password, account_type.name as g FROM users LEFT JOIN account_type ON users.type = account_type.id WHERE users.password IS NOT NULL """ for user in config.db.query_fetchall( sql ): username, h, group_name = user groupid = config.pianosdb.create_group( group_name ) config.pianosdb.create_user( username = username, hash = h, groupid = groupid ) config.pianosdb.reset_user( username, hash = h ) config.pianosdb.commit() return True except: return False @app.route( baseurl + "/pianos_api/add_segments/all" ) @admin_required def pianos_copy_all_segments(): """ Route to push all segments to PiAnoS. """ return jsonify( { "error": not do_pianos_copy_all_segments() } ) def do_pianos_copy_all_segments(): """ Copy all segments images to PiAnoS. If the case already exists, the image is not pushed to PiAnoS. """ try: folder_id = config.pianosdb.create_folder( "Annotation" ) img = Image.new( "L", ( 200, 200 ), 255 ) empty_img_res = 500 empty_img_id = config.pianosdb.create_image( "PRINT", img, empty_img_res, "empty" ) sql = """ SELECT files_segments.uuid, files_segments.data, files_v.resolution FROM files_segments LEFT JOIN files_v ON files_segments.tenprint = files_v.uuid """ for segment in config.db.query_fetchall( sql ): img = str2img( segment[ "data" ] ) try: config.pianosdb.create_exercise( folder_id, segment[ "uuid" ], "", img, segment[ "resolution" ], empty_img_id, empty_img_res ) except caseExistsInDB: continue except: raise config.pianosdb.commit() return True except: return False ################################################################################ # Home page @app.route( baseurl + "/" ) @login_required def home(): """ Serve the homepage to all users. """ return render_template( "index.html", baseurl = baseurl, js = config.cdnjs, css = config.cdncss, session_timeout = config.session_timeout, account_type = session.get( "account_type", None ), session_security_key = session.get( "session_security_key" ), envtype = envtype ) ################################################################################ # Main application configuration gpg = gnupg.GPG( **config.gpg_options ) for file in os.listdir( config.keys_folder ): with open( config.keys_folder + "/" + file, "r" ) as fp: gpg.import_keys( fp.read() )