AskQuestionJob.php 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. <?php
  2. namespace App\Jobs;
  3. use App\Enums\Status;
  4. use App\Events\Streaming;
  5. use App\Events\Wisper;
  6. use App\Models\Message;
  7. use GuzzleHttp\Psr7\Utils;
  8. use Illuminate\Bus\Batchable;
  9. use Illuminate\Contracts\Queue\ShouldQueue;
  10. use Illuminate\Foundation\Queue\Queueable;
  11. use Illuminate\Support\Arr;
  12. use Illuminate\Support\Facades\Http;
  13. class AskQuestionJob implements ShouldQueue
  14. {
  15. use Batchable;
  16. use Queueable;
  17. public int $tries = 5;
  18. private readonly Message $message;
  19. private readonly string $question;
  20. private int $position = 0;
  21. private int $iteration = 0;
  22. private string $buffering;
  23. private ?Status $status = null;
  24. private string $thinking = '';
  25. private string $content = '';
  26. private array $fields = [];
  27. /**
  28. * Create a new job instance.
  29. */
  30. public function __construct(Message $message, string $question)
  31. {
  32. $this->onQueue('primary');
  33. $this->afterCommit();
  34. $this->message = $message;
  35. $this->question = $question;
  36. }
  37. /**
  38. * Execute the job.
  39. * @throws \Exception
  40. * @throws \Exception
  41. */
  42. public function handle(): void
  43. {
  44. if ($this->batch()->cancelled()) {
  45. $this->status(Status::Canceled);
  46. return;
  47. }
  48. $this->message->load('chat');
  49. $this->status(Status::Sending);
  50. /** @var \Illuminate\Http\Client\Response $stream */
  51. $stream = Http::inference()
  52. ->withOptions(['stream' => true])
  53. ->post('/ask', ['message' => $this->question, 'history' => $this->history()]);
  54. $body = $stream->toPsrResponse()->getBody();
  55. $this->buffering = '';
  56. while (!$body->eof()) {
  57. $string = null;
  58. $status = null;
  59. $read = Utils::readLine($body);
  60. $chunk = json_decode($read, true);
  61. if (!$chunk || $chunk['type'] !== 'AIMessageChunk') {
  62. continue;
  63. }
  64. if (isset($chunk['additional_kwargs']['reasoning_content']) && $chunk['additional_kwargs']['reasoning_content'] !== '') {
  65. $string = $chunk['additional_kwargs']['reasoning_content'];
  66. $status = Status::Thinking;
  67. } else if (isset($chunk['content']) && $chunk['content'] !== '') {
  68. $string = $chunk['content'];
  69. $status = Status::Typing;
  70. }
  71. if ($chunk['response_metadata'] ?? false) {
  72. $this->fields = Arr::only($chunk, ['response_metadata', 'usage_metadata', 'tool_calls', 'invalid_tool_calls']);
  73. }
  74. if ($status && !in_array($this->status, [Status::Thinking, Status::Typing])) {
  75. $this->status($status);
  76. }
  77. if ($status) {
  78. $this->iteration++;
  79. if ($this->status !== $status || $this->iteration % 100 === 0) {
  80. $this->event();
  81. $this->status($status);
  82. }
  83. }
  84. $this->buffering .= $string;
  85. }
  86. if ($this->buffering) {
  87. $this->event();
  88. }
  89. $this->status(Status::Completed);
  90. $this->message->update([
  91. 'content' => $this->content,
  92. 'thinking' => $this->thinking,
  93. 'fields' => $this->fields
  94. ]);
  95. }
  96. private function status(Status $status): void
  97. {
  98. $this->status = $status;
  99. $this->message->update(['status' => $status]);
  100. event(new Wisper($this->message, $status));
  101. }
  102. private function history(): array
  103. {
  104. return Message::where('chat_id', $this->message->chat_id)->where('id', '!=', $this->message->id)
  105. ->whereNotNull('content')
  106. ->orderByDesc('id')
  107. ->offset(1)->limit(5)->get()
  108. ->sortBy('id')
  109. ->map(fn($message) => ['role' => $message->role, 'content' => $message->content])
  110. ->values()
  111. ->toArray();
  112. }
  113. /**
  114. * @throws \Exception
  115. */
  116. private function event(): void
  117. {
  118. $string = $this->buffering;
  119. $this->buffering = '';
  120. match ($this->status) {
  121. Status::Thinking => $this->thinking .= $string,
  122. Status::Typing => $this->content .= $string,
  123. default => throw new \Exception('Unexpected match value')
  124. };
  125. //Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]));
  126. event(new Streaming($this->message, $this->position++, $string, $this->status));
  127. }
  128. public function failed(\Throwable $exception): void
  129. {
  130. $this->status(Status::Failed);
  131. }
  132. }