php—Swoole和Redis实现的并发队列处理系统_八重樱。的博客-程序员宅基地

技术标签: swoole  php  Redis  

由于PHP不支持多线程,但是作为一个完善的系统,有很多操作都是需要异步完成的。为了完成这些异步操作,我们做了一个基于Redis队列任务系统。

大家知道,一个消息队列处理系统主要分为两大部分:消费者和生产者。

在我们的系统中,主系统作为生产者,任务系统作为消费者。

 

具体的工作流程如下:

1、主系统将需要需要处理的任务名称+任务参数push到队列中。

2、任务系统实时的对任务队列进行pop,pop出来一个任务就fork一个子进程,由子进程完成具体的任务逻辑。

 

具体代码如下:

/**
 * 启动守护进程
 */
public function runAction() {
    Tools::log_message('ERROR', 'daemon/run' . ' | action: restart', 'daemon-');
    while (true) {
        $this->fork_process();
    }
    exit;
}

/**
 * 创建子进程
 */
private function fork_process() {
    $ppid = getmypid();
    $pid = pcntl_fork();
    if ($pid == 0) {//子进程
        $pid = posix_getpid();
        //echo "* Process {$pid} was created \n\n";
        $this->mq_process();
        exit;
    } else {//主进程
        $pid = pcntl_wait($status, WUNTRACED); //取得子进程结束状态
        if (pcntl_wifexited($status)) {
            //echo "\n\n* Sub process: {$pid} exited with {$status}";
            //Tools::log_message('INFO', 'daemon/run succ' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid );
        } else {
            Tools::log_message('ERROR', 'daemon/run fail' . '|status:' . $status . '|pid:' . $ppid . '|childpid:' . $pid, 'daemon-');
        }
    }
}

/**
 * 业务任务队列处理
 */
private function mq_process() {
    $data_pop = $this->masterRedis->rPop($this->redis_list_key);
    $data = json_decode($data_pop, 1);
    if (!$data) {
        return FALSE;
    }
    $worker = '_task_' . $data['worker'];
    $class_name = isset($data['class']) ? $data['class'] : 'TaskproModel';
    $params = $data['params'];
    $class = new $class_name();
    $class->$worker($params);
    return TRUE;
}

 

这是一个简单的任务处理系统。

通过这个任务系统帮助我们实现了异步,到目前为止已经稳定运行了将近一年。

但很可惜,它是一个单进程的系统。它是一直在不断的fork,如果有任务就处理,没有任务就跳过。

这样很稳定。

但问题有两个:一是不断地fork、pop会浪费服务器资源,二是不支持并发!

第一个问题还好,但第二个问题就很严重。

当主系统 同时 抛过来大量的任务时,任务的处理时间就会无限的拉长。

 

新的设计

为了解决并发的问题,我们计划做一个更加高效强壮的队里处理系统。

因为在PHP7之前不支持多线程,所以我们采用多进程。

从网上找了不少资料,大多所谓的多进程都是N个进程同时在后台运行。

显然这是不合适的。

我的预想是:每pop出一个任务就fork一个任务,任务执行完成后子进程结束。

 

遇到的问题

1、如何控制最大进程数

这个问题很简单,那就是每fork一个子进程就自增一次。而当子进程执行完成就自减一次。

自增没有问题,我们就在主进程中操作就完了。那么该如何自减呢?

可能你会说,当然是在子进程中啊。但这里你需要注意:当fork的时候是从主进程复制了一份资源给子进程,这就意味着你无法在子进程中操作主进程中的计数器!

所以,这里就需要了解一个知识点:信号。

具体的可以自行Google,这里直接看代码。

// install signal handler for dead kids
pcntl_signal(SIGCHLD, array($this, "sig_handler"));

 

这就安装了一个信号处理器。当然还缺少一点。

declare(ticks = 1);

 

declare是一个控制结构语句,具体的用法也请去Google。

这句代码的意思就是每执行一条低级语句就调用一次信号处理器。

这样,每当子进程结束的时候就会调用信号处理器,我们就可以在信号处理器中进行自减。

 

2、如何解决进程残留

在多进程开发中,如果处理不当就会导致进程残留。

为了解决进程残留,必须得将子进程回收。

那么如何对子进程进行回收就是一个技术点了。

在pcntl的demo中,包括很多博文中都是说在主进程中回收子进程。

