#!/usr/bin/python # -*- coding: UTF-8 -*- import base64 import json import os import zipfile from cStringIO import StringIO from uuid import uuid4 from flask import Blueprint from flask import jsonify, request, send_file, current_app, session from PIL import Image import utils from utils.decorator import admin_required, login_required from utils.sql import sql_insert_generate from utils.template import my_render_template import config from views.images import str2img, image_tatoo, get_submission_uuid_for_file, image_serve from const import * from functions import do_encrypt_dek, do_decrypt_dek from NIST.fingerprint.labels import FINGER_POSITION_CODE, PALM_POSITION_CODE segments_position_code = dict( FINGER_POSITION_CODE, **PALM_POSITION_CODE ) afis_view = Blueprint( "afis", __name__, template_folder = "templates" ) @afis_view.route( "/admin/afis/list" ) @admin_required def admin_list_folders(): return list_folders_inner() @afis_view.route( "/afis/list" ) def list_folders(): return list_folders_inner() def list_folders_inner(): sql = """ SELECT cnm_folder.uuid, users.username FROM cnm_folder INNER JOIN submissions ON submissions.uuid = cnm_folder.donor INNER JOIN users ON submissions.donor_id = users.id """ folders = config.db.query_fetchall( sql ) sql = """ SELECT submissions.uuid, users.username FROM submissions LEFT JOIN users ON submissions.donor_id = users.id LEFT JOIN cnm_folder ON submissions.uuid = cnm_folder.donor WHERE cnm_folder.uuid IS NULL ORDER BY users.id ASC """ donors = config.db.query_fetchall( sql ) return my_render_template( "afis/shared/list.html", folders = folders, donors = donors ) @afis_view.route( "/admin/afis/new_folder", methods = [ "POST" ] ) @admin_required def admin_add_new_folder(): try: donor_uuid = request.form.get( "donor_uuid", False ) sql = "SELECT count( * ) AS nb FROM cnm_folder WHERE donor = %s" nb = config.db.query_fetchone( sql, ( donor_uuid, ) )[ "nb" ] if nb == 0: sql = sql_insert_generate( "cnm_folder", [ "donor", "uuid" ], "id" ) config.db.query_fetchone( sql, ( donor_uuid, str( uuid4() ) ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @afis_view.route( "/admin/afis//delete" ) @admin_required def admin_delete_folder( folder_id ): try: sql = "DELETE FROM cnm_annotation WHERE folder = %s" config.db.query( sql, ( folder_id, ) ) sql = "DELETE FROM cnm_segments WHERE folder_uuid = %s" config.db.query( sql, ( folder_id, ) ) sql = "DELETE FROM cnm_folder WHERE uuid = %s" config.db.query( sql, ( folder_id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @afis_view.route( "/admin/afis/" ) @admin_required def admin_folder_show( folder_id ): return folder_show_inner( folder_id ) @afis_view.route( "/afis/" ) def folder_show( folder_id ): return folder_show_inner( folder_id ) def folder_show_inner( folder_id ): admin = session.get( "account_type_name", None ) == "Administrator" sql = """ SELECT segments_locations.fpc FROM segments_locations INNER JOIN files ON segments_locations.tenprint_id = files.uuid INNER JOIN submissions ON files.folder = submissions.id INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor WHERE cnm_folder.uuid = %s AND ( fpc <= 10 OR fpc = 22 OR fpc = 24 OR fpc = 25 OR fpc = 27 ) GROUP BY fpc ORDER BY fpc ASC """ segment_list = config.db.query_fetchall( sql, ( folder_id, ) ) sql = """ SELECT * FROM cnm_segments WHERE folder_uuid = %s ORDER BY fpc ASC """ segment_list_in_folder = config.db.query_fetchall( sql, ( folder_id, ) ) sql = """ SELECT files_segments.tenprint, files_segments.pc FROM cnm_folder INNER JOIN submissions ON cnm_folder.donor = submissions.uuid INNER JOIN files ON submissions.id = files.folder INNER JOIN files_segments ON files.uuid = files_segments.tenprint WHERE cnm_folder.uuid = %s """ tpid = config.db.query_fetchall( sql, ( folder_id, ) ) sql = """ SELECT username FROM users INNER JOIN submissions ON users.id = submissions.donor_id INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor WHERE cnm_folder.uuid = %s """ username = config.db.query_fetchone( sql, ( folder_id, ) )[ "username" ] # Users assigned to this folder sql = "SELECT id, username FROM users WHERE type = 5" users = config.db.query_fetchall( sql ) sql = """ SELECT users.id, users.username FROM cnm_folder_users INNER JOIN users ON cnm_folder_users.user_id = users.id WHERE folder_id = %s AND img_type = 1 """ users_assigned_refs = config.db.query_fetchall( sql, ( folder_id, ) ) sql = """ SELECT users.id, users.username FROM cnm_folder_users INNER JOIN users ON cnm_folder_users.user_id = users.id WHERE folder_id = %s AND img_type = 2 """ users_assigned_marks = config.db.query_fetchall( sql, ( folder_id, ) ) return my_render_template( "afis/shared/folder.html", segment_list = segment_list, segment_list_in_folder = segment_list_in_folder, tpid = tpid, username = username, users = users, users_assigned_refs = users_assigned_refs, users_assigned_marks = users_assigned_marks, folder_id = folder_id, segments_position_code = segments_position_code ) @afis_view.route( "/admin/afis//add/segment", methods = [ "POST" ] ) @admin_required def admin_add_segment_to_cnmfolder( folder_id ): try: fpc = request.form.get( "fpc" ) sql = "SELECT count( * ) AS nb FROM cnm_segments WHERE folder_uuid = %s AND fpc = %s" nb = config.db.query_fetchone( sql, ( folder_id, fpc, ) )[ "nb" ] if nb == 0: sql = sql_insert_generate( "cnm_segments", [ "uuid", "folder_uuid", "fpc" ], "uuid" ) config.db.query_fetchone( sql, ( str( uuid4() ), folder_id, fpc, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @afis_view.route( "/admin/afis//segment/" ) @admin_required def admin_view_segment( folder_id, fpc ): return view_segment_inner( folder_id, fpc, True ) @afis_view.route( "/afis//segment/" ) def view_segment( folder_id, fpc ): return view_segment_inner( folder_id, fpc, False ) def view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin ): # Get the list of references images depending upon the FPC multi_img_fpc = { 1: [ 1, 11 ], 2: [ 2, 13 ], 3: [ 3, 13 ], 4: [ 4, 13 ], 5: [ 5, 13 ], 6: [ 6, 12 ], 7: [ 7, 14 ], 8: [ 8, 14 ], 9: [ 9, 14 ], 10: [ 10, 14 ] } fpc = int( fpc ) fpcs = multi_img_fpc.get( fpc, [ fpc ] ) tmp_sql = [ "files_segments.pc = %s" ] * len( fpcs ) tmp_sql = " OR ".join( tmp_sql ) sql = """ SELECT files_segments.tenprint, files_segments.pc FROM cnm_folder INNER JOIN submissions ON cnm_folder.donor = submissions.uuid INNER JOIN files ON submissions.id = files.folder INNER JOIN files_segments ON files.uuid = files_segments.tenprint WHERE cnm_folder.uuid = %s AND ( {} ) ORDER BY pc ASC """.format( tmp_sql ) data = [ folder_id, ] data.extend( fpcs ) tenprints_list = config.db.query_fetchall( sql, data ) # Get the list of marks related to this finger sql = """ SELECT files.id, files.uuid FROM files INNER JOIN mark_info ON files.uuid = mark_info.uuid INNER JOIN submissions ON files.folder = submissions.id INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor WHERE cnm_folder.uuid = %s AND ( files.type = 3 OR files.type = 4 ) """ sql_where = [] for f in fpcs: for q in fpc2pfsp[ f ]: sql_where.append( "mark_info.pfsp LIKE '%%{}%%'".format( q ) ) if len( sql_where ) > 0: sql += " AND ( {} )".format( " OR ".join( sql_where ) ) sql += " ORDER BY files.id ASC" mark_list = config.db.query_fetchall( sql, ( folder_id, ) ) # Filter out the reference and mark list for afis users #TODO: Refactoring if not isadmin: sql = "SELECT count( * ) FROM cnm_folder_users WHERE user_id = %s AND folder_id = %s AND img_type = 1" r = config.db.query_fetchone( sql, ( session.get( "user_id" ), folder_id, ) )[ "count" ] if r == 0: tenprints_list = [] sql = "SELECT count( * ) FROM cnm_folder_users WHERE user_id = %s AND folder_id = %s AND img_type = 2" r = config.db.query_fetchone( sql, ( session.get( "user_id" ), folder_id, ) )[ "count" ] if r == 0: mark_list = [] # Get the list of annotations files sql = """ SELECT id, uuid FROM cnm_annotation WHERE folder = %s AND fpc = %s """ annotations = config.db.query_fetchall( sql, ( folder_id, fpc, ) ) return { "mark_list": mark_list, "tenprints_list": tenprints_list, "annotations": annotations } def view_segment_inner( folder_id, fpc, isadmin ): d = view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin ) mark_list = d[ "mark_list" ] tenprints_list = d[ "tenprints_list" ] annotations = d[ "annotations" ] # Get the username of the donor sql = """ SELECT username FROM users INNER JOIN submissions ON users.id = submissions.donor_id INNER JOIN cnm_folder ON submissions.uuid = cnm_folder.donor WHERE cnm_folder.uuid = %s """ username = config.db.query_fetchone( sql, ( folder_id, ) )[ "username" ] # return my_render_template( "afis/shared/segment.html", folder_id = folder_id, fpc = fpc, tenprints_list = tenprints_list, len_tenprints_list = len( tenprints_list ), mark_list = mark_list, len_mark_list = len( mark_list ), username = username, annotations = annotations, segments_position_code = segments_position_code ) @afis_view.route( "/admin/afis//segment//add/illustration", methods = [ "POST" ] ) @admin_required def admin_add_illustration( folder_id, fpc ): try: sql = """ SELECT donor FROM cnm_folder WHERE uuid = %s """ donor_uuid = config.db.query_fetchone( sql, ( folder_id, ) )[ "donor" ] uploaded_file = request.files[ "file" ] fp = StringIO() uploaded_file.save( fp ) fp.seek( 0 ) file_data = fp.getvalue() file_data = base64.b64encode( file_data ) file_data = do_encrypt_dek( file_data, donor_uuid ) sql = sql_insert_generate( "cnm_annotation", [ "uuid", "folder", "fpc", "annotation_data" ], "id" ) annotation_id = config.db.query_fetchone( sql, ( str( uuid4() ), folder_id, fpc, file_data, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @afis_view.route( "/admin/afis//segment//remove/illustration", methods = [ "POST" ] ) @admin_required def admin_remove_illustration( folder_id, fpc ): try: illustration_uuid = request.form.get( "uuid" ) sql = "DELETE FROM cnm_annotation WHERE folder = %s AND fpc = %s AND uuid = %s" config.db.query( sql, ( folder_id, fpc, illustration_uuid, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @afis_view.route( "/admin/afis//remove/segment", methods = [ "POST" ] ) @admin_required def admin_remove_segment( folder_id ): try: fpc = request.form.get( "fpc" ) sql = "DELETE FROM cnm_segments WHERE folder_uuid = %s AND fpc = %s" config.db.query( sql, ( folder_id, fpc, ) ) sql = "DELETE FROM cnm_annotation WHERE folder = %s AND fpc = %s" config.db.query( sql, ( folder_id, fpc, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) @afis_view.route( "//afis//annotation/" ) def get_annotation_image( folder_id, a_uuid, u_type = "afis" ): img = get_annotation_image_inner( folder_id, a_uuid ) buff = utils.images.pil2buffer( img, "PNG" ) return send_file( buff, mimetype = "image/png" ) def get_annotation_image_inner( folder_id, a_uuid ): sql = "SELECT donor FROM cnm_folder WHERE uuid = %s" donor_uuid = config.db.query_fetchone( sql, ( folder_id, ) )[ "donor" ] sql = """ SELECT annotation_data FROM cnm_annotation WHERE uuid = %s """ data = config.db.query_fetchone( sql, ( a_uuid, ) )[ "annotation_data" ] data = do_decrypt_dek( data, donor_uuid ) img = base64.b64decode( data ) buff = StringIO() buff.write( img ) buff.seek( 0 ) return Image.open( buff ) @afis_view.route( "/admin/afis//user/update", methods = [ "POST" ] ) @admin_required def admin_update_users_in_afis_folder( folder_id ): try: users = request.form.get( "users" ) users = json.loads( users ) if request.form.get( "type" ) == "ref": img_type = 1 else: img_type = 2 sql = "DELETE FROM cnm_folder_users WHERE folder_id = %s AND img_type = %s" config.db.query( sql, ( folder_id, img_type, ) ) sql = sql_insert_generate( "cnm_folder_users", [ "user_id", "folder_id", "img_type" ], "id" ) for user in users: config.db.query_fetchone( sql, ( user, folder_id, img_type, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } ) return jsonify( view_segment_mark_ref_anno_lists( folder_id, fpc, False ) ) @afis_view.route( "/afis//download/" ) @login_required def download_folder_afis( folder_id, fpc ): return download_folder_inner( folder_id, fpc, False ) @afis_view.route( "/admin/afis//download/" ) @login_required def download_folder_admin( folder_id, fpc ): return download_folder_inner( folder_id, fpc, True ) def download_folder_inner( folder_id, fpc, isadmin ): lst = view_segment_mark_ref_anno_lists( folder_id, fpc, isadmin ) zipbuffer = StringIO() with zipfile.ZipFile( zipbuffer, "w", zipfile.ZIP_DEFLATED ) as fp: for fid in lst[ "mark_list" ]: file_id = fid[ "uuid" ] submission_id = get_submission_uuid_for_file( file_id ) img, _ = image_serve( "files", file_id, submission_id ) img = image_tatoo( img, file_id ) buff = utils.images.pil2buffer( img, "TIFF" ) fp.writestr( "mark_{}.tiff".format( file_id ), buff.getvalue() ) for fid in lst[ "tenprints_list" ]: file_id = fid[ "tenprint" ] pc = fid[ "pc" ] submission_id = get_submission_uuid_for_file( file_id ) img, _ = image_serve( "files_segments", ( file_id, pc ), submission_id ) img = image_tatoo( img, file_id ) buff = utils.images.pil2buffer( img, "TIFF" ) fp.writestr( "reference_{}.tiff".format( file_id ), buff.getvalue() ) for fid in lst[ "annotations" ]: file_id = fid[ "uuid" ] img = get_annotation_image_inner( folder_id, file_id ) img = image_tatoo( img, file_id ) buff = utils.images.pil2buffer( img, "PNG" ) fp.writestr( "annotation_{}.png".format( file_id ), buff.getvalue() ) zipbuffer.seek( 0 ) return send_file( zipbuffer, attachment_filename = "{}.zip".format( folder_id ), as_attachment = True )