Integrating Gearman Into Zend Framework

by Mike Willbanks on October 17th, 2010

When scale and performance is important, you start to look into moving operations into synchronous or asynchronous tasks. This is important for any web application that must scale out and/or to add new ways of handling feedback to your users. While I will not be going into detail of how to operate Gearman or briefing you on ways of utilizing gearman, what I am going to show you is how to incorporate Gearman into being it’s own Zend Framework application.

This article will go into incorporating Gearman workers into Zend Framework (rather a lightweight approach that can and likely should be scaled out) as you work with “Workers”. I have implemented these much to be like “Controllers” in the current MVC model, however, they do lack quite a bit of functionality at this point in the process. Basically, they do not support any plugins or action helpers. However, if you are looking for some form of consistency in your Gearman workers, this is a fairly unique approach of getting them incorporated into your application.

Overview

When I started looking into how I could leverage gearman into a new suite of applications, I knew that I was going to need several workers, however, many of these workers would not require the same amount of configuration nor resources. I also knew that I wanted them to be fairly consistent and be able to take new functionality that might be added in the future. I really think of this as something like helpers that mimic the current Zend_Controller_Action_Helper area. However, I knew I would not get to this at the start. I also knew that there was quite a bit of functionality that I wanted to encapsulate such as allowing each individual worker to be able to bootstrap what it needed and do some form of initialization separate from anything else. Lastly, I also knew that I wanted it to be able to kill itself if it started to leak too much memory, be able to handle timeouts in the event that I had a database connection or any other connection that might need to be pinged, encapsulate error handling and last but not least take most of the methods of putting together the registration for the function into a simple constructor.

There is a custom dispatcher that has to be in place since this runs off of the CLI, it takes an argument of the worker name and the application environment. Since these can be exited out, you should utilize something to the degree of “Supervisor” in order to handle keeping the processes alive.

Remember that this is preliminary functionality that I plan to extend in the future but wanted to share the work that has been done. And since I have not been updating my blog often enough this may lack a bit of detail.

The Code

Gearman Worker

To be placed in library/ZF/Gearman/Worker.php

< ?php
 
/**
 * Gearman Worker
 *
 * @package    Gearman
 * @subpackage Worker
 * @version    $Id$
 */
class ZF_Gearman_Worker
{
 
    /** 
     * Register Function
     * @var string
     */
    protected $_registerFunction;
 
    /** 
     * Gearman Timeout
     * @var int
     */
    protected $_timeout = 60000;
 
    /** 
     * Alloted Memory Limit in MB
     * @var int
     */
    protected $_memory = 1024;
 
    /** 
     * Error Message
     * @var string
     */
    protected $_error = null;
 
    /** 
     * Gearman Worker
     * @var GearmanWorker
     */
    protected $_worker;
 
    /**
     * Bootstrap
     * @var Zend_Application_Bootstrap_BootstrapAbstract
     */
    protected $_bootstrap;
 
    /**
     * Constructor
     * Checks for the required gearman extension,
     * fetches the bootstrap and loads in the gearman worker
     *
     * @return Gearman_Worker
     */
    public function __construct(Zend_Application_Bootstrap_BootstrapAbstract $bootstrap)
    {
        if (!extension_loaded('gearman')) {
            throw new RuntimeException('The PECL::gearman extension is required.');
        }
        $this->_bootstrap = $bootstrap;
        $this->_worker = $this->_bootstrap->bootstrap('gearmanworker')
                              ->getResource('gearmanworker');
        if (empty($this->_registerFunction)) {
            throw new InvalidArgumentException(get_class($this) . ' must implement a registerFunction');
        }
        // allow for a small memory gap:
        $memoryLimit = ($this->_memory + 128) * 1024 * 1024;
        ini_set('memory_limit', $memoryLimit);
        $this->_worker->addFunction($this->_registerFunction, array(&$this, 'work'));
        $this->_worker->setTimeout($this->_timeout);
        $this->init();
 
        while ($this->_worker->work() || $this->_worker->returnCode() == GEARMAN_TIMEOUT) {
            if ($this->_worker->returnCode() == GEARMAN_TIMEOUT) {
                $this->timeout();
                continue;
            }
            if ($this->_worker->returnCode() != GEARMAN_SUCCESS) {
                $this->setError($this->_worker->returnCode() . ': ' . $this->_worker->getErrno() . ': ' . $this->_worker->error());
                break;
            }
        }
 
        $this->shutdown();
    }
 
    /**
     * Initialization
     *
     * @return void
     */
    protected function init()
    {
 
    }
 
    /**
     * Handle Timeout
     *
     * @return void
     */
    protected function timeout()
    {
    }
 
    /**
     * Handle Shutdown
     *
     * @return void
     */
    protected function shutdown()
    {
 
    }
 
    /**
     * Set Error Message
     *
     * @param string $error
     * @return void
     */
    public function setError($error)
    {
        $this->_error = $error;
    }
 
    /**
     * Get Error Message
     *
     * @return string|null
     */
    public function getError()
    {
        return $this->_error;
    }
 
    /**
     * Set Job Workload
     *
     * @param mixed
     * @return void
     */
    public function setWorkload($workload)
    {
        $this->_workload = $workload;
    }
 
    /**
     * Get Job Workload
     *
     * @return mixed
     */
    public function getWorkload()
    {
        return $this->_workload;
    }
 
    /**
     * Work, work, work
     *
     * @return void
     */
    public final function work($job)
    {
        $this->setWorkload($job->workload());
        $ret = $this->_work();
        $mem = memory_get_usage();
        if ($mem > ($this->_memory * 1024 * 1024)) {
            exit(1); // over memory limit;
        }
    }
}

Dispatch

To be placed in dispatch.php

< ?php
/**
 * Dispatch a Gearman Worker
 *
 * Handles the bootstrapping process and dispatches
 * a worker to handle the task at hand.
 *
 * @package    Gearman
 * @subpackage Application
 * @version    $Id$
 */
set_time_limit(0);
 
 
if (!isset($argv[1])) {
    throw new InvalidArgumentException('The worker name must be passed in as a parameter');
}
 
if (isset($argv[2])) {
    define('APPLICATION_ENV', $argv[2]);
} else {
    define('APPLICATION_ENV', (getenv('APPLICATION_ENV') ? getenv('APPLICATION_ENV') : 'production'));
}
 
// Define path to application directory
defined('APPLICATION_PATH')
    || define('APPLICATION_PATH', realpath(dirname(__FILE__) . '/application'));
 
// Ensure library/ is on include_path
set_include_path(implode(PATH_SEPARATOR, array(
    realpath(APPLICATION_PATH . '/../library'),
)));
 
require_once 'Zend/Application.php';
$app = new Zend_Application(
    APPLICATION_ENV,
    APPLICATION_PATH . '/configs/application.ini'
);
$app->run();

Bootstrap

To be placed in application/Bootstrap.php

< ?php
 
class Bootstrap extends Zend_Application_Bootstrap_Bootstrap
{
 
    protected function _initGearmanWorker()
    {
        $options = $this->getOptions();
        $gearmanworker = new GearmanWorker();
        if (isset($options['gearmanworker']) && isset($options['gearmanworker']['servers'])) {
            $gearmanworker->addServers($options['gearmanworker']['servers']);
        } else {
            $gearmanworker->addServer();
        }
        return $gearmanworker;
    }
 
    public function run()
    {
        global $argv;
        if (!isset($argv[1])) {
            throw new InvalidArgumentException('A Worker Name Must Be Passed In');
        }
        $worker = ucwords(basename($argv[1]));
        $workerName = $worker . 'Worker';
        $workerFile = APPLICATION_PATH . '/workers/' . $workerName . '.php';
 
        if (!file_exists($workerFile)) {
            throw new InvalidArgumentException('The worker file does not exist: ' . $workerFile);
        }
        require $workerFile;
        if (!class_exists($workerName)) {
            throw new InvalidArgumentException('The worker class: ' . $workerName . ' does not exist in file: ' . $workerFile);
        }
        $worker = new $workerName($this);
    }
}

Configuration

