AskQuestionJob.php 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. <?php
  2. namespace App\Jobs;
  3. use App\Enums\Status;
  4. use App\Events\Streaming;
  5. use App\Events\Wisper;
  6. use App\Helpers\ChunkParser;
  7. use App\Models\Message;
  8. use GuzzleHttp\Psr7\Utils;
  9. use Illuminate\Bus\Batchable;
  10. use Illuminate\Contracts\Queue\ShouldQueue;
  11. use Illuminate\Foundation\Queue\Queueable;
  12. use Illuminate\Support\Facades\Http;
  13. class AskQuestionJob implements ShouldQueue
  14. {
  15. use Batchable;
  16. use Queueable;
  17. public int $tries = 5;
  18. private readonly Message $message;
  19. private readonly string $question;
  20. /**
  21. * Create a new job instance.
  22. */
  23. public function __construct(Message $message, string $question)
  24. {
  25. $this->onQueue('primary');
  26. $this->afterCommit();
  27. $this->message = $message;
  28. $this->question = $question;
  29. }
  30. /**
  31. * Execute the job.
  32. */
  33. public function handle(ChunkParser $response): void
  34. {
  35. //TODO: ; For Status
  36. if ($this->batch()->cancelled()) {
  37. $this->status(Status::Canceled);
  38. return;
  39. }
  40. $this->status(Status::Sending);
  41. /** @var \Illuminate\Http\Client\Response $stream */
  42. $stream = Http::inference()
  43. ->withOptions(['stream' => true])
  44. ->post('/answer', ['message' => $this->question, 'history' => []]);
  45. $body = $stream->toPsrResponse()->getBody();
  46. for ($i = 0; !$body->eof(); $i++) {
  47. $response->append(Utils::readLine($body))
  48. ->parse();
  49. if ($status = $response->status) {
  50. $this->status($status);
  51. }
  52. $this->chunk($i, $response);
  53. }
  54. $this->status(Status::Completed);
  55. $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]);
  56. }
  57. private function status(Status $status): void
  58. {
  59. $this->message->update(['status' => $status]);
  60. event(new Wisper($this->message, $status));
  61. }
  62. private function chunk(int $index, ChunkParser $response): void
  63. {
  64. try {
  65. event(new Streaming($this->message, $response->chunk, $index));
  66. // Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]));
  67. } catch (\Exception $e) {
  68. report($e);
  69. }
  70. }
  71. public function failed(\Throwable $exception): void
  72. {
  73. $this->status(Status::Failed);
  74. }
  75. }