Skip to content
__init__.py 13.4 KiB
Newer Older
# -*- coding: UTF-8 -*-

from flask import Blueprint
from flask import current_app, abort, send_file, jsonify, session
from cStringIO import StringIO
from threading import Lock
from uuid import uuid4

from PIL import Image

import utils

from functions import no_preview_image

from NIST import NISTf

lock = Lock()

image_view = Blueprint( "image", __name__, template_folder = "template" )

def get_submission_uuid_for_file( file_uuid ):
    """
        Get the related submission uuid for a file uuid.
    """
    sql = """
        SELECT submissions.uuid
        FROM submissions
        LEFT JOIN files ON submissions.id = files.folder
        WHERE files.uuid = %s
    """
    return config.db.query_fetchone( sql, ( file_uuid, ) )[ "uuid" ]

@image_view.route( "/image/file/<file_id>/preview" )
@image_view.route( "/image/file/preview" )
@utils.decorator.login_required
def image_file_serve( file_id = None ):
    """
        Function to get an image from the database and return it as PNG preview image.
    """
    current_app.logger.info( "Serve a preview for the file '{}'".format( file_id ) )
    
    try:
            raise Exception( "No file id" )
        submission_id = get_submission_uuid_for_file( file_id )
        current_app.logger.debug( "submission id: '{}'".format( submission_id ) )
        
        img, _ = image_serve( "thumbnails", file_id, submission_id )
        
        if img == None:
            current_app.logger.debug( "No image in the 'thumnnails' database. Recreating the thumbnail" )
            
            img, _ = image_serve( "files", file_id, submission_id )
            
            if img == None:
                return abort( 404 )
            
            current_app.logger.debug( "Image from the 'files' table: {}".format( img ) )
            img = utils.images.create_thumbnail( file_id, img, submission_id )
            current_app.logger.debug( "Thumbnail image: {}".format( img ) )
        
        buff = utils.images.pil2buffer( img, "PNG" )
        return send_file( buff, mimetype = "image/png" )
    
    except:
        current_app.logger.error( "Error while creating the thumbnail. Serving a 'no preview' image" )
        return send_file( no_preview_image(), mimetype = "image/png" )

@image_view.route( "/image/file/<file_id>/full_resolution" )
@utils.decorator.admin_required
def admin_download_file_full_resolution( file_id ):
    try:
        if file_id == None:
            raise Exception( "No file id" )
        
        submission_id = get_submission_uuid_for_file( file_id )
        current_app.logger.debug( "submission id: '{}'".format( submission_id ) )
        
        img, _ = image_serve( "files", file_id, submission_id )
        
        if img == None:
            return abort( 404 )
            
        buff = utils.images.pil2buffer( img, "TIFF" )
        return send_file(
            buff,
            mimetype = "image/tiff",
            as_attachment = True,
            attachment_filename = file_id + ".tiff",
        )
@image_view.route( "/image/segment/<tenprint_id>/<pc>" )
@utils.decorator.login_required
def image_segment_serve( tenprint_id, pc ):
    """
        Serve a preview for a segment image.
    """
    current_app.logger.info( "Serving a segment image for the tenprint '{}', pc '{}'".format( tenprint_id, pc ) )
    
    try:
        submission_id = get_submission_uuid_for_file( tenprint_id )
        
        current_app.logger.debug( "submission id: {}".format( submission_id ) )
        
Marco De Donno's avatar
Marco De Donno committed
        img, _ = image_serve( "files_segments", ( tenprint_id, pc ), submission_id )
        img = utils.images.create_thumbnail( None, img, submission_id )
        
        current_app.logger.debug( "image: {}".format( img ) )
        
        buff = utils.images.pil2buffer( img, "PNG" )
        return send_file( buff, mimetype = "image/png" )
    
    except:
        current_app.logger.error( "Error while creating the thumbnail. Serving a 'no preview' image" )
        return send_file( no_preview_image(), mimetype = "image/png" )

