Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from flask import Blueprint
from flask import abort, redirect, jsonify, send_file
from flask import url_for, request, current_app, session
from cStringIO import StringIO
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from threading import Thread
import base64
import hashlib
import json
import time
import webauthn
import config
import utils
import views.pianos
login_view = Blueprint( "login", __name__, template_folder = "templates" )
@login_view.route( "/is_logged" )
def is_logged():
"""
App route to know if the user is logged in the ICNML main application.
This route is used by nginx to protect some other locations, for example the PiAnoS dedicated pages.
"""
current_app.logger.info( "Check if the user is connected" )
if session.get( "logged", False ):
return "ok"
else:
return abort( 403 )
@login_view.route( "/logout" )
def logout():
"""
Logout the user, clear the session and redirect to the login page.
"""
current_app.logger.info( "Logout and clear session" )
session_clear_and_prepare()

Marco De Donno
committed
return redirect( url_for( "login.login" ) )
def session_clear_and_prepare():
"""
Clear the session related to the user and initialize the login related variables.
"""
session.clear()
session[ "process" ] = "login"
session[ "need_to_check" ] = [ "password" ]
session[ "logged" ] = False
@login_view.route( "/login" )
def login():
"""
Route to serve the login.html page.
"""
current_app.logger.info( "Login start" )
session_clear_and_prepare()

Marco De Donno
committed
if request.query_string != "":
session[ "url_redirect" ] = request.query_string
return utils.template.my_render_template( "login.html" )
@login_view.route( "/do/login", methods = [ "POST" ] )
def do_login():
"""
Function to manadge the login workflow and check the username, password and TOTP data.
This function is called multiple times because the check is done only for one data type at the time.
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
If all the checks are OK, the user has provided all needed information, hence is logged in.
"""
#TODO: combine the security key checks in this function
need_to_check = session.get( "need_to_check", [ "password" ] )
try:
current_check = need_to_check[ 0 ]
except:
current_check = None
session[ "need_to_check" ] = need_to_check
current_app.logger.info( "Current check: {}".format( current_check ) )
############################################################################
# Username and password check
if current_check == "password":
user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) )
if user == None:
current_app.logger.error( "Username not found in the database" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False
} )
form_password = request.form.get( "password", None )
if form_password == None or not utils.hash.pbkdf2( form_password, user[ "password" ] ).verify():
current_app.logger.error( "Password not validated" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
} )
elif not user[ "active" ]:
current_app.logger.error( "User not active" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
"message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)."
} )
else:
session[ "username" ] = user[ "username" ]
session[ "user_id" ] = user[ "id" ]
session[ "password_check" ] = True
current_app.logger.info( "User '{}' checked by password".format( user[ "username" ] ) )
session[ "need_to_check" ].remove( current_check )
session[ "password" ] = utils.hash.pbkdf2( form_password, "AES256", config.PASSWORD_NB_ITERATIONS ).hash()
sql = "SELECT count( * ) FROM webauthn WHERE user_id = %s AND active = TRUE"
security_keys_count = config.db.query_fetchone( sql, ( user[ "id" ], ) )[ "count" ]
current_app.logger.info( "Number of security keys: {}".format( security_keys_count ) )
current_app.logger.info( "TOTP: {}".format( user[ "totp" ] is not None ) )
if config.envtype.upper() != "DEV":
hra = hashlib.sha512( request.headers.environ[ "REMOTE_ADDR" ] ).hexdigest()
username = session[ "username" ]
key = "{}_{}".format( username, hra )
saved_totp = config.redis_shared.get( key ) == "ok"
if security_keys_count > 0:
session[ "need_to_check" ].append( "securitykey" )
if saved_totp:
pass
elif user[ "totp" ]:
session[ "need_to_check" ].append( "totp" )
else:
current_app.logger.error( "Second factor missing" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
"next_step": "totp_reset"
} )
############################################################################
# Time-based One Time Password check
elif current_check == "totp":
user = config.db.query_fetchone( "SELECT username, totp FROM users WHERE username = %s", ( session[ "username" ], ) )
totp_db = pyotp.TOTP( user[ "totp" ] )
totp_user = request.form.get( "totp", None )
totp_save_serverside = request.form.get( "save", False )
if totp_user == None:
return jsonify( {
"error": True,
"message": "No TOTP provided"
} )
current_app.logger.info( "TOTP expected now: {}".format( totp_db.now() ) )
current_app.logger.info( "TOTP provided: {}".format( totp_user ) )
if not totp_db.verify( totp_user, valid_window = config.TOTP_VALIDWINDOW ):
current_app.logger.error( "TOTP not valid in a {} time window".format( config.TOTP_VALIDWINDOW ) )
current_app.logger.info( "TOTP check for a bigger time window" )
Loading
Loading full blame...