| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- <?php
- namespace App\Jobs;
- use App\Enums\Status;
- use App\Events\Streaming;
- use App\Events\Wisper;
- use App\Models\Message;
- use GuzzleHttp\Psr7\Utils;
- use Illuminate\Bus\Batchable;
- use Illuminate\Contracts\Queue\ShouldQueue;
- use Illuminate\Foundation\Queue\Queueable;
- use Illuminate\Support\Arr;
- use Illuminate\Support\Facades\Http;
- class AskQuestionJob implements ShouldQueue
- {
- use Batchable;
- use Queueable;
- public int $tries = 5;
- private readonly Message $message;
- private readonly string $question;
- private int $position = 0;
- private int $iteration = 0;
- private string $buffering;
- private ?Status $status = null;
- private string $thinking = '';
- private string $content = '';
- private array $fields = [];
- /**
- * Create a new job instance.
- */
- public function __construct(Message $message, string $question)
- {
- $this->onQueue('primary');
- $this->afterCommit();
- $this->message = $message;
- $this->question = $question;
- }
- /**
- * Execute the job.
- * @throws \Exception
- * @throws \Exception
- */
- public function handle(): void
- {
- if ($this->batch()->cancelled()) {
- $this->status(Status::Canceled);
- return;
- }
- $this->message->load('chat');
- $this->status(Status::Sending);
- /** @var \Illuminate\Http\Client\Response $stream */
- $stream = Http::inference()
- ->withOptions(['stream' => true])
- ->post('/ask', ['message' => $this->question, 'history' => $this->history()]);
- $body = $stream->toPsrResponse()->getBody();
- $this->buffering = '';
- while (!$body->eof()) {
- $string = null;
- $status = null;
- $read = Utils::readLine($body);
- $chunk = json_decode($read, true);
- if (!$chunk || $chunk['type'] !== 'AIMessageChunk') {
- continue;
- }
- if (isset($chunk['additional_kwargs']['reasoning_content']) && $chunk['additional_kwargs']['reasoning_content'] !== '') {
- $string = $chunk['additional_kwargs']['reasoning_content'];
- $status = Status::Thinking;
- } else if (isset($chunk['content']) && $chunk['content'] !== '') {
- $string = $chunk['content'];
- $status = Status::Typing;
- }
- if ($chunk['response_metadata'] ?? false) {
- $this->fields = Arr::only($chunk, ['response_metadata', 'usage_metadata', 'tool_calls', 'invalid_tool_calls']);
- }
- if ($status && !in_array($this->status, [Status::Thinking, Status::Typing])) {
- $this->status($status);
- }
- if ($status) {
- $this->iteration++;
- if ($this->status !== $status || $this->iteration % 100 === 0) {
- $this->event();
- $this->status($status);
- }
- }
- $this->buffering .= $string;
- }
- if ($this->buffering) {
- $this->event();
- }
- $this->status(Status::Completed);
- $this->message->update([
- 'content' => $this->content,
- 'thinking' => $this->thinking,
- 'fields' => $this->fields
- ]);
- }
- private function status(Status $status): void
- {
- $this->status = $status;
- $this->message->update(['status' => $status]);
- event(new Wisper($this->message, $status));
- }
- private function history(): array
- {
- return Message::where('chat_id', $this->message->chat_id)->where('id', '!=', $this->message->id)
- ->whereNotNull('content')
- ->orderByDesc('id')
- ->offset(1)->limit(5)->get()
- ->sortBy('id')
- ->map(fn($message) => ['role' => $message->role, 'content' => $message->content])
- ->values()
- ->toArray();
- }
- /**
- * @throws \Exception
- */
- private function event(): void
- {
- $string = $this->buffering;
- $this->buffering = '';
- match ($this->status) {
- Status::Thinking => $this->thinking .= $string,
- Status::Typing => $this->content .= $string,
- default => throw new \Exception('Unexpected match value')
- };
- //Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]));
- event(new Streaming($this->message, $this->position++, $string, $this->status));
- }
- public function failed(\Throwable $exception): void
- {
- $this->status(Status::Failed);
- }
- }
|