Skip to content
__init__.py 15.6 KiB
Newer Older
from cStringIO import StringIO
from flask import Blueprint, session, current_app, request, jsonify, send_file, url_for, abort, redirect
from uuid import uuid4
import base64
from cStringIO import StringIO
from uuid import uuid4

from flask import Blueprint
from flask import jsonify, request, send_file, current_app, session
Marco De Donno's avatar
Marco De Donno committed
from const import *
from NIST.fingerprint.labels import FINGER_POSITION_CODE, PALM_POSITION_CODE
segments_position_code = dict( FINGER_POSITION_CODE, **PALM_POSITION_CODE )

afis_view = Blueprint( "afis", __name__, template_folder = "templates" )

@afis_view.route( "/afis/list" )
@utils.decorator.login_required
def list_folders():
    user_id = session.get( "user_id", None )
    
    sql = """
        SELECT
            cnm_assignment.folder_uuid,
            tmp.*
        FROM cnm_assignment
        LEFT JOIN cnm_folder ON cnm_assignment.folder_uuid = cnm_folder.uuid
        LEFT JOIN (
            SELECT DISTINCT ON ( donor_id, pc ) *
            FROM donor_segments_v
        ) AS tmp ON cnm_folder.donor_id = tmp.donor_id AND cnm_folder.pc = tmp.pc
        
        WHERE cnm_assignment.user_id = %s
        
        ORDER BY
            tmp.donor_id ASC,
            tmp.pc ASC
    """
    folder_list = config.db.query_fetchall( sql, ( user_id, ) )
    
    return utils.template.my_render_template(
        folder_list = folder_list
    )
@afis_view.route( "/admin/target/<submission_id>/<pc>/new" )
@utils.decorator.admin_required
def admin_create_new_target_and_redirect( submission_id, pc ):
    sql = "SELECT donor_id FROM submissions WHERE uuid = %s"
    donor_id = config.db.query_fetchone( sql, ( submission_id, ) )[ "donor_id" ]
    pc = int( pc )
    
    target_uuid = str( uuid4() )
    
    sql = utils.sql.sql_insert_generate( "cnm_folder", [ "uuid", "pc", "donor_id" ], "id" )
    config.db.query_fetchone( sql, ( target_uuid, pc, donor_id, ) )
    config.db.commit()
    
    return redirect( url_for( "afis.admin_show_target", uuid = target_uuid ) )

@afis_view.route( "/admin/target/<uuid>" )
@utils.decorator.admin_required
def admin_show_target( uuid ):
    return show_folder_inner( uuid, True )

@afis_view.route( "/afis/<uuid>" )
@utils.decorator.login_required
def show_folder( uuid ):
    return show_folder_inner( uuid, False )

def get_segment_list_for_target_folder( uuid, admin = True ):
    if not admin:
        sql = """
            SELECT count(*) AS count
            FROM cnm_assignment
            LEFT JOIN cnm_assignment_type ON cnm_assignment.assignment_type = cnm_assignment_type.id
            WHERE
                folder_uuid = %s AND
                user_id = %s AND
                cnm_assignment_type.name = 'reference'
        """
        if config.db.query_fetchone( sql, ( uuid, session.get( "user_id", False ), ) )[ "count" ] == 0:
            return None
        
    sql = """
        SELECT
            cnm_folder.uuid AS folder_uuid,
            donor_segments_v.*
        LEFT JOIN fingers_same ON cnm_folder.pc = fingers_same.base_finger
        RIGHT JOIN donor_segments_v ON
            cnm_folder.donor_id = donor_segments_v.donor_id AND
            fingers_same.target = donor_segments_v.pc
        ORDER BY
            donor_segments_v.donor_id ASC,
            donor_segments_v.pc ASC
    """
