Guía 77- Ejecución de tareas en colas en «background»

Estoy muy contento con este ejercicio, pues es algo que los sistemas disponían (Oracle Forms y Report, por ejemplo) y es muy habitual en sistemas grandes, pero no se ve en los sistemas Web Forms y se echa mucho de menos.

Estamos acostumbrados en las aplicaciones Web Forms (como las que realizamos en PHPRunner) a que todo se ejecute en el mismo momento que se solicita y esto hace :

  • Que los usuarios se queden «colgados» durante el tiempo que se completa la petición.
  • No tenemos forma de asegurar el correcto funcionamiento del sistema, pues esos procesos largos y pesados intentan capturar todos los recursos del sistema y no tenemos forma de limitar las ejecuciones pesadas, ni secuenciarlas para que no compitan todas las peticiones con todas las peticiones.
  • Aplicaciones para muchos usuarios, hace que estos sistemas sean inestables y que se puedan «colgar» con cierta facilidad.

Objetivo

Disponer de un sistema de diferentes colas de ejecución de tareas en «background» que nos permita regular la carga de los sistemas y asegurar que las tareas pesadas se ejecutan en orden y secuenciadas, para que no saturen el sistema.

DEMO: https://fhumanes.com/scheduler_queue

Usuarios: admin/admin y user1/user1

Solución Técnica

Lo que he hecho es dar la posibilidad al aplicativo de que las tareas largas y pesadas, se puedan ejecutar en background (segundo plano) para que no se quede «enganchada» al usuario. De esta forma, procesos de elaboración de informes, actualizaciones de muchos registros, comunicaciones con otros sistemas o simplemente, el envío de email o procesos similares, se ejecuten sin estar «enganchados» al suario. Cuando estas tareas terminan, a través del sistema de Notificaciones que dispone PHPRunner, avisa al usuario de su terminación y le facilita la información de la ejecución e incluso los ficheros que hayan podido producirse en esos procesos.

Todas las tareas que se puedan ejecutar en Background se tienen que definir.
(1).- Todas las tareas tienen un código (valor único en el sistema) y es el que se va a utilizar para solicitar la ejecución de la misma.
(2).- Se define la cola de ejecución. Las ejecuciones se agrupan por colas. En este ejemplo se han definido 2 colas. FAST (para tareas rápidas) y HEAVY (para tareas muy pesadas). Esto nos permite ejecutar en paralelo, pero secuenciadas, las tareas con coste menores de recursos y otras que tardan más en ejecutarse.
(3).- Se establece el comando que se va a ejecutar, incluyendo parámetros fijos. Se utilizan valores encerrados entre llaves «{}» para que en tiempo de ejecución se calculen los valores. Esto nos permite mover la información entre sistemas ( desarrollo y producción).
(4).- Sólo para el ejemplo, se ha programado este botón para solicitar la ejecución de la tarea. Se puede ver el código para que podáis apreciar lo sencillo es que es solicitar la ejecución.

Esta pantalla sólo la tiene el usuario «admin».

(1).- Este es el bloque de información que se actualiza en la petición de ejecución. Las tareas pasan por 4 estados.
(2).- Este es el bloque de información que se va completando en la planificación y ejecución de la tarea
(3).- Cuando se termina la tarea aparece una nueva notificación al usuario que hizo la solicitud.

(1).- Si se pulsa en el botón de las notificaciones muestra todas las que tenemos y la primero de la lista es la más reciente.
(2).- Información de la tarea completada y si hacemos clic en esta información facilita detalle de la ejecución.
(3).- Este icono muestra las notificaciones que el sistema nonos había mostrado.

Facilita el detalle de la ejecución de las tareas.
(1).- Muestra el resultado (mensaje que nos haya dejado la tarea) en el campo «Result». Y también muestra los ficheros  (informes, reports, etc.) que a tarea produzca.
(2).- El botón «Close» elimina esa pestaña del navegador y vuelve a dejarte en la pestaña en la que estabas trabajando.

El proceso que ejecuta estos conjuntos de tareas es uno que se lanza desde el CRONTAB (ejecuta el proceso cada minuto)  y que ejecuta el fichero «scheduler_queue/scheduler_queue_job.php» y su contenido es:

<?php
@ini_set("display_errors","1");
@ini_set("display_startup_errors","1");
require_once("include/dbcommon.php");

