Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-
from flask import Blueprint
from flask import abort, redirect, jsonify, send_file
from flask import url_for, request, current_app, session
from cStringIO import StringIO
from datetime import datetime, timedelta
from email.mime.text import MIMEText
from threading import Thread
import base64
import hashlib
import json
import time
import webauthn
import config
import utils
from utils.decorator import login_required
from utils.template import my_render_template
login_view = Blueprint( "login", __name__, template_folder = "templates" )
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@login_view.before_request
def renew_session():
"""
Reset the timer before the automatic-logout.
This function is called before every HTTP call.
"""
session.permanent = True
session.permanent_session_lifetime = timedelta( seconds = config.session_timeout )
@login_view.route( "/is_logged" )
def is_logged():
"""
App route to know if the user is logged in the ICNML main application.
This route is used by nginx to protect some other locations, for example
the PiAnoS dedicated pages.
The session countdown timer is resetted to allow the user to use the protected
location for the rest of the timeout.
"""
current_app.logger.info( "Check if the user is connected" )
if session.get( "logged", False ):
return "ok"
else:
return abort( 403 )
@login_view.route( "/logout" )
def logout():
"""
Logout the user, clear the session and redirect to the login page.
"""
current_app.logger.info( "Logout and clear session" )
session_clear_and_prepare()
return redirect( url_for( "base.home" ) )
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def session_clear_and_prepare():
"""
Clear the session related to the user and initialize the login related variables.
"""
session.clear()
session[ "process" ] = "login"
session[ "need_to_check" ] = [ "password" ]
session[ "logged" ] = False
session[ "session_security_key" ] = str( uuid4() )
@login_view.route( "/login" )
def login():
"""
Route to serve the login.html page.
"""
current_app.logger.info( "Login start" )
session_clear_and_prepare()
return my_render_template( "login.html" )
@login_view.route( "/do/login", methods = [ "POST" ] )
def do_login():
"""
Function to manadge the login workflow and check the username, password and TOTP data.
This function is called multiple times because the check is done only for one data
type at the time.
If all the checks are OK, the user has provided all needed information, hence is logged in.
"""
#TODO: combine the security key checks in this function
need_to_check = session.get( "need_to_check", [ "password" ] )
try:
current_check = need_to_check[ 0 ]
except:
current_check = None
session[ "need_to_check" ] = need_to_check
current_app.logger.info( "Current check: {}".format( current_check ) )
############################################################################
# Username and password check
if current_check == "password":
user = config.db.query_fetchone( "SELECT * FROM users WHERE username = %s", ( request.form.get( "username" ), ) )
if user == None:
current_app.logger.error( "Username not found in the database" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False
} )
form_password = request.form.get( "password", None )
if form_password == None or not utils.hash.pbkdf2( form_password, user[ "password" ] ).verify():
current_app.logger.error( "Password not validated" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
} )
elif not user[ "active" ]:
current_app.logger.error( "User not active" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
"message": "Your account is not activated. Please contact an administrator (icnml@unil.ch)."
} )
else:
session[ "username" ] = user[ "username" ]
session[ "user_id" ] = user[ "id" ]
session[ "password_check" ] = True
current_app.logger.info( "User '{}' checked by password".format( user[ "username" ] ) )
session[ "need_to_check" ].remove( current_check )
session[ "password" ] = utils.hash.pbkdf2( form_password, "AES256", config.PASSWORD_NB_ITERATIONS ).hash()
sql = "SELECT count( * ) FROM webauthn WHERE user_id = %s AND active = TRUE"
security_keys_count = config.db.query_fetchone( sql, ( user[ "id" ], ) )[ "count" ]
current_app.logger.info( "Number of security keys: {}".format( security_keys_count ) )
current_app.logger.info( "TOTP: {}".format( user[ "totp" ] is not None ) )
if config.envtype.upper() != "DEV":
if security_keys_count > 0:
session[ "need_to_check" ].append( "securitykey" )
elif user[ "totp" ]:
session[ "need_to_check" ].append( "totp" )
else:
current_app.logger.error( "Second factor missing" )
session_clear_and_prepare()
return jsonify( {
"error": False,
"logged": False,
"message": "Second factor missing. Contact the ICNML administrator (icnml@unil.ch)."
} )
############################################################################
# Time-based One Time Password check
elif current_check == "totp":
user = config.db.query_fetchone( "SELECT username, totp FROM users WHERE username = %s", ( session[ "username" ], ) )
totp_db = pyotp.TOTP( user[ "totp" ] )
totp_user = request.form.get( "totp", None )
if totp_user == None:
return jsonify( {
"error": True,
"message": "No TOTP provided"
} )
current_app.logger.info( "TOTP expected now: {}".format( totp_db.now() ) )
current_app.logger.info( "TOTP provided: {}".format( totp_user ) )
if not totp_db.verify( totp_user, valid_window = config.TOTP_VALIDWINDOW ):
current_app.logger.error( "TOTP not valid in a {} time window".format( config.TOTP_VALIDWINDOW ) )
Loading
Loading full blame...