Laravel + Swoole 协程实战:一个多租户采集系统的真实落地方案

作为 PHP 开发者,我们迟早都会遇到一个瞬间:

代码没有报错,逻辑也没问题,但系统就是——慢。

不是慢一点,是那种
「明明只是检查 1000 个账号状态,却要跑半个小时」
「服务器 CPU 不高,但请求就是排着队等」
的慢。

最近搞项目刚好遇到了一个业务场景,多租户系统中,根据账号采集数据,采集前先要确认账号是否有效(就像根据IP代理爬数据一样,爬数据前先确认代理是否有效),于是我按照以往的经验,很自然的写了这样一条链路:

  1. 循环取出一个账号

  2. 检测状态

  3. 拉数据

  4. 账号异常,再换下一个账号,重复执行1、2、3

逻辑看似没问题,代码也很优雅。直到我测试的时候,跑了一次完整的任务。1000个账号,跑了30分钟还没跑完。我查看了CPU,内存也不高,但就是很慢。为什么呢?

问题不在算力,而在于PHP在大量IO等待中,被迫排队!

这个时候就是协程(Coroutine)大显身手的时候了。下文将结合业务和实战(编码)深入理解协程,并展示如何在Laravel中构建基于协程的高性能多租户采集系统。

一、协程基础:重新认识PHP并发编程

在介绍协程前,咱们先得明白一件事儿:

协程不会使CPU算的更快!

它解决的是“等待问题”,而不是“计算问题”。

只要系统大量在做以下几件事:

  • 调第三方API

  • 查数据库

  • 读写文件

  • 等网络相应

协程就有发挥空间。废话不多说

1.1 什么是协程?

协程是用户态的轻量级线程,这句话可能有点抽象,让我们通过一个简单的比喻来理解:

想象一个餐厅厨房:

  • 传统多进程/多线程:雇佣多个厨师(进程/线程),每个厨师有自己的工作台和工具,切换成本高(内存开销大)

  • 协程:一个超级厨师(单线程),但可以同时照看多个锅(协程),当一个锅在炖煮时(等待IO),就切换到另一个锅翻炒

技术定义:协程是一种比线程更轻量级的存在,由程序自身控制调度,在用户态进行上下文切换,避免了内核态切换的开销。

协程不是让 PHP 变快,而是让 PHP 不再傻等 IO

1.2 为什么选择协程?三大核心优势

1. 极低的资源消耗

// 对比:创建1000个并发任务
$threads = [];
for ($i = 0; $i < 1000; $i++) {
    $threads[] = new Thread(fn() => processTask($i)); // 每个线程约2MB内存
}
// 总内存:1000 * 2MB = 2GB

// 协程方式
Coroutine\run(function () {
    for ($i = 0; $i < 1000; $i++) {
        Coroutine::create(fn() => processTask($i)); // 每个协程约2KB内存
    }
});
// 总内存:1000 * 2KB = 2MB

2. 高效的IO并发

传统同步代码在等待数据库查询、API响应时会阻塞整个进程。协程可以在等待时让出控制权,处理其他任务。

// 传统同步方式(串行执行,总耗时 = 各任务耗时之和)
function fetchUsers() {
    $result1 = $http->get('/api/users/1'); // 等待100ms
    $result2 = $http->get('/api/users/2'); // 等待100ms
    // 总耗时约200ms
}

// 协程方式(并发执行)
Coroutine\run(function () {
    $chan = new Channel(2);
    
    Coroutine::create(function () use ($chan) {
        $result1 = $http->get('/api/users/1');
        $chan->push($result1);
    });
    
    Coroutine::create(function () use ($chan) {
        $result2 = $http->get('/api/users/2');
        $chan->push($result2);
    });
    
    // 总耗时约100ms(同时发起请求)
});

3. 同步的编程体验,异步的执行效率

协程最大的魅力在于:用同步的代码风格,获得异步的执行性能。不需要复杂的回调地狱(Callback Hell),代码可读性大大提升。

1.3 协程 vs 其他并发方案对比

特性多进程多线程异步回调协程
并发能力极高
内存消耗极低
开发复杂度
调试难度
适用场景CPU密集型CPU密集型IO密集型IO密集型

二、Laravel中的协程集成:Swoole实战

2.1 环境配置:让Laravel飞起来

# 1. 安装Swoole扩展
pecl install swoole

# 2. 验证安装
php --ri swoole

# 3. 配置php.ini
extension=swoole.so

# 4. 安装Laravel Octane(可选,但推荐)
composer require laravel/octane
php artisan octane:install

2.2 协程核心概念:Channel与WaitGroup

在理解我们的采集系统之前,先掌握两个关键概念:

Channel(通道):协程间的通信管道

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;

// 创建容量为10的通道
$chan = new Channel(10);

