Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from flask import Blueprint
from flask import current_app, request, jsonify, session, url_for
import base64
import json
import webauthn
from datetime import datetime
from email.mime.text import MIMEText
from uuid import uuid4
import utils
from utils.decorator import admin_required
from utils.template import my_render_template
from functions import mySMTP
import config
newuser_view = Blueprint( "newuser", __name__, template_folder = "template" )
@newuser_view.route( config.baseurl + "/signin" )
def new_user():
"""
Serve the page to register to ICNML.
"""
current_app.logger.info( "New user registration page open" )
r = config.db.query_fetchall( "SELECT id, name FROM account_type WHERE can_singin = true" )
account_type = []
for rr in r:
account_type.append( dict( rr ) )
return my_render_template(
"signin.html",
list_account_type = account_type
)
@newuser_view.route( config.baseurl + "/do/signin", methods = [ "POST" ] )
def add_account_request_to_db():
"""
Add the new user request to the database.
"""
current_app.logger.info( "Registrer new user request to the database" )
try:
first_name = request.form[ "first_name" ]
last_name = request.form[ "last_name" ]
email = request.form[ "email" ]
account_type = int( request.form[ "account_type" ] )
uuid = str( uuid4() )
email = email.lower()
current_app.logger.debug( "First name: {}".format( first_name ) )
current_app.logger.debug( "Last name: {}".format( last_name ) )
current_app.logger.debug( "Email: {}".format( email ) )
current_app.logger.debug( "Account: {}".format( account_type ) )
sql = "SELECT name FROM account_type WHERE id = %s"
account_type_name = config.db.query_fetchone( sql, ( account_type, ) )[ "name" ]
account_type_name = account_type_name.lower()
sql = "SELECT nextval( 'username_{}_seq' ) as id".format( account_type_name )
username_id = config.db.query_fetchone( sql )[ "id" ]
current_app.logger.debug( "Username id:{}".format( username_id ) )
config.db.query(
utils.sql.sql_insert_generate( "signin_requests", [ "first_name", "last_name", "email", "account_type", "uuid", "username_id" ] ),
( first_name, last_name, email, account_type, uuid, username_id, )
)
config.db.commit()
return jsonify( {
"error": False,
"uuid": uuid
} )
except:
current_app.logger.error( "New user request database error" )
return jsonify( {
"error": True
} )
@newuser_view.route( config.baseurl + "/validate_signin" )
@admin_required
def validate_signin():
"""
Serve the page to admins regarding the validation of new users.
"""
current_app.logger.info( "Serve the signin valiation page" )
q = config.db.query( """
SELECT signin_requests.*, account_type.name as account_type
FROM signin_requests
LEFT JOIN account_type ON signin_requests.account_type = account_type.id
WHERE signin_requests.status = 'pending'
""" )
users = []
try:
r = q.fetchall()
for rr in r:
users.append( dict( rr ) )
current_app.logger.debug( "Add '{}' to the validation list".format( rr[ "email" ] ) )
except:
pass
current_app.logger.info( "{} users added to the validation list".format( len( users ) ) )
return my_render_template(
"validate_signin.html",
users = users
)
@newuser_view.route( config.baseurl + "/do/validate_signin", methods = [ "POST" ] )
@admin_required
def do_validate_signin():
"""
Prepare the new user data to be signed by the admin, and serve the page.
"""
request_id = request.form.get( "id" )
current_app.logger.info( "Signin validation begin for request_id '{}'".format( request_id ) )
s = config.db.query_fetchone( "SELECT * FROM signin_requests WHERE id = %s", ( request_id, ) )
s = dict( s )
r = {}
r[ "user" ] = s
r[ "user" ][ "request_time" ] = str( r[ "user" ][ "request_time" ] )
r[ "user" ][ "validation_time" ] = str( r[ "user" ][ "validation_time" ] )
r[ "acceptance" ] = {}
r[ "acceptance" ][ "username" ] = session[ "username" ]
r[ "acceptance" ][ "time" ] = str( datetime.now() )
j = json.dumps( r )
challenge = base64.b64encode( j )
challenge = challenge.replace( "=", "" )
session[ "validation_user_challenge" ] = challenge
############################################################################
user_id = session[ "user_id" ]
key = config.db.query_fetchone( "SELECT * FROM webauthn WHERE user_id = %s AND usage_counter > 0 ORDER BY last_usage DESC LIMIT 1", ( user_id, ) )
webauthn_user = webauthn.WebAuthnUser(
key[ "ukey" ], session[ "username" ], session[ "username" ], None,
key[ "credential_id" ], key[ "pub_key" ], key[ "sign_count" ], config.RP_ID
)
webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge )
return jsonify( {
"error": False,
"data": webauthn_assertion_options.assertion_dict
} )
@newuser_view.route( config.baseurl + "/do/validate_signin_2", methods = [ "POST" ] )
@admin_required
def do_validate_signin_2():
"""
Verification of the signature of the new user data by the admin.
"""
current_app.logger.info( "Verification of the validated new user" )
challenge = session.get( "validation_user_challenge" )
assertion_response = request.form
assertion_response_s = base64.b64encode( json.dumps( assertion_response ) )
credential_id = assertion_response.get( "id" )
user = config.db.query_fetchone( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) )
webauthn_user = webauthn.WebAuthnUser(
user[ "ukey" ], session[ "username" ], session[ "username" ], None,
user[ "credential_id" ], user[ "pub_key" ], user[ "sign_count" ], "icnml.unil.ch"
)
webauthn_assertion_response = webauthn.WebAuthnAssertionResponse(
webauthn_user,
assertion_response,
challenge,
config.ORIGIN,
uv_required = False
)
try:
webauthn_assertion_response.verify()
current_app.logger.info( "Webauthn assertion verified" )
except Exception as e:
return jsonify( {
"error": True,
"message": "Assertion failed. Error: {}".format( e )
} )
############################################################################
if len( challenge ) % 4 != 0:
challenge += "=" * ( 4 - ( len( challenge ) % 4 ) )
newuser = base64.b64decode( challenge )
newuser = json.loads( newuser )
user_type = int( newuser[ "user" ][ "account_type" ] )
email = newuser[ "user" ][ "email" ]
email_hash = utils.hash.pbkdf2( email, utils.rand.random_data( config.EMAIL_SALT_LENGTH ), config.EMAIL_NB_ITERATIONS ).hash()
request_id = newuser[ "user" ][ "id" ]
username_id = newuser[ "user" ][ "username_id" ]
n = config.db.query_fetchone( "SELECT name FROM account_type WHERE id = %s", ( user_type, ) )[ "name" ]
username = "{}_{}".format( n, username_id )
username = username.lower()
current_app.logger.info( "Creation of the new user '{}'".format( username ) )
try:
config.db.query( "UPDATE signin_requests SET validation_time = now(), assertion_response = %s, status = 'validated' WHERE id = %s", ( assertion_response_s, request_id ) )
config.db.query( utils.sql.sql_insert_generate( "users", [ "username", "email", "type" ] ), ( username, email_hash, user_type ) )
config.db.commit()
current_app.logger.debug( "User '{}' created".format( username ) )
except:
current_app.logger.error( "Error while creating the user account for '{}'".format( username ) )
return jsonify( {
"error": True,
"message": "Can not insert into database."
} )
############################################################################
try:
email_content = utils.template.render_jinja_html(
"templates/email", "signin.html",
username = username,
url = "https://icnml.unil.ch" + url_for( "config_new_user", uuid = newuser[ "user" ][ "uuid" ] )
)
msg = MIMEText( email_content, "html" )
msg[ "Subject" ] = "ICNML - Login information"
msg[ "From" ] = config.sender
msg[ "To" ] = email
current_app.logger.info( "Sending the email to the user '{}'".format( username ) )
with mySMTP() as s:
s.sendmail( config.sender, [ email ], msg.as_string() )
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True,
"message": "Error while sending the email"
} )
@newuser_view.route( config.baseurl + "/do/validation_reject", methods = [ "POST" ] )
def do_validation_reject():
"""
Reject the request for a new user.
"""
request_id = request.form.get( "id" )
current_app.logger.info( "Reject the new user request {}".format( request_id ) )
try:
sql = "UPDATE signin_requests SET status = 'rejected' WHERE id = %s"
config.db.query( sql, ( request_id, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )