Hướng dẫn dùng Phalcon với PHP-Resque

by Thien

on Nov 30/16 at 04:14

Hôm nay trong bài viết này tôi sẽ hướng dẫn bạn sử dụng một thư viện php-resque tương tác queue, như các bạn biết có khá nhiều dịch vụ như RabbitMQ, Beanstalk, etc, tại sao tôi lại dùng nó thứ nhất vì thư viện này viết bằng PHP, sau đó nó lại kết hợp với Redis để lưu trữ các thông tin của jobs queue đó, và tất nhiên nó khá là dễ sử dụng

Trước hết bạn cần hiểu khái niệm Message Queue là gì: Theo như sự hiểu biết của tôi thì Message Queue là một mô hình giao tiếp truyền tin bất động bộ. Có nghĩa trao đổi giữa người gửi và người nhận không cần xảy ra đồng thời, tại cùng 1 thời điểm. Người gửi có thể đẩy tin cần gửi vào hàng đợi (queue), và sau đó một số tiến trình độc lập (worker) sẽ đẩy tin từ hàng đợi đến người nhận.

Lấy một ví dụ cụ thể như thế này, chẵn hạn bạn có chức năng khôi phục mật khẩu khi người dùng nhập email vào bạn sẽ đặt một job vào queue đó, job này có nhiệm vụ gửi mật khẩu mới cho bạn có thể chạy sau một phút hoặc một giờ tùy bạn định nghĩa thời gian:

$object = User::findFirstByEmail('hello@phanbook.com')// giả sử đây là function lấy thông tin 1 user với email

//Do something
if (!$object) {
    $this->displayModelErrors($object);
} else {
    $params = [
        'name'  => $object->getName(),
        'email' => $object->getEmail(),
        'subject' => 'Phanbook - You have requested to change your password.',
        'body' => 'To reset your password, please click below.',
        'link'      => ($this->request->isSecure()
                ? 'https://' : 'http://') . $this->request->getHttpHost()
            . '/resetpassword/' . $passwordForgotHash
    ];
    $jobId = $this->queue->enqueue('lostpassword', "Lackky\\Queue\\LostPassword", $params, true);
    $this->flashSession->success(
        t('An email was sent to your address in order to continue with the reset password process.')
    );

    return $this->response->redirect();
}

Như trên đoạn code trên nếu tìm thấy user thì tôi sẽ đặt nó trong một Job với đoạn code

$jobId = $this->queue->enqueue('lostpassword', "Phanbook\\Queue\\LostPassword", $params, true);

Cài đặt PHP-resque

Như mọi khi bạn nên cài đặt thư viện PHP thông qua composer nếu bạn chưa biết nó là gì thì hãy search từ khóa đó với google để hiểu nó

Mở file composer.jon chèn nội dung giống như thế này

"require": {
        "chrisboulton/php-resque" : "dev-master"
}

Sau đó bạn chạy lện bên dưới để cài đặt nó

composer install

Tạo một file cấu hình

Tùy vào mỗi một framework PHP cụ thể mà bạn đặt cấu hình của PHP-Resque, để cho đơn giản tôi tạo một file settings.php với nội dung sau

require 'vendor/autoload.php';

$settings = [
    'REDIS_BACKEND'     => '127.0.0.1:6379',    // Set Redis Backend Info
    'REDIS_BACKEND_DB'  => '0',                 // Use Redis DB 0
    'COUNT'             => '1',                 // Run 1 worker
    'INTERVAL'          => '5',                 // Run every 5 seconds
    'QUEUE'             => '*',                 // Look in all queues
    'PREFIX'            => '__luckky',              // Prefix queues with __luckky
    'VVERBOSE'          => 1
];

foreach ($settings as $key => $value) {
    putenv(sprintf('%s=%s', $key, $value));
}

Còn trong dự án thực tế của tôi sẽ giống như thế này, chú ý đây là code khi tôi dùng Framework Phalcon PHP


namespace Lackky\Queue;

use Phalcon\Mvc\User\Component;

class Resque extends Component
{
    public function __construct()
    {

        $resque = (array) $this->config->resque;
        foreach ($resque as $key => $value) {
            putenv(sprintf('%s=%s', $key, $value));
        }
        //Set prefix
        \Resque_Redis::prefix(getenv('PREFIX'));
        \Resque::setBackend(getenv('REDIS_BACKEND'));
    }

