checkDatabaseToSeeIfJobNeedToBeDone($data); if(! $isJobStillNeedToBeDone){ $job->delete(); return; } // 执行逻辑处理(即:你需要该消息队列做什么) $isJobDone = $this->doHelloJob($data); if ($isJobDone) { // 如果任务执行成功,记得删除任务 $job->delete(); } else { // 通过这个方法可以检查这个任务已经重试了几次了 if ($job->attempts() > 3) { $job->delete(); // 也可以重新发布这个任务 //$job->release(2); // $delay为延迟时间,表示该任务延迟2秒后再执行 } } } /** * 有些消息在到达消费者时,可能已经不再需要执行了 * @param $data 发布任务时自定义的数据 * @return bool 任务执行的结果 */ private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } /** * 根据消息中的数据进行实际的业务处理... * @param $data * @return bool */ private function doHelloJob($data) { /* * "data": { "ts": 1713866309, "url": "http:\/\/console.zx2049.com\/api\/cms\/tagHandle", "taskid": 10, "area_id": 1, "area_name": "\u5e7f\u4e1c" }, */ //测试 print("------------------------------------------------ \n"); print("执行任务处理: " . $data['area_name'].$data['area_id'] . " \n"); // TODO 该处为实际业务逻辑,即:对消息中的数据进行处理 //拼装url $url = $data['url'].'?taskid='.$data['taskid'].'&diqu='.$data['area_id']; //发送执行 $content = file_get_contents($url); //{"code":1,"msg":"成功","time":1713883656,"data":null} //判断是否执行成功 $respon = json_decode($content,true); if((int)$respon['code'] === 1){ print("结果: " . $respon['msg']. " \n"); return true; }else{ //异常会一直重发 不会继续执行 并 删除任务 $info = isset($respon["data"])&&!empty($respon["data"])&&is_array($respon["data"])?json_encode($respon["data"],JSON_UNESCAPED_UNICODE):'未知'; print("结果: " .$respon['msg'].'('.$info.')'.$data['area_name'].$data['area_id'] . "失败 \n"); return false; } } }