但我们是基于Redis的brpop的,而brpop是阻塞的。

这就导致一个问题:当执行N个任务之后,任务系统空闲的时候主进程是阻塞的,而在发生阻塞的时候子进程还在执行,所以就无法完成最后几个子进程的进程回收。。。

这里本来一直很纠结,但当我将信号处理器搞定之后就也很简单了。

进程回收也放到信号处理器中去。

 

新系统的评估

pcntl是一个进程处理的扩展,但很可惜它对多进程的支持非常乏力。

所以这里采用Swoole扩展中的Process。

具体代码如下:

declare(ticks = 1);
class JobDaemonController extends Yaf_Controller_Abstract{

    use Trait_Redis;

    private $maxProcesses = 800;
    private $child;
    private $masterRedis;
    private $redis_task_wing = 'task:wing'; //待处理队列

    public function init(){
        // install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection
    }

    private function redis_client(){
        $rds = new Redis();
        $rds->connect('redis.master.host',6379);
        return $rds;
    }

    public function process(swoole_process $worker){// 第一个处理
        $GLOBALS['worker'] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS['worker'];
            $recv = $worker->read();            //send data to master

            sleep(rand(1, 3));
            echo "From Master: $recv\n";
            $worker->exit(0);
        });
        exit;
    }

    public function testAction(){
        for ($i = 0; $i < 10000; $i++){
            $data = [
                'abc' => $i,
                'timestamp' => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }

    public function runAction(){
        while (1){
//            echo "\t now we de have $this->child child processes\n";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待
                if (!$data_pop){
                    continue;
                }
                echo "\t Starting new child | now we de have $this->child child processes\n";
                $this->child++;
                $process = new swoole_process([$this, 'process']);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }

    private function sig_handler($signo) {
//        echo "Recive: $signo \r\n";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
//                    echo "PID={$ret['pid']}\n";
                    $this->child--;
                }
        }
    }
}

最终,经过测试,单核1G的服务器执行1到3秒的任务可以做到800的并发。

phper在进阶的时候总会遇到一些问题和瓶颈,业务代码写多了没有方向感,不知道该从那里入手去提升,对此我整理了一些资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、服务器性能调优、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql优化、shell脚本、Docker、微服务、Nginx等多个知识点高级进阶干货需要的可以免费分享给大家需要的(点击→)我的官方群677079770

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_43814458/article/details/103181286

智能推荐

Element-UI+Vue改变单元格字体样式_qq_37607348的博客-程序员宅基地_element修改字体

&lt;template&gt; &lt;el-table v-loading="table_loading" element-loading-text="加载中..." max-height="800" border fit highlight-current-row :data="list" ref="multipleTable" @selection-change="handleSel.

业力管理 - 当和尚遇到钻石续集_简简单单OnlineZuozuo的博客-程序员宅基地

文章目录业力管理 - 当和尚遇到钻石续集1、业力管理的八个核心法则2、牢狱3、停止做决定4、装好订书针5、乘着问题飞跃巅峰6、业力再投资业力管理 - 当和尚遇到钻石续集1、业力管理的八个核心法则1、停止无用功2、找到原因背后的原因3、认定你的事‘业’伙伴4、从我做起5、停止做决定6、装好订书针7、乘着问题飞跃巅峰8、‘业力’再投资业力不是因为不好的因,带来不好的果,其实业...

CentOS 6 安装chromium_weixin_30545285的博客-程序员宅基地

由于centos 6对C++11支持不足的缘故,目前chromium已经不再支持CentOS 6系列。这里介绍如何在centos 6系列安装chromium。1、添加chromium源cd /etc/yum.repos.dwget http://people.centos.org/hughesjr/chromium/6/chromium-el6.repoyu...

用户、角色、权限管理_congfan7177的博客-程序员宅基地

 Oracle 权限设置  一、权限分类:  系统权限:系统规定用户使用数据库的权限。(系统权限是对用户而言)。  实体权限:某种权限用户对其它用户的表或视图的存取权限。(是针对表或视图而言的)。  二、系统...

android 抽象硬件,Android硬件抽象层(HAL)深入剖析(二)_weixin_39665762的博客-程序员宅基地