custom_error(2,"Actual directory: ".getcwd().' directory script: '.__DIR__); // To debug
chdir(__DIR__);                                                             // Put directory actual the directory script actual

require_once("My_Code/scheduler_queue_function.php");  // Function general project

$dir_path_base = __DIR__;  // Directory Path Base, Reference other process

$status_arr = single_execution( 'scheduler', 'My_Code/scheduler_queue_process.php'); // Execute Process Scheduler single

custom_error(20,"Control Scheduler: ".print_r($status_arr,true)); // To debug

$process_arr = $status_arr[2]; // All execution pending

foreach ($process_arr as &$row_process) {  // Execution al process by queue
    $queue = $row_process["code"]; 
    
    $status_queue = fifo_execution( $queue , "My_Code/scheduler_queue_execute.php" );

    custom_error(21,"status Queue: ".print_r($status_queue,true)); // To debug
}

Podéis ver que lleva las trazas de depuración (función de «custom_error») de la guía 34.

En el directorio «My_Code» está el código que ejecuta las tareas en Background.

El fichero «scheduler_queue_function.php» es el conjunto de funciones que utilizo en los otros códigos:

<?php
// Function general by project

// To ensure that only one process is validating if you have to run a batch
function single_execution( $semaphore, $process ) {
    $dir_path_base = __DIR__; 
    $path_semaphore = __DIR__."/../semaphores/".$semaphore.".lock";
    $fp = fopen($path_semaphore, "r+");
    if ($fp == false ) { // file not found
        return array(false ,"The semaphore file does not exist!");
    }    
    if (!flock($fp, LOCK_EX|LOCK_NB, $blocked)) {
        if ($blocked) {    
            // another process holds the lock
            return array(false, "Couldn't get the lock! Other script in run!");     
        }
        else {
            // couldn't lock for another reason, e.g. no such file
            return array(false , "Error! Nothing done.");
        }
    }
    else {
        // lock obtained
        ftruncate($fp, 0);      // truncate file        
        // execute Process
        $row_arr = array();
        include ($process);     // Execute Process
        fflush($fp);            // flush output before releasing the lock
        flock($fp, LOCK_UN);    // release the lock        
        return array(true, "Process executed correctly",$row_arr);    
    }
}


// Execution Process with stop/wait if running other process

function fifo_execution( $semaphore, $process ) {
    $path_semaphore = __DIR__."/../semaphores/".$semaphore.".lock";
    $fp = fopen($path_semaphore, "r+");    
    if ($fp == false ) { // file not found
        return array(false ,"The semaphore file does not exist!");
    }    
    if (flock($fp, LOCK_EX)) {  // acquire an exclusive lock
        ftruncate($fp, 0);      // truncate file    
        // execute Proccess
        include ($process);    
        fflush($fp);            // flush output before releasing the lock
        flock($fp, LOCK_UN);    // release the lock       
    }    
    fclose($fp);
    return array(true, "Process executed correctly");
}

/*execute program and write all output to $out
terminate program if it runs more than XXX seconds */

function execute($cmd, $stdin=null, &$stdout, &$stderr, $timeout=false)
{
    $pipes = array();
    $process = proc_open(
        $cmd,
        array(array('pipe','r'),array('pipe','w'),array('pipe','w')),
        $pipes
    );
    $start = time();
    $stdout = '';
    $stderr = '';

    if(is_resource($process))
    {
        stream_set_blocking($pipes[0], 0);
        stream_set_blocking($pipes[1], 0);
        stream_set_blocking($pipes[2], 0);
        fwrite($pipes[0], $stdin);
        fclose($pipes[0]);
    }

    while(is_resource($process))
    {
        //echo ".";
        $stdout .= stream_get_contents($pipes[1]);
        $stderr .= stream_get_contents($pipes[2]);

        if($timeout !== false && time() - $start > $timeout)
        {
            proc_terminate($process, 9);
            return 1;
        }

        $status = proc_get_status($process);
        if(!$status['running'])
        {
            fclose($pipes[1]);
            fclose($pipes[2]);
            proc_close($process);
            return $status['exitcode'];
        }

        usleep(1000000);
    }

    return 1;
}

El fichero «scheduler_queue_process.php» es el que identifica las tareas que están pendientes de ser ejecutadas:

