PHP/Multithread

Un article de Le wiki de 2 noisettes - noisette.ch.

Sommaire

Introduction

PHP dispose de 2 modules qui permettent de faire de la programmation multithreadée : pcntl et sémaphore

Le module pcntl permet de créer des processus fils (fork), ainsi que d'autres fonctions de gestion de ces processus. Le module sémaphore est quant à lui composé en 3 parties distinctes qui sont la mémoire partagée, les sémaphores à proprement parler et les messages System V. Je vais dans cet article utiliser la mémoire partagée et les sémaphores pour gérer la synchronisation entre processus.

PHP doit être compilé avec les options --enable-pcntl, --enable-sysvsem (pour activer les sémaphores) et --enable-sysvshm (pour la mémoire partagée). A noter que ces options n'existent pas pour la version Windows.

Concepts

Je ne vais pas rappeler ici ce qu'est un sémaphore, un mutex ou un fork, mais bien les concepts à mettre en place pour utiliser tout ça et faire un démon PHP multithreadé. L'exemple sur lequel je me base pour illustré ce concept est un démon qui recherche dans une file d'attente (implémentée par une base de données simple) des entrées à traiter.

Le principe sera le suivant :

  • La queue est-elle vide ?
    • Si oui, on se met en attente un certain laps de temps avant de tester à nouveau la queue.
    • Si non, tant qu'il y a des données à traiter, on test si le nombre maximum de fils est atteint
      • Si non, on lance un nouveau fils qui traitera les informations.
      • Si oui, on attend qu'un fils termine son exécution.

Il y a donc 2 points sur lesquels une bonne synchronisation doit être mise en place :

* Le nombre maximum de processus fils à lancer en parallèle
* Le traitement fait par les processus fils (doit être thread safe...)