上一篇我们分析了androidHAL层的主要的两个结构体hw_module_t(硬件模块)和hw_device_t(硬件设备)的成员,下面我们来具体看看上层app到底是怎么实现操作硬件的?我们知道,一些硬件厂商不愿意将自己的一些核心代码开放出去,所以将这些代码放到HAL层,但是怎么保证它不开放呢?HAL层代码不是也让大家知道下载吗?其实硬件厂商的HAL核心代码是以共享库的形式出现的,每次在需要的...

AlertDialog自定义_xiaoyangxavier的博客-程序员宅基地

收藏(19)Android 提供了 AlertDialog 类可通过其内部类 Builder 轻松创建对话框窗口,但是没法对这个对话框窗口进行定制,为了修改 AlertDialog 窗口显示的外观,解决的办法就是创建一个指定的 AlertDialog 和 AlertDialog.Builder 类。定义外观我们希望将上面默认的对话框外观修改为如下图所示的新对话框风格:该

随便推点

java nio 连接数_突破netty单机最大连接数_长弓手地鼠的博客-程序员宅基地

实现单机的百万连接,瓶颈有以下几点:1、如何模拟百万连接2、突破局部文件句柄的限制3、突破全局文件句柄的限制在linux系统里面,单个进程打开的句柄数是非常有限的,一条TCP连接就对应一个文件句柄,而对于我们应用程序来说,一个服务端默认建立的连接数是有限制的。如下图所示,通常一个客户端去除一些被占用的端口之后,可用的端口大于只有6w个左右,要想模拟百万连接要起比较多的客户端,而且比较麻烦,所以这种...

BOG(Bag-of-Graphs)学习(一)_Ready渣的博客-程序员宅基地

BOG概述BoG是从对象内部的本地关系创建向量表示的过程。它的模型是由一个复合函数定义的,指定的包提取,它结合了图提取,兴趣图检测器(GoI检测器),分配,池,以及顶点,边缘和图描述符的特征提取函数。图形提取功能提取数字对象的内在结构。该结构的描述是一个对数字对象元素(对象组件)之间的关系建模的图。一个对象的所有组成部分的集合称为功率数字对象。然后使用一个GoI检测器函数来检测对象对应图的所有可能子图(幂图)中的感兴趣图,此函数选择表示对象内相关局部结构的子图。属性图对应的是顶点和边由简单和

附录1 比特币:一种点对点的电子现金系统_xiaohuanglv的博客-程序员宅基地

中本聪 著李志阔(网名:面神护法) 赵海涛 焦锋 译[email protected]摘要完全点对点的电子现金系统可以不通过金融机构,由一方直接发送在线支付给另一方。虽然数字签名为此提供了大部分解决方案,但是如果这个系统(电子现金系统)仍然需要可信的第三方来阻止双重花费(Double-Spending),那么它的价值就会大打折扣。本文提出了一种使用点对点网络...

Android布局2(相对布局和网格布局)_阿圆啊哈的博客-程序员宅基地

1. 相对布局(RelativeLayout 重点:)  1.1 相对布局窗口内子组件的位置总是相对兄弟组件、父容器来决定的(就是根据旁边的足迹来设置位置),因此叫相对布局  1.2 如果A组件位置是由B组件的位置决定的,Android要求先定B组件,再定义A组件      如果A组件位置是由B组件的位置决定的,Android要求先定B组件,再定义A组件      如果A组件位置是由B...

expdp时出现错误:ORA-39006: internal error_来了个土豆的博客-程序员宅基地

expdp时出现错误:ORA-39006: internal error分类: ORACLE2011-11-24 11:08 1066人阅读 评论(0) 收藏 举报oracledatabaseprocessingxsl2010up 原文地址:http://blog.csdn.net/yfleng2002/article/details/7007212[ora

如何在pycharm中运行/调试torch分布式训练_sunmenmian的博客-程序员宅基地_pycharm配置torch

转自https://zhuanlan.zhihu.com/p/144815822现在很多深度学习研究开源代码都会使用pytorch框架,原因之一就是在torch中,只要你定义好了一个module,你就可以轻易的用torch.distributed将其应用到单机多GPU或者多机多GPU的场景中,加速模型的收敛速度。但是在所有github项目的readme中,都是仅给出了如何在命令行模式下使用分布式的方法。对于需要在Pycharm或其他IDE进行调试的研究者就不太适用。环境PyTorch 1.6PyC

推荐文章

热门文章

相关标签