    /**
     * Create a new job and save it to the specified queue.
     *
     * @param string $queue The name of the queue to place the job in.
     * @param string $class The name of the class that contains the code to execute the job.
     * @param array $args Any optional arguments that should be passed when the job is executed.
     * @param boolean $trackStatus Set to true to be able to monitor the status of a job.
     *
     * @return string|boolean Job ID when the job was created, false if creation was cancelled due to beforeEnqueue
     */
    public function enqueue($queue, $class, $args = null, $trackStatus = false)
    {
        return \Resque::enqueue($queue, $class, $args, $trackStatus);
    }
    public function blpop(array $queues, $timeout)
    {
        return \Resque::blpop($queues,$timeout);
    }
}

Và file config.php sẽ có dạng thế này:


'resque' => [
            'REDIS_BACKEND'     => 'localhost:6379',    // Set Redis Backend Info
            'REDIS_BACKEND_DB'  => '0',                 // Use Redis DB 0
            'COUNT'             => '1',                 // Run 1 worker
            'INTERVAL'          => '5',                 // Run every 5 seconds
            'QUEUE'             => '*',                 // Look in all queues
            'PREFIX'            => '__luckky',              // Prefix queues with __luckky
            'VVERBOSE'          => '1',
            'APP_INCLUDE'       => 'bootstrap.php'
        ],

Trong đó tham số

  • QUEUE = * có nghĩa là chạy toàn bộ jobs có hiện tại nếu bạn muốn chỉ chạy mỗi job khôi phục mật khẩu thì lúc này tham số QUEUE = 'lostpassword'

  • INTERVAL : có nghĩa là thời gian thực thi các jobs mặc định là 5 giây, dưới đây là một ví dụ khi tôi capture nó
# tail -f /tmp/resque/admin/resque
[info] [03:29:03 2016-11-30] Checking signupinitial for jobs
[info] [03:29:03 2016-11-30] Sleeping for 5
[info] [03:29:08 2016-11-30] Checking influxdb for jobs
[info] [03:29:08 2016-11-30] Checking lostpassword for jobs
[info] [03:29:08 2016-11-30] Checking signupinitial for jobs
[info] [03:29:08 2016-11-30] Sleeping for 5
[info] [03:29:13 2016-11-30] Checking influxdb for jobs
[info] [03:29:13 2016-11-30] Checking lostpassword for jobs
[info] [03:29:13 2016-11-30] Checking signupinitial for jobs
[info] [03:29:13 2016-11-30] Sleeping for 5
[info] [03:29:18 2016-11-30] Checking influxdb for jobs
[info] [03:29:18 2016-11-30] Checking lostpassword for jobs
[info] [03:29:18 2016-11-30] Checking signupinitial for jobs
[info] [03:29:18 2016-11-30] Sleeping for 5
  • PREFIX: thêm vào khi bạn có nhiều jobs tránh conflict nó cũng như giúp ta định danh jobs nó dễ hơn

  • APP_INCLUDE: là file dùng để load các thư viện, trong dự án của tôi thì có dạng như thế này
#!/usr/bin/env php

use Lackky\Common\Console;
use Lackky\Tools\Cli\PhpError;

// Register The Auto Loader
require __DIR__ . '/bootstrap/autoloader.php';

date_default_timezone_set('UTC');

// Capture runtime errors
register_shutdown_function([PhpError::class, 'runtimeShutdown']);

try {
    $app = new Console();
    // Record any php warnings/errors
    set_error_handler([PhpError::class, 'errorHandler']);

    // Check if only run single instance
    if ($key = array_search('--single', $argv)) {
        $app->setSingleInstance(true);
        // Ensure pid removes even on fatal error
        register_shutdown_function([$app, 'removeProcessInstance']);
    }

    // Check if logging to database
    if ($key = array_search('--record', $argv)) {
        $app->setRecording(true);
    }

    // Check if debug mode
    if ($key = array_search('--debug', $argv)) {
        $app->setDebug(true);
        // @TODO: later
        // Ensure debug display even on fatal error
        //register_shutdown_function([new Events\Cli\Debug(FALSE), 'display'], $app);
    }

    $app->setArgs($argv, $argc);

    // Boom, Run
    $app->run();
} catch (Exception $e) {
    fwrite(STDERR, $e->getMessage() . PHP_EOL);
    exit(1);
}
  • VVERBOSE: nếu bạn muốn bật chế độ debug thì gán cho 1 mặc định là 0

các tham số còn lại bạn có thể tham khảo trên trang chủ của nó https://github.com/chrisboulton/php-resque

Tạo một job

Sau khi bạn đã cấu hình xong thì bạn cần tạo job cho nó, ví dụ job lostpassword

namespace Lackky\Queue;

