Skip to content
__init__.py 14.4 KiB
Newer Older
# -*- coding: UTF-8 -*-

from flask import Blueprint
from flask import current_app, abort, send_file, jsonify, session
from cStringIO import StringIO
from threading import Lock
from uuid import uuid4

from PIL import Image
from PIL.TiffImagePlugin import TiffImageFile

import utils

from functions import no_preview_image

from NIST import NISTf

lock = Lock()

image_view = Blueprint( "image", __name__, template_folder = "template" )

@utils.redis.redis_cache( 300 )
def get_submission_uuid_for_file( file_uuid ):
    """
        Get the related submission uuid for a file uuid.
    """
    sql = """
        SELECT submissions.uuid
        FROM submissions
        LEFT JOIN files ON submissions.id = files.folder
        WHERE files.uuid = %s
    """
    return config.db.query_fetchone( sql, ( file_uuid, ) )[ "uuid" ]

@image_view.route( "/image/file/<file_id>/preview" )
@image_view.route( "/image/file/preview" )
@utils.decorator.login_required
def image_file_serve( file_id = None ):
    img = image_file_serve_inner( file_id )
    
    if isinstance( img, TiffImageFile ):
        if not hasattr( img, "use_load_libtiff" ):
            img.use_load_libtiff = True
    
    if img == None:
        return abort( 404 )
    
    else:
        buff = utils.images.pil2buffer( img, "PNG" )
        return send_file( buff, mimetype = "image/png" )

def image_file_serve_inner( file_id ):
    """
        Function to get an image from the database and return it as PNG preview image.
    """
    current_app.logger.info( "Serve a preview for the file '{}'".format( file_id ) )
    
    try:
            raise Exception( "No file id" )
        submission_id = get_submission_uuid_for_file( file_id )
        current_app.logger.debug( "submission id: '{}'".format( submission_id ) )
        
        img, _ = image_serve( "thumbnails", file_id, submission_id )
        
        if img == None:
            current_app.logger.debug( "No image in the 'thumnnails' database. Recreating the thumbnail" )
            
            img, _ = image_serve( "files", file_id, submission_id )
            
            if img == None:
            
            current_app.logger.debug( "Image from the 'files' table: {}".format( img ) )
            img = utils.images.create_thumbnail( file_id, img, submission_id )
            current_app.logger.debug( "Thumbnail image: {}".format( img ) )
        
    
    except:
        current_app.logger.error( "Error while creating the thumbnail. Serving a 'no preview' image" )
        return no_preview_image( return_pil = True )
@image_view.route( "/image/file/<file_id>/full_resolution" )
@utils.decorator.admin_required
def admin_download_file_full_resolution( file_id ):
Marco De Donno's avatar
Marco De Donno committed
    """
        Download the full resolution of an image. This function is only
        accessible for admins. The downloaded image will not be tagged like the
        other images.
    """
    try:
        if file_id == None:
            raise Exception( "No file id" )
        
        submission_id = get_submission_uuid_for_file( file_id )
        current_app.logger.debug( "submission id: '{}'".format( submission_id ) )
        
        img, _ = image_serve( "files", file_id, submission_id )
        
        if img == None:
            return abort( 404 )
            
        buff = utils.images.pil2buffer( img, "TIFF" )
        return send_file(
            buff,
            mimetype = "image/tiff",
            as_attachment = True,
            attachment_filename = file_id + ".tiff",
        )
@image_view.route( "/image/segment/<tenprint_id>/<pc>" )
@utils.decorator.login_required
def image_segment_serve( tenprint_id, pc ):
    """
        Serve a preview for a segment image.
    """
    current_app.logger.info( "Serving a segment image for the tenprint '{}', pc '{}'".format( tenprint_id, pc ) )
    
    try:
        submission_id = get_submission_uuid_for_file( tenprint_id )
        
        current_app.logger.debug( "submission id: {}".format( submission_id ) )
        
Marco De Donno's avatar
Marco De Donno committed
        img, _ = image_serve( "files_segments", ( tenprint_id, pc ), submission_id )
        img = utils.images.create_thumbnail( None, img, submission_id )
        
        current_app.logger.debug( "image: {}".format( img ) )
        
        buff = utils.images.pil2buffer( img, "PNG" )
        return send_file( buff, mimetype = "image/png" )
    
    except:
        current_app.logger.error( "Error while creating the thumbnail. Serving a 'no preview' image" )
        return send_file( no_preview_image(), mimetype = "image/png" )

