#!/usr/bin/python # -*- coding: UTF-8 -*- from cStringIO import StringIO from flask import Blueprint, session, current_app, request, jsonify, send_file from uuid import uuid4 import base64 import json import zipfile import config from views.images import get_submission_uuid_for_annotation, get_submission_uuid_for_file from views.images import image_serve, image_tatoo from utils.decorator import login_required, admin_required from utils.template import my_render_template from utils.sql import sql_insert_generate import utils from functions import do_encrypt_dek from NIST.fingerprint.labels import FINGER_POSITION_CODE, PALM_POSITION_CODE segments_position_code = dict( FINGER_POSITION_CODE, **PALM_POSITION_CODE ) afis_view = Blueprint( "afis", __name__, template_folder = "templates" ) @afis_view.route( "/admin/afis/list" ) @admin_required def admin_list_folders(): return "ok" @afis_view.route( "/afis/list" ) @login_required def list_folders(): user_id = session.get( "user_id", None ) sql = """ SELECT cnm_assignment.folder_uuid, tmp.* FROM cnm_assignment LEFT JOIN cnm_folder ON cnm_assignment.folder_uuid = cnm_folder.folder_uuid LEFT JOIN ( SELECT DISTINCT ON ( donor_id, pc ) * FROM donor_segments_v ) AS tmp ON cnm_folder.donor_id = tmp.donor_id AND cnm_folder.pc = tmp.pc WHERE cnm_assignment.user_id = %s ORDER BY tmp.donor_id ASC, tmp.pc ASC """ folder_list = config.db.query_fetchall( sql, ( user_id, ) ) return my_render_template( "afis/folder_list.html", folder_list = folder_list ) @afis_view.route( "/admin/target/" ) @admin_required def admin_show_target( uuid ): return show_folder_inner( uuid, True ) @afis_view.route( "/afis/" ) @login_required def show_folder( uuid ): return show_folder_inner( uuid, False ) def get_segment_list_for_target_folder( uuid, admin = True ): if not admin: sql = """ SELECT count(*) AS count FROM cnm_assignment LEFT JOIN cnm_assignment_type ON cnm_assignment.assignment_type = cnm_assignment_type.id WHERE folder_uuid = %s AND user_id = %s AND cnm_assignment_type.name = 'reference' """ if config.db.query_fetchone( sql, ( uuid, session.get( "user_id", False ), ) )[ "count" ] == 0: return None sql = """ SELECT cnm_folder.folder_uuid, donor_segments_v.* FROM cnm_folder LEFT JOIN fingers_same ON cnm_folder.pc = fingers_same.base_finger RIGHT JOIN donor_segments_v ON cnm_folder.donor_id = donor_segments_v.donor_id AND fingers_same.target = donor_segments_v.pc WHERE cnm_folder.folder_uuid = %s ORDER BY donor_segments_v.donor_id ASC, donor_segments_v.pc ASC """ return config.db.query_fetchall( sql, ( uuid, ) ) def get_marks_list_for_target_foler( uuid, admin = True ): if not admin: sql = """ SELECT count(*) AS count FROM cnm_assignment LEFT JOIN cnm_assignment_type ON cnm_assignment.assignment_type = cnm_assignment_type.id WHERE folder_uuid = %s AND user_id = %s AND cnm_assignment_type.name = 'mark' """ if config.db.query_fetchone( sql, ( uuid, session.get( "user_id", False ), ) )[ "count" ] == 0: return None sql = """ SELECT files_v.uuid FROM cnm_folder LEFT JOIN submissions ON cnm_folder.donor_id = submissions.donor_id LEFT JOIN files_v ON submissions.id = files_v.folder LEFT JOIN files_type ON files_v.type = files_type.id WHERE cnm_folder.folder_uuid = %s AND files_type.name = 'mark_target' """ return config.db.query_fetchall( sql, ( uuid, ) ) def get_annotation_list_for_target_folder( uuid, admin = True ): sql = """ SELECT uuid FROM cnm_data LEFT JOIN cnm_data_type ON cnm_data.type_id = cnm_data_type.id WHERE folder_uuid = %s AND cnm_data_type.name = 'annotation' """ return config.db.query_fetchall( sql, ( uuid, ) ) def show_folder_inner( uuid, admin ): segments_list = get_segment_list_for_target_folder( uuid, admin ) annotation_list = get_annotation_list_for_target_folder( uuid, admin ) marks_list = get_marks_list_for_target_foler( uuid, admin ) if admin: sql = """ SELECT submissions.uuid, cnm_folder.pc, users.username FROM cnm_folder INNER JOIN submissions ON cnm_folder.donor_id = submissions.donor_id INNER JOIN users ON cnm_folder.donor_id = users.id WHERE cnm_folder.folder_uuid = %s """ submission_id, pc, username = config.db.query_fetchone( sql, ( uuid, ) ) finger_name = "{} (F{})".format( segments_position_code[ pc ], pc ) sql = """ SELECT users.id, users.username FROM users LEFT JOIN account_type ON users.type = account_type.id WHERE account_type.name = 'AFIS' """ all_afis_users = config.db.query_fetchall( sql ) def get_user_assined( assignment_type ): sql = """ SELECT cnm_assignment.user_id AS id FROM cnm_assignment LEFT JOIN cnm_assignment_type ON cnm_assignment.assignment_type = cnm_assignment_type.id WHERE cnm_assignment.folder_uuid = %s AND cnm_assignment_type.name = '{}' """.format( assignment_type ) return config.db.query_fetchall( sql, ( uuid, ) ) users_assigned_refs = get_user_assined( "reference" ) users_assigned_marks = get_user_assined( "mark" ) else: submission_id = None finger_name = None username = None all_afis_users = None users_assigned_refs = [] users_assigned_marks = [] return my_render_template( "shared/segment.html", target_uuid = uuid, segments_list = segments_list, annotation_list = annotation_list, marks_list = marks_list, submission_id = submission_id, username = username, finger_name = finger_name, all_afis_users = all_afis_users, users_assigned_refs = users_assigned_refs, users_assigned_marks = users_assigned_marks ) @afis_view.route( "/admin/target//new_illustration", methods = [ "POST" ] ) @admin_required def admin_add_illustration( uuid ): try: sql = """ SELECT submissions.uuid FROM cnm_folder LEFT JOIN submissions ON cnm_folder.donor_id = submissions.donor_id WHERE cnm_folder.folder_uuid = %s """ donor_id = config.db.query_fetchone( sql, ( uuid, ) )[ "uuid" ] sql = "SELECT id FROM cnm_data_type WHERE name = 'annotation'" annotation_type_id = config.db.query_fetchone( sql )[ "id" ] uploaded_file = request.files[ "file" ] fp = StringIO() uploaded_file.save( fp ) fp.seek( 0 ) file_data = fp.getvalue() file_data = base64.b64encode( file_data ) file_data = do_encrypt_dek( file_data, donor_id ) sql = sql_insert_generate( "cnm_data", [ "uuid", "folder_uuid", "data", "type_id" ], "id" ) data = ( str( uuid4() ), uuid, file_data, annotation_type_id, ) annotation_id = config.db.query_fetchone( sql, data ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": False } ) @afis_view.route( "/afis//download" ) @login_required def download_target_folder( uuid ): short_uuid = uuid[ 0:18 ] annotation_list = get_annotation_list_for_target_folder( uuid, False ) segments_list = get_segment_list_for_target_folder( uuid, False ) marks_list = get_marks_list_for_target_foler( uuid, False ) zipbuffer = StringIO() with zipfile.ZipFile( zipbuffer, "w", zipfile.ZIP_DEFLATED ) as fp: if isinstance( annotation_list, list ): for fid in annotation_list: file_id = fid[ "uuid" ] short_file_id = file_id[ 0:18 ] submission_id = get_submission_uuid_for_annotation( file_id ) img, _ = image_serve( "cnm_data", file_id, submission_id ) img = image_tatoo( img, file_id ) buff = utils.images.pil2buffer( img, "TIFF" ) fp.writestr( "{}_annotation_{}.tiff".format( short_uuid, short_file_id ), buff.getvalue() ) if isinstance( segments_list, list ): for sid in segments_list: file_id = sid[ "uuid" ] short_file_id = file_id[ 0:18 ] tenprint_id = sid[ "tenprint" ] pc = sid[ "pc" ] submission_id = get_submission_uuid_for_file( tenprint_id ) img, _ = image_serve( "files_segments", ( tenprint_id, pc ), submission_id ) img = image_tatoo( img, file_id ) buff = utils.images.pil2buffer( img, "TIFF" ) fp.writestr( "{}_reference_{}.tiff".format( short_uuid, short_file_id ), buff.getvalue() ) if isinstance( marks_list, list ): for mid in marks_list: file_id = mid[ "uuid" ] short_file_id = file_id[ 0:18 ] submission_id = get_submission_uuid_for_file( file_id ) img, _ = image_serve( "files", file_id, submission_id ) img = image_tatoo( img, file_id ) buff = utils.images.pil2buffer( img, "TIFF" ) fp.writestr( "{}_mark_{}.tiff".format( short_uuid, short_file_id ), buff.getvalue() ) zipbuffer.seek( 0 ) return send_file( zipbuffer, attachment_filename = "{}.zip".format( short_uuid ), as_attachment = True ) @afis_view.route( "/admin//target/list" ) @admin_required def admin_show_target_list( uuid ): sql = """ SELECT DISTINCT ON ( donor_segments_v.pc ) donor_segments_v.*, cnm_folder.folder_uuid FROM donor_segments_v LEFT JOIN submissions ON submissions.donor_id = donor_segments_v.donor_id LEFT JOIN cnm_folder ON donor_segments_v.donor_id = cnm_folder.donor_id AND donor_segments_v.pc = cnm_folder.pc WHERE submissions.uuid = %s """ segments_list = config.db.query_fetchall( sql, ( uuid, ) ) sql = """ SELECT username FROM users LEFT JOIN submissions ON users.id = submissions.donor_id WHERE submissions.uuid = %s """ username = config.db.query_fetchone( sql, ( uuid, ) )[ "username" ] return my_render_template( "admin/folder_list.html", segments_list = segments_list, segments_position_code = segments_position_code, username = username, submission_uuid = uuid ) @afis_view.route( "/admin/afis//user/update", methods = [ "POST" ] ) @admin_required def admin_update_users_in_afis_folder( target_uuid ): try: users = request.form.get( "users" ) users = json.loads( users ) assignment_type = request.form.get( "type" ) sql = "SELECT id FROM cnm_assignment_type WHERE name = %s" assignment_type_id = config.db.query_fetchone( sql, ( assignment_type, ) )[ "id" ] sql = """ DELETE FROM cnm_assignment WHERE folder_uuid = %s AND assignment_type = %s """ config.db.query( sql, ( target_uuid, assignment_type_id, ) ) sql = sql_insert_generate( "cnm_assignment", [ "folder_uuid", "user_id", "assignment_type" ], "id" ) for user in users: config.db.query_fetchone( sql, ( target_uuid, user, assignment_type_id, ) ) config.db.commit() return jsonify( { "error": False } ) except: return jsonify( { "error": True } )