use Phalcon\Mvc\User\Component;
use Lackky\Models\Entities;

/**
 * Class LostPassword
 * @package Lackky\Queue
 * {code}
 * Removes job class 'LostPassword' of queue 'default'
 * \Resque::dequeue('default', ['LostPassword']);
 * {/code}
 */
class LostPassword extends Resque
{
    /**
     * Set up environment for this job
     *
     */
    public function setUp()
    {
    }

    /**
     * Run a job
     *
     */
    public function perform()
    {
        //@TODO remove when finish test
        $params = $this->args;
        $templateId = $this->config->mail->templateResetPassword ?: 'forgotpassword';
        if (!$this->mail->sendSingle($templateId,  $params)) {
            echo 'Error sending email';
        }
    }

    /**
     * Remove environment for this job
     *
     */
    public function tearDown()
    {
    }
}

Khi dùng Resques chúng ta có ba method đó là:

  • setUp: Phương thức này chạy trước khi bạn thực hiện job thường dùng đê kết nối redis

  • perform: Công việc cần làm như trong ví dụ trên tôi gửi email

  • tearDown: Làm những công việc sau khi xong job như xóa biến tạm , đối tượng, etc

Nó thực sự rất giống trong các hàm unit test nếu như bạn nào thường viết test case cho các ứng dụng PHP

Như ví dụ ở trên để đặt job này vào queue bạn chỉ cần dùng đoạn code như sau:

$this->queue->enqueue('lostpassword', "Lackky\\Queue\\LostPassword", $params, true);

sau đó bạn cần chạy worker để thực thi các jobs này với lệnh sau:

php vendor/bin/resque >> /tmp/resque/admin/resque 2>&1 &'

Kiểm tra

Để kiểm tra nó có làm việc hay không bạn chỉ cần nhập email vào sau đó bấm reset password, đây sẽ là kết quả bạn mong đợi khi chạy log

[info] [03:45:28 2016-11-30] Checking influxdb for jobs
[info] [03:45:28 2016-11-30] Checking lostpassword for jobs
[info] [03:45:28 2016-11-30] Checking signupinitial for jobs
[info] [03:45:28 2016-11-30] Sleeping for 5
[info] [03:45:33 2016-11-30] Checking influxdb for jobs
[info] [03:45:33 2016-11-30] Checking lostpassword for jobs
[info] [03:45:33 2016-11-30] Found job on lostpassword
[notice] [03:45:33 2016-11-30] Starting work on (Job{lostpassword} | ID: 4bffd39dbcd729fa60fcc22bf7ea7b24 | Lackky\Queue\LostPassword | [{"name":"user24","email":"hello@phanbook.com","subject":"Lackky - You have requested to change your password.","body":"To reset your password, please click below.","link":"http:\/\/lackky.phanbook.com\/resetpassword\/14d6ffcfa876b8312273a9e5354c906001266803"}])
[info] [03:45:33 2016-11-30] Forked 5681 at 2016-11-30 03:45:33
[info] [03:45:33 2016-11-30] Processing lostpassword since 2016-11-30 03:45:33
[notice] [03:45:35 2016-11-30] (Job{lostpassword} | ID: 4bffd39dbcd729fa60fcc22bf7ea7b24 | Lackky\Queue\LostPassword | [{"name":"user24","email":"hello@phanbook.com","subject":"Lackky - You have requested to change your password.","body":"To reset your password, please click below.","link":"http:\/\/lackky.phanbook.com\/resetpassword\/14d6ffcfa876b8312273a9e5354c906001266803"}]) has finished

còn trong redis-cli console sẽ có dạng như bên dưới

# redis-cli
127.0.0.1:6379> keys *
 1) "__luckky:workers"
 2) "__luckky:stat:processed"
 3) "__luckky:stat:failed"
 4) "__luckky:stat:failed:dev-startup:21845:*"
 5) "__luckky:stat:processed:dev-startup:21845:*"
 6) "__luckky:stat:failed:dev-startup:16691:*"
 7) "__luckky:worker:dev-startup:16691:*:started"
 8) "__luckky:failed"
 9) "__luckky:queues"
10) "_PHCM_MM"
11) "__luckky:stat:processed:dev-startup:16691:*"
127.0.0.1:6379>

Chúng ta có thể tóm gọn quá trình làm việc queue như sau:

put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

Kết luận

Trong bài viết này tôi đã hướng dẫn các bạn dùng PHP-Resque với Phalcon PHP, nếu bạn có dùng nó cho các FW khác thì để bình luận dưới đây, như mọi khi nếu thấy bài viết hay thì share!!!