<?php
// Main code
$rs = DB::Query(
"SELECT
task_execution.id_task_execution, task_execution.scheduler_queue_task_id, task_execution.status_id,  task_execution.param_special,
task_execution.user, task_execution.group,
task.id_task, task.code task_code, task.scheduler_queue_id, task.status, task.command, task.user_default, task.group_default, task.max_time_minutes,
queue.id_queue, queue.code, queue.last_control_date
FROM scheduler_queue_task_execution task_execution
JOIN scheduler_queue_task task on (task_execution.scheduler_queue_task_id = task.id_task)
JOIN scheduler_queue queue on (queue.id_queue = task.scheduler_queue_id)
WHERE task.status = 0 AND task_execution.status_id = 1
ORDER BY queue.last_control_date, task_execution.id_task_execution
"
);
// $row_arr = array(); // Process father

$queue_id = $rs->value("id_queue"); // Select Queue of firt record

while( $data = $rs->fetchAssoc() )
{
    if ($queue_id == $data["id_queue"]) { // Only reord of firt Queue
        custom_error(10,"Id Task execution: ".$data['id_task_execution']); // To debug

        // Update date in Queue
        $data2 = array();
        $keyvalues2 = array();
        $data2["last_control_date"] = now();
        $keyvalues2["id_queue"] = $data["id_queue"];
        DB::Update("scheduler_queue", $data2, $keyvalues2 );

        // Update status in Task Execution
        $data2 = array();
        $keyvalues2 = array();
        $data2["status_id"] = 2;
        $keyvalues2["id_task_execution"] = $data["id_task_execution"];
        DB::Update("scheduler_queue_task_execution", $data2, $keyvalues2 );

        $row_arr[]= $data;
   }
}

El fichero «scheduler_queue_execute.php» es el que se encarga de ejecutar cada una de las tareas y se encarga de actualizar en base de datos los datos de ejecución y de enviar el mensaje de notificación.

<?php
/*
 * In varibale $row_process, all data of execution
 */
// Launch independent process
//  $dir_path_base = __DIR__;  // In process father
global $row_process;
global $dir_path_base;
global $debugCode;  

$debugCode = true;  // Change to "false" to eliminate traces

$scheduler_task_execution =  $row_process['id_task_execution'];

// Update "Execution Init"
$data2 = array();
$keyvalues2 = array();
$data2["date_init"] = now();
$data2["status_id"] = 3;
$keyvalues2["id_task_execution"] = $scheduler_task_execution;
DB::Update("scheduler_queue_task_execution", $data2, $keyvalues2 );

$command = str_replace("{DIR}", $dir_path_base, $row_process['command']);
$command = str_replace("{ID_TASK}", $scheduler_task_execution, $command);
$command .= ' '.$row_process['param_special'];

custom_error(11,"Execute Command: ".$command ); // To debug

$max_time = $row_process['max_time_minutes']*60;

execute($command, null, $out, $err, $max_time);

// $out = shell_exec($command);

custom_error(12,"Execute Command -Result: ".$out ); // To debug
custom_error(13,"Execute Command -Error: ".$err ); // To debug

// Update "Execution End"
$data2 = array();
$keyvalues2 = array();
$data2["date_end"] = now();
$data2["status_id"] = 4;
$data2["result"]  = $out;
$keyvalues2["id_task_execution"] = $scheduler_task_execution;
DB::Update("scheduler_queue_task_execution", $data2, $keyvalues2 );

$keyvalue = array();
$keyvalue["id_task_execution"] = $scheduler_task_execution;
$special_param = 

// Notify API
addNotification( "Task Batch executed: ".$scheduler_task_execution." - Code: ".$row_process["task_code"] , "Batch Task", "fa-envelope", 
        makePageLink( "scheduler_queue_task_execution", "view", $keyvalue, array( "page" => "view_special")), null, $row_process["user"], true );  

En el fichero «function_petition_task.php» incorpora la función para dar de alta una nueva petición de ejecución de la tarea.