Marco De Donno's avatar
Marco De Donno committed
def get_marks_list_for_target_folder( uuid, admin = True ):
    if not admin:
        sql = """
            SELECT count(*) AS count
            FROM cnm_assignment
            LEFT JOIN cnm_assignment_type ON cnm_assignment.assignment_type = cnm_assignment_type.id
            WHERE
                folder_uuid = %s AND
                user_id = %s AND
                cnm_assignment_type.name = 'mark'
        """
        if config.db.query_fetchone( sql, ( uuid, session.get( "user_id", False ), ) )[ "count" ] == 0:
            return None
        
        FROM cnm_folder
        LEFT JOIN submissions ON cnm_folder.donor_id = submissions.donor_id
        LEFT JOIN files_v ON submissions.id = files_v.folder
        LEFT JOIN files_type ON files_v.type = files_type.id
        LEFT JOIN mark_info ON files_v.uuid = mark_info.uuid
            files_type.name = 'mark_target'
    """
    return config.db.query_fetchall( sql, ( uuid, ) )

def get_annotation_list_for_target_folder( uuid, admin = True ):
    sql = """
        SELECT uuid
        FROM cnm_data
Marco De Donno's avatar
Marco De Donno committed
        LEFT JOIN cnm_data_type ON cnm_data.type_id = cnm_data_type.id
        WHERE
            folder_uuid = %s AND
            cnm_data_type.name = 'annotation'
    return config.db.query_fetchall( sql, ( uuid, ) )

