Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from cStringIO import StringIO
from datetime import datetime
from email.mime.text import MIMEText
from threading import Thread
import functools
import hashlib
import json
from flask import render_template, send_from_directory
from flask import send_file
from flask import session
from flask import url_for
from flask_compress import Compress
from pyotp import random_base32
import pytz
from functions import random_data
from functions import render_jinja_html
################################################################################
debug = os.environ.get( "DEBUG", False )
baseurl = os.environ.get( "BASEURL", "" )
################################################################################
# Loggin decorator

Marco De Donno
committed
def session_field_required( field, value ):
def decorator( func ):
@functools.wraps( func )
def wrapper_login_required( *args, **kwargs ):
if not field in session:
return redirect( url_for( "login" ) )
elif not session.get( field ) == value:
return redirect( url_for( "login" ) )
return func( *args, **kwargs )
return wrapper_login_required
return decorator
def login_required( func ):
@functools.wraps( func )
def wrapper_login_required( *args, **kwargs ):
if not session.get( 'logged', False ) :
return redirect( url_for( "login" ) )
return func( *args, **kwargs )
return wrapper_login_required
################################################################################
# Generic routing
def ping():
return "pong"
################################################################################
# App serving
@app.route( baseurl + '/app/<path>' )
def send_app_files( path ):
return send_from_directory( 'app', path )
################################################################################
# Sessions
@app.before_request
def renew_session():
session.permanent = True
app.permanent_session_lifetime = timedelta( seconds = config.session_timeout )
@app.route( baseurl + '/logout' )
def logout():
session.clear()
return redirect( url_for( 'home' ) )
@app.route( baseurl + '/login' )
def login():
session[ 'need_to_check' ] = [ 'password' ]
session[ 'logged' ] = False
return render_template(
"login.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss
)
@app.route( baseurl + '/do_login', methods = [ 'POST' ] )
need_to_check = session.get( "need_to_check", [ 'password' ] )
try:
current_check = need_to_check[ 0 ]
except:
current_check = None
session[ 'need_to_check' ] = need_to_check
############################################################################
if current_check == "password":
q = config.db.query( 'SELECT * FROM users WHERE username = %s', ( request.form.get( "username" ), ) )
user = q.fetchone()
if user == None:
return jsonify( {
'error': False,
'logged': False
} )
form_password = request.form.get( "password", None )
if form_password == None or not pbkdf2( form_password, user[ 'password' ] ):
return jsonify( {
'error': False,
'logged': False,
} )
elif not user[ 'active' ]:
return jsonify( {
'error': False,
'logged': False,
'message': 'Your account is not activated. Please contact an administrator.'
} )
else:
session[ 'session_id' ] = str( uuid4() )
session[ 'username' ] = user[ 'username' ]
session[ 'user_id' ] = user[ 'id' ]
session[ 'password_check' ] = True
session[ 'need_to_check' ].remove( current_check )
if user[ 'must_use_totp' ]:
session[ 'need_to_check' ].append( 'totp' )
if user[ 'must_use_securitykey' ]:
session[ 'need_to_check' ].append( 'securitykey' )
q = config.db.query( 'SELECT username, totp FROM users WHERE username = %s', ( session[ 'username' ], ) )
user = q.fetchone()
if not pyotp.TOTP( user[ 'totp' ] ).verify( request.form[ "totp" ], valid_window = 1 ):
return jsonify( {
'error': False,
'logged': False,
'message': 'Wrong TOTP'
session[ 'need_to_check' ].remove( current_check )
if len( session[ 'need_to_check' ] ) == 0 and session.get( "password_check", False ):
for key in [ 'process', 'need_to_check', 'password_check' ]:
session.pop( key )
return jsonify( {
'error': False,
'logged': True,
} )
'next_step': session[ 'need_to_check' ][ 0 ]
################################################################################
# Reset
@app.route( baseurl + '/reset_password' )
def password_reset():
session.clear()
session[ 'process' ] = "request_password_reset"
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
return render_template(
"password_reset.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss
)
@app.route( baseurl + '/do_reset_password', methods = [ 'POST' ] )
def do_password_reset():
email = request.form.get( "email", None )
Thread( target = do_password_reset_thread, args = ( email, ) ).start()
return jsonify( {
'error': False,
'message': 'OK'
} )
def do_password_reset_thread( email ):
q = config.db.query( 'SELECT id, username, email FROM users' )
users = q.fetchall()
for user in users:
if not user[ 'email' ].startswith( "pbkdf2$" ):
continue
elif pbkdf2( email, user[ 'email' ] ):
id = hashlib.sha512( random_data( 100 ) ).hexdigest()
####################################################################
data = {
'process': 'password_reset',
'process_id': id,
'user_id': user[ 'id' ]
}
data = json.dumps( data )
data = base64.b64encode( data )
config.redis_shared.set( "reset_" + id, data, ex = 24 * 3600 )
####################################################################
email_content = render_jinja_html(
"templates/email", "reset.html",
id = id,
url = config.domain + baseurl + "/reset_password_stage2"
)
msg = MIMEText( email_content, "html" )
msg[ 'Subject' ] = 'ICNML - User password reset'
msg[ 'From' ] = config.sender
msg[ 'To' ] = email
s = smtplib.SMTP( config.smtpserver )
s.sendmail( config.sender, [ email ], msg.as_string() )
s.quit()
break
else:
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
@app.route( baseurl + '/reset_password_stage2/<id>', methods = [ 'GET', 'POST' ] )
def password_reset_stage2( id ):
id = str( id )
data = config.redis_shared.get( "reset_" + id )
if data != None:
data = base64.b64decode( data )
data = json.loads( data )
password = request.form.get( "password", None )
userid = data.get( "user_id", None )
if password != None:
password = pbkdf2( password, random_data( 50 ), 50000 )
config.db.query( "UPDATE users SET password = %s WHERE id = %s", ( password, userid ) )
config.db.commit()
config.redis_shared.delete( "reset_" + id )
return jsonify( {
'error': False,
'password_updated': True
} )
else:
return render_template(
"password_reset_stage2.html",
baseurl = baseurl,
id = id,
js = config.cdnjs,
css = config.cdncss
)
else:
return jsonify( {
'error': True,
'message': 'Reset procedure not found/expired'
} )
################################################################################
# Webauthn

Marco De Donno
committed
@app.route( baseurl + '/webauthn_key_management' )
@login_required
return render_template(
"webauthn_key_management.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss,
keys = do_webauthn_get_list_of_keys( all = True )
)

Marco De Donno
committed
@app.route( baseurl + '/webauthn_begin_activate', methods = [ 'POST' ] )
@login_required
def webauthn_begin_activate():
session[ 'key_name' ] = request.form.get( "key_name", None )
username = session.get( "username" )
challenge = random_base32( 64 )
ukey = random_base32( 64 )
session[ 'challenge' ] = challenge
session[ 'register_ukey' ] = ukey
make_credential_options = webauthn.WebAuthnMakeCredentialOptions(
challenge, config.rp_name, config.RP_ID,
ukey, username, username,
None
)
return jsonify( make_credential_options.registration_dict )

Marco De Donno
committed
@app.route( baseurl + '/verify_credential_info', methods = [ 'POST' ] )
@login_required
challenge = session[ 'challenge' ]
user_id = session[ 'user_id' ]
key_name = session.get( "key_name", None )
ukey = session[ 'register_ukey' ]
response = request.form
webauthn_registration_response = webauthn.WebAuthnRegistrationResponse(
config.RP_ID,
config.ORIGIN,
response,
challenge
)
try:
webauthn_credential = webauthn_registration_response.verify()
except Exception as e:
'error': True,
'message': 'Registration failed. Error: {}'.format( e )
config.db.query(
"""
INSERT INTO webauthn
( user_id, key_name, ukey, credential_id, pub_key, sign_count )
VALUES ( %s, %s, %s, %s, %s, %s )
""",
(
user_id, key_name,
ukey, webauthn_credential.credential_id,
webauthn_credential.public_key, webauthn_credential.sign_count,
)
)
config.db.commit()
return jsonify( {
'success': 'User successfully registered.'
} )
################################################################################
def do_webauthn_get_list_of_keys( uid = None, all = False ):
user_id = session.get( "user_id", uid )
sql = "SELECT id, key_name as name, created_on, last_usage, usage_counter, active FROM webauthn WHERE user_id = %s"
if not all:
sql += " AND active = true"
sql += " ORDER BY sign_count DESC"
q = config.db.query( sql, ( user_id, ) )
keys = q.fetchall()
data = []
for key in keys:
data.append( dict( key ) )

Marco De Donno
committed
@app.route( baseurl + '/webauthn_get_list_of_keys', methods = [ 'POST' ] )
def webauthn_get_list_of_keys():
return jsonify( {
'error': False,
'data': do_webauthn_get_list_of_keys()
} )

Marco De Donno
committed
@app.route( baseurl + '/webauthn_delete_key', methods = [ 'POST' ] )
@login_required
def webauthn_delete_key():
key_id = request.form.get( "key_id", False )
key_name = request.form.get( "key_name", False )
userid = session[ 'user_id' ]
try:
config.db.query( "DELETE FROM webauthn WHERE id = %s AND key_name = %s AND user_id = %s", ( key_id, key_name, userid, ) )
config.db.commit()
return jsonify( {
'error': False
} )
except Exception as e:
return jsonify( {
'error': True
} );

Marco De Donno
committed
@app.route( baseurl + '/webauthn_disable_key', methods = [ 'POST' ] )
@login_required
def webauthn_disable_key():
key_id = request.form.get( "key_id", False )
key_name = request.form.get( "key_name", False )
userid = session[ 'user_id' ]
try:
config.db.query( "UPDATE webauthn SET active = False WHERE id = %s AND key_name = %s AND user_id = %s", ( key_id, key_name, userid, ) )
config.db.commit()
return jsonify( {
'error': False
} )
except Exception as e:
return jsonify( {
'error': True
} );

Marco De Donno
committed
@app.route( baseurl + '/webauthn_enable_key', methods = [ 'POST' ] )
@login_required
def webauthn_enable_key():
key_id = request.form.get( "key_id", False )
key_name = request.form.get( "key_name", False )
userid = session[ 'user_id' ]
try:
config.db.query( "UPDATE webauthn SET active = True WHERE id = %s AND key_name = %s AND user_id = %s", ( key_id, key_name, userid, ) )
config.db.commit()
return jsonify( {
'error': False
} )
except Exception as e:
return jsonify( {
'error': True
} );

Marco De Donno
committed
@app.route( baseurl + '/webauthn_begin_assertion', methods = [ 'POST' ] )
def webauthn_begin_assertion():
key_name = request.form.get( "key_name", None )
user_id = session.get( "user_id" )
if 'challenge' in session:
del session[ 'challenge' ]
challenge = random_base32( 64 )
session[ 'challenge' ] = challenge
q = config.db.query( "SELECT * FROM webauthn WHERE user_id = %s AND key_name = %s", ( user_id, key_name ) )
key = q.fetchone()
webauthn_user = webauthn.WebAuthnUser(

Marco De Donno
committed
key[ 'ukey' ], session[ 'username' ], session[ 'username' ], None,
key[ 'credential_id' ], key[ 'pub_key' ], key[ 'sign_count' ], config.RP_ID
)
webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( webauthn_user, challenge )
return jsonify( {
'error': False,
'data': webauthn_assertion_options.assertion_dict
} )

Marco De Donno
committed
@app.route( baseurl + '/verify_assertion', methods = [ 'POST' ] )
def verify_assertion():
challenge = session.get( 'challenge' )
assertion_response = request.form
credential_id = assertion_response.get( 'id' )
q = config.db.query( "SELECT * FROM webauthn WHERE credential_id = %s", ( credential_id, ) )
user = q.fetchone()
webauthn_user = webauthn.WebAuthnUser(

Marco De Donno
committed
user[ 'ukey' ], session[ 'username' ], session[ 'username' ], None,
user[ 'credential_id' ], user[ 'pub_key' ], user[ 'sign_count' ], "icnml.unil.ch"
)
webauthn_assertion_response = webauthn.WebAuthnAssertionResponse(
webauthn_user,
assertion_response,
challenge,
config.ORIGIN,
uv_required = False
)
try:
sign_count = webauthn_assertion_response.verify()
except Exception as e:
return jsonify( {
'error': True,
'message': 'Assertion failed. Error: {}'.format( e )
} )
dt = datetime.now( pytz.timezone( 'Europe/Zurich' ) )
q = config.db.query( "UPDATE webauthn SET sign_count = %s, last_usage = %s, usage_counter = usage_counter + 1 WHERE credential_id = %s", ( sign_count, dt, credential_id, ) )
session[ 'need_to_check' ].remove( "securitykey" )
do_login()
return jsonify( {
'error': False
} )
################################################################################
# QR Code generation
def renew_secret():
secret = random_base32( 40 )
session[ 'secret' ] = secret
return secret
def get_secret():
secret = session.get( "secret", None )
if secret == None:
secret = renew_secret()
return secret
@app.route( baseurl + '/set_secret' )
@login_required
def set_secret():
config.db.query( "UPDATE users SET totp = %s WHERE username = %s", ( session[ 'secret' ], session[ 'username' ], ) )
config.db.commit()
return jsonify( {
'error': False
} )
@app.route( baseurl + '/secret' )
@login_required
def request_secret():
get_secret()
return jsonify( {
'error': False,
'secret': session[ 'secret' ]
} )
@app.route( baseurl + '/new_secret' )
@login_required
def request_renew_secret():
renew_secret()
return jsonify( {
'error': False,
'secret': session[ 'secret' ]
} )
@app.route( baseurl + '/qrcode' )
@login_required
def send_qrcode():
img = qrcode.make( 'otpauth://totp/ICNML%20' + session[ 'username' ] + '?secret=' + get_secret() )
temp = StringIO()
img.save( temp, format = "png" )
temp.seek( 0 )
return send_file( temp, mimetype = 'image/png' )
@app.route( baseurl + '/user_qrcode' )
@login_required
return render_template(
"qrcode.html",
baseurl = baseurl,
secret = get_secret(),
js = config.cdnjs,
css = config.cdncss
)
################################################################################
# Home page
@app.route( baseurl + '/' )
@login_required
return render_template(
"index.html",
baseurl = baseurl,
js = config.cdnjs,
css = config.cdncss,
session_timeout = config.session_timeout
)
################################################################################
# Main startup
if __name__ == '__main__':
app.run( debug = debug, host = "0.0.0.0", threaded = True )