Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-

Marco De Donno
committed
import os
import zipfile
from cStringIO import StringIO
from flask import Blueprint
from flask import jsonify, request, send_file, current_app, session
from PIL import Image
import utils

Marco De Donno
committed
from utils.decorator import admin_required, login_required
from utils.template import my_render_template
from views.images import str2img, image_tatoo, get_submission_uuid_for_file, image_serve
from const import *
from functions import do_encrypt_dek, do_decrypt_dek
from NIST.fingerprint.labels import FINGER_POSITION_CODE, PALM_POSITION_CODE
segments_position_code = dict( FINGER_POSITION_CODE, **PALM_POSITION_CODE )
afis_view = Blueprint( "afis", __name__, template_folder = "templates" )
@afis_view.route( "/admin/afis/list" )
@admin_required
def admin_list_folders():
return list_folders_inner()
@afis_view.route( "/afis/list" )
def list_folders():
return list_folders_inner()
def list_folders_inner():
sql = """
SELECT
cnm_folder.uuid,
users.username
FROM cnm_folder
INNER JOIN submissions ON submissions.uuid = cnm_folder.donor
INNER JOIN users ON submissions.donor_id = users.id
"""
folders = config.db.query_fetchall( sql )
sql = """
SELECT
submissions.uuid,
users.username
FROM submissions
LEFT JOIN users ON submissions.donor_id = users.id
LEFT JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE cnm_folder.uuid IS NULL
ORDER BY users.id ASC
"""
donors = config.db.query_fetchall( sql )
return my_render_template(
"afis/shared/list.html",
folders = folders,
donors = donors
@afis_view.route( "/admin/afis/new_folder", methods = [ "POST" ] )
def admin_add_new_folder():
try:
donor_uuid = request.form.get( "donor_uuid", False )
sql = "SELECT count( * ) AS nb FROM cnm_folder WHERE donor = %s"
nb = config.db.query_fetchone( sql, ( donor_uuid, ) )[ "nb" ]
if nb == 0:
sql = sql_insert_generate( "cnm_folder", [ "donor", "uuid" ], "id" )
config.db.query_fetchone( sql, ( donor_uuid, str( uuid4() ) ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/delete" )
@admin_required
def admin_delete_folder( folder_id ):
try:
sql = "DELETE FROM cnm_annotation WHERE folder = %s"
config.db.query( sql, ( folder_id, ) )
sql = "DELETE FROM cnm_segments WHERE folder_uuid = %s"
config.db.query( sql, ( folder_id, ) )
sql = "DELETE FROM cnm_folder WHERE uuid = %s"
config.db.query( sql, ( folder_id, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>" )
@admin_required
def admin_folder_show( folder_id ):
return folder_show_inner( folder_id )
@afis_view.route( "/afis/<folder_id>" )
def folder_show( folder_id ):
return folder_show_inner( folder_id )
def folder_show_inner( folder_id ):
admin = session.get( "account_type_name", None ) == "Administrator"
sql = """
SELECT
segments_locations.fpc
FROM segments_locations
INNER JOIN files ON segments_locations.tenprint_id = files.uuid
INNER JOIN submissions ON files.folder = submissions.id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE
cnm_folder.uuid = %s AND
(
fpc <= 10 OR
fpc = 22 OR
fpc = 24 OR
fpc = 25 OR
fpc = 27
)
GROUP BY fpc
ORDER BY fpc ASC
"""
segment_list = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT *
FROM cnm_segments
WHERE folder_uuid = %s
ORDER BY fpc ASC
"""
segment_list_in_folder = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT
files_segments.tenprint,
files_segments.pc
FROM cnm_folder
INNER JOIN submissions ON cnm_folder.donor = submissions.uuid
INNER JOIN files ON submissions.id = files.folder
INNER JOIN files_segments ON files.uuid = files_segments.tenprint
WHERE cnm_folder.uuid = %s
"""
tpid = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT username
FROM users
INNER JOIN submissions ON users.id = submissions.donor_id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE cnm_folder.uuid = %s
"""
username = config.db.query_fetchone( sql, ( folder_id, ) )[ "username" ]
# Users assigned to this folder
sql = "SELECT id, username FROM users WHERE type = 5"
users = config.db.query_fetchall( sql )
sql = """
SELECT
users.id,
users.username
FROM cnm_folder_users
INNER JOIN users ON cnm_folder_users.user_id = users.id
WHERE
folder_id = %s AND

Marco De Donno
committed
img_type = 1
users_assigned_refs = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT
users.id,
users.username
FROM cnm_folder_users
INNER JOIN users ON cnm_folder_users.user_id = users.id
WHERE
folder_id = %s AND

Marco De Donno
committed
img_type = 2
"""
users_assigned_marks = config.db.query_fetchall( sql, ( folder_id, ) )
return my_render_template(
"afis/shared/folder.html",
segment_list = segment_list,
segment_list_in_folder = segment_list_in_folder,
users_assigned_refs = users_assigned_refs,
users_assigned_marks = users_assigned_marks,
folder_id = folder_id,
segments_position_code = segments_position_code
@afis_view.route( "/admin/afis/<folder_id>/add/segment", methods = [ "POST" ] )
@admin_required
def admin_add_segment_to_cnmfolder( folder_id ):
try:
fpc = request.form.get( "fpc" )
sql = "SELECT count( * ) AS nb FROM cnm_segments WHERE folder_uuid = %s AND fpc = %s"
nb = config.db.query_fetchone( sql, ( folder_id, fpc, ) )[ "nb" ]
if nb == 0:
sql = sql_insert_generate( "cnm_segments", [ "uuid", "folder_uuid", "fpc" ], "uuid" )
config.db.query_fetchone( sql, ( str( uuid4() ), folder_id, fpc, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/segment/<fpc>" )
@admin_required
def admin_view_segment( folder_id, fpc ):
return view_segment_inner( folder_id, fpc, True )
@afis_view.route( "/afis/<folder_id>/segment/<fpc>" )
def view_segment( folder_id, fpc ):
return view_segment_inner( folder_id, fpc, False )

Marco De Donno
committed
def view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin ):
# Get the list of references images depending upon the FPC
multi_img_fpc = {
1: [ 1, 11 ],
2: [ 2, 13 ],
3: [ 3, 13 ],
4: [ 4, 13 ],
5: [ 5, 13 ],
6: [ 6, 12 ],
7: [ 7, 14 ],
8: [ 8, 14 ],
9: [ 9, 14 ],
10: [ 10, 14 ]
}
fpc = int( fpc )
fpcs = multi_img_fpc.get( fpc, [ fpc ] )
tmp_sql = [ "files_segments.pc = %s" ] * len( fpcs )
tmp_sql = " OR ".join( tmp_sql )
files_segments.tenprint,
files_segments.pc,
files_segments.uuid
FROM cnm_folder
INNER JOIN submissions ON cnm_folder.donor = submissions.uuid
INNER JOIN files ON submissions.id = files.folder
INNER JOIN files_segments ON files.uuid = files_segments.tenprint
WHERE
cnm_folder.uuid = %s AND
( {} )
ORDER BY pc ASC
""".format( tmp_sql )
data = [ folder_id, ]
data.extend( fpcs )
tenprints_list = config.db.query_fetchall( sql, data )
# Get the list of marks related to this finger
sql = """
FROM files
INNER JOIN mark_info ON files.uuid = mark_info.uuid
INNER JOIN submissions ON files.folder = submissions.id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE
cnm_folder.uuid = %s AND
( files.type = 3 OR files.type = 4 )
sql_where = []
for f in fpcs:
for q in fpc2pfsp[ f ]:
sql_where.append( "mark_info.pfsp LIKE '%%{}%%'".format( q ) )
if len( sql_where ) > 0:
sql += " AND ( {} )".format( " OR ".join( sql_where ) )
sql += " ORDER BY files.id ASC"
mark_list = config.db.query_fetchall( sql, ( folder_id, ) )

Marco De Donno
committed
# Filter out the reference and mark list for afis users
#TODO: Refactoring
if not isadmin:

Marco De Donno
committed
sql = "SELECT count( * ) FROM cnm_folder_users WHERE user_id = %s AND folder_id = %s AND img_type = 1"
r = config.db.query_fetchone( sql, ( session.get( "user_id" ), folder_id, ) )[ "count" ]
if r == 0:
tenprints_list = []

Marco De Donno
committed
sql = "SELECT count( * ) FROM cnm_folder_users WHERE user_id = %s AND folder_id = %s AND img_type = 2"
r = config.db.query_fetchone( sql, ( session.get( "user_id" ), folder_id, ) )[ "count" ]
if r == 0:
mark_list = []

Marco De Donno
committed
# Get the list of annotations files
sql = """
SELECT id, uuid
FROM cnm_annotation
WHERE folder = %s AND fpc = %s
"""
annotations = config.db.query_fetchall( sql, ( folder_id, fpc, ) )
return {
"mark_list": mark_list,
"tenprints_list": tenprints_list,
"annotations": annotations
}

Marco De Donno
committed
def view_segment_inner( folder_id, fpc, isadmin ):
d = view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin )
mark_list = d[ "mark_list" ]
tenprints_list = d[ "tenprints_list" ]
annotations = d[ "annotations" ]
# Get the username of the donor
sql = """
SELECT username
FROM users
INNER JOIN submissions ON users.id = submissions.donor_id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE cnm_folder.uuid = %s
"""
username = config.db.query_fetchone( sql, ( folder_id, ) )[ "username" ]
"afis/shared/segment.html",
tenprints_list = tenprints_list,

Marco De Donno
committed
len_tenprints_list = len( tenprints_list ),
mark_list = mark_list,

Marco De Donno
committed
len_mark_list = len( mark_list ),
username = username,
annotations = annotations,
segments_position_code = segments_position_code
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
@afis_view.route( "/admin/afis/<folder_id>/segment/<fpc>/add/illustration", methods = [ "POST" ] )
@admin_required
def admin_add_illustration( folder_id, fpc ):
try:
sql = """
SELECT donor
FROM cnm_folder
WHERE uuid = %s
"""
donor_uuid = config.db.query_fetchone( sql, ( folder_id, ) )[ "donor" ]
uploaded_file = request.files[ "file" ]
fp = StringIO()
uploaded_file.save( fp )
fp.seek( 0 )
file_data = fp.getvalue()
file_data = base64.b64encode( file_data )
file_data = do_encrypt_dek( file_data, donor_uuid )
sql = sql_insert_generate( "cnm_annotation", [ "uuid", "folder", "fpc", "annotation_data" ], "id" )
annotation_id = config.db.query_fetchone( sql, ( str( uuid4() ), folder_id, fpc, file_data, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/segment/<fpc>/remove/illustration", methods = [ "POST" ] )
@admin_required
def admin_remove_illustration( folder_id, fpc ):
try:
illustration_uuid = request.form.get( "uuid" )
sql = "DELETE FROM cnm_annotation WHERE folder = %s AND fpc = %s AND uuid = %s"
config.db.query( sql, ( folder_id, fpc, illustration_uuid, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/remove/segment", methods = [ "POST" ] )
@admin_required
def admin_remove_segment( folder_id ):
try:
fpc = request.form.get( "fpc" )
sql = "DELETE FROM cnm_segments WHERE folder_uuid = %s AND fpc = %s"
config.db.query( sql, ( folder_id, fpc, ) )
sql = "DELETE FROM cnm_annotation WHERE folder = %s AND fpc = %s"
config.db.query( sql, ( folder_id, fpc, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )

Marco De Donno
committed
@afis_view.route( "/<u_type>/afis/<folder_id>/annotation/<a_uuid>" )
def get_annotation_image( folder_id, a_uuid, u_type = "afis" ):
img = get_annotation_image_inner( folder_id, a_uuid )
buff = utils.images.pil2buffer( img, "PNG" )
return send_file( buff, mimetype = "image/png" )
def get_annotation_image_inner( folder_id, a_uuid ):
sql = "SELECT donor FROM cnm_folder WHERE uuid = %s"
donor_uuid = config.db.query_fetchone( sql, ( folder_id, ) )[ "donor" ]
sql = """
SELECT annotation_data
FROM cnm_annotation
WHERE uuid = %s
"""
data = config.db.query_fetchone( sql, ( a_uuid, ) )[ "annotation_data" ]
data = do_decrypt_dek( data, donor_uuid )
img = base64.b64decode( data )
buff = StringIO()
buff.write( img )
buff.seek( 0 )

Marco De Donno
committed
return Image.open( buff )
@afis_view.route( "/admin/afis/<folder_id>/user/update", methods = [ "POST" ] )
@admin_required
def admin_update_users_in_afis_folder( folder_id ):
try:
users = request.form.get( "users" )
users = json.loads( users )

Marco De Donno
committed
if request.form.get( "type" ) == "ref":
img_type = 1

Marco De Donno
committed
img_type = 2

Marco De Donno
committed
sql = "DELETE FROM cnm_folder_users WHERE folder_id = %s AND img_type = %s"
config.db.query( sql, ( folder_id, img_type, ) )

Marco De Donno
committed
sql = sql_insert_generate( "cnm_folder_users", [ "user_id", "folder_id", "img_type" ], "id" )
for user in users:

Marco De Donno
committed
config.db.query_fetchone( sql, ( user, folder_id, img_type, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )

Marco De Donno
committed
return jsonify( view_segment_mark_ref_anno_lists( folder_id, fpc, False ) )
@afis_view.route( "/afis/<folder_id>/download/<fpc>" )
@login_required
def download_folder_afis( folder_id, fpc ):
return download_folder_inner( folder_id, fpc, False )
@afis_view.route( "/admin/afis/<folder_id>/download/<fpc>" )
@login_required
def download_folder_admin( folder_id, fpc ):
return download_folder_inner( folder_id, fpc, True )
def download_folder_inner( folder_id, fpc, isadmin ):
lst = view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin )

Marco De Donno
committed
zipbuffer = StringIO()
with zipfile.ZipFile( zipbuffer, "w", zipfile.ZIP_DEFLATED ) as fp:
for fid in lst[ "mark_list" ]:
file_id = fid[ "uuid" ]
submission_id = get_submission_uuid_for_file( file_id )
img, _ = image_serve( "files", file_id, submission_id )
img = image_tatoo( img, file_id )

Marco De Donno
committed
buff = utils.images.pil2buffer( img, "TIFF" )
fp.writestr( "mark_{}.tiff".format( file_id ), buff.getvalue() )
for fid in lst[ "tenprints_list" ]:
file_id = fid[ "tenprint" ]
pc = fid[ "pc" ]
submission_id = get_submission_uuid_for_file( file_id )
img, _ = image_serve( "files_segments", ( file_id, pc ), submission_id )
img = image_tatoo( img, file_id )
buff = utils.images.pil2buffer( img, "TIFF" )
fp.writestr( "reference_{}.tiff".format( file_id ), buff.getvalue() )

Marco De Donno
committed
for fid in lst[ "annotations" ]:
file_id = fid[ "uuid" ]
img = get_annotation_image_inner( folder_id, file_id )
img = image_tatoo( img, file_id )

Marco De Donno
committed
buff = utils.images.pil2buffer( img, "PNG" )
fp.writestr( "annotation_{}.png".format( file_id ), buff.getvalue() )
zipbuffer.seek( 0 )
return send_file( zipbuffer, attachment_filename = "{}.zip".format( folder_id ), as_attachment = True )