def image_serve( table, image_id, submission_id ):
    """
        Backend function to get the image from the database.
    """
    current_app.logger.info( "Serve the image '{}' for submission '{}' from table '{}'".format( image_id, submission_id, table ) )
    
    need_to_decrypt = True
    
    if table == "files_segments":
        if isinstance( image_id, tuple ):
            tp, pc = image_id
            sql = "SELECT data, uuid FROM {} WHERE tenprint = %s AND pc = %s".format( table )
            p = ( tp, pc, )
        else:
            sql = "SELECT data, uuid FROM {} WHERE uuid = %s".format( table )
            p = ( image_id, )
    
    elif table == "files":
        sql = "SELECT data, uuid, resolution, format FROM {} WHERE uuid = %s".format( table )
        p = ( image_id, )
    
    elif table == "thumbnails":
        sql = "SELECT data, uuid, format FROM {} WHERE uuid = %s".format( table )
        p = ( image_id, )
    
    elif table == "tenprint_cards":
        image_id, t = image_id
        sql = "SELECT image_{} as data, id as uuid, image_resolution AS resolution FROM {} WHERE id = %s".format( t, table )
        p = ( image_id, )
        need_to_decrypt = False
    
    else:
        current_app.logger.error( "table '{}' not authorized".format( table ) )
        raise Exception( "table not authorized" )
    
    current_app.logger.debug( "sql:    {}".format( sql ) )
    current_app.logger.debug( "params: {}".format( p ) )
    
    data = config.db.query_fetchone( sql, p )
    
    if data == None:
        return None, None
    
    else:
        img = data[ "data" ]
        rid = data[ "uuid" ]
        
        if img == None:
            return None, None
        
        current_app.logger.debug( "image: {}...".format( img[ 0:20 ] ) )
        current_app.logger.debug( "need_to_decrypt: {}".format( need_to_decrypt ) )
        if "format" in data:
            current_app.logger.debug( "format: {}".format( data[ "format" ] ) )
            img = utils.encryption.do_decrypt_dek( img, submission_id )
        if table == "files" and data[ "format" ].upper() == "NIST":
            img = str2nist2img( img )
        else:
            img = str2img( img )
        if "resolution" in data:
            img.info[ "dpi" ] = ( data[ "resolution" ], data[ "resolution" ] )

        current_app.logger.debug( "image: {}".format( img ) )
        
        return img, rid

def str2img( data ):
    """
        Convert a base64 string image to a PIL image.
    """
    current_app.logger.info( "Convert string image to PIL format" )
    
    if data == None:
        return None
    
    else:
        img = base64.b64decode( data )
        buff = StringIO()
        buff.write( img )
        buff.seek( 0 )
        img = Image.open( buff )
        
        current_app.logger.debug( "string: {}".format( data[ 0:20 ] ) )
        current_app.logger.debug( "image: {}".format( img ) )
        
        return img

def str2nist2img( data ):
Marco De Donno's avatar
Marco De Donno committed
    """
        Convert a NIST string object to the tenprint card image.
    """
    current_app.logger.info( "Convert string NIST file to tenprint card image" )
    
    if data == None:
        return None
    
    else:
        img = base64.b64decode( data )
        buff = StringIO()
        buff.write( img )
        buff.seek( 0 )
        
        with lock:
            n = NISTf( buff )
            img = n.get_tenprintcard_front( 1000 )
        
        current_app.logger.debug( "string: {}".format( data[ 0:20 ] ) )
        current_app.logger.debug( "image: {}".format( img ) )
        
        return img

def tag_bottom( img, data ):
Marco De Donno's avatar
Marco De Donno committed
    """
        Add fingerprinting to the image at the bottom. This will add the
        argument `data` to the images as a CODE128 barcode at the top of the
        image.
    """
    data = str( data )

    top = 10
    h = 10
    
    fp = StringIO()
    options = {
        'module_width': 0.2,
        'module_height': 1.0,
        'quiet_zone': 0,
        'write_text': False,
    }
    barcode.generate( 'code128', data, writer = barcode.writer.ImageWriter(), output = fp, writer_options = options )
    codebar_img = Image.open( fp )
    codebar_img = codebar_img.crop( ( 0, top, codebar_img.width, top + h ) )
    
    size = list( img.size )
    size[ 1 ] += h

    if img.mode == "RGB":
        bg = ( 255, 255, 255 )
    else:
        bg = 255

    ret = Image.new( img.mode, size, bg )
    ret.paste( img, ( 0, 0 ) )
    ret.paste( codebar_img, ( size[ 0 ] - codebar_img.size[ 0 ], size[ 1 ] - codebar_img.size[ 1 ] ) )

    return ret

def tag_visible( img, data ):
Marco De Donno's avatar
Marco De Donno committed
    """
        Add fingerprinting to the image at the bottom. This will add the
        argument `data` to the images as a CODE128 barcode at the bottom of the
        image.
    """
    data = str( data )

    fp = StringIO()
    options = {
        'module_width': 0.2,
        'module_height': 4,
        'quiet_zone': 2,
        'font_size': 10,
        'text_distance': 1,
    }
    barcode.generate( 'code128', data, writer = barcode.writer.ImageWriter(), output = fp, writer_options = options )
    codebar_img = Image.open( fp )
    
    size = list( img.size )
    size[ 1 ] += codebar_img.size[ 1 ]

        mode = "L"
    else:
        bg = ( 255, 255, 255 )
        mode = "RGB"
    ret = Image.new( mode, size, bg )
    ret.paste( img, ( 0, codebar_img.size[ 1 ] ) )
    ret.paste( codebar_img, ( size[ 0 ] - codebar_img.size[ 0 ], 0 ) )

    return ret

