Newer
Older
#!/usr/bin/python
# -*- coding: UTF-8 -*-

Marco De Donno
committed
import os
import zipfile
from cStringIO import StringIO
from flask import Blueprint
from flask import jsonify, request, send_file, current_app, session
from PIL import Image
import utils

Marco De Donno
committed
from utils.decorator import admin_required, login_required
from utils.template import my_render_template
from views.images import str2img, image_tatoo, get_submission_uuid_for_file, image_serve
from const import *
from functions import do_encrypt_dek, do_decrypt_dek
from NIST.fingerprint.labels import FINGER_POSITION_CODE, PALM_POSITION_CODE
segments_position_code = dict( FINGER_POSITION_CODE, **PALM_POSITION_CODE )
afis_view = Blueprint( "afis", __name__, template_folder = "templates" )
@afis_view.route( "/admin/afis/list" )
@admin_required
def admin_list_folders():
return list_folders_inner( True )
@afis_view.route( "/afis/list" )
def list_folders():
return list_folders_inner( False )
def list_folders_inner( isadmin ):
if isadmin:
sql = """
SELECT
cnm_folder.uuid,
users.username
FROM cnm_folder
INNER JOIN submissions ON submissions.uuid = cnm_folder.donor
INNER JOIN users ON submissions.donor_id = users.id
ORDER BY users.id DESC
"""
else:
sql = """
SELECT DISTINCT ON ( users.id )
cnm_folder.uuid,
users.username
FROM cnm_folder
INNER JOIN cnm_folder_users ON cnm_folder.uuid = cnm_folder_users.folder_id
INNER JOIN submissions ON submissions.uuid = cnm_folder.donor
INNER JOIN users ON submissions.donor_id = users.id
WHERE cnm_folder_users.user_id = %s
ORDER BY users.id DESC
"""
folders = config.db.query_fetchall( sql, ( session.get( "user_id" ), ) )
sql = """
SELECT
submissions.uuid,
users.username
FROM submissions
LEFT JOIN users ON submissions.donor_id = users.id
LEFT JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE cnm_folder.uuid IS NULL
ORDER BY users.id ASC
"""
donors = config.db.query_fetchall( sql )
return my_render_template(
"afis/shared/list.html",
folders = folders,
donors = donors
@afis_view.route( "/admin/afis/new_folder", methods = [ "POST" ] )
def admin_add_new_folder():
try:
donor_uuid = request.form.get( "donor_uuid", False )
sql = "SELECT count( * ) AS nb FROM cnm_folder WHERE donor = %s"
nb = config.db.query_fetchone( sql, ( donor_uuid, ) )[ "nb" ]
if nb == 0:
sql = sql_insert_generate( "cnm_folder", [ "donor", "uuid" ], "id" )
config.db.query_fetchone( sql, ( donor_uuid, str( uuid4() ) ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/delete" )
@admin_required
def admin_delete_folder( folder_id ):
try:
sql = "DELETE FROM cnm_annotation WHERE folder = %s"
config.db.query( sql, ( folder_id, ) )
sql = "DELETE FROM cnm_segments WHERE folder_uuid = %s"
config.db.query( sql, ( folder_id, ) )
sql = "DELETE FROM cnm_folder WHERE uuid = %s"
config.db.query( sql, ( folder_id, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>" )
@admin_required
def admin_folder_show( folder_id ):
return folder_show_inner( folder_id )
@afis_view.route( "/afis/<folder_id>" )
def folder_show( folder_id ):
return folder_show_inner( folder_id )
def folder_show_inner( folder_id ):
admin = session.get( "account_type_name", None ) == "Administrator"
sql = """
SELECT
segments_locations.fpc
FROM segments_locations
INNER JOIN files ON segments_locations.tenprint_id = files.uuid
INNER JOIN submissions ON files.folder = submissions.id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE
cnm_folder.uuid = %s AND
(
fpc <= 10 OR
fpc = 22 OR
fpc = 24 OR
fpc = 25 OR
fpc = 27
)
GROUP BY fpc
ORDER BY fpc ASC
"""
segment_list = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT *
FROM cnm_segments
WHERE folder_uuid = %s
ORDER BY fpc ASC
"""
segment_list_in_folder = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT
files_segments.tenprint,
files_segments.pc
FROM cnm_folder
INNER JOIN submissions ON cnm_folder.donor = submissions.uuid
INNER JOIN files ON submissions.id = files.folder
INNER JOIN files_segments ON files.uuid = files_segments.tenprint
WHERE cnm_folder.uuid = %s
"""
tpid = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT username
FROM users
INNER JOIN submissions ON users.id = submissions.donor_id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE cnm_folder.uuid = %s
"""
username = config.db.query_fetchone( sql, ( folder_id, ) )[ "username" ]
# Users assigned to this folder
sql = "SELECT id, username FROM users WHERE type = 5"
users = config.db.query_fetchall( sql )
sql = """
SELECT
users.id,
users.username
FROM cnm_folder_users
INNER JOIN users ON cnm_folder_users.user_id = users.id
WHERE
folder_id = %s AND

Marco De Donno
committed
img_type = 1
users_assigned_refs = config.db.query_fetchall( sql, ( folder_id, ) )
sql = """
SELECT
users.id,
users.username
FROM cnm_folder_users
INNER JOIN users ON cnm_folder_users.user_id = users.id
WHERE
folder_id = %s AND

Marco De Donno
committed
img_type = 2
"""
users_assigned_marks = config.db.query_fetchall( sql, ( folder_id, ) )
return my_render_template(
"afis/shared/folder.html",
segment_list = segment_list,
segment_list_in_folder = segment_list_in_folder,
users_assigned_refs = users_assigned_refs,
users_assigned_marks = users_assigned_marks,
folder_id = folder_id,
segments_position_code = segments_position_code
@afis_view.route( "/admin/afis/<folder_id>/add/segment", methods = [ "POST" ] )
@admin_required
def admin_add_segment_to_cnmfolder( folder_id ):
try:
fpc = request.form.get( "fpc" )
sql = "SELECT count( * ) AS nb FROM cnm_segments WHERE folder_uuid = %s AND fpc = %s"
nb = config.db.query_fetchone( sql, ( folder_id, fpc, ) )[ "nb" ]
if nb == 0:
sql = sql_insert_generate( "cnm_segments", [ "uuid", "folder_uuid", "fpc" ], "uuid" )
config.db.query_fetchone( sql, ( str( uuid4() ), folder_id, fpc, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/segment/<fpc>" )
@admin_required
def admin_view_segment( folder_id, fpc ):
return view_segment_inner( folder_id, fpc, True )
@afis_view.route( "/afis/<folder_id>/segment/<fpc>" )
def view_segment( folder_id, fpc ):
return view_segment_inner( folder_id, fpc, False )

Marco De Donno
committed
def view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin ):
mark_list = view_segment_mark_ref_anno_lists_marks( folder_id, fpc, isadmin )
tenprints_list = view_segment_mark_ref_anno_lists_ref( folder_id, fpc, isadmin )
annotations = view_segment_mark_ref_anno_lists_annotations( folder_id, fpc, isadmin )
return {
"mark_list": mark_list,
"tenprints_list": tenprints_list,
"annotations": annotations
}
def view_segment_anno_list( folder_id, isadmin ):
annotations = view_segment_mark_ref_anno_lists_annotations( folder_id, "all", isadmin )
return {
"mark_list": [],
"tenprints_list": [],
"annotations": annotations
}
def get_multi_img_fpc( fpc ):
multi_img_fpc = {
1: [ 1, 11 ],
2: [ 2, 13 ],
3: [ 3, 13 ],
4: [ 4, 13 ],
5: [ 5, 13 ],
6: [ 6, 12 ],
7: [ 7, 14 ],
8: [ 8, 14 ],
9: [ 9, 14 ],
10: [ 10, 14 ]
}
fpc = int( fpc )
fpcs = multi_img_fpc.get( fpc, [ fpc ] )
return fpcs
def view_segment_mark_ref_anno_lists_ref( folder_id, fpc, isadmin ):
fpcs = get_multi_img_fpc( fpc )
tmp_sql = [ "files_segments.pc = %s" ] * len( fpcs )
tmp_sql = " OR ".join( tmp_sql )
files_segments.tenprint,
files_segments.pc,
files_segments.uuid
FROM cnm_folder
INNER JOIN submissions ON cnm_folder.donor = submissions.uuid
INNER JOIN files ON submissions.id = files.folder
INNER JOIN files_segments ON files.uuid = files_segments.tenprint
WHERE
cnm_folder.uuid = %s AND
( {} )
ORDER BY pc ASC
""".format( tmp_sql )
data = [ folder_id, ]
data.extend( fpcs )
tenprints_list = config.db.query_fetchall( sql, data )
if not isadmin:
sql = "SELECT count( * ) FROM cnm_folder_users WHERE user_id = %s AND folder_id = %s AND img_type = 1"
r = config.db.query_fetchone( sql, ( session.get( "user_id" ), folder_id, ) )[ "count" ]
if r == 0:
tenprints_list = []
return tenprints_list
def view_segment_mark_ref_anno_lists_marks( folder_id, fpc, isadmin ):
fpcs = get_multi_img_fpc( fpc )
FROM files
INNER JOIN mark_info ON files.uuid = mark_info.uuid
INNER JOIN submissions ON files.folder = submissions.id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE
cnm_folder.uuid = %s AND
( files.type = 3 OR files.type = 4 )
sql_where = []
for f in fpcs:
for q in fpc2pfsp[ f ]:
sql_where.append( "mark_info.pfsp LIKE '%%{}%%'".format( q ) )
if len( sql_where ) > 0:
sql += " AND ( {} )".format( " OR ".join( sql_where ) )
sql += " ORDER BY files.id ASC"
mark_list = config.db.query_fetchall( sql, ( folder_id, ) )

Marco De Donno
committed
if not isadmin:

Marco De Donno
committed
sql = "SELECT count( * ) FROM cnm_folder_users WHERE user_id = %s AND folder_id = %s AND img_type = 2"
r = config.db.query_fetchone( sql, ( session.get( "user_id" ), folder_id, ) )[ "count" ]
if r == 0:
mark_list = []
return mark_list
def view_segment_mark_ref_anno_lists_annotations( folder_id, fpc, isadmin ):

Marco De Donno
committed
sql = """
SELECT
id,
uuid,
fpc

Marco De Donno
committed
FROM cnm_annotation
WHERE folder = %s

Marco De Donno
committed
"""
data = ( folder_id, )
if fpc != "all":
sql += " AND fpc = %s"
data = ( folder_id, fpc, )
annotations = config.db.query_fetchall( sql, data )
return annotations

Marco De Donno
committed
def view_segment_inner( folder_id, fpc, isadmin ):
d = view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin )
mark_list = d[ "mark_list" ]
tenprints_list = d[ "tenprints_list" ]
annotations = d[ "annotations" ]
# Get the username of the donor
sql = """
SELECT username
FROM users
INNER JOIN submissions ON users.id = submissions.donor_id
INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor
WHERE cnm_folder.uuid = %s
"""
username = config.db.query_fetchone( sql, ( folder_id, ) )[ "username" ]
"afis/shared/segment.html",
tenprints_list = tenprints_list,

Marco De Donno
committed
len_tenprints_list = len( tenprints_list ),
mark_list = mark_list,

Marco De Donno
committed
len_mark_list = len( mark_list ),
username = username,
annotations = annotations,
segments_position_code = segments_position_code
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
@afis_view.route( "/admin/afis/<folder_id>/segment/<fpc>/add/illustration", methods = [ "POST" ] )
@admin_required
def admin_add_illustration( folder_id, fpc ):
try:
sql = """
SELECT donor
FROM cnm_folder
WHERE uuid = %s
"""
donor_uuid = config.db.query_fetchone( sql, ( folder_id, ) )[ "donor" ]
uploaded_file = request.files[ "file" ]
fp = StringIO()
uploaded_file.save( fp )
fp.seek( 0 )
file_data = fp.getvalue()
file_data = base64.b64encode( file_data )
file_data = do_encrypt_dek( file_data, donor_uuid )
sql = sql_insert_generate( "cnm_annotation", [ "uuid", "folder", "fpc", "annotation_data" ], "id" )
annotation_id = config.db.query_fetchone( sql, ( str( uuid4() ), folder_id, fpc, file_data, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/segment/<fpc>/remove/illustration", methods = [ "POST" ] )
@admin_required
def admin_remove_illustration( folder_id, fpc ):
try:
illustration_uuid = request.form.get( "uuid" )
sql = "DELETE FROM cnm_annotation WHERE folder = %s AND fpc = %s AND uuid = %s"
config.db.query( sql, ( folder_id, fpc, illustration_uuid, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )
@afis_view.route( "/admin/afis/<folder_id>/remove/segment", methods = [ "POST" ] )
@admin_required
def admin_remove_segment( folder_id ):
try:
fpc = request.form.get( "fpc" )
sql = "DELETE FROM cnm_segments WHERE folder_uuid = %s AND fpc = %s"
config.db.query( sql, ( folder_id, fpc, ) )
sql = "DELETE FROM cnm_annotation WHERE folder = %s AND fpc = %s"
config.db.query( sql, ( folder_id, fpc, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )

Marco De Donno
committed
@afis_view.route( "/<u_type>/afis/<folder_id>/annotation/<a_uuid>" )

Marco De Donno
committed
def get_annotation_image( folder_id, a_uuid, u_type = "afis" ):
img = get_annotation_image_inner( folder_id, a_uuid )
buff = utils.images.pil2buffer( img, "PNG" )
return send_file( buff, mimetype = "image/png" )
def get_annotation_image_inner( folder_id, a_uuid ):
sql = "SELECT donor FROM cnm_folder WHERE uuid = %s"
donor_uuid = config.db.query_fetchone( sql, ( folder_id, ) )[ "donor" ]
sql = """
SELECT annotation_data
FROM cnm_annotation
WHERE uuid = %s
"""
data = config.db.query_fetchone( sql, ( a_uuid, ) )[ "annotation_data" ]
data = do_decrypt_dek( data, donor_uuid )
img = base64.b64decode( data )
buff = StringIO()
buff.write( img )
buff.seek( 0 )

Marco De Donno
committed
return Image.open( buff )
@afis_view.route( "/admin/afis/<folder_id>/user/update", methods = [ "POST" ] )
@admin_required
def admin_update_users_in_afis_folder( folder_id ):
try:
users = request.form.get( "users" )
users = json.loads( users )

Marco De Donno
committed
if request.form.get( "type" ) == "ref":
img_type = 1

Marco De Donno
committed
img_type = 2

Marco De Donno
committed
sql = "DELETE FROM cnm_folder_users WHERE folder_id = %s AND img_type = %s"
config.db.query( sql, ( folder_id, img_type, ) )

Marco De Donno
committed
sql = sql_insert_generate( "cnm_folder_users", [ "user_id", "folder_id", "img_type" ], "id" )
for user in users:

Marco De Donno
committed
config.db.query_fetchone( sql, ( user, folder_id, img_type, ) )
config.db.commit()
return jsonify( {
"error": False
} )
except:
return jsonify( {
"error": True
} )

Marco De Donno
committed
return jsonify( view_segment_mark_ref_anno_lists( folder_id, fpc, False ) )
@afis_view.route( "/afis/<folder_id>/download/<fpc>" )
@login_required
def download_folder_afis( folder_id, fpc ):
return download_folder_inner( folder_id, fpc, False )
@afis_view.route( "/admin/afis/<folder_id>/download/<fpc>" )
@login_required
def download_folder_admin( folder_id, fpc ):
return download_folder_inner( folder_id, fpc, True )
def download_folder_inner( folder_id, fpc, isadmin ):
if fpc == "all":
lst = view_segment_anno_list( folder_id, isadmin )
else:
lst = view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin )

Marco De Donno
committed
zipbuffer = StringIO()
with zipfile.ZipFile( zipbuffer, "w", zipfile.ZIP_DEFLATED ) as fp:
for fid in lst[ "mark_list" ]:
file_id = fid[ "uuid" ]
submission_id = get_submission_uuid_for_file( file_id )
img, _ = image_serve( "files", file_id, submission_id )
img = image_tatoo( img, file_id )

Marco De Donno
committed
buff = utils.images.pil2buffer( img, "TIFF" )
fp.writestr( "mark_{}.tiff".format( file_id ), buff.getvalue() )
for fid in lst[ "tenprints_list" ]:
file_id = fid[ "tenprint" ]
pc = fid[ "pc" ]
submission_id = get_submission_uuid_for_file( file_id )
img, _ = image_serve( "files_segments", ( file_id, pc ), submission_id )
img = image_tatoo( img, file_id )
buff = utils.images.pil2buffer( img, "TIFF" )
fp.writestr( "reference_{}.tiff".format( file_id ), buff.getvalue() )

Marco De Donno
committed
for fid in lst[ "annotations" ]:
file_id = fid[ "uuid" ]
fpc = fid[ "fpc" ]

Marco De Donno
committed
img = get_annotation_image_inner( folder_id, file_id )
img = image_tatoo( img, file_id )

Marco De Donno
committed
buff = utils.images.pil2buffer( img, "PNG" )
fp.writestr( "annotation_finger{}_{}.png".format( fpc, file_id ), buff.getvalue() )

Marco De Donno
committed
zipbuffer.seek( 0 )
return send_file( zipbuffer, attachment_filename = "{}.zip".format( folder_id ), as_attachment = True )