Pour mes besoins personnels, j'ai en plus souhaité que chaque fils s'exécute dans un slot particulier (c'est donc une affectation de ressource unique parmi plusieurs).

Attente de ressources

On remarquera aussi que j'ai utilisé un mutex (en fait un sémaphore avec une seule ressource disponible) avec un compteur plutôt qu'un sémaphore avec autant de ressources qu'on souhaite voir de processus en parallèle, et cela pour une raison bien particulière qui est la formation de zombie quand un fils meurt si on ne le termine pas correctement. En d'autres mots, plutôt que de rester simplement bloquer par un sémaphore à l'entrée de la section critique, j'utilise la fonction pcntl_wait(int &status) qui attend la fin d'un processus fils et retourne son status. Cette astuce permet d'éviter la formation de zombie, car dès qu'aucun slot n'est libre, on se met en attente d'une terminaison de processus fils (le fait de lire son status termine sa destruction).

Si $thread compte le nombre de processus fils en cours, _MAX_THREADS est le nombre maximal de threads exécutés simultanément, l'attente qu'une ressource se libère donne simplement quelque chose comme ça (j'ai volontairement omi les écritures en mémoire partagées, j'y reviendrai) :

while ($threads >= _MAX_THREADS) {
    pcntl_wait($status);
    $thread--;
}

Variables partagées

fork étant implementé en utilisant une méthode de copie à l'écriture, les variables globales doivent être donc placées dans la mémoire partagée, et protégée par un mutex les des lectures/écritures.

Dans l'exemple précédent, $thread est donc une variable partagée entre le processus père et ses fils, le code nous donne quelque chose comme ça :

sem_acquire($mutex);
$threads = shm_get_var($shm_id, _VAR_THREAD);
while ($threads >= _MAX_THREADS) {
    sem_release($mutex);
    pcntl_wait($status);
    sem_acquire($mutex);
    $threads = shm_get_var($shm_id, _VAR_THREAD);
}
$threads = shm_get_var($shm_id, _VAR_THREAD) + 1;
shm_put_var($shm_id, _VAR_THREAD, $threads);
sem_release($mutex);

sem_aquire permet d'acquérir une ressource, shm_get_var de lire une variable partagée, shm_put_var d'écrire une variable partagée et sem_release de relâcher une ressource.

Arrêt du démon

  • boucle principale while($run)
  • gestion des signaux

Exemple

<?php
 
 /**
  * Multithreaded PHP skeleton
  *
  * http://www.noisette.ch/wiki/index.php/PHP/Multithread
  *
  * Requirement : 
  *   php must be compiled with pcntl extension, 
  *   doesn't work on Windows plateform (because of pcntl)
  *
  * @author Benoit Perroud <perroud@omne.ch>
  * @copyright 2006 Benoit Perroud
  * @license GLPv2
  * @version 1.0
  * @example usage.php
  * @todo See todo file
  *
  * Version History:
  * 1.0 - 08/11/2006 - bperroud
  * Initial Release
  *
  */
 
 /*
  * Constants declaration
  */
 define(_DEBUG,             TRUE);
 define(_MAX_THREADS,       12);                                     // Number of concurrent childs
 define(_LOCK_PATH,         "locks");
 define(_LOCK_FILE,         _LOCK_PATH . "/" . $argv[0] . ".lock");  // file to ensure that a single instance of this script is running.
 define(_SLEEP_TIME,        10);                                     // Sleeping time when nothing to do (or too much sql errors
 define(_LOG_FILE,          $argv[0] . "mt-log");
 
 
 /*
  * Very simple logger
  */
 function mylog($msg) 
 {
         if ($log == FALSE) {
                 $log = fopen(_LOG_FILE, "a");
         }
         fprintf($log, date("Y-m-d H:i:s") . " " . ereg_replace("%", "%%", $msg) . "\n");
         if (_DEBUG) echo $msg . "\n";
 }
 
 /*
  * Environment initialization
  */
 if (!is_dir(_LOCK_PATH)) mkdir(_LOCK_PATH);
 
 /*
  * Single running instance checking and locking
  */
 if (file_exists(_LOCK_FILE)) {
         $run = false;
         mylog($argv[0] . " est deja lance, ou a mal ete arrete, et il faudrait supprimer " . _LOCK_FILE);
         exit(1);
 }
 touch(_LOCK_FILE);
 
 /*
  * Signal handling stuff (doesn't work yet...)
  */
 function sig_handler($signo) 
 {
         global $run;
         mylog("Signal " . $signo . " re<E7>u");
         switch($signo) {
                 case SIGTERM:
                         $run = FALSE;
                         break;
                 case SIGHUP:
                         break;
                 default:
                         mylog("Can someone explain me why I'm here ??");
         }
 }
 
 pcntl_signal(SIGTERM, "sig_handler");
 pcntl_signal(SIGHUP, "sig_handler");
 
 
 /*
  * Global variables
  */
 $run = TRUE;
 $log = FALSE;
 
 
 /*
  * Threads synchronization and shared memory stuff
  */
 $shm = ftok(__FILE__, 'c');
 $shm_id =  shm_attach($shm);
 $mutex = sem_get($shm);
 
 
 /*
  * Shared variables
  */
 // number of running threads, to avoid zombies
 define("_VAR_THREAD", 1);
 $threads = 0;
 shm_put_var($shm_id, _VAR_THREAD, $threads);
 
 // slots state handling
 define("_VAR_SLOTS", 2);
 $slots = array();
 for ($i = 0; $i < _THREADS; $i++) 
 {
         $slots[$i] = FALSE;
 }
 shm_put_var($shm_id, _VAR_SLOTS, $slots);
 
 /*
  * Critical section entry point
  * Return the slot which can be used
  * If no more slots are free, wait until a chil exit (pcntl_wait)
  * This tricks is used otherwhise the child hold in zombie state...
  */
 function lock() 
 {
         global $mutex, $shm_id, $semaphore, $slots;
 
         /*
          * Wait until a free slot
          */
         sem_acquire($mutex);
         $threads = shm_get_var($shm_id, _VAR_THREAD);
         while ($threads >= _MAX_THREADS) {
                 sem_release($mutex);
                 pcntl_wait($status);
                 sem_acquire($mutex);
                 $threads = shm_get_var($shm_id, _VAR_THREAD);
         }
         $threads = shm_get_var($shm_id, _VAR_THREAD) + 1;
         shm_put_var($shm_id, _VAR_THREAD, $threads);
 
         /*
          * Find which is the free slot
          */
         $slots = shm_get_var($shm_id, _VAR_SLOTS);
         for ($slot = 0; $slot < _MAX_THREADS; $slot++) {
                 if ($slots[$slot] == FALSE) {
                         $slots[$slot] = TRUE;
                         break;
                 }
         }
         shm_put_var($shm_id, _VAR_SLOTS, $slots);
         sem_release($mutex);
         return $slot;
 }
 
 /*
  * End of critical section
  */
 function unlock($slot) 
 {
         global $mutex, $shm_id, $semaphore, $slots;
 
         /*
          * Free slot
          */
         sem_acquire($mutex);
         $slots = shm_get_var($shm_id, _VAR_SLOTS);
         $slots[$slot] = FALSE;
         shm_put_var($shm_id, _VAR_SLOTS, $slots);
 
         /*
          * Release the thread
          */
         $threads = shm_get_var($shm_id, _VAR_THREAD) - 1;
         shm_put_var($shm_id, _VAR_THREAD, $threads);
         sem_release($mutex);
 }
 
 /*
  * Main loop
  */
 while ($run) {
 
   $slot = lock();
 
   $pid = pcntl_fork();
   if ($pid == -1) {
     mylog("Cannot fork. Exiting");
     $run = false;
     continue;
   } else if ($pid) { // father
     continue;
   } else { // child
 
     sleep(rand(1,5)); // do something useful
 
     unlock($slot);
 
     exit(0);
   } // if child
 
 }
 
 /* 
  * Wait until every children are complete
  */
 sem_acquire($mutex);
 $threads = shm_get_var($shm_id, _VAR_THREAD);
 while ($threads > 0) {
   sem_acquire($mutex);
   pcntl_wait($status);
   sem_acquire($mutex);
   $threads = shm_get_var($shm_id, _VAR_THREAD) - 1;
   shm_put_var($shm_id, _VAR_THREAD, $threads);
 }
 sem_release($mutex);
 
 /*
  * Closing logfile
  */
 if ($log != FALSE) fclose($log);
 
 /*
  * Freeing shared memory
  */
 shm_detach($shm_id);
 
 /*
  * Remove single running instance file lock
  */
 unlink(_LOCK_FILE);
 
 ?>

Conclusion