def image_serve( table, image_id, submission_id ):
    """
        Backend function to get the image from the database.
    """
    current_app.logger.info( "Serve the image '{}' for submission '{}' from table '{}'".format( image_id, submission_id, table ) )
    
    need_to_decrypt = True
    
    if table == "files_segments":
        if isinstance( image_id, tuple ):
            tp, pc = image_id
            sql = "SELECT data, uuid FROM {} WHERE tenprint = %s AND pc = %s".format( table )
            p = ( tp, pc, )
        else:
            sql = "SELECT data, uuid FROM {} WHERE uuid = %s".format( table )
            p = ( image_id, )
    
    elif table == "files":
        sql = "SELECT data, uuid, resolution, format FROM {} WHERE uuid = %s".format( table )
        p = ( image_id, )
    
    elif table == "thumbnails":
        sql = "SELECT data, uuid, format FROM {} WHERE uuid = %s".format( table )
        p = ( image_id, )
    
    elif table == "tenprint_cards":
        image_id, t = image_id
        sql = "SELECT image_{} as data, id as uuid, image_resolution AS resolution FROM {} WHERE id = %s".format( t, table )
        p = ( image_id, )
        need_to_decrypt = False
    
    else:
        current_app.logger.error( "table '{}' not authorized".format( table ) )
        raise Exception( "table not authorized" )
    
    current_app.logger.debug( "sql:    {}".format( sql ) )
    current_app.logger.debug( "params: {}".format( p ) )
    
    data = config.db.query_fetchone( sql, p )
    
    if data == None:
        return None, None
    
    else:
        img = data[ "data" ]
        rid = data[ "uuid" ]
        
        if img == None:
            return None, None
        
        current_app.logger.debug( "image: {}...".format( img[ 0:20 ] ) )
        current_app.logger.debug( "need_to_decrypt: {}".format( need_to_decrypt ) )
        if "format" in data:
            current_app.logger.debug( "format: {}".format( data[ "format" ] ) )
            img = utils.encryption.do_decrypt_dek( img, submission_id )
        if table == "files" and data[ "format" ].upper() == "NIST":
            img = str2nist2img( img )
        else:
            img = str2img( img )
        if "resolution" in data:
            img.info[ "dpi" ] = ( data[ "resolution" ], data[ "resolution" ] )

        current_app.logger.debug( "image: {}".format( img ) )
        
        return img, rid

def str2img( data ):
    """
        Convert a base64 string image to a PIL image.
    """
    current_app.logger.info( "Convert string image to PIL format" )
    
    if data == None:
        return None
    
    else:
        img = base64.b64decode( data )
        buff = StringIO()
        buff.write( img )
        buff.seek( 0 )
        img = Image.open( buff )
        
        current_app.logger.debug( "string: {}".format( data[ 0:20 ] ) )
        current_app.logger.debug( "image: {}".format( img ) )
        
        return img

def str2nist2img( data ):
    current_app.logger.info( "Convert string NIST file to tenprint card image" )
    
    if data == None:
        return None
    
    else:
        img = base64.b64decode( data )
        buff = StringIO()
        buff.write( img )
        buff.seek( 0 )
        
        with lock:
            n = NISTf( buff )
            img = n.get_tenprintcard_front( 1000 )
        
        current_app.logger.debug( "string: {}".format( data[ 0:20 ] ) )
        current_app.logger.debug( "image: {}".format( img ) )
        
        return img

def tag_bottom( img, data ):
    data = str( data )

    top = 10
    h = 10
    
    fp = StringIO()
    options = {
        'module_width': 0.2,
        'module_height': 1.0,
        'quiet_zone': 0,
        'write_text': False,
    }
    barcode.generate( 'code128', data, writer = barcode.writer.ImageWriter(), output = fp, writer_options = options )
    codebar_img = Image.open( fp )
    codebar_img = codebar_img.crop( ( 0, top, codebar_img.width, top + h ) )
    
    size = list( img.size )
    size[ 1 ] += h

    if img.mode == "RGB":
        bg = ( 255, 255, 255 )
    else:
        bg = 255

    ret = Image.new( img.mode, size, bg )
    ret.paste( img, ( 0, 0 ) )
    ret.paste( codebar_img, ( size[ 0 ] - codebar_img.size[ 0 ], size[ 1 ] - codebar_img.size[ 1 ] ) )

    return ret

def tag_visible( img, data ):
    data = str( data )

    fp = StringIO()
    options = {
        'module_width': 0.2,
        'module_height': 4,
        'quiet_zone': 2,
        'font_size': 10,
        'text_distance': 1,
    }
    barcode.generate( 'code128', data, writer = barcode.writer.ImageWriter(), output = fp, writer_options = options )
    codebar_img = Image.open( fp )
    
    size = list( img.size )
    size[ 1 ] += codebar_img.size[ 1 ]

        mode = "L"
    else:
        bg = ( 255, 255, 255 )
        mode = "RGB"
    ret = Image.new( mode, size, bg )
    ret.paste( img, ( 0, codebar_img.size[ 1 ] ) )
    ret.paste( codebar_img, ( size[ 0 ] - codebar_img.size[ 0 ], 0 ) )

    return ret

