Newer
Older
"error": False,
"data": webauthn_assertion_options.assertion_dict
@app.route( baseurl + "/do/validate_signin_2", methods = [ "POST" ] )
@admin_required
def do_validate_signin_2():
"""
Verification of the signature of the new user data by the admin.
"""
challenge = session.get( "validation_user_challenge" )
assertion_response = request.form
assertion_response_s = base64.b64encode( json.dumps( assertion_response ) )
credential_id = assertion_response.get( "id" )
q = config.db.query( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) )
user = q.fetchone()
webauthn_user = webauthn.WebAuthnUser(
user[ "ukey" ], session[ "username" ], session[ "username" ], None,
user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], "icnml.unil.ch"
)
webauthn_assertion_response = webauthn.WebAuthnAssertionResponse(
webauthn_user,
assertion_response,
challenge,
config.ORIGIN,
uv_required = False
)
try:
webauthn_assertion_response.verify()
except Exception as e:
return jsonify( {
"error": True,
"message": "Assertion failed. Error: {}".format( e )
} )
############################################################################
if len( challenge ) % 4 != 0:
challenge += "=" * ( 4 - ( len( challenge ) % 4 ) )
newuser = base64.b64decode( challenge )
newuser = json.loads( newuser )
user_type = int( newuser[ "user" ][ "account_type" ] )
email = newuser[ "user" ][ "email" ]
email_hash = pbkdf2( email, random_data( 100 ), config.EMAIL_NB_ITERATIONS ).hash()
request_id = newuser[ "user" ][ "id" ]
username_id = newuser[ "user" ][ "username_id" ]
n = config.db.query( "SELECT name FROM account_type WHERE id = %s", ( user_type, ) )
n = n.fetchone()
n = n[ 0 ]
username = "{}_{}".format( n, username_id )
username = username.lower()
try:
config.db.query( "UPDATE signin_requests SET validation_time = now(), assertion_response = %s, status = 'validated' WHERE id = %s", ( assertion_response_s, request_id ) )
config.db.query( sql_insert_generate( "users", [ "username", "email", "type" ] ), ( username, email_hash, user_type ) )