To be placed in application/configs/application.ini – this is still a normal configuration file, the only difference is, is that you need a gearman server!

 
[base]
phpSettings.display_startup_errors = 1
phpSettings.display_errors = 1
phpSettings.log_errors                 = 1
phpSettings.error_log                  = APPLICATION_PATH "/../logs/php.log"
includePaths.library = APPLICATION_PATH "/../library"
bootstrap.path = APPLICATION_PATH "/Bootstrap.php"
bootstrap.class = "Bootstrap"
appnamespace = "Application"
autoloaderNamespaces.zf  = ZF
 
[production : base]
gearmanclient.servers     = "localhost"

Creating your First Worker

Now that we have the basics setup, we can now create our first worker. I will go through this step by step so you understand the options that are available for your worker and the initialization towards the end of the worker. This first worker we will be creating will utilize all of the features available.

Before you begin, you will need to create the directory: “application/workers”. This directory will house all of your workers that you will be creating.

Demo Worker

To be placed in application/workers/DemoWorker.php

class DemoWorker extends ZF_Gearman_Worker
{
    protected $_registerFunction = 'demowork';
    protected $_timeout = 10000; // 10 seconds
    protected $_memory = 10; // 10 MB
 
    protected function init()
    {
        // if you were to bootstrap something...
        // you would use:
        // $this->_bootstrap->bootstrap('name');
 
        // if you are going to utilize a class for each
        // worker, you would initialize it here as well.
    }
 
    protected function timeout()
    {
        // this function will be called whenever the
        // timeout limit has been reached.  this means
        // that there has been no jobs worked on for this
        // particular amount of time.  this is extremely
        // useful for database connections that do not
        // automatically restart.
    }
 
    protected function shutdown()
    {
        // if you have any open connections
        // these should be closed down here.
        // otherwise you can also destruct any
        // other objects you might have.
    }
 
    protected function _work()
    {
        $workload = $this->getWorkload();
        echo 'The workload is: ' . $workload;
    }
}

Running Your New Worker

To run your new worker, assuming you have gearman running, all you need to do is go to the application directory and execute:

php dispatch.php DemoWorker production

Assuming all went well, your worker will now be running. Now all you need to do is add in a job for it to be worked on. A quick and simple way to do this would be to execute a command such as:

gearman -f demowork "demo workload"

The output of the worker will be:

The workload is: demo workload

Conclusion

I hope that this simple entry was helpful to some one out there. In short, this is a simple way to encapsulate your workers. If you have any feedback let me know!

From PHP

7 Comments
  1. Nice work.

    We’ve been doing a lot of Gearman and Zend Framework as well. Took a different approach, but I also like your’s! Good work!

    I’d probably refactor the code so less is done in __construct() and try to invoke things such as shutdown() in a shutdown function or from a __destruct()? But regardless, I like that timeout() stuff and so on – looks very useful.

  2. @till -
    I have thought about moving quite a bit of the work outside of the constructor actually, mostly just so that it would be a little more extensible and could be extended easier. I agree about the destructor but my fear is that at the end of the request if it is being called the bootstrap might no longer be available and if there was a case to be able to restart the worker within the shutdown method you could encapsulate more of that work. Anyhow, it has been working very nicely and I plan to make it a bit more extensible in the future as I find better working models for it.

    Once nice thing about the timeout() function is that it has certainly helped us since PHP doesn’t exactly have connection pooling for database connections. It was a rather ugly hack but it made things really nice.

  3. Maksim permalink

    Hello. Thank you. But I don’t get where _initGearmanWorker method should be called?

  4. Maksim,

    This is bootstrap functionality that is built-in for zend framework when running zend application. If you look at the custom worker class, it automatically handles bootstrapping the _initGearmanWorker.

  5. solidusse permalink

    hey thanks for your wonderful example
    i have a question here:
    i would like to make my workers shut down as soon as the work is done
    i would like to my workers be a one time use
    thanks

  6. Matteo monti permalink

    Hello, thanks for sharing!,

    I guess there is an error in application ini. the servers key should be

    gearmanworker.server

    instead of gearmanclient.servers

    Am i right?

Trackbacks & Pingbacks

  1. Emagister Engineering » Tecnolog√≠a al servicio de la Formaci√≥n » Primeros pasos con Gearman (II)

Leave a Reply

Note: XHTML is allowed. Your email address will never be published.

Subscribe to this comment feed via RSS