def image_tatoo( img, image_id ):
    """
        Add the user and download information on the provided image.
        This information is generated automatically for every download of an image.
    """
    image_id = image_id[ 0:18 ]
    
    img = tag_visible( img, image_id )
    img = tag_bottom( img, "{} {}".format( session[ "user_id" ], time.time() ) )
@image_view.route( "/image/file/<image_id>/info" )
@utils.decorator.login_required
def img_info( image_id ):
    """
        Get and return the metadata for a particular image.
        See do_img_info() for more informations.
    """
    current_app.logger.info( "Serve image informations for image '{}'".format( image_id ) )
    
    d = do_img_info( image_id )
    
    if d != None:
        return jsonify( d )
    else:
        return abort( 404 )

def do_img_info( image_id ):
    """
        Retrieve the metadata for a particular image from the database.
    """
    current_app.logger.debug( "Get image information from database for '{}'".format( image_id ) )
    
Marco De Donno's avatar
Marco De Donno committed
    sql = """
        SELECT
            size, width, height, resolution, format
        FROM files
        WHERE uuid = %s
    """
    d = config.db.query_fetchone( sql, ( image_id, ) )
    
    for key, value in d.iteritems():
        current_app.logger.debug( "{}: {}".format( key, value ) )
    
    if d != None:
        return dict( d )
    else:
        return None

@image_view.route( "/image/segment/<tenprint_id>/start" )
@utils.decorator.login_required
def image_tenprint_segmentation( tenprint_id ):
    """
        Route to start the segmentation of a tenprint image into segments (fingers or palm images).
    """
    current_app.logger.info( "Start segmentations for '{}'".format( tenprint_id ) )
    
    ret = do_image_tenprint_segmentation( tenprint_id )
    
    return jsonify( {
        "error": False,
        "data": ret
    } )

def do_image_tenprint_segmentation( tenprint_id ):
    """
        Backend function to create all the segments images for a tenprint souce image.
    """
    sql = "SELECT size, resolution, type, format, data FROM files WHERE uuid = %s"
    img = config.db.query_fetchone( sql, ( tenprint_id, ) )
    
    for key, value in img.iteritems():
        if isinstance( value, str ) and len( value ) > 20:
            value = "{}...".format( value[ 0:20 ] )
        current_app.logger.debug( "{}: {}".format( key, value ) )
    
    img_format = img[ "format" ]
    side = {
        1: "front",
        2: "back"
    }[ img[ "type" ] ]
    
    current_app.logger.debug( "side: {}".format( side ) )
    
    submission_id = get_submission_uuid_for_file( tenprint_id )
    
    current_app.logger.debug( "Decrypt data with DEK" )
    img = utils.encryption.do_decrypt_dek( img[ "data" ], submission_id )
    img = base64.b64decode( img )
    
    buff = StringIO()
    buff.write( img )
    buff.seek( 0 )
    img = Image.open( buff )
    
    current_app.logger.debug( "image: {}".format( img ) )
    
    sql = "SELECT * FROM segments_locations WHERE tenprint_id = %s"
    data = ( tenprint_id, )
    zones = config.db.query_fetchall( sql, data )
    sql = "DELETE FROM files_segments WHERE tenprint = %s"
    config.db.query( sql, ( tenprint_id, ) )
    
        current_app.logger.debug( "Segmenting fpc '{}'".format( z[ "fpc" ] ) )
        tl_x, tl_y, br_x, br_y = [ z[ "x" ], z[ "y" ], z[ "x" ] + z[ "width" ], z[ "y" ] + z[ "height" ] ]
        tmp = img.crop( ( tl_x, tl_y, br_x, br_y ) )
        
        if z[ "orientation" ] != 0:
            tmp = tmp.rotate( -z[ "orientation" ], Image.BICUBIC, True )
        
        buff = StringIO()
        tmp.save( buff, format = img_format )
        buff.seek( 0 )
        
        current_app.logger.debug( "Encrypting segment image with DEK" )
        file_data = buff.getvalue()
        file_data = base64.b64encode( file_data )
        file_data = utils.encryption.do_encrypt_dek( file_data, submission_id )
        current_app.logger.debug( "Inserting to the database" )
        sql = utils.sql.sql_insert_generate( "files_segments", [ "tenprint", "uuid", "pc", "data" ] )
        data = ( tenprint_id, str( uuid4() ), z[ "fpc" ], file_data )
        config.db.query( sql, data )