// 生产者协程
Coroutine::create(function () use ($chan) {
    for ($i = 0; $i < 100; $i++) {
        $chan->push($i); // 生产数据
        echo "生产: $i\n";
        Coroutine::sleep(0.1); // 模拟耗时
    }
    $chan->close(); // 生产完毕,关闭通道
});

// 消费者协程
Coroutine::create(function () use ($chan) {
    while ($data = $chan->pop()) { // 消费数据,通道空时自动挂起
        echo "消费: $data\n";
    }
    echo "所有数据处理完毕\n";
});

WaitGroup(等待组):协程同步工具

use Swoole\Coroutine\WaitGroup;

$wg = new WaitGroup();
$results = [];

for ($i = 0; $i < 10; $i++) {
    $wg->add(); // 增加计数
    Coroutine::create(function () use ($i, $wg, &$results) {
        try {
            // 模拟任务处理
            Coroutine::sleep(0.1);
            $results[$i] = "任务{$i}完成";
        } finally {
            $wg->done(); // 任务完成,减少计数
        }
    });
}

$wg->wait(); // 等待所有任务完成
print_r($results); // 所有任务完成后执行

三、实战:多租户采集系统设计

3.1 业务场景解析

回到文章开头提到的业务场景:

  1. 多租户系统:每个租户独立,有不同的采集配置和限制

  2. 账号分组:每个租户下有多个账号分组,每组包含若干账号

  3. 状态检测:采集前需要验证账号有效性

  4. 并发控制:需要限制单个租户的并发数,避免API被限流

  5. 任务管理:需要跟踪采集任务状态、进度和结果

3.2 架构设计:协程驱动的四层架构

┌─────────────────────────────────────────────┐
│                 Web层(控制器)              │
│  - 接收采集请求                             │
│  - 参数验证                                 │
│  - 返回任务ID                               │
└─────────────────┬───────────────────────────┘
                  │
┌─────────────────▼───────────────────────────┐
│             任务调度层(Command)            │
│  - 定时扫描待处理任务                        │
│  - 动态创建协程池                           │
│  - 预热账号数据                             │
└─────────────────┬───────────────────────────┘
                  │
┌─────────────────▼───────────────────────────┐
│             业务逻辑层(Service)            │
│  - 账号状态检测                             │
│  - 数据采集逻辑                             │
│  - 数据存储                                 │
└─────────────────┬───────────────────────────┘
                  │
┌─────────────────▼───────────────────────────┐
│             基础设施层                       │
│  - 协程池(CoroutinePool)                  │
│  - 协程工具(CoroutineUtils)               │
│  - 账号缓存管理                             │
└─────────────────────────────────────────────┘

3.3 关键代码解读:从理论到实现

3.3.1 协程池(CoroutinePool) - 资源控制器

协程池的核心作用是限制并发数,避免系统过载。以下是简化版的实现:

namespace App\Concurrency;

use Swoole\Coroutine;
use Swoole\Coroutine\Channel;

class CoroutinePool
{
    private $size;      // 池子大小(最大并发数)
    private $channel;   // 控制并发的通道
    private $count = 0; // 当前活跃协程数
    
    public function __construct(int $size)
    {
        $this->size = $size;
        // 创建容量为$size的通道,实现并发控制
        $this->channel = new Channel($size);
    }
    
    public function add(callable $task): void
    {
        // 关键:当并发数达到上限时,这里会阻塞,直到有协程完成
        // 这里的 channel 不是用来传数据的,而是用来“占坑位”的
        // 每 push 一次,代表占用一个并发名额
        // 当 channel 满了,push 会阻塞,新的任务进不来
        $this->channel->push(true);
        
        Coroutine::create(function () use ($task) {
            try {
                $task(); // 执行用户任务
            } finally {
                // 任务完成,释放一个位置
                $this->channel->pop();
                $this->count--;
            }
        });
        
        $this->count++;
    }
    
    public function close(): void
    {
        // 等待所有任务完成
        while ($this->count > 0) {
            Coroutine::sleep(0.1);
        }
    }
}

设计要点

  1. 通道作为信号量$channel->push()$channel->pop()实现了经典的信号量模式

  2. 优雅的资源释放:使用try...finally确保任务异常时也能释放资源

  3. 阻塞式等待:当并发数满时,新任务会阻塞而非拒绝,更适合队列场景

3.3.2 任务运行器(FetchBloggerTaskRunner) - 调度器

