onQueue('primary'); $this->afterCommit(); $this->message = $message; $this->question = $question; } /** * Execute the job. * @throws \Exception * @throws \Exception */ public function handle(): void { if ($this->batch()->cancelled()) { $this->status(Status::Canceled); return; } $this->message->load('chat'); $this->status(Status::Sending); /** @var \Illuminate\Http\Client\Response $stream */ $stream = Http::inference() ->withOptions(['stream' => true]) ->post('/ask', ['message' => $this->question, 'history' => $this->history()]); $body = $stream->toPsrResponse()->getBody(); $this->buffering = ''; while (!$body->eof()) { $string = null; $status = null; $read = Utils::readLine($body); $chunk = json_decode($read, true); if (!$chunk || $chunk['type'] !== 'AIMessageChunk') { continue; } if (isset($chunk['additional_kwargs']['reasoning_content']) && $chunk['additional_kwargs']['reasoning_content'] !== '') { $string = $chunk['additional_kwargs']['reasoning_content']; $status = Status::Thinking; } else if (isset($chunk['content']) && $chunk['content'] !== '') { $string = $chunk['content']; $status = Status::Typing; } if ($chunk['response_metadata'] ?? false) { $this->fields = Arr::only($chunk, ['response_metadata', 'usage_metadata', 'tool_calls', 'invalid_tool_calls']); } if ($status && !in_array($this->status, [Status::Thinking, Status::Typing])) { $this->status($status); } if ($status) { $this->iteration++; if ($this->status !== $status || $this->iteration % 100 === 0) { $this->event(); $this->status($status); } } $this->buffering .= $string; } if ($this->buffering) { $this->event(); } $this->status(Status::Completed); $this->message->update([ 'content' => $this->content, 'thinking' => $this->thinking, 'fields' => $this->fields ]); } private function status(Status $status): void { $this->status = $status; $this->message->update(['status' => $status]); event(new Wisper($this->message, $status)); } private function history(): array { return Message::where('chat_id', $this->message->chat_id)->where('id', '!=', $this->message->id) ->whereNotNull('content') ->orderByDesc('id') ->offset(1)->limit(5)->get() ->sortBy('id') ->map(fn($message) => ['role' => $message->role, 'content' => $message->content]) ->values() ->toArray(); } /** * @throws \Exception */ private function event(): void { $string = $this->buffering; $this->buffering = ''; match ($this->status) { Status::Thinking => $this->thinking .= $string, Status::Typing => $this->content .= $string, default => throw new \Exception('Unexpected match value') }; //Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields])); event(new Streaming($this->message, $this->position++, $string, $this->status)); } public function failed(\Throwable $exception): void { $this->status(Status::Failed); } }