def show_folder_inner( uuid, admin ):
    if not admin:
        sql = "SELECT count(*) FROM cnm_assignment WHERE folder_uuid = %s AND user_id = %s"
        if config.db.query_fetchone( sql, ( uuid, session.get( "user_id" ), ) )[ "count" ] == 0:
            return redirect( url_for( "base.home" ) )
    
    segments_list = get_segment_list_for_target_folder( uuid, admin )
    annotation_list = get_annotation_list_for_target_folder( uuid, admin )
    marks_list_tmp = get_marks_list_for_target_folder( uuid, admin )
    if admin:
        sql = """
            SELECT
                submissions.uuid,
                cnm_folder.pc,
                users.username
            FROM cnm_folder
            INNER JOIN submissions ON cnm_folder.donor_id = submissions.donor_id
            INNER JOIN users ON cnm_folder.donor_id = users.id
        """
        submission_id, pc, username = config.db.query_fetchone( sql, ( uuid, ) )
        finger_name = "{} (F{})".format( segments_position_code[ pc ], pc )
        marks_list = []
        for m in marks_list_tmp:
            for z in m[ "pfsp" ].split( "," ):
                if pc in pfsp2fpc[ z ]:
                    marks_list.append( m )
        
        sql = """
            SELECT
                users.id,
                users.username
            FROM users
            LEFT JOIN account_type ON users.type = account_type.id
            WHERE account_type.name = 'AFIS'
        """
        all_afis_users = config.db.query_fetchall( sql )
        
        def get_user_assined( assignment_type ):
            sql = """
                SELECT
                    cnm_assignment.user_id AS id
                FROM cnm_assignment
                LEFT JOIN cnm_assignment_type ON cnm_assignment.assignment_type = cnm_assignment_type.id
                WHERE
                    cnm_assignment.folder_uuid = %s AND
                    cnm_assignment_type.name = '{}'
            """.format( assignment_type )
            return config.db.query_fetchall( sql, ( uuid, ) )
        users_assigned_refs = get_user_assined( "reference" )
        users_assigned_marks = get_user_assined( "mark" )
    
    else:
        submission_id = None
        finger_name = None
        username = None
        users_assigned_refs = []
        users_assigned_marks = []
    return utils.template.my_render_template(
        "afis/shared/segment.html",
        target_uuid = uuid,
        segments_list = segments_list,
        annotation_list = annotation_list,
        submission_id = submission_id,
        username = username,
        finger_name = finger_name,
        all_afis_users = all_afis_users,
        users_assigned_refs = users_assigned_refs,
        users_assigned_marks = users_assigned_marks
@afis_view.route( "/admin/target/<uuid>/new_illustration", methods = [ "POST" ] )
@utils.decorator.admin_required
def admin_add_illustration( uuid ):
    try:
        sql = """
            SELECT submissions.uuid
            FROM cnm_folder
            LEFT JOIN submissions ON cnm_folder.donor_id = submissions.donor_id
        """
        donor_id = config.db.query_fetchone( sql, ( uuid, ) )[ "uuid" ]
        
        sql = "SELECT id FROM cnm_data_type WHERE name = 'annotation'"
        annotation_type_id = config.db.query_fetchone( sql )[ "id" ]
        
        uploaded_file = request.files[ "file" ]
        
        fp = StringIO()
        uploaded_file.save( fp )
        fp.seek( 0 )
        
        file_data = fp.getvalue()
        file_data = base64.b64encode( file_data )
        file_data = utils.encryption.do_encrypt_dek( file_data, donor_id )
        sql = utils.sql.sql_insert_generate(
            "cnm_data",
            [ "uuid", "folder_uuid", "data", "type_id" ],
            "id"
        )
        data = ( str( uuid4() ), uuid, file_data, annotation_type_id, )
Marco De Donno's avatar
Marco De Donno committed
        config.db.query_fetchone( sql, data )
        config.db.commit()
        
        return jsonify( {
            "error": False
        } )
    
    except:
        return jsonify( {
            "error": False
        } )

@afis_view.route( "/admin/target/<folder_uuid>/target/annotation/delete", methods = [ "POST" ] )
@utils.decorator.admin_required
def admin_delete_annotation( folder_uuid ):
    try:
        illustration_uuid = request.form.get( "uuid" )
        sql = "DELETE FROM cnm_data WHERE folder_uuid = %s AND uuid = %s"
        config.db.query( sql, ( folder_uuid, illustration_uuid, ) )
        config.db.commit()
        
        return jsonify( {
            "error": False
        } )
    
    except:
        return jsonify( {
            "error": True
        } )

@afis_view.route( "/afis/<uuid>/download" )
@utils.decorator.login_required
def download_target_folder( uuid ):
    short_uuid = uuid[ 0:18 ]
    
    annotation_list = get_annotation_list_for_target_folder( uuid, False )
    segments_list = get_segment_list_for_target_folder( uuid, False )
Marco De Donno's avatar
Marco De Donno committed
    marks_list = get_marks_list_for_target_folder( uuid, False )
    
    zipbuffer = StringIO()
    
    with zipfile.ZipFile( zipbuffer, "w", zipfile.ZIP_DEFLATED ) as fp:
        if isinstance( annotation_list, list ):
            for fid in annotation_list:
                file_id = fid[ "uuid" ]
                short_file_id = file_id[ 0:18 ]
                
                submission_id = views.images.get_submission_uuid_for_annotation( file_id )
                img, _ = views.images.image_serve( "cnm_data", file_id, submission_id )
                img = views.images.image_tatoo( img, file_id )
                
                buff = utils.images.pil2buffer( img, "TIFF" )
                fp.writestr(
                    "{}_annotation_{}.tiff".format( short_uuid, short_file_id ),
                    buff.getvalue()
                )
                
        if isinstance( segments_list, list ):
            for sid in segments_list:
                file_id = sid[ "uuid" ]
                short_file_id = file_id[ 0:18 ]
                tenprint_id = sid[ "tenprint" ]
                pc = sid[ "pc" ]
                
                submission_id = views.images.get_submission_uuid_for_file( tenprint_id )
                img, _ = views.images.image_serve( "files_segments", ( tenprint_id, pc ), submission_id )
                img = views.images.image_tatoo( img, file_id )
                
                buff = utils.images.pil2buffer( img, "TIFF" )
                fp.writestr(
                    "{}_reference_{}.tiff".format( short_uuid, short_file_id ),
                    buff.getvalue()
                )
                
        if isinstance( marks_list, list ):
            for mid in marks_list:
                file_id = mid[ "uuid" ]
                short_file_id = file_id[ 0:18 ]
                
                submission_id = views.images.get_submission_uuid_for_file( file_id )
                img, _ = views.images.image_serve( "files", file_id, submission_id )
                img = views.images.image_tatoo( img, file_id )
                
                buff = utils.images.pil2buffer( img, "TIFF" )
                fp.writestr(
                    "{}_mark_{}.tiff".format( short_uuid, short_file_id ),
                    buff.getvalue()
                )
            
    zipbuffer.seek( 0 )
    
    return send_file(
        zipbuffer,
        attachment_filename = "{}.zip".format( short_uuid ),
        as_attachment = True
    )

@afis_view.route( "/afis/<uuid>/upload/list" )
def upload_cnm_list_page( uuid ):
    sql = """
        SELECT uuid
        FROM cnm_result
        WHERE
            cnm_folder = %s AND
            uploader = %s
    """
    cnm_result_list = config.db.query_fetchall( sql, ( uuid, session.get( "user_id" ), ) )
    return utils.template.my_render_template(
        "afis/user/upload_cnm_list.html",
        cnm_result_list = cnm_result_list,
    )
    
@afis_view.route( "/admin/<uuid>/target/list" )
@utils.decorator.admin_required
def admin_show_target_list( uuid ):
    sql = """
        SELECT DISTINCT ON ( donor_segments_v.pc )
            donor_segments_v.*,
            cnm_folder.uuid AS folder_uuid
        FROM donor_segments_v
        LEFT JOIN submissions ON submissions.donor_id = donor_segments_v.donor_id
        LEFT JOIN cnm_folder ON
            donor_segments_v.donor_id = cnm_folder.donor_id AND
            donor_segments_v.pc = cnm_folder.pc
        WHERE submissions.uuid = %s
    """
    segments_list = config.db.query_fetchall( sql, ( uuid, ) )
    
    sql = """
        SELECT
            cnm_folder.pc
        FROM submissions
        LEFT JOIN cnm_folder ON submissions.donor_id = cnm_folder.donor_id
        LEFT JOIN cnm_data ON cnm_folder.uuid = cnm_data.folder_uuid
        LEFT JOIN cnm_data_type ON cnm_data.type_id = cnm_data_type.id
        WHERE
            submissions.uuid = %s AND
            cnm_data_type.name = 'annotation'
        GROUP BY pc
    """
    annotations = config.db.query_fetchall( sql, ( uuid, ) )
    annotations = [ a[ 'pc' ] for a in annotations ]
    
    sql = """
        SELECT username
        FROM users
        LEFT JOIN submissions ON users.id = submissions.donor_id
        WHERE submissions.uuid = %s
    """
    username = config.db.query_fetchone( sql, ( uuid, ) )[ "username" ]
    
    return utils.template.my_render_template(
        "afis/admin/folder_list.html",
        segments_list = segments_list,
        segments_position_code = segments_position_code,
        username = username,
        submission_uuid = uuid
@afis_view.route( "/admin/afis/<target_uuid>/user/update", methods = [ "POST" ] )
@utils.decorator.admin_required
def admin_update_users_in_afis_folder( target_uuid ):
    try:
        users = request.form.get( "users" )
        users = json.loads( users )

        assignment_type = request.form.get( "type" )
        sql = "SELECT id FROM cnm_assignment_type WHERE name = %s"
        assignment_type_id = config.db.query_fetchone( sql, ( assignment_type, ) )[ "id" ]

        sql = """
            DELETE FROM cnm_assignment
            WHERE
                folder_uuid = %s AND
                assignment_type = %s
        """
        config.db.query( sql, ( target_uuid, assignment_type_id, ) )
            
        sql = utils.sql.sql_insert_generate(
            "cnm_assignment",
            [ "folder_uuid", "user_id", "assignment_type" ],
            "id"
        )
        for user in users:
            config.db.query_fetchone( sql, ( target_uuid, user, assignment_type_id, ) )
            
        config.db.commit()
        
        return jsonify( {
            "error": False
        } )
    
    except:
        return jsonify( {
            "error": True
        } )