public function handle()
{
    // 1. 设置定时器,定期检查数据库中的新任务
    Timer::tick(self::CHECK_INTERVAL * 1000, function () {
        Coroutine::create(function () {
            $tasks = $this->getUnHandleTasks();
            if ($tasks->isEmpty()) return;
            
            // 2. 创建协程池,限制最大并发
            $pool = new CoroutinePool(20);
            $pool->run();
            
            // 3. 异步预热账号(不阻塞主流程)
            $accountGroupIds = $tasks->pluck('account_group_id')->unique()->toArray();
            $manager = new AccountWarmupManager('', '', 10);
            $manager->asyncWarmupByGroupIds($accountGroupIds);
            
            // 4. 标记任务为处理中
            YkCollectBloggerTaskModel::whereIn('id', $tasks->pluck('id'))
                ->update(['status' => 'handing']);
            
            // 5. 为每个任务创建采集协程
            foreach ($tasks as $task) {
                // 分布式锁:避免重复处理
                $lock = Cache::lock("lock:fetch_blogger_data_task:{$task->id}", 300);
                if (!$lock->get()) continue;
                
                $pool->add(function () use ($task, $lock) {
                    try {
                        $service = new FetchBloggerTaskService($task);
                        $service->handle();
                    } finally {
                        $lock->release(); // 释放锁
                    }
                });
            }
            
            $pool->close(); // 等待所有任务完成
        });
    });
    
    Event::wait(); // 关键:阻塞主进程,让协程有机会执行
}

代码分析

  1. Timer::tick定时器:替代传统的while+sleep循环,更高效

  2. 协程内查询数据库:避免阻塞事件循环

  3. 异步预热:账号预热不阻塞任务分发

  4. 分布式锁:在多进程部署时避免任务重复执行

  5. Event::wait():维持进程运行,让定时器持续工作

3.3.3 账号预热管理器(AccountWarmupManager) - 缓存优化

public function asyncWarmupByGroupIds(array $groupIds): void
{
    Coroutine::create(function () use ($groupIds) {
        // 1. 创建控制并发的通道
        $chan = new Channel($this->maxConcurrency);
        
        // 2. 分页查询账号,避免一次性加载过多数据
        $page = 1;
        while (true) {
            $accounts = YkAccountModel::query()
                ->whereIn('group_id', $groupIds)
                ->forPage($page, $this->pageSize)
                ->get();
            
            if ($accounts->isEmpty()) break;
            
            // 3. 并发验证每个账号
            foreach ($accounts as $account) {
                // 跳过已缓存的账号
                if (Redis::exists(self::VALID_ACCOUNT_CACHE_KEY . $account->id)) {
                    continue;
                }
                
                $chan->push(true); // 控制并发
                Coroutine::create(function () use ($account, $chan) {
                    try {
                        $this->checkAndCacheAccount($account->toArray());
                    } finally {
                        $chan->pop(); // 释放并发位
                    }
                });
            }
            
            $page++;
        }
    });
}

优化技巧

  1. 分页查询:避免一次性加载大量数据到内存

  2. 缓存跳过:已验证的账号跳过重复检查

  3. 并发控制:限制同时验证的账号数,保护API

  4. 异步执行:预热过程不阻塞主流程

3.3.4 抽象采集器(AbstractBloggerCollector) - 模板方法模式

abstract class AbstractBloggerCollector
{
    // 模板方法:定义采集算法骨架
    public function execute(): int
    {
        $accountIndex = 0;
        $number = 0;
        $maxId = '';
        
        // 循环使用账号,直到采集完成
        while ($accountIndex < count($this->accounts)) {
            $account = $this->accounts[$accountIndex] ?? [];
            
            // 1. 检查账号有效性
            $checkAccount = $manager->getValidAccount($account['id']);
            if (empty($checkAccount) || empty($checkAccount['valid'])) {
                $accountIndex++; // 换下一个账号
                continue;
            }
            
            // 2. 带重试的采集
            try {
                $data = retryCoroutine(2, function () use ($account, $maxId) {
                    return $this->fetchPage($account, $maxId);
                });
            } catch (\Throwable $e) {
                $accountIndex++; // 当前账号失败,换账号
                continue;
            }
            
            // 3. 处理采集到的数据
            if (isset($data['data']['data'][$this->getDataKey()]['users'])) {
                $result = $this->storeUsers($data['data']['data'][$this->getDataKey()]['users']);
                $number += $result;
            }
            
            // 4. 处理分页
            $maxId = $data['data']['data'][$this->getDataKey()]['next_max_id'] ?? '';
            if (empty($maxId)) {
                break; // 没有更多数据
            }
        }
        
        return $number;
    }
    
    // 抽象方法:由子类实现具体逻辑
    abstract protected function getApiRoute(): string;
    abstract protected function getDataKey(): string;
    abstract protected function getCollectType(): int;
}

设计模式应用

  1. 模板方法模式:固定算法骨架,子类实现具体步骤

  2. 重试机制retryCoroutine实现自动重试,提高成功率

  3. 账号轮换:一个账号失败自动切换下一个

  4. 分页处理:支持大数据集的分页采集