def image_tatoo( img, image_id ):
    """
Marco De Donno's avatar
Marco De Donno committed
        Add the user and download information on the provided image. This
        information is generated automatically for every download of an image.
    """
    image_id = image_id[ 0:18 ]
    
    img = tag_visible( img, image_id )
    img = tag_bottom( img, "{} {}".format( session[ "user_id" ], time.time() ) )
@image_view.route( "/image/file/<image_id>/info" )
@utils.decorator.login_required
Marco De Donno's avatar
Marco De Donno committed
        Get and return the metadata for a particular image. See do_img_info()
        for more informations.
    """
    current_app.logger.info( "Serve image informations for image '{}'".format( image_id ) )
    
    d = do_img_info( image_id )
    
    if d != None:
        return jsonify( d )
    else:
        return abort( 404 )

@utils.redis.redis_cache( 300 )
def do_img_info( image_id ):
    """
        Retrieve the metadata for a particular image from the database.
    """
    current_app.logger.debug( "Get image information from database for '{}'".format( image_id ) )
    
Marco De Donno's avatar
Marco De Donno committed
    sql = """
        SELECT
            size, width, height, resolution, format
        FROM files
        WHERE uuid = %s
    """
    d = config.db.query_fetchone( sql, ( image_id, ) )
    
    for key, value in d.iteritems():
        current_app.logger.debug( "{}: {}".format( key, value ) )
    
    if d != None:
        return dict( d )
    else:
        return None

@image_view.route( "/image/segment/<tenprint_id>/start" )
@utils.decorator.login_required
def image_tenprint_segmentation( tenprint_id ):
    """
Marco De Donno's avatar
Marco De Donno committed
        Route to start the segmentation of a tenprint image into segments
        (fingers or palm images).
    """
    current_app.logger.info( "Start segmentations for '{}'".format( tenprint_id ) )
    
    ret = do_image_tenprint_segmentation( tenprint_id )
    
    return jsonify( {
        "error": False,
        "data": ret
    } )

def do_image_tenprint_segmentation( tenprint_id ):
    """
        Backend function to create all the segments images for a tenprint souce image.
    """
    sql = "SELECT size, resolution, type, format, data FROM files WHERE uuid = %s"
    img = config.db.query_fetchone( sql, ( tenprint_id, ) )
    
    for key, value in img.iteritems():
        if isinstance( value, str ) and len( value ) > 20:
            value = "{}...".format( value[ 0:20 ] )
        current_app.logger.debug( "{}: {}".format( key, value ) )
    
    img_format = img[ "format" ]
    side = {
        1: "front",
        2: "back"
    }[ img[ "type" ] ]
    
    current_app.logger.debug( "side: {}".format( side ) )
    
    submission_id = get_submission_uuid_for_file( tenprint_id )
    
    current_app.logger.debug( "Decrypt data with DEK" )
    img = utils.encryption.do_decrypt_dek( img[ "data" ], submission_id )
    img = base64.b64decode( img )
    
    buff = StringIO()
    buff.write( img )
    buff.seek( 0 )
    img = Image.open( buff )
    
    current_app.logger.debug( "image: {}".format( img ) )
    
    sql = "SELECT * FROM segments_locations WHERE tenprint_id = %s"
    data = ( tenprint_id, )
    zones = config.db.query_fetchall( sql, data )
    sql = "DELETE FROM files_segments WHERE tenprint = %s"
    config.db.query( sql, ( tenprint_id, ) )
    
        current_app.logger.debug( "Segmenting fpc '{}'".format( z[ "fpc" ] ) )
        tl_x, tl_y, br_x, br_y = [ z[ "x" ], z[ "y" ], z[ "x" ] + z[ "width" ], z[ "y" ] + z[ "height" ] ]
        tmp = img.crop( ( tl_x, tl_y, br_x, br_y ) )
        
        if z[ "orientation" ] != 0:
            tmp = tmp.rotate( -z[ "orientation" ], Image.BICUBIC, True )
        
        buff = StringIO()
        tmp.save( buff, format = img_format )
        buff.seek( 0 )
        
        current_app.logger.debug( "Encrypting segment image with DEK" )
        file_data = buff.getvalue()
        file_data = base64.b64encode( file_data )
        file_data = utils.encryption.do_encrypt_dek( file_data, submission_id )
        current_app.logger.debug( "Inserting to the database" )
        sql = utils.sql.sql_insert_generate( "files_segments", [ "tenprint", "uuid", "pc", "data" ] )
        data = ( tenprint_id, str( uuid4() ), z[ "fpc" ], file_data )
        config.db.query( sql, data )