onQueue('primary'); $this->afterCommit(); $this->message = $message; $this->question = $question; } /** * Execute the job. */ public function handle(ChunkParser $response): void { //TODO: ; For Status if ($this->batch()->cancelled()) { $this->status(Status::Canceled); return; } $this->status(Status::Sending); /** @var \Illuminate\Http\Client\Response $stream */ $stream = Http::inference() ->withOptions(['stream' => true]) ->post('/answer', ['message' => $this->question, 'history' => []]); $body = $stream->toPsrResponse()->getBody(); for ($i = 0; !$body->eof(); $i++) { $response->append(Utils::readLine($body)) ->parse(); if ($status = $response->status) { $this->status($status); } $this->chunk($i, $response); } $this->status(Status::Completed); $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]); } private function status(Status $status): void { $this->message->update(['status' => $status]); event(new Wisper($this->message, $status)); } private function chunk(int $index, ChunkParser $response): void { try { event(new Streaming($this->message, $response->chunk, $index)); // Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields])); } catch (\Exception $e) { report($e); } } public function failed(\Throwable $exception): void { $this->status(Status::Failed); } }