Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from cStringIO import StringIO
from datetime import datetime
from email.mime.text import MIMEText
from threading import Thread
import functools
import hashlib
import json
from flask import render_template, send_from_directory
from flask import send_file
from flask import session
from flask import url_for
from flask_compress import Compress
from pyotp import random_base32
import pytz
from functions import random_data
from functions import render_jinja_html
################################################################################
debug = os.environ.get( "DEBUG", False )
baseurl = os.environ.get( "BASEURL", "" )
################################################################################
# Loggin decorator

Marco De Donno
committed
def session_field_required( field, value ):
def decorator( func ):
@functools.wraps( func )
def wrapper_login_required( *args, **kwargs ):
if not field in session:
return redirect( url_for( "login" ) )
elif not session.get( field ) == value:
return redirect( url_for( "login" ) )
return func( *args, **kwargs )
return wrapper_login_required
return decorator
def login_required( func ):
@functools.wraps( func )
def wrapper_login_required( *args, **kwargs ):
if not session.get( 'logged', False ) :
return redirect( url_for( "login" ) )
return func( *args, **kwargs )
return wrapper_login_required
################################################################################
# Generic routing
def ping():
return "pong"
################################################################################
# App serving
@app.route( baseurl + '/app/<path>' )
def send_app_files( path ):
return send_from_directory( 'app', path )
################################################################################
# Sessions
@app.before_request
def renew_session():
session.permanent = True
app.permanent_session_lifetime = timedelta( seconds = config.session_timeout )
@app.route( baseurl + '/logout' )
def logout():
session.clear()
return redirect( url_for( 'home' ) )
@app.route( baseurl + '/login' )
def login():
session[ 'need_to_check' ] = [ 'password' ]
session[ 'logged' ] = False
return render_template(
"login.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss
)
@app.route( baseurl + '/do_login', methods = [ 'POST' ] )
def do_login():
need_to_check = session.get( "need_to_check", [ 'password' ] )
try:
current_check = need_to_check[ 0 ]
except:
current_check = None
session[ 'need_to_check' ] = need_to_check
############################################################################
if current_check == "password":
q = config.db.query( 'SELECT * FROM users WHERE username = %s', ( request.form.get( "username" ), ) )
user = q.fetchone()
if user == None:
return jsonify( {
'error': False,
'logged': False
} )
form_password = request.form.get( "password", None )
if form_password == None or not pbkdf2( form_password, user[ 'password' ] ):
return jsonify( {
'error': False,
'logged': False,
} )
elif not user[ 'active' ]:
return jsonify( {
'error': False,
'logged': False,
'message': 'Your account is not activated. Please contact an administrator.'
} )
else:
session[ 'session_id' ] = str( uuid4() )
session[ 'username' ] = user[ 'username' ]
session[ 'user_id' ] = user[ 'id' ]
session[ 'password_check' ] = True
session[ 'need_to_check' ].pop( 0 )
session[ 'need_to_check' ].append( 'totp' )
if user[ 'must_use_securitykey' ]:
session[ 'need_to_check' ].append( 'securitykey' )
q = config.db.query( 'SELECT username, totp FROM users WHERE username = %s', ( session[ 'username' ], ) )
user = q.fetchone()
if not pyotp.TOTP( user[ 'totp' ] ).verify( request.form[ "totp" ], valid_window = 1 ):
return jsonify( {
'error': False,
'logged': False,
'message': 'Wrong TOTP'
if len( session[ 'need_to_check' ] ) == 0 and session.get( "password_check", False ):
session[ 'logged' ] = True
return jsonify( {
'error': False,
'logged': True,
} )
'next_step': session[ 'need_to_check' ][ 0 ]
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
@app.route( baseurl + '/reset_password' )
def password_reset():
return render_template(
"password_reset.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss
)
@app.route( baseurl + '/do_reset_password', methods = [ 'POST' ] )
def do_password_reset():
email = request.form.get( "email", None )
Thread( target = do_password_reset_thread, args = ( email, ) ).start()
return jsonify( {
'error': False,
'message': 'OK'
} )
def do_password_reset_thread( email ):
q = config.db.query( 'SELECT id, username, email FROM users' )
users = q.fetchall()
for user in users:
if not user[ 'email' ].startswith( "pbkdf2$" ):
continue
elif pbkdf2( email, user[ 'email' ] ):
id = hashlib.sha512( random_data( 100 ) ).hexdigest()
####################################################################
data = {
'process': 'password_reset',
'process_id': id,
'user_id': user[ 'id' ]
}
data = json.dumps( data )
data = base64.b64encode( data )
config.redis_shared.set( "reset_" + id, data, ex = 24 * 3600 )
####################################################################
email_content = render_jinja_html(
"templates/email", "reset.html",
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
id = id,
url = config.domain + baseurl + "/reset_password_stage2"
)
msg = MIMEText( email_content, "html" )
msg[ 'Subject' ] = 'ICNML - User password reset'
msg[ 'From' ] = config.sender
msg[ 'To' ] = email
s = smtplib.SMTP( config.smtpserver )
s.sendmail( config.sender, [ email ], msg.as_string() )
s.quit()
break
else:
print "email not found"
@app.route( baseurl + '/reset_password_stage2/<id>', methods = [ 'GET', 'POST' ] )
def password_reset_stage2( id ):
id = str( id )
data = config.redis_shared.get( "reset_" + id )
if data != None:
data = base64.b64decode( data )
data = json.loads( data )
password = request.form.get( "password", None )
userid = data.get( "user_id", None )
if password != None:
password = pbkdf2( password, random_data( 50 ), 50000 )
config.db.query( "UPDATE users SET password = %s WHERE id = %s", ( password, userid ) )
config.db.commit()
config.redis_shared.delete( "reset_" + id )
return jsonify( {
'error': False,
'password_updated': True
} )
else:
return render_template(
"password_reset_stage2.html",
baseurl = baseurl,
id = id,
js = config.cdnjs,
css = config.cdncss
)
else:
return jsonify( {
'error': True,
'message': 'Reset procedure not found/expired'
} )
################################################################################
# Webauthn

Marco De Donno
committed
@app.route( baseurl + '/webauthn_key_management' )
@login_required
return render_template(
"webauthn_key_management.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss,
keys = do_webauthn_get_list_of_keys( all = True )
)

Marco De Donno
committed
@app.route( baseurl + '/webauthn_begin_activate', methods = [ 'POST' ] )
@login_required
def webauthn_begin_activate():
session[ 'key_name' ] = request.form.get( "key_name", None )
username = session.get( "username" )
challenge = random_base32( 64 )
ukey = random_base32( 64 )
session[ 'challenge' ] = challenge
session[ 'register_ukey' ] = ukey
make_credential_options = webauthn.WebAuthnMakeCredentialOptions(
challenge, config.rp_name, config.RP_ID,
ukey, username, username,
None
)
return jsonify( make_credential_options.registration_dict )

Marco De Donno
committed
@app.route( baseurl + '/verify_credential_info', methods = [ 'POST' ] )
@login_required
challenge = session[ 'challenge' ]
user_id = session[ 'user_id' ]
key_name = session.get( "key_name", None )
ukey = session[ 'register_ukey' ]
response = request.form
webauthn_registration_response = webauthn.WebAuthnRegistrationResponse(
config.RP_ID,
config.ORIGIN,
response,
challenge
)
try:
webauthn_credential = webauthn_registration_response.verify()
except Exception as e:
'error': True,
'message': 'Registration failed. Error: {}'.format( e )
config.db.query(
"""
INSERT INTO webauthn
( user_id, key_name, ukey, credential_id, pub_key, sign_count )
VALUES ( %s, %s, %s, %s, %s, %s )
""",
(
user_id, key_name,
ukey, webauthn_credential.credential_id,
webauthn_credential.public_key, webauthn_credential.sign_count,
)
)
config.db.commit()
return jsonify( {
'success': 'User successfully registered.'
} )
################################################################################
def do_webauthn_get_list_of_keys( uid = None, all = False ):
user_id = session.get( "user_id", uid )
sql = "SELECT id, key_name as name, created_on, last_usage, usage_counter, active FROM webauthn WHERE user_id = %s"
if not all:
sql += " AND active = true"
sql += " ORDER BY sign_count DESC"
q = config.db.query( sql, ( user_id, ) )
keys = q.fetchall()
data = []
for key in keys:
data.append( dict( key ) )

Marco De Donno
committed
@app.route( baseurl + '/webauthn_get_list_of_keys', methods = [ 'POST' ] )
def webauthn_get_list_of_keys():
return jsonify( {
'error': False,
'data': do_webauthn_get_list_of_keys()
} )

Marco De Donno
committed
@app.route( baseurl + '/webauthn_delete_key', methods = [ 'POST' ] )
@login_required
def webauthn_delete_key():
key_id = request.form.get( "key_id", False )
key_name = request.form.get( "key_name", False )
userid = session[ 'user_id' ]
try:
config.db.query( "DELETE FROM webauthn WHERE id = %s AND key_name = %s AND user_id = %s", ( key_id, key_name, userid, ) )
config.db.commit()
return jsonify( {
'error': False
} )
except Exception as e:
return jsonify( {
'error': True
} );

Marco De Donno
committed
@app.route( baseurl + '/webauthn_disable_key', methods = [ 'POST' ] )
@login_required
def webauthn_disable_key():
key_id = request.form.get( "key_id", False )
key_name = request.form.get( "key_name", False )
userid = session[ 'user_id' ]
try:
config.db.query( "UPDATE webauthn SET active = False WHERE id = %s AND key_name = %s AND user_id = %s", ( key_id, key_name, userid, ) )
config.db.commit()
return jsonify( {
'error': False
} )
except Exception as e:
return jsonify( {
'error': True
} );

Marco De Donno
committed
@app.route( baseurl + '/webauthn_enable_key', methods = [ 'POST' ] )
@login_required
def webauthn_enable_key():
key_id = request.form.get( "key_id", False )
key_name = request.form.get( "key_name", False )
userid = session[ 'user_id' ]
try:
config.db.query( "UPDATE webauthn SET active = True WHERE id = %s AND key_name = %s AND user_id = %s", ( key_id, key_name, userid, ) )
config.db.commit()
return jsonify( {
'error': False
} )
except Exception as e:
return jsonify( {
'error': True
} );

Marco De Donno
committed
@app.route( baseurl + '/webauthn_begin_assertion', methods = [ 'POST' ] )
def webauthn_begin_assertion():
key_name = request.form.get( "key_name", None )
user_id = session.get( "user_id" )
if 'challenge' in session:
del session[ 'challenge' ]
challenge = random_base32( 64 )
session[ 'challenge' ] = challenge
q = config.db.query( "SELECT * FROM webauthn WHERE user_id = %s AND key_name = %s", ( user_id, key_name ) )
key = q.fetchone()
webauthn_user = webauthn.WebAuthnUser(

Marco De Donno
committed
key[ 'ukey' ], session[ 'username' ], session[ 'username' ], None,
key[ 'credential_id' ], key[ 'pub_key' ], key[ 'sign_count' ], config.RP_ID
)
webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge )
return jsonify( {
'error': False,
'data': webauthn_assertion_options.assertion_dict
} )

Marco De Donno
committed
@app.route( baseurl + '/verify_assertion', methods = [ 'POST' ] )
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
def verify_assertion():
challenge = session.get( 'challenge' )
assertion_response = request.form
credential_id = assertion_response.get( 'id' )
q = config.db.query( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) )
user = q.fetchone()
webauthn_user = webauthn.WebAuthnUser(
user[ 'ukey' ], "admin", "admin", None,
user[ 'credential_id' ], user[ 'pub_key' ], user[ 'sign_count' ], "icnml.unil.ch"
)
webauthn_assertion_response = webauthn.WebAuthnAssertionResponse(
webauthn_user,
assertion_response,
challenge,
config.ORIGIN,
uv_required = False
)
try:
sign_count = webauthn_assertion_response.verify()
except Exception as e:
return jsonify( {
'error': True,
'message': 'Assertion failed. Error: {}'.format( e )
} )
dt = datetime.now( pytz.timezone( 'Europe/Zurich' ) )
q = config.db.query( "UPDATE webauthn SET sign_count = %s, last_usage = %s, usage_counter = usage_counter + 1 WHERE credential_id = %s", ( sign_count, dt, credential_id, ) )
config.db.commit()
session[ 'need_to_check' ].pop( 0 )
do_login()
return jsonify( {
'error': False
} )
################################################################################
# QR Code generation
def renew_secret():
secret = random_base32( 40 )
session[ 'secret' ] = secret
return secret
def get_secret():
secret = session.get( "secret", None )
if secret == None:
secret = renew_secret()
return secret
@app.route( baseurl + '/set_secret' )
@login_required
def set_secret():
config.db.query( "UPDATE users SET totp = %s WHERE username = %s", ( session[ 'secret' ], session[ 'username' ], ) )
config.db.commit()
return jsonify( {
'error': False
} )
@app.route( baseurl + '/secret' )
@login_required
def request_secret():
get_secret()
return jsonify( {
'error': False,
'secret': session[ 'secret' ]
} )
@app.route( baseurl + '/new_secret' )
@login_required
def request_renew_secret():
renew_secret()
return jsonify( {
'error': False,
'secret': session[ 'secret' ]
} )
@app.route( baseurl + '/qrcode' )
@login_required
def send_qrcode():
img = qrcode.make( 'otpauth://totp/ICNML%20' + session[ 'username' ] + '?secret=' + get_secret() )
temp = StringIO()
img.save( temp, format = "png" )
temp.seek( 0 )
return send_file( temp, mimetype = 'image/png' )
@app.route( baseurl + '/user_qrcode' )
@login_required
return render_template(
"qrcode.html",
baseurl = baseurl,
secret = get_secret(),
js = config.cdnjs,
css = config.cdncss
)
################################################################################
# Home page
@app.route( baseurl + '/' )
@login_required
return render_template(
"index.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss,
session_timeout = config.session_timeout
)
################################################################################
# Main startup
if __name__ == '__main__':
app.run( debug = debug, host = "0.0.0.0", threaded = True )