Marco De Donno
committed
except:
return jsonify( {
"error": True,
"message": "Can not insert into database."
} )
############################################################################
email_content = render_jinja_html(
"templates/email", "signin.html",
username = username,
url = "https://icnml.unil.ch" + url_for( "config_new_user", uuid = newuser[ "user" ][ "uuid" ] )
)
msg = MIMEText( email_content, "html" )
msg[ "Subject" ] = "ICNML - Login information"
msg[ "From" ] = config.sender
msg[ "To" ] = email
s = smtplib.SMTP( config.smtpserver )
s.starttls()
s.login( config.smtpuser, config.smtppassword )
s.sendmail( config.sender, [ email ], msg.as_string() )
s.quit()
############################################################################
return jsonify( {
@app.route( baseurl + "/do/validation_reject", methods = [ "POST" ] )
"""
Reject the request for a new user.
"""
request_id = request.form.get( "id" )
try:
sql = "UPDATE signin_requests SET status = 'rejected' WHERE id = %s"
config.db.query( sql, ( request_id, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
def config_new_user( uuid ):
"""
Serve the first page to the new user to configure his account.
"""
session.clear()
q = config.db.query( "SELECT email FROM signin_requests WHERE uuid = %s", ( uuid, ) )
r = q.fetchone()
try:
email = r[ "email" ]
session[ "signin_user_validation_email" ] = email
session[ "signin_user_validation_uuid" ] = uuid
next_step = "do_config_new_user"
@app.route( baseurl + "/do/config", methods = [ "POST" ] )
"""
Save the configuration of the new user to the database.
"""
email = session[ "signin_user_validation_email" ]
uuid = session[ "signin_user_validation_uuid" ]
username = request.form.get( "username" )
password = request.form.get( "password" )
############################################################################
q = config.db.query( "SELECT count(*) FROM signin_requests WHERE uuid = %s AND email = %s", ( uuid, email, ) )
r = q.fetchone()
r = r[ 0 ]
if r == 0:
return jsonify( {
"error": True,
"message": "no signin request"
} )
q = config.db.query( "SELECT * FROM users WHERE username = %s", ( username, ) )
user = q.fetchone()
if user == None:
return jsonify( {
} )
elif not password.startswith( "pbkdf2" ):
return jsonify( {
"error": True,
"message": "password not in the correct format"
elif not pbkdf2( email, user[ "email" ] ).verify():
"error": True,
"message": "email not corresponding to the request form"
elif user.get( "password", None ) != None:
"error": True,
"message": "password already set"
} )
############################################################################
password = pbkdf2( password, random_data( 65 ), config.PASSWORD_NB_ITERATIONS ).hash()
q = config.db.query( "UPDATE users SET password = %s WHERE username = %s", ( password, username, ) )
config.db.commit()
############################################################################
return jsonify( {
@app.route( baseurl + "/config/donor/<h>" )
def config_new_user_donor( h ):
"""
Serve the configuration page for a new donor.
"""
session.clear()
sql = "SELECT id, username, email FROM users WHERE type = 2 AND password IS NULL"
for r in config.db.query_fetchall( sql ):
if h == hashlib.sha512( r[ "email" ] ).hexdigest():
user = r
break
else:
return redirect( url_for( "home" ) )
session[ "email_hash" ] = h
session[ "user_id" ] = user[ "id" ]
"users/config.html",
next_step = "do_config_new_donor",
hash = h
)
@app.route( baseurl + "/do/config/donor", methods = [ "POST" ] )
def do_config_new_donor():
"""
Save the donor configuration to the database.
"""
username = request.form.get( "username" )
password = request.form.get( "password" )
password = pbkdf2( password, random_data( 100 ), config.PASSWORD_NB_ITERATIONS ).hash()
h = request.form.get( "hash" )
sql = "SELECT id FROM users WHERE username = %s"
user_id = config.db.query_fetchone( sql, ( username, ) )[ "id" ]
if session[ "email_hash" ] == h and session[ "user_id" ] == user_id:
config.db.query( "UPDATE users SET password = %s WHERE username = %s", ( password, username, ) )
config.db.commit()
return jsonify( {
"error": False
} )
else:
return jsonify( {
"error": True,
"message": "Invalid parameters"
} )
"""
Serve the help page for the TOTP.
"""
return my_render_template( "totp_help.html" )
################################################################################
# QR Code generation
def renew_secret():
"""
Request a new TOTP secret.
"""
return secret
def get_secret():
"""
Retrieve the current secret.
"""
secret = session.get( "secret", None )
if secret == None:
secret = renew_secret()
return secret
"""
Set the new secret value for the TOTP in the database.
"""
config.db.query( "UPDATE users SET totp = %s WHERE username = %s", ( session[ "secret" ], session[ "username" ], ) )
config.db.commit()
return jsonify( {
"""
Serve the current secret as JSON.
"""
get_secret()
return jsonify( {
"error": False,
"secret": session[ "secret" ]
@login_required
def request_renew_secret():
"""
Serve current secret.
"""
renew_secret()
return jsonify( {
"error": False,
"secret": session[ "secret" ]
@app.route( baseurl + "/user/config/totp_qrcode.png" )
Generate the TOTP PNG QRcode image ready to scan.
qrcode_value = "otpauth://totp/ICNML%20{}?secret={}".format( session[ "username" ], get_secret() )
qrcode_value = "otpauth://totp/ICNML?secret={}".format( get_secret() )
img = qrcode.make( qrcode_value )
temp = StringIO()
img.save( temp, format = "png" )
temp.seek( 0 )
return send_file( temp, mimetype = "image/png" )
@app.route( baseurl + "/user/config/totp" )
@login_required
"users/totp.html",
secret = get_secret()
################################################################################
# Data decryption

Marco De Donno
committed
def do_decrypt_user_session( data ):
return do_decrypt( data, session[ "password" ] )
def do_encrypt_user_session( data ):
return do_encrypt( data, session[ "password" ] )
################################################################################
# DEK encryption/decryption
def get_dek_from_submissionid( submission_id ):
"""
Get the Data Encryption Key related to a submission id folder.
"""
sql = """
SELECT donor_dek.dek
FROM donor_dek
LEFT JOIN users ON users.username = donor_dek.donor_name
LEFT JOIN submissions ON submissions.donor_id = users.id
LIMIT 1
"""
dek = config.db.query_fetchone( sql, ( submission_id, ) )[ "dek" ]
if dek == None:
raise
else:
return dek
try:
return session[ "dek_{}".format( submission_id ) ]
except:
return None
def do_decrypt_dek( data, submission_id ):
"""
AES encrypt the data with the Data Encryption Key related to the donor.
"""
dek = get_dek_from_submissionid( submission_id )
return do_decrypt( data, dek )
def do_encrypt_dek( data, submission_id ):
"""
AES decrypt the data with the Data Encryption Key related to the donor.
"""
dek = get_dek_from_submissionid( submission_id )
return do_encrypt( data, dek )
def dek_check( submission_id ):
if dek_exists( submission_id ):
return True
else:

Marco De Donno
committed
try:
return dek_submitte_recreate_session( submission_id )
except:
return False
def dek_exists( submission_id ):
if get_dek_from_submissionid( submission_id ) == None:
return False
else:
return True
def dek_submitte_recreate_session( submission_id ):
sql = """
SELECT
donor_dek.salt,

Marco De Donno
committed
donor_dek.dek_check,
donor_dek.donor_name as username,
submissions.email_aes as email
FROM donor_dek
LEFT JOIN users ON users.username = donor_dek.donor_name
LEFT JOIN submissions ON submissions.donor_id = users.id
WHERE submissions.uuid = %s
LIMIT 1
"""
user = config.db.query_fetchone( sql, ( submission_id, ) )
username = user[ "username" ]
email = do_decrypt_user_session( user[ "email" ] )
dek_salt = user[ "salt" ]
_, dek, _ = dek_generate( username = username, email = email, salt = dek_salt )

Marco De Donno
committed
to_check = do_decrypt( user[ "dek_check" ], dek )
to_check = json.loads( to_check )
if to_check[ "value" ] == "ok":
session[ "dek_{}".format( submission_id ) ] = dek
return True
else:
return False
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
def dek_generate( **kwargs ):
if "email" in kwargs:
email = kwargs[ "email" ]
email = pbkdf2( email, "icnml_user_DEK" ).hash( True )
elif "email_hash" in kwargs:
email = kwargs[ "email_hash" ]
else:
raise Exception( "need the email or hashed_email" )
if "username" in kwargs:
username = kwargs[ "username" ]
else:
raise Exception( "need the username" )
dek_salt = kwargs.get( "salt", random_data( 100 ) )
dek = pbkdf2(
"{}:{}".format( username, email, ),
dek_salt,
iterations = config.DEK_NB_ITERATIONS,
hash_name = "sha512"
).hash( True )
check = {
"value": "ok",
"time": int( time.time() * 1000 ),
"random": random_data( 10 )
}
check = json.dumps( check )
check = do_encrypt( check, dek )
return dek_salt, dek, check
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
@app.route( baseurl + "/dek/reconstruct", methods = [ "POST" ] )
@login_required
def dek_regenerate():
try:
email_hash = request.form.get( "email_hash" )
username = session.get( "username" )
sql = "SELECT * FROM donor_dek WHERE donor_name = %s"
user = config.db.query_fetchone( sql, ( username, ) )
_, dek, _ = dek_generate( username = username, email_hash = email_hash, salt = user[ "salt" ] )
check = do_decrypt( user[ "dek_check" ], dek )
check = json.loads( check )
if check[ "value" ] != "ok":
raise
sql = "UPDATE donor_dek SET dek = %s WHERE id = %s AND donor_name = %s"
config.db.query( sql, ( dek, user[ "id" ], username, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@app.route( baseurl + "/dek/delete" )
@login_required
def dek_delete():
try:
username = session.get( "username" )
sql = "UPDATE donor_dek SET dek = NULL WHERE donor_name = %s"
config.db.query( sql, ( username, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
} )
@app.route( baseurl + "/dek/fulldelete" )
@login_required
def dek_delete_fully():
try:
username = session.get( "username" )
sql = "UPDATE donor_dek SET dek = NULL WHERE donor_name = %s"
config.db.query( sql, ( username, ) )
sql = "UPDATE donor_dek SET salt = NULL WHERE donor_name = %s"
config.db.query( sql, ( username, ) )
sql = "UPDATE donor_dek SET dek_check = NULL WHERE donor_name = %s"
config.db.query( sql, ( username, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
################################################################################
# File upload
@app.route( baseurl + "/upload", methods = [ "GET", "POST" ] )
@login_required
def upload_file():
"""
Main function dealing with the upload of files (tenprint, latent and consent forms).
This function accept traditionals images and NIST files for the fingerprint data,
and PDFs for the consent forms.
"""
upload_type = request.form.get( "upload_type", None )
file_extension = request.form.get( "extension", None )
if isinstance( file_extension, str ):
file_extension = file_extension.lower()
if upload_type == None:
"error": True,
"msg": "Must specify a file type to upload a file"
if "file" not in request.files:
return jsonify( {
"error": True,
"msg": "No file in the POST request"
} )
elif "submission_id" not in request.form:
return jsonify( {
"error": True,
submission_uuid = request.form.get( "submission_id" )
sql = "SELECT id FROM submissions WHERE uuid = %s"
r = config.db.query( sql, ( submission_uuid, ) )
submission_id = r.fetchone()[ "id" ]
except:
return jsonify( {
"error": True,
"msg": "upload not related to a submission form"
uploaded_file = request.files[ "file" ]

Marco De Donno
committed
file_name = do_encrypt_user_session( uploaded_file.filename )
file_uuid = str( uuid4() )
uploaded_file.save( fp )
file_size = fp.tell()
fp.seek( 0 )
if file_extension in config.NIST_file_extensions:
file_data = fp.getvalue()
file_data = base64.b64encode( file_data )
file_data = do_encrypt_dek( file_data, submission_uuid )
if not n.is_initialized():
raise
return jsonify( {
"error": True,
"msg": "Error while loading the NIST file"
} )

Marco De Donno
committed
# Save the NIST file in the DB
sql = sql_insert_generate( "files", [ "folder", "creator", "filename", "type", "format", "size", "uuid", "data" ] )
data = ( submission_id, session[ "user_id" ], file_name, 5, "NIST", file_size, file_uuid, file_data, )
config.db.query( sql, data )
# Segmentation of the NIST file
fpc_in_file = []
for fpc in config.all_fpc:
try:

Marco De Donno
committed
try:
img = n.get_print( fpc = fpc )
except:
img = n.get_palmar( fpc = fpc )
buff = StringIO()
img.save( buff, format = "TIFF" )
buff.seek( 0 )
img_data = buff.getvalue()
img_data = base64.b64encode( img_data )
img_data = do_encrypt_dek( img_data, submission_uuid )
sql = sql_insert_generate( "files_segments", [ "tenprint", "uuid", "pc", "data" ] )
data = ( file_uuid, str( uuid4() ), fpc, img_data, )
config.db.query( sql, data )
fpc_in_file.append( fpc )
except:
pass
return jsonify( {
"error": False,
"fpc": fpc_in_file
} )
else:
if upload_type in [ "latent_target", "latent_incidental", "tenprint_card_front", "tenprint_card_back" ]:
img = Image.open( fp )
img_format = img.format
width, height = img.size
except:
return jsonify( {
"error": True,
"msg": "No resolution found in the image. Upload not possible at the moment."
} )
try:
img = rotate_image_upon_exif( img )
except:
pass
buff = StringIO()
img.save( buff, format = img_format )
buff.seek( 0 )
file_data = buff.getvalue()
if upload_type in [ "tenprint_card_front", "tenprint_card_back" ]:
create_thumbnail( file_uuid, img, submission_uuid )
file_data_r = file_data
file_data = base64.b64encode( file_data )
sql = "SELECT id FROM files_type WHERE name = %s"
upload_type_id = config.db.query( sql, ( upload_type, ) ).fetchone()[ 0 ]
####################################################################
if upload_type == "consent_form":
sql = "SELECT email_aes FROM submissions WHERE uuid = %s"
email = config.db.query_fetchone( sql, ( submission_uuid, ) )[ "email_aes" ]

Marco De Donno
committed
email = do_decrypt_user_session( email )
sql = "SELECT username, email FROM users WHERE type = 2 ORDER BY id DESC"
for username_db, email_db in config.db.query_fetchall( sql ):
if pbkdf2( email, email_db ).verify():
username = username_db
url_hash = hashlib.sha512( email_db ).hexdigest()
break
else:
return jsonify( {
"error": True,
"message": "user not found"
} )
# Email for the donor
email_content = render_jinja_html(
"templates/email", "donor.html",
username = username,
url = "https://icnml.unil.ch" + url_for( "config_new_user_donor", h = url_hash )
msg[ "Subject" ] = "ICNML - You have been added as donor"
msg[ "From" ] = config.sender
msg[ "To" ] = email
msg.attach( MIMEText( email_content, "html" ) )
part = MIMEApplication( file_data_r, Name = "consent_form.pdf" )
part[ "Content-Disposition" ] = "attachment; filename=consent_form.pdf"
msg.attach( part )
try:
s = smtplib.SMTP( config.smtpserver )
s.starttls()
s.login( config.smtpuser, config.smtppassword )
s.sendmail( config.sender, [ email ], msg.as_string() )
s.quit()
except:
return jsonify( {
"error": True,
"message": "Can not send the email to the user"
} )
else:
# Consent form save
file_data = base64.b64encode( file_data )
file_data = gpg.encrypt( file_data, *config.gpg_key )
file_data = str( file_data )
file_data = base64.b64encode( file_data )
sql = sql_insert_generate( "cf", [ "uuid", "data", "email" ] )
data = ( file_uuid, file_data, pbkdf2( email, iterations = 100000 ).hash(), )
config.db.query( sql , data )
sql = "UPDATE submissions SET consent_form = true WHERE uuid = %s"
config.db.query( sql, ( submission_uuid, ) )
file_data = do_encrypt_dek( file_data, submission_uuid )
sql = sql_insert_generate( "files", [
"folder", "creator",
"filename", "type",
"format", "size", "width", "height", "resolution",
"uuid", "data"
] )
submission_id, session[ "user_id" ],
file_name, upload_type_id,
img_format, file_size, width, height, res,
file_uuid, file_data,
)
config.db.commit()
else:
return abort( 403 )
################################################################################
@app.route( baseurl + "/submission/new" )
@submission_has_access
"""
Serve the page to start a new submission (new donor).
"""
return my_render_template( "submission/new.html" )
@app.route( baseurl + "/submission/do_new", methods = [ "POST" ] )
@submission_has_access
"""
Check the new donor data, and store the new submission process in the database.
"""
email = request.form.get( "email", False )
# Check for duplicate base upon the email data
sql = "SELECT id, email_hash FROM submissions WHERE submitter_id = %s"
r = config.db.query( sql, ( session[ "user_id" ], ) )
if pbkdf2( email, case[ "email_hash" ] ).verify():
"error": True,
"msg": "Email already used"
donor_uuid = str( uuid4() )

Marco De Donno
committed
email_aes = do_encrypt_user_session( email )
email_hash = pbkdf2( email, iterations = config.EMAIL_NB_ITERATIONS ).hash()
upload_nickname = request.form.get( "upload_nickname", None )

Marco De Donno
committed
upload_nickname = do_encrypt_user_session( upload_nickname )
userid = config.db.query_fetchone( "SELECT nextval( 'username_donor_seq' ) as id" )[ "id" ]
username = "donor_{}".format( userid )
sql = sql_insert_generate( "users", [ "username", "email", "type" ], "id" )
data = ( username, email_hash, 2 )
donor_user_id = config.db.query_fetchone( sql, data )[ "id" ]
dek_salt, dek, dek_check = dek_generate( email = email, username = username )
sql = sql_insert_generate( "donor_dek", [ "donor_name", "salt", "dek", "dek_check", "iterations", "algo", "hash" ], "id" )
data = ( username, dek_salt, dek, dek_check, config.DEK_NB_ITERATIONS, "pbkdf2", "sha512", )
config.db.query_fetchone( sql, data )
sql = sql_insert_generate( "submissions", [ "uuid", "email_aes", "email_hash", "nickname", "donor_id", "status", "submitter_id" ] )
data = ( donor_uuid, email_aes, email_hash, upload_nickname, donor_user_id, status, submitter_id, )
config.db.commit()
return jsonify( {
"id": donor_uuid
"error": True,
"msg": "Email not provided"
@app.route( baseurl + "/submission/<submission_id>/add_files" )
@submission_has_access
def submission_upload_tplp( submission_id ):
"""
Serve the page to upload tenprint and latent images files.
This page is not accessible if a consent form is not available in the
database for this particular donor.
"""
dek_check( submission_id )
sql = """
SELECT email_aes as email, nickname, created_time, consent_form
FROM submissions
WHERE submitter_id = %s AND uuid = %s
"""
r = config.db.query( sql, ( session[ "user_id" ], submission_id ) )
if user[ "consent_form" ]:
for key in [ "email", "nickname" ]:

Marco De Donno
committed
user[ key ] = do_decrypt_user_session( user[ key ] )
"submission/add_files.html",
**user
)
else:
return redirect( url_for( "submission_consent_form", submission_id = submission_id ) )
except:
return jsonify( {
"error": True,
"msg": "Case not found"
@app.route( baseurl + "/submission/<submission_id>/consent_form" )
@submission_has_access
def submission_consent_form( submission_id ):
"""
Serve the page to upload the consent form for the user.
"""
sql = """
SELECT email_aes as email, nickname, created_time
FROM submissions
WHERE submitter_id = %s AND uuid = %s
"""
r = config.db.query( sql, ( session[ "user_id" ], submission_id ) )

Marco De Donno
committed
user = r.fetchone()
if user != None:

Marco De Donno
committed
user[ key ] = do_decrypt_user_session( user[ key ] )
"submission/consent_form.html",

Marco De Donno
committed
else:
return abort( 404 )
@app.route( baseurl + "/submission/<submission_id>/set/nickname", methods = [ "POST" ] )
@submission_has_access
def submission_update_nickname( submission_id ):
"""
Change the nickname of the donor in the database.
THIS INFORMATION SHALL BE ENCRYPTED ON THE CLIENT SIDE FIRST WITH A UNIQUE
ENCRYPTION KEY NOT TRANSMETTED TO THE SERVER!
"""

Marco De Donno
committed
nickname = request.form.get( "nickname", None )
if nickname != None and len( nickname ) != 0:
try:

Marco De Donno
committed
nickname = do_encrypt_user_session( nickname )

Marco De Donno
committed
sql = "UPDATE submissions SET nickname = %s WHERE uuid = %s"
config.db.query( sql, ( nickname, submission_id, ) )

Marco De Donno
committed
config.db.commit()
return jsonify( {

Marco De Donno
committed
} )
except:
return jsonify( {
"error": True,
"message": "DB error"

Marco De Donno
committed
} )
else:
return jsonify( {
"error": True,
"message": "No new nickname in the POST request"

Marco De Donno
committed
} )
@app.route( baseurl + "/submission/list" )
@submission_has_access
"""
Get the list of all submissions folder for the currently logged submitter.
"""
sql = "SELECT * FROM submissions WHERE submitter_id = %s ORDER BY created_time DESC"
r = config.db.query( sql, ( session[ "user_id" ], ) )

Marco De Donno
committed
"email": do_decrypt_user_session( donor.get( "email_aes", None ) ),
"nickname": do_decrypt_user_session( donor.get( "nickname", None ) ),