作为 PHP 开发者,我们迟早都会遇到一个瞬间:
代码没有报错,逻辑也没问题,但系统就是——慢。
不是慢一点,是那种
「明明只是检查 1000 个账号状态,却要跑半个小时」
「服务器 CPU 不高,但请求就是排着队等」
的慢。
最近搞项目刚好遇到了一个业务场景,多租户系统中,根据账号采集数据,采集前先要确认账号是否有效(就像根据IP代理爬数据一样,爬数据前先确认代理是否有效),于是我按照以往的经验,很自然的写了这样一条链路:
循环取出一个账号
检测状态
拉数据
账号异常,再换下一个账号,重复执行1、2、3
逻辑看似没问题,代码也很优雅。直到我测试的时候,跑了一次完整的任务。1000个账号,跑了30分钟还没跑完。我查看了CPU,内存也不高,但就是很慢。为什么呢?
问题不在算力,而在于PHP在大量IO等待中,被迫排队!
这个时候就是协程(Coroutine)大显身手的时候了。下文将结合业务和实战(编码)深入理解协程,并展示如何在Laravel中构建基于协程的高性能多租户采集系统。
一、协程基础:重新认识PHP并发编程
在介绍协程前,咱们先得明白一件事儿:
协程不会使CPU算的更快!
它解决的是“等待问题”,而不是“计算问题”。
只要系统大量在做以下几件事:
调第三方API
查数据库
读写文件
等网络相应
协程就有发挥空间。废话不多说
1.1 什么是协程?
协程是用户态的轻量级线程,这句话可能有点抽象,让我们通过一个简单的比喻来理解:
想象一个餐厅厨房:
传统多进程/多线程:雇佣多个厨师(进程/线程),每个厨师有自己的工作台和工具,切换成本高(内存开销大)
协程:一个超级厨师(单线程),但可以同时照看多个锅(协程),当一个锅在炖煮时(等待IO),就切换到另一个锅翻炒
技术定义:协程是一种比线程更轻量级的存在,由程序自身控制调度,在用户态进行上下文切换,避免了内核态切换的开销。
协程不是让 PHP 变快,而是让 PHP 不再傻等 IO
1.2 为什么选择协程?三大核心优势
1. 极低的资源消耗
// 对比:创建1000个并发任务
$threads = [];
for ($i = 0; $i < 1000; $i++) {
$threads[] = new Thread(fn() => processTask($i)); // 每个线程约2MB内存
}
// 总内存:1000 * 2MB = 2GB
// 协程方式
Coroutine\run(function () {
for ($i = 0; $i < 1000; $i++) {
Coroutine::create(fn() => processTask($i)); // 每个协程约2KB内存
}
});
// 总内存:1000 * 2KB = 2MB2. 高效的IO并发
传统同步代码在等待数据库查询、API响应时会阻塞整个进程。协程可以在等待时让出控制权,处理其他任务。
// 传统同步方式(串行执行,总耗时 = 各任务耗时之和)
function fetchUsers() {
$result1 = $http->get('/api/users/1'); // 等待100ms
$result2 = $http->get('/api/users/2'); // 等待100ms
// 总耗时约200ms
}
// 协程方式(并发执行)
Coroutine\run(function () {
$chan = new Channel(2);
Coroutine::create(function () use ($chan) {
$result1 = $http->get('/api/users/1');
$chan->push($result1);
});
Coroutine::create(function () use ($chan) {
$result2 = $http->get('/api/users/2');
$chan->push($result2);
});
// 总耗时约100ms(同时发起请求)
});3. 同步的编程体验,异步的执行效率
协程最大的魅力在于:用同步的代码风格,获得异步的执行性能。不需要复杂的回调地狱(Callback Hell),代码可读性大大提升。
1.3 协程 vs 其他并发方案对比
| 特性 | 多进程 | 多线程 | 异步回调 | 协程 |
|---|---|---|---|---|
| 并发能力 | 高 | 高 | 高 | 极高 |
| 内存消耗 | 高 | 中 | 低 | 极低 |
| 开发复杂度 | 低 | 中 | 高 | 低 |
| 调试难度 | 低 | 高 | 高 | 中 |
| 适用场景 | CPU密集型 | CPU密集型 | IO密集型 | IO密集型 |
二、Laravel中的协程集成:Swoole实战
2.1 环境配置:让Laravel飞起来
# 1. 安装Swoole扩展
pecl install swoole
# 2. 验证安装
php --ri swoole
# 3. 配置php.ini
extension=swoole.so
# 4. 安装Laravel Octane(可选,但推荐)
composer require laravel/octane
php artisan octane:install2.2 协程核心概念:Channel与WaitGroup
在理解我们的采集系统之前,先掌握两个关键概念:
Channel(通道):协程间的通信管道
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
// 创建容量为10的通道
$chan = new Channel(10);
// 生产者协程
Coroutine::create(function () use ($chan) {
for ($i = 0; $i < 100; $i++) {
$chan->push($i); // 生产数据
echo "生产: $i\n";
Coroutine::sleep(0.1); // 模拟耗时
}
$chan->close(); // 生产完毕,关闭通道
});
// 消费者协程
Coroutine::create(function () use ($chan) {
while ($data = $chan->pop()) { // 消费数据,通道空时自动挂起
echo "消费: $data\n";
}
echo "所有数据处理完毕\n";
});WaitGroup(等待组):协程同步工具
use Swoole\Coroutine\WaitGroup;
$wg = new WaitGroup();
$results = [];
for ($i = 0; $i < 10; $i++) {
$wg->add(); // 增加计数
Coroutine::create(function () use ($i, $wg, &$results) {
try {
// 模拟任务处理
Coroutine::sleep(0.1);
$results[$i] = "任务{$i}完成";
} finally {
$wg->done(); // 任务完成,减少计数
}
});
}
$wg->wait(); // 等待所有任务完成
print_r($results); // 所有任务完成后执行三、实战:多租户采集系统设计
3.1 业务场景解析
回到文章开头提到的业务场景:
多租户系统:每个租户独立,有不同的采集配置和限制
账号分组:每个租户下有多个账号分组,每组包含若干账号
状态检测:采集前需要验证账号有效性
并发控制:需要限制单个租户的并发数,避免API被限流
任务管理:需要跟踪采集任务状态、进度和结果
3.2 架构设计:协程驱动的四层架构
┌─────────────────────────────────────────────┐
│ Web层(控制器) │
│ - 接收采集请求 │
│ - 参数验证 │
│ - 返回任务ID │
└─────────────────┬───────────────────────────┘
│
┌─────────────────▼───────────────────────────┐
│ 任务调度层(Command) │
│ - 定时扫描待处理任务 │
│ - 动态创建协程池 │
│ - 预热账号数据 │
└─────────────────┬───────────────────────────┘
│
┌─────────────────▼───────────────────────────┐
│ 业务逻辑层(Service) │
│ - 账号状态检测 │
│ - 数据采集逻辑 │
│ - 数据存储 │
└─────────────────┬───────────────────────────┘
│
┌─────────────────▼───────────────────────────┐
│ 基础设施层 │
│ - 协程池(CoroutinePool) │
│ - 协程工具(CoroutineUtils) │
│ - 账号缓存管理 │
└─────────────────────────────────────────────┘3.3 关键代码解读:从理论到实现
3.3.1 协程池(CoroutinePool) - 资源控制器
协程池的核心作用是限制并发数,避免系统过载。以下是简化版的实现:
namespace App\Concurrency;
use Swoole\Coroutine;
use Swoole\Coroutine\Channel;
class CoroutinePool
{
private $size; // 池子大小(最大并发数)
private $channel; // 控制并发的通道
private $count = 0; // 当前活跃协程数
public function __construct(int $size)
{
$this->size = $size;
// 创建容量为$size的通道,实现并发控制
$this->channel = new Channel($size);
}
public function add(callable $task): void
{
// 关键:当并发数达到上限时,这里会阻塞,直到有协程完成
// 这里的 channel 不是用来传数据的,而是用来“占坑位”的
// 每 push 一次,代表占用一个并发名额
// 当 channel 满了,push 会阻塞,新的任务进不来
$this->channel->push(true);
Coroutine::create(function () use ($task) {
try {
$task(); // 执行用户任务
} finally {
// 任务完成,释放一个位置
$this->channel->pop();
$this->count--;
}
});
$this->count++;
}
public function close(): void
{
// 等待所有任务完成
while ($this->count > 0) {
Coroutine::sleep(0.1);
}
}
}设计要点:
通道作为信号量:
$channel->push()和$channel->pop()实现了经典的信号量模式优雅的资源释放:使用
try...finally确保任务异常时也能释放资源阻塞式等待:当并发数满时,新任务会阻塞而非拒绝,更适合队列场景
3.3.2 任务运行器(FetchBloggerTaskRunner) - 调度器
public function handle()
{
// 1. 设置定时器,定期检查数据库中的新任务
Timer::tick(self::CHECK_INTERVAL * 1000, function () {
Coroutine::create(function () {
$tasks = $this->getUnHandleTasks();
if ($tasks->isEmpty()) return;
// 2. 创建协程池,限制最大并发
$pool = new CoroutinePool(20);
$pool->run();
// 3. 异步预热账号(不阻塞主流程)
$accountGroupIds = $tasks->pluck('account_group_id')->unique()->toArray();
$manager = new AccountWarmupManager('', '', 10);
$manager->asyncWarmupByGroupIds($accountGroupIds);
// 4. 标记任务为处理中
YkCollectBloggerTaskModel::whereIn('id', $tasks->pluck('id'))
->update(['status' => 'handing']);
// 5. 为每个任务创建采集协程
foreach ($tasks as $task) {
// 分布式锁:避免重复处理
$lock = Cache::lock("lock:fetch_blogger_data_task:{$task->id}", 300);
if (!$lock->get()) continue;
$pool->add(function () use ($task, $lock) {
try {
$service = new FetchBloggerTaskService($task);
$service->handle();
} finally {
$lock->release(); // 释放锁
}
});
}
$pool->close(); // 等待所有任务完成
});
});
Event::wait(); // 关键:阻塞主进程,让协程有机会执行
}代码分析:
Timer::tick定时器:替代传统的while+sleep循环,更高效
协程内查询数据库:避免阻塞事件循环
异步预热:账号预热不阻塞任务分发
分布式锁:在多进程部署时避免任务重复执行
Event::wait():维持进程运行,让定时器持续工作
3.3.3 账号预热管理器(AccountWarmupManager) - 缓存优化
public function asyncWarmupByGroupIds(array $groupIds): void
{
Coroutine::create(function () use ($groupIds) {
// 1. 创建控制并发的通道
$chan = new Channel($this->maxConcurrency);
// 2. 分页查询账号,避免一次性加载过多数据
$page = 1;
while (true) {
$accounts = YkAccountModel::query()
->whereIn('group_id', $groupIds)
->forPage($page, $this->pageSize)
->get();
if ($accounts->isEmpty()) break;
// 3. 并发验证每个账号
foreach ($accounts as $account) {
// 跳过已缓存的账号
if (Redis::exists(self::VALID_ACCOUNT_CACHE_KEY . $account->id)) {
continue;
}
$chan->push(true); // 控制并发
Coroutine::create(function () use ($account, $chan) {
try {
$this->checkAndCacheAccount($account->toArray());
} finally {
$chan->pop(); // 释放并发位
}
});
}
$page++;
}
});
}优化技巧:
分页查询:避免一次性加载大量数据到内存
缓存跳过:已验证的账号跳过重复检查
并发控制:限制同时验证的账号数,保护API
异步执行:预热过程不阻塞主流程
3.3.4 抽象采集器(AbstractBloggerCollector) - 模板方法模式
abstract class AbstractBloggerCollector
{
// 模板方法:定义采集算法骨架
public function execute(): int
{
$accountIndex = 0;
$number = 0;
$maxId = '';
// 循环使用账号,直到采集完成
while ($accountIndex < count($this->accounts)) {
$account = $this->accounts[$accountIndex] ?? [];
// 1. 检查账号有效性
$checkAccount = $manager->getValidAccount($account['id']);
if (empty($checkAccount) || empty($checkAccount['valid'])) {
$accountIndex++; // 换下一个账号
continue;
}
// 2. 带重试的采集
try {
$data = retryCoroutine(2, function () use ($account, $maxId) {
return $this->fetchPage($account, $maxId);
});
} catch (\Throwable $e) {
$accountIndex++; // 当前账号失败,换账号
continue;
}
// 3. 处理采集到的数据
if (isset($data['data']['data'][$this->getDataKey()]['users'])) {
$result = $this->storeUsers($data['data']['data'][$this->getDataKey()]['users']);
$number += $result;
}
// 4. 处理分页
$maxId = $data['data']['data'][$this->getDataKey()]['next_max_id'] ?? '';
if (empty($maxId)) {
break; // 没有更多数据
}
}
return $number;
}
// 抽象方法:由子类实现具体逻辑
abstract protected function getApiRoute(): string;
abstract protected function getDataKey(): string;
abstract protected function getCollectType(): int;
}设计模式应用:
模板方法模式:固定算法骨架,子类实现具体步骤
重试机制:
retryCoroutine实现自动重试,提高成功率账号轮换:一个账号失败自动切换下一个
分页处理:支持大数据集的分页采集
四、协程使用场景与最佳实践
4.1 什么时候应该使用协程?
推荐使用协程的场景:
高并发IO操作:API调用、数据库查询、文件读写
定时任务处理:需要并发执行多个独立任务
实时数据处理:消息队列消费、实时监控
长连接服务:WebSocket、聊天室
不推荐使用协程的场景:
纯CPU密集型计算:协程不会提升性能,反而增加复杂度
简单的CRUD应用:如果没有高并发需求,传统方式更简单
对协程不熟悉的团队:学习成本需要考虑
4.2 Laravel协程开发最佳实践
实践1:数据库连接管理
// 错误的做法:在协程中直接使用Eloquent
Coroutine::create(function () {
$users = User::all(); // 可能导致连接混用
});
// 正确的做法:使用连接池或重新获取连接
Coroutine::create(function () {
DB::connection()->reconnect(); // 重新连接
$users = User::all();
});实践2:避免协程间共享状态
// 危险:协程间共享变量可能导致数据竞争
$count = 0;
for ($i = 0; $i < 1000; $i++) {
Coroutine::create(function () use (&$count) {
$count++; // 非原子操作,可能出错
});
}
// 安全:使用通道或原子操作
$chan = new Channel(1);
$count = 0;
for ($i = 0; $i < 1000; $i++) {
Coroutine::create(function () use ($chan, &$count) {
$chan->push(true);
$count++;
$chan->pop();
});
}实践3:合理的并发数控制
// 根据业务调整并发数
$concurrency = min(
$tenant->rate_limit, // 租户API限制
$this->getSystemMaxConcurrency(), // 系统最大并发
count($tasks) // 任务数量
);
$pool = new CoroutinePool($concurrency);4.3 调试与监控
协程的调试比传统代码复杂,需要专门的工具:
// 1. 添加协程ID到日志
Log::info('处理任务', [
'task_id' => $task->id,
'coroutine_id' => Coroutine::getCid(),
'memory_usage' => memory_get_usage(true)
]);
// 2. 协程异常捕获
Coroutine::create(function () {
try {
// 业务代码
} catch (\Throwable $e) {
Log::error('协程异常', [
'message' => $e->getMessage(),
'trace' => $e->getTraceAsString(),
'coroutine_id' => Coroutine::getCid()
]);
// 重新抛出或处理异常
throw $e;
}
});
// 3. 协程状态监控
$stats = Coroutine::stats();
Log::info('协程统计', [
'peak_num' => $stats['peak_num'], // 历史峰值
'coroutine_num' => $stats['coroutine_num'], // 当前数量
'coroutine_peak_num' => $stats['coroutine_peak_num']
]);五、性能对比与收益分析
5.1 采集系统优化效果
优化前(传统同步方式):
处理1000个账号:约1800秒(30分钟)
内存使用:约500MB
成功率:受单个账号失败影响大
优化后(协程方案):
处理1000个账号:约120秒(2分钟)提升15倍
内存使用:约100MB 减少80%
成功率:自动重试和账号轮换,提升明显
5.2 系统资源利用率对比
传统方案资源使用图:
CPU: ▁▁▃▅▇▇▇▅▃▁▁▁▁▁ (波动大,大量时间在等待IO)
内存: ██████████████ (持续较高,每个进程独立内存)
协程方案资源使用图:
CPU: ▃▅▇▇▇▇▇▇▇▅▃▃▃▃▃ (持续高效利用)
内存: ████▇▅▃▁▁▁▁▁▁▁ (峰值低,波动小)六、总结
6.1 协程带来的范式转变
通过这个多租户采集系统的实践,我们看到了协程如何改变PHP开发范式:
从"一个请求一个进程"到"一个进程处理所有请求"
从"同步阻塞等待"到"异步非阻塞切换"
从"面向流程编程"到"面向状态机编程"
6.2 使用建议
循序渐进:从简单的定时任务开始,逐步应用到核心业务
充分测试:协程的并发特性可能暴露隐藏的bug
监控先行:在上线前建立完善的监控体系
团队学习:协程需要团队共同学习和实践
七、结语
协程不是银弹,但在IO密集型场景下,它是PHP性能提升的利器。通过上文我关于协程的实战案例,我们可以大概了解如何在Laravel中合理运用协程,构建高性能、可扩展的多租户系统。
作者:命中水
版权声明:转载请注明出处,欢迎技术交流