<?php
// Function that facilitates the application for the execution of new tasks in background
function petition_task( $task, $param_special = null ) {
    $rs = DB::Query("select * from scheduler_queue_task WHERE code = '$task'");
    // Control of whether the task code exists
    if ( $rs->value("code") == null) {
        return (array(0,"Task code not found"));
    }
    $data1 = $rs->fetchAssoc();
    // Control that the task is active
    if ( $data1['status'] <> 0 ){
        return (array(0,"Non -active task"));
    }
    // Task control is on dates
    $now = now();
    if ( $data1['date_init'] > $now or ($data1['date_end'] <> null and $data1['date_end'] < $now) ){
        return (array(0,"The task is out of date to be executed"));
    }
    
    $user = Security::getUserName(); // User conected
    
    // `scheduler_queue_task_id`, `param_special`, `user`, `status_id`
    $data = array();
    $data["scheduler_queue_task_id"] = $data1['id_task'];
    $data["param_special"]  = $param_special;
    $data["user"] = $user;
    $data["status_id"] = 1;
    DB::Insert("scheduler_queue_task_execution", $data );
    $id = DB::LastId();
    return(array(1,$id));
    
}

A modo de ejemplo os facilito los códigos de las dos tareas que he programado en el ejemplo. Están en el directorio «Jobs».

«test1.php»

<?php
@ini_set("display_errors","1");
@ini_set("display_startup_errors","1");
require_once(__DIR__."/../../include/dbcommon.php");
custom_error(50,"Job Bacht Test1 in execution"); // To debug

// GET Parameter line Command
$param = getopt(null, ["id_task:","user:"]);
custom_error(52,"Job Param: ".print_r($param,true)); // To debug

// time initial
$time_initial = date('h:i:s');
// sleep 5 segundos
sleep(5);
// Time final
$time_final = date('h:i:s');

custom_error(51,"Time Initial: ".$time_initial." Time End :".$time_final); // To debug

echo "ok";

custom_error(60,"End Job test1.php"); // To debug

«test2.php»

<?php
@ini_set("display_errors","1");
@ini_set("display_startup_errors","1");
require_once(__DIR__."/../../include/dbcommon.php");

global $debugCode;  
$debugCode = true;  // Change to "false" to eliminate traces

custom_error(50,"Job Bacht Test2 in execution"); // To debug

// GET Parameter line Command
$param = getopt(null, ["id_task:"]);
$id_task = $param['id_task'];
custom_error(52,"Job Param: ".print_r($param,true)); // To debug

// time initial
$time_initial = date('h:i:s');
// sleep 5 segundos
sleep(5);
// Time final
$time_final = date('h:i:s');

custom_error(51,"Time Initial: ".$time_initial." Time End :".$time_final); // To debug

echo "ok"; // This is what will go to the field "result"

$temp_file = tempnam(sys_get_temp_dir(), 'txt');
$fp = fopen($temp_file, "w");
fwrite($fp, "This is an example to show how to record a file obtained from a report or any other process \n");
fclose($fp);
$filesize = filesize($temp_file);

// prepare image info to be saved in db
$files= array();
$files[] = array("name" => $temp_file,
"usrName" => 'example.txt', "size" => $filesize, "type" => "text/plain",
"searchStr" => 'example.txt'.":sStrEnd");

custom_error(52,"Preparate file :".print_r($files,true)); // To debug

// Update record by Execution with references by Files

// $rs = DB::Query("select * from scheduler_queue_task_execution where id_task_execution = $id_task");
// $data = $rs->fetchAssoc();
// custom_error(53,"Dara Record :".print_r($data,true)); // To debug

$data = array();
$keyvalues = array();
$data["files"] = my_json_encode($files);;
$keyvalues["id_task_execution"] = $id_task;
DB::Update("scheduler_queue_task_execution", $data, $keyvalues );

custom_error(60,"End Job test2.php"); // To debug

También existe el directorio «semaphores»

Que contiene unos ficheros para el control de las colas de procesos. Por cada cola que definamos, debe haber su correspondiente fichero en este directorio. El fichero «scheduler.lock» es fijo y es el que se utiliza para asegurarnos que sólo un proceso accede a la lista de tareas pendientes de ejecutarse.

Este no es un sistema sencillo por lo que los usuarios que tengan un desarrollo no complejo, no debería implementarlo, pero aquellos que tengan muchos informes y procesos pesados, seguro que verán en él muchas de las soluciones que buscaban.

Para cualquier duda, por favor, contactar conmigo a través de mi email [email protected]

Os dejo los fuentes para que lo podáis instalar en vuestros PC’s y podáis ver con detalle todo el sistema y , también, lo podáis modificar y mejorar.

Adjuntos

Archivo Tamaño de archivo Descargas
zip PHPRunner 10.91 y backup de base de datos 87 KB 509

Blog personal para facilitar soporte gratuito a usuarios de React y PHPRunner