Estoy muy contento con este ejercicio, pues es algo que los sistemas disponían (Oracle Forms y Report, por ejemplo) y es muy habitual en sistemas grandes, pero no se ve en los sistemas Web Forms y se echa mucho de menos.
Estamos acostumbrados en las aplicaciones Web Forms (como las que realizamos en PHPRunner) a que todo se ejecute en el mismo momento que se solicita y esto hace :
- Que los usuarios se queden «colgados» durante el tiempo que se completa la petición.
- No tenemos forma de asegurar el correcto funcionamiento del sistema, pues esos procesos largos y pesados intentan capturar todos los recursos del sistema y no tenemos forma de limitar las ejecuciones pesadas, ni secuenciarlas para que no compitan todas las peticiones con todas las peticiones.
- Aplicaciones para muchos usuarios, hace que estos sistemas sean inestables y que se puedan «colgar» con cierta facilidad.
Objetivo
Disponer de un sistema de diferentes colas de ejecución de tareas en «background» que nos permita regular la carga de los sistemas y asegurar que las tareas pesadas se ejecutan en orden y secuenciadas, para que no saturen el sistema.
DEMO: https://fhumanes.com/scheduler_queue
Usuarios: admin/admin y user1/user1
Solución Técnica
Lo que he hecho es dar la posibilidad al aplicativo de que las tareas largas y pesadas, se puedan ejecutar en background (segundo plano) para que no se quede «enganchada» al usuario. De esta forma, procesos de elaboración de informes, actualizaciones de muchos registros, comunicaciones con otros sistemas o simplemente, el envío de email o procesos similares, se ejecuten sin estar «enganchados» al suario. Cuando estas tareas terminan, a través del sistema de Notificaciones que dispone PHPRunner, avisa al usuario de su terminación y le facilita la información de la ejecución e incluso los ficheros que hayan podido producirse en esos procesos.
Todas las tareas que se puedan ejecutar en Background se tienen que definir.
(1).- Todas las tareas tienen un código (valor único en el sistema) y es el que se va a utilizar para solicitar la ejecución de la misma.
(2).- Se define la cola de ejecución. Las ejecuciones se agrupan por colas. En este ejemplo se han definido 2 colas. FAST (para tareas rápidas) y HEAVY (para tareas muy pesadas). Esto nos permite ejecutar en paralelo, pero secuenciadas, las tareas con coste menores de recursos y otras que tardan más en ejecutarse.
(3).- Se establece el comando que se va a ejecutar, incluyendo parámetros fijos. Se utilizan valores encerrados entre llaves «{}» para que en tiempo de ejecución se calculen los valores. Esto nos permite mover la información entre sistemas ( desarrollo y producción).
(4).- Sólo para el ejemplo, se ha programado este botón para solicitar la ejecución de la tarea. Se puede ver el código para que podáis apreciar lo sencillo es que es solicitar la ejecución.
Esta pantalla sólo la tiene el usuario «admin».
(1).- Este es el bloque de información que se actualiza en la petición de ejecución. Las tareas pasan por 4 estados.
(2).- Este es el bloque de información que se va completando en la planificación y ejecución de la tarea
(3).- Cuando se termina la tarea aparece una nueva notificación al usuario que hizo la solicitud.
(1).- Si se pulsa en el botón de las notificaciones muestra todas las que tenemos y la primero de la lista es la más reciente.
(2).- Información de la tarea completada y si hacemos clic en esta información facilita detalle de la ejecución.
(3).- Este icono muestra las notificaciones que el sistema nonos había mostrado.
Facilita el detalle de la ejecución de las tareas.
(1).- Muestra el resultado (mensaje que nos haya dejado la tarea) en el campo «Result». Y también muestra los ficheros (informes, reports, etc.) que a tarea produzca.
(2).- El botón «Close» elimina esa pestaña del navegador y vuelve a dejarte en la pestaña en la que estabas trabajando.
El proceso que ejecuta estos conjuntos de tareas es uno que se lanza desde el CRONTAB (ejecuta el proceso cada minuto) y que ejecuta el fichero «scheduler_queue/scheduler_queue_job.php» y su contenido es:
<?php @ini_set("display_errors","1"); @ini_set("display_startup_errors","1"); require_once("include/dbcommon.php"); custom_error(2,"Actual directory: ".getcwd().' directory script: '.__DIR__); // To debug chdir(__DIR__); // Put directory actual the directory script actual require_once("My_Code/scheduler_queue_function.php"); // Function general project $dir_path_base = __DIR__; // Directory Path Base, Reference other process $status_arr = single_execution( 'scheduler', 'My_Code/scheduler_queue_process.php'); // Execute Process Scheduler single custom_error(20,"Control Scheduler: ".print_r($status_arr,true)); // To debug $process_arr = $status_arr[2]; // All execution pending foreach ($process_arr as &$row_process) { // Execution al process by queue $queue = $row_process["code"]; $status_queue = fifo_execution( $queue , "My_Code/scheduler_queue_execute.php" ); custom_error(21,"status Queue: ".print_r($status_queue,true)); // To debug }
Podéis ver que lleva las trazas de depuración (función de «custom_error») de la guía 34.
En el directorio «My_Code» está el código que ejecuta las tareas en Background.
El fichero «scheduler_queue_function.php» es el conjunto de funciones que utilizo en los otros códigos:
<?php // Function general by project // To ensure that only one process is validating if you have to run a batch function single_execution( $semaphore, $process ) { $dir_path_base = __DIR__; $path_semaphore = __DIR__."/../semaphores/".$semaphore.".lock"; $fp = fopen($path_semaphore, "r+"); if ($fp == false ) { // file not found return array(false ,"The semaphore file does not exist!"); } if (!flock($fp, LOCK_EX|LOCK_NB, $blocked)) { if ($blocked) { // another process holds the lock return array(false, "Couldn't get the lock! Other script in run!"); } else { // couldn't lock for another reason, e.g. no such file return array(false , "Error! Nothing done."); } } else { // lock obtained ftruncate($fp, 0); // truncate file // execute Process $row_arr = array(); include ($process); // Execute Process fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // release the lock return array(true, "Process executed correctly",$row_arr); } } // Execution Process with stop/wait if running other process function fifo_execution( $semaphore, $process ) { $path_semaphore = __DIR__."/../semaphores/".$semaphore.".lock"; $fp = fopen($path_semaphore, "r+"); if ($fp == false ) { // file not found return array(false ,"The semaphore file does not exist!"); } if (flock($fp, LOCK_EX)) { // acquire an exclusive lock ftruncate($fp, 0); // truncate file // execute Proccess include ($process); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // release the lock } fclose($fp); return array(true, "Process executed correctly"); } /*execute program and write all output to $out terminate program if it runs more than XXX seconds */ function execute($cmd, $stdin=null, &$stdout, &$stderr, $timeout=false) { $pipes = array(); $process = proc_open( $cmd, array(array('pipe','r'),array('pipe','w'),array('pipe','w')), $pipes ); $start = time(); $stdout = ''; $stderr = ''; if(is_resource($process)) { stream_set_blocking($pipes[0], 0); stream_set_blocking($pipes[1], 0); stream_set_blocking($pipes[2], 0); fwrite($pipes[0], $stdin); fclose($pipes[0]); } while(is_resource($process)) { //echo "."; $stdout .= stream_get_contents($pipes[1]); $stderr .= stream_get_contents($pipes[2]); if($timeout !== false && time() - $start > $timeout) { proc_terminate($process, 9); return 1; } $status = proc_get_status($process); if(!$status['running']) { fclose($pipes[1]); fclose($pipes[2]); proc_close($process); return $status['exitcode']; } usleep(1000000); } return 1; }
El fichero «scheduler_queue_process.php» es el que identifica las tareas que están pendientes de ser ejecutadas:
<?php // Main code $rs = DB::Query( "SELECT task_execution.id_task_execution, task_execution.scheduler_queue_task_id, task_execution.status_id, task_execution.param_special, task_execution.user, task_execution.group, task.id_task, task.code task_code, task.scheduler_queue_id, task.status, task.command, task.user_default, task.group_default, task.max_time_minutes, queue.id_queue, queue.code, queue.last_control_date FROM scheduler_queue_task_execution task_execution JOIN scheduler_queue_task task on (task_execution.scheduler_queue_task_id = task.id_task) JOIN scheduler_queue queue on (queue.id_queue = task.scheduler_queue_id) WHERE task.status = 0 AND task_execution.status_id = 1 ORDER BY queue.last_control_date, task_execution.id_task_execution " ); // $row_arr = array(); // Process father $queue_id = $rs->value("id_queue"); // Select Queue of firt record while( $data = $rs->fetchAssoc() ) { if ($queue_id == $data["id_queue"]) { // Only reord of firt Queue custom_error(10,"Id Task execution: ".$data['id_task_execution']); // To debug // Update date in Queue $data2 = array(); $keyvalues2 = array(); $data2["last_control_date"] = now(); $keyvalues2["id_queue"] = $data["id_queue"]; DB::Update("scheduler_queue", $data2, $keyvalues2 ); // Update status in Task Execution $data2 = array(); $keyvalues2 = array(); $data2["status_id"] = 2; $keyvalues2["id_task_execution"] = $data["id_task_execution"]; DB::Update("scheduler_queue_task_execution", $data2, $keyvalues2 ); $row_arr[]= $data; } }
El fichero «scheduler_queue_execute.php» es el que se encarga de ejecutar cada una de las tareas y se encarga de actualizar en base de datos los datos de ejecución y de enviar el mensaje de notificación.
<?php /* * In varibale $row_process, all data of execution */ // Launch independent process // $dir_path_base = __DIR__; // In process father global $row_process; global $dir_path_base; global $debugCode; $debugCode = true; // Change to "false" to eliminate traces $scheduler_task_execution = $row_process['id_task_execution']; // Update "Execution Init" $data2 = array(); $keyvalues2 = array(); $data2["date_init"] = now(); $data2["status_id"] = 3; $keyvalues2["id_task_execution"] = $scheduler_task_execution; DB::Update("scheduler_queue_task_execution", $data2, $keyvalues2 ); $command = str_replace("{DIR}", $dir_path_base, $row_process['command']); $command = str_replace("{ID_TASK}", $scheduler_task_execution, $command); $command .= ' '.$row_process['param_special']; custom_error(11,"Execute Command: ".$command ); // To debug $max_time = $row_process['max_time_minutes']*60; execute($command, null, $out, $err, $max_time); // $out = shell_exec($command); custom_error(12,"Execute Command -Result: ".$out ); // To debug custom_error(13,"Execute Command -Error: ".$err ); // To debug // Update "Execution End" $data2 = array(); $keyvalues2 = array(); $data2["date_end"] = now(); $data2["status_id"] = 4; $data2["result"] = $out; $keyvalues2["id_task_execution"] = $scheduler_task_execution; DB::Update("scheduler_queue_task_execution", $data2, $keyvalues2 ); $keyvalue = array(); $keyvalue["id_task_execution"] = $scheduler_task_execution; $special_param = // Notify API addNotification( "Task Batch executed: ".$scheduler_task_execution." - Code: ".$row_process["task_code"] , "Batch Task", "fa-envelope", makePageLink( "scheduler_queue_task_execution", "view", $keyvalue, array( "page" => "view_special")), null, $row_process["user"], true );
En el fichero «function_petition_task.php» incorpora la función para dar de alta una nueva petición de ejecución de la tarea.
<?php // Function that facilitates the application for the execution of new tasks in background function petition_task( $task, $param_special = null ) { $rs = DB::Query("select * from scheduler_queue_task WHERE code = '$task'"); // Control of whether the task code exists if ( $rs->value("code") == null) { return (array(0,"Task code not found")); } $data1 = $rs->fetchAssoc(); // Control that the task is active if ( $data1['status'] <> 0 ){ return (array(0,"Non -active task")); } // Task control is on dates $now = now(); if ( $data1['date_init'] > $now or ($data1['date_end'] <> null and $data1['date_end'] < $now) ){ return (array(0,"The task is out of date to be executed")); } $user = Security::getUserName(); // User conected // `scheduler_queue_task_id`, `param_special`, `user`, `status_id` $data = array(); $data["scheduler_queue_task_id"] = $data1['id_task']; $data["param_special"] = $param_special; $data["user"] = $user; $data["status_id"] = 1; DB::Insert("scheduler_queue_task_execution", $data ); $id = DB::LastId(); return(array(1,$id)); }
A modo de ejemplo os facilito los códigos de las dos tareas que he programado en el ejemplo. Están en el directorio «Jobs».
«test1.php»
<?php @ini_set("display_errors","1"); @ini_set("display_startup_errors","1"); require_once(__DIR__."/../../include/dbcommon.php"); custom_error(50,"Job Bacht Test1 in execution"); // To debug // GET Parameter line Command $param = getopt(null, ["id_task:","user:"]); custom_error(52,"Job Param: ".print_r($param,true)); // To debug // time initial $time_initial = date('h:i:s'); // sleep 5 segundos sleep(5); // Time final $time_final = date('h:i:s'); custom_error(51,"Time Initial: ".$time_initial." Time End :".$time_final); // To debug echo "ok"; custom_error(60,"End Job test1.php"); // To debug
«test2.php»
<?php @ini_set("display_errors","1"); @ini_set("display_startup_errors","1"); require_once(__DIR__."/../../include/dbcommon.php"); global $debugCode; $debugCode = true; // Change to "false" to eliminate traces custom_error(50,"Job Bacht Test2 in execution"); // To debug // GET Parameter line Command $param = getopt(null, ["id_task:"]); $id_task = $param['id_task']; custom_error(52,"Job Param: ".print_r($param,true)); // To debug // time initial $time_initial = date('h:i:s'); // sleep 5 segundos sleep(5); // Time final $time_final = date('h:i:s'); custom_error(51,"Time Initial: ".$time_initial." Time End :".$time_final); // To debug echo "ok"; // This is what will go to the field "result" $temp_file = tempnam(sys_get_temp_dir(), 'txt'); $fp = fopen($temp_file, "w"); fwrite($fp, "This is an example to show how to record a file obtained from a report or any other process \n"); fclose($fp); $filesize = filesize($temp_file); // prepare image info to be saved in db $files= array(); $files[] = array("name" => $temp_file, "usrName" => 'example.txt', "size" => $filesize, "type" => "text/plain", "searchStr" => 'example.txt'.":sStrEnd"); custom_error(52,"Preparate file :".print_r($files,true)); // To debug // Update record by Execution with references by Files // $rs = DB::Query("select * from scheduler_queue_task_execution where id_task_execution = $id_task"); // $data = $rs->fetchAssoc(); // custom_error(53,"Dara Record :".print_r($data,true)); // To debug $data = array(); $keyvalues = array(); $data["files"] = my_json_encode($files);; $keyvalues["id_task_execution"] = $id_task; DB::Update("scheduler_queue_task_execution", $data, $keyvalues ); custom_error(60,"End Job test2.php"); // To debug
También existe el directorio «semaphores»
Que contiene unos ficheros para el control de las colas de procesos. Por cada cola que definamos, debe haber su correspondiente fichero en este directorio. El fichero «scheduler.lock» es fijo y es el que se utiliza para asegurarnos que sólo un proceso accede a la lista de tareas pendientes de ejecutarse.
Este no es un sistema sencillo por lo que los usuarios que tengan un desarrollo no complejo, no debería implementarlo, pero aquellos que tengan muchos informes y procesos pesados, seguro que verán en él muchas de las soluciones que buscaban.
Para cualquier duda, por favor, contactar conmigo a través de mi email [email protected]
Os dejo los fuentes para que lo podáis instalar en vuestros PC’s y podáis ver con detalle todo el sistema y , también, lo podáis modificar y mejorar.