Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-
import base64
from datetime import datetime
from email.mime.text import MIMEText
import hashlib
import json
from uuid import uuid4
from flask import Blueprint
from flask import current_app, request, jsonify, session, url_for, redirect
import webauthn
import config
from functions import mySMTP
import utils
from utils.decorator import admin_required
from utils.template import my_render_template
newuser_view = Blueprint( "newuser", __name__, template_folder = "templates" )
@newuser_view.route( "/signin" )
def new_user():
"""
Serve the page to register to ICNML.
"""
current_app.logger.info( "New user registration page open" )
r = config.db.query_fetchall( "SELECT id, name FROM account_type WHERE can_singin = true" )
account_type = []
for rr in r:
account_type.append( dict( rr ) )
return my_render_template(
"signin.html",
list_account_type = account_type
)
@newuser_view.route( "/do/signin", methods = [ "POST" ] )
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def add_account_request_to_db():
"""
Add the new user request to the database.
"""
current_app.logger.info( "Registrer new user request to the database" )
try:
first_name = request.form[ "first_name" ]
last_name = request.form[ "last_name" ]
email = request.form[ "email" ]
account_type = int( request.form[ "account_type" ] )
uuid = str( uuid4() )
email = email.lower()
current_app.logger.debug( "First name: {}".format( first_name ) )
current_app.logger.debug( "Last name: {}".format( last_name ) )
current_app.logger.debug( "Email: {}".format( email ) )
current_app.logger.debug( "Account: {}".format( account_type ) )
sql = "SELECT name FROM account_type WHERE id = %s"
account_type_name = config.db.query_fetchone( sql, ( account_type, ) )[ "name" ]
account_type_name = account_type_name.lower()
sql = "SELECT nextval( 'username_{}_seq' ) as id".format( account_type_name )
username_id = config.db.query_fetchone( sql )[ "id" ]
current_app.logger.debug( "Username id:{}".format( username_id ) )
config.db.query(
utils.sql.sql_insert_generate( "signin_requests", [ "first_name", "last_name", "email", "account_type", "uuid", "username_id" ] ),
( first_name, last_name, email, account_type, uuid, username_id, )
)
config.db.commit()
return jsonify( {
"error": False,
"uuid": uuid
} )
except:
current_app.logger.error( "New user request database error" )
return jsonify( {
"error": True
} )
@newuser_view.route( "/validate_signin" )
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
@admin_required
def validate_signin():
"""
Serve the page to admins regarding the validation of new users.
"""
current_app.logger.info( "Serve the signin valiation page" )
q = config.db.query( """
SELECT signin_requests.*, account_type.name as account_type
FROM signin_requests
LEFT JOIN account_type ON signin_requests.account_type = account_type.id
WHERE signin_requests.status = 'pending'
""" )
users = []
try:
r = q.fetchall()
for rr in r:
users.append( dict( rr ) )
current_app.logger.debug( "Add '{}' to the validation list".format( rr[ "email" ] ) )
except:
pass
current_app.logger.info( "{} users added to the validation list".format( len( users ) ) )
return my_render_template(
"validate_signin.html",
users = users
)
@newuser_view.route( "/do/validate_signin", methods = [ "POST" ] )
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@admin_required
def do_validate_signin():
"""
Prepare the new user data to be signed by the admin, and serve the page.
"""
request_id = request.form.get( "id" )
current_app.logger.info( "Signin validation begin for request_id '{}'".format( request_id ) )
s = config.db.query_fetchone( "SELECT * FROM signin_requests WHERE id = %s", ( request_id, ) )
s = dict( s )
r = {}
r[ "user" ] = s
r[ "user" ][ "request_time" ] = str( r[ "user" ][ "request_time" ] )
r[ "user" ][ "validation_time" ] = str( r[ "user" ][ "validation_time" ] )
r[ "acceptance" ] = {}
r[ "acceptance" ][ "username" ] = session[ "username" ]
r[ "acceptance" ][ "time" ] = str( datetime.now() )
j = json.dumps( r )
challenge = base64.b64encode( j )
challenge = challenge.replace( "=", "" )
session[ "validation_user_challenge" ] = challenge
############################################################################
user_id = session[ "user_id" ]
key = config.db.query_fetchone( "SELECT * FROM webauthn WHERE user_id = %s AND usage_counter > 0 ORDER BY last_usage DESC LIMIT 1", ( user_id, ) )
webauthn_user = webauthn.WebAuthnUser(
key[ "ukey" ], session[ "username" ], session[ "username" ], None,
key[ "credential_id" ], key[ "pub_key" ], key[ "sign_count" ], config.RP_ID
)
webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge )
return jsonify( {
"error": False,
"data": webauthn_assertion_options.assertion_dict
} )
@newuser_view.route( "/do/validate_signin_2", methods = [ "POST" ] )
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@admin_required
def do_validate_signin_2():
"""
Verification of the signature of the new user data by the admin.
"""
current_app.logger.info( "Verification of the validated new user" )
challenge = session.get( "validation_user_challenge" )
assertion_response = request.form
assertion_response_s = base64.b64encode( json.dumps( assertion_response ) )
credential_id = assertion_response.get( "id" )
user = config.db.query_fetchone( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) )
webauthn_user = webauthn.WebAuthnUser(
user[ "ukey" ], session[ "username" ], session[ "username" ], None,
user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], "icnml.unil.ch"
)
webauthn_assertion_response = webauthn.WebAuthnAssertionResponse(
webauthn_user,
assertion_response,
challenge,
config.ORIGIN,
uv_required = False
)
try:
webauthn_assertion_response.verify()
current_app.logger.info( "Webauthn assertion verified" )
except Exception as e:
return jsonify( {
"error": True,
"message": "Assertion failed. Error: {}".format( e )
} )
############################################################################
if len( challenge ) % 4 != 0:
challenge += "=" * ( 4 - ( len( challenge ) % 4 ) )
newuser = base64.b64decode( challenge )
newuser = json.loads( newuser )
user_type = int( newuser[ "user" ][ "account_type" ] )
email = newuser[ "user" ][ "email" ]
email_hash = utils.hash.pbkdf2( email, utils.rand.random_data( config.EMAIL_SALT_LENGTH ), config.EMAIL_NB_ITERATIONS ).hash()
request_id = newuser[ "user" ][ "id" ]
username_id = newuser[ "user" ][ "username_id" ]
n = config.db.query_fetchone( "SELECT name FROM account_type WHERE id = %s", ( user_type, ) )[ "name" ]
username = "{}_{}".format( n, username_id )
username = username.lower()
current_app.logger.info( "Creation of the new user '{}'".format( username ) )
try:
config.db.query( "UPDATE signin_requests SET validation_time = now(), assertion_response = %s, status = 'validated' WHERE id = %s", ( assertion_response_s, request_id ) )
config.db.query( utils.sql.sql_insert_generate( "users", [ "username", "email", "type" ] ), ( username, email_hash, user_type ) )
config.db.commit()
current_app.logger.debug( "User '{}' created".format( username ) )
except:
current_app.logger.error( "Error while creating the user account for '{}'".format( username ) )
return jsonify( {
"error": True,
"message": "Can not insert into database."
} )
############################################################################
try:
email_content = utils.template.render_jinja_html(
"templates/email", "signin.html",
username = username,
url = "https://icnml.unil.ch" + url_for( "newuser.config_new_user", uuid = newuser[ "user" ][ "uuid" ] )
)
msg = MIMEText( email_content, "html" )
msg[ "Subject" ] = "ICNML - Login information"
msg[ "From" ] = config.sender
msg[ "To" ] = email
current_app.logger.info( "Sending the email to the user '{}'".format( username ) )
with mySMTP() as s:
s.sendmail( config.sender, [ email ], msg.as_string() )
return jsonify( {
"error": False
} )
except Exception as e:
print e
return jsonify( {
"error": True,
"message": "Error while sending the email"
} )
@newuser_view.route( "/do/validation_reject", methods = [ "POST" ] )
def do_validation_reject():
"""
Reject the request for a new user.
"""
request_id = request.form.get( "id" )
current_app.logger.info( "Reject the new user request {}".format( request_id ) )
try:
sql = "UPDATE signin_requests SET status = 'rejected' WHERE id = %s"
config.db.query( sql, ( request_id, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@newuser_view.route( "/config/<uuid>" )
def config_new_user( uuid ):
"""
Serve the first page to the new user to configure his account.
"""
current_app.logger.info( "Serve user account configuration page" )
session.clear()
r = config.db.query_fetchone( "SELECT email FROM signin_requests WHERE uuid = %s", ( uuid, ) )
try:
email = r[ "email" ]
session[ "signin_user_validation_email" ] = email
session[ "signin_user_validation_uuid" ] = uuid
return my_render_template(
"users/config.html",
next_step = "newuser.do_config_new_user"
)
except Exception as e:
print e
return redirect( url_for( "base.home" ) )
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
@newuser_view.route( "/do/config", methods = [ "POST" ] )
def do_config_new_user():
"""
Save the configuration of the new user to the database.
"""
current_app.logger.info( "Start user account configuration" )
email = session[ "signin_user_validation_email" ]
uuid = session[ "signin_user_validation_uuid" ]
username = request.form.get( "username" )
password = request.form.get( "password" )
session[ "username" ] = username
############################################################################
r = config.db.query_fetchone( "SELECT count(*) FROM signin_requests WHERE uuid = %s AND email = %s", ( uuid, email, ) )
r = r[ 0 ]
if r == 0:
return jsonify( {
"error": True,
"message": "no signin request"
} )
user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( username, ) )
if user == None:
current_app.logger.error( "No user found in the databse for '{}'".format( username ) )
return jsonify( {
"error": True,
"message": "no user"
} )
elif not password.startswith( "pbkdf2" ):
current_app.logger.error( "Password not hashed with PBKDF2" )
return jsonify( {
"error": True,
"message": "password not in the correct format"
} )
elif not utils.hash.pbkdf2( email, user[ "email" ] ).verify():
current_app.logger.error( "Email not corresponding to the stored email in the database" )
return jsonify( {
"error": True,
"message": "email not corresponding to the request form"
} )
elif user.get( "password", None ) != None:
current_app.logger.error( "Password already set for this user" )
return jsonify( {
"error": True,
"message": "password already set"
} )
############################################################################
current_app.logger.debug( "Storing the new password to the databse" )
password = utils.hash.pbkdf2( password, utils.rand.random_data( config.EMAIL_SALT_LENGTH ), config.PASSWORD_NB_ITERATIONS ).hash()
config.db.query( "UPDATE users SET password = %s WHERE username = %s", ( password, username, ) )
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
config.db.commit()
session[ "username" ] = username
############################################################################
return jsonify( {
"error": False
} )
@newuser_view.route( "/config/donor/<h>" )
def config_new_user_donor( h ):
"""
Serve the configuration page for a new donor.
"""
current_app.logger.info( "Start new donor configuration" )
session.clear()
current_app.logger.debug( "Searching in the database for hash value '{}'".format( h ) )
sql = """
SELECT users.id, users.username, users.email
FROM users
LEFT JOIN account_type ON users.type = account_type.id
WHERE account_type.name = 'Donor' AND password IS NULL
"""
for r in config.db.query_fetchall( sql ):
if h == hashlib.sha512( r[ "email" ] ).hexdigest():
current_app.logger.info( "User '{}' found".format( r[ "username" ] ) )
user = r
break
else:
current_app.logger.error( "The hash '{}' is not present in the users database".format( h ) )
return redirect( url_for( "base.home" ) )
session[ "email_hash" ] = h
session[ "user_id" ] = user[ "id" ]
current_app.logger.info( "Serving the config new donor page" )
return my_render_template(
"users/config.html",
next_step = "newuser.do_config_new_donor",
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
hash = h
)
@newuser_view.route( "/do/config/donor", methods = [ "POST" ] )
def do_config_new_donor():
"""
Save the donor configuration to the database.
"""
current_app.logger.info( "Start donor configuration" )
username = request.form.get( "username" )
current_app.logger.debug( "Username: {}".format( username ) )
password = request.form.get( "password" )
password = utils.hash.pbkdf2( password, utils.rand.random_data( config.PASSWORD_SALT_LENGTH ), config.PASSWORD_NB_ITERATIONS ).hash()
h = request.form.get( "hash" )
sql = "SELECT id FROM users WHERE username = %s"
user_id = config.db.query_fetchone( sql, ( username, ) )[ "id" ]
session[ "username" ] = username
if session[ "email_hash" ] == h and session[ "user_id" ] == user_id:
config.db.query( "UPDATE users SET password = %s WHERE username = %s", ( password, username, ) )
config.db.commit()
current_app.logger.info( "Password set" )
return jsonify( {
"error": False
} )
else:
current_app.logger.error( "Error while updating the password dor {}".format( username ) )
return jsonify( {
"error": True,
"message": "Invalid parameters"
} )