四、协程使用场景与最佳实践

4.1 什么时候应该使用协程?

推荐使用协程的场景

  1. 高并发IO操作:API调用、数据库查询、文件读写

  2. 定时任务处理:需要并发执行多个独立任务

  3. 实时数据处理:消息队列消费、实时监控

  4. 长连接服务:WebSocket、聊天室

不推荐使用协程的场景

  1. 纯CPU密集型计算:协程不会提升性能,反而增加复杂度

  2. 简单的CRUD应用:如果没有高并发需求,传统方式更简单

  3. 对协程不熟悉的团队:学习成本需要考虑

4.2 Laravel协程开发最佳实践

实践1:数据库连接管理

// 错误的做法:在协程中直接使用Eloquent
Coroutine::create(function () {
    $users = User::all(); // 可能导致连接混用
});

// 正确的做法:使用连接池或重新获取连接
Coroutine::create(function () {
    DB::connection()->reconnect(); // 重新连接
    $users = User::all();
});

实践2:避免协程间共享状态

// 危险:协程间共享变量可能导致数据竞争
$count = 0;
for ($i = 0; $i < 1000; $i++) {
    Coroutine::create(function () use (&$count) {
        $count++; // 非原子操作,可能出错
    });
}

// 安全:使用通道或原子操作
$chan = new Channel(1);
$count = 0;
for ($i = 0; $i < 1000; $i++) {
    Coroutine::create(function () use ($chan, &$count) {
        $chan->push(true);
        $count++;
        $chan->pop();
    });
}

实践3:合理的并发数控制

// 根据业务调整并发数
$concurrency = min(
    $tenant->rate_limit,          // 租户API限制
    $this->getSystemMaxConcurrency(), // 系统最大并发
    count($tasks)                 // 任务数量
);

$pool = new CoroutinePool($concurrency);

4.3 调试与监控

协程的调试比传统代码复杂,需要专门的工具:

// 1. 添加协程ID到日志
Log::info('处理任务', [
    'task_id' => $task->id,
    'coroutine_id' => Coroutine::getCid(),
    'memory_usage' => memory_get_usage(true)
]);

// 2. 协程异常捕获
Coroutine::create(function () {
    try {
        // 业务代码
    } catch (\Throwable $e) {
        Log::error('协程异常', [
            'message' => $e->getMessage(),
            'trace' => $e->getTraceAsString(),
            'coroutine_id' => Coroutine::getCid()
        ]);
        
        // 重新抛出或处理异常
        throw $e;
    }
});

// 3. 协程状态监控
$stats = Coroutine::stats();
Log::info('协程统计', [
    'peak_num' => $stats['peak_num'],     // 历史峰值
    'coroutine_num' => $stats['coroutine_num'], // 当前数量
    'coroutine_peak_num' => $stats['coroutine_peak_num']
]);

五、性能对比与收益分析

5.1 采集系统优化效果

优化前(传统同步方式)

  • 处理1000个账号:约1800秒(30分钟)

  • 内存使用:约500MB

  • 成功率:受单个账号失败影响大

优化后(协程方案)

  • 处理1000个账号:约120秒(2分钟)提升15倍

  • 内存使用:约100MB 减少80%

  • 成功率:自动重试和账号轮换,提升明显

5.2 系统资源利用率对比

传统方案资源使用图:
CPU: ▁▁▃▅▇▇▇▅▃▁▁▁▁▁ (波动大,大量时间在等待IO)
内存: ██████████████ (持续较高,每个进程独立内存)

协程方案资源使用图:
CPU: ▃▅▇▇▇▇▇▇▇▅▃▃▃▃▃ (持续高效利用)
内存: ████▇▅▃▁▁▁▁▁▁▁ (峰值低,波动小)

六、总结

6.1 协程带来的范式转变

通过这个多租户采集系统的实践,我们看到了协程如何改变PHP开发范式:

  1. 从"一个请求一个进程"到"一个进程处理所有请求"

  2. 从"同步阻塞等待"到"异步非阻塞切换"

  3. 从"面向流程编程"到"面向状态机编程"

6.2 使用建议

  1. 循序渐进:从简单的定时任务开始,逐步应用到核心业务

  2. 充分测试:协程的并发特性可能暴露隐藏的bug

  3. 监控先行:在上线前建立完善的监控体系

  4. 团队学习:协程需要团队共同学习和实践

七、结语

协程不是银弹,但在IO密集型场景下,它是PHP性能提升的利器。通过上文我关于协程的实战案例,我们可以大概了解如何在Laravel中合理运用协程,构建高性能、可扩展的多租户系统。


作者:命中水
版权声明:转载请注明出处,欢迎技术交流

添加新评论