浏览代码

Change Stream

Artem Kastrov 1 月之前
父节点
当前提交
a26a8cfdb6

+ 7 - 2
app/Events/Streaming.php

@@ -2,27 +2,32 @@
 
 namespace App\Events;
 
+use App\Enums\Status;
 use App\Models\Message;
 use Illuminate\Broadcasting\InteractsWithSockets;
 use Illuminate\Broadcasting\PrivateChannel;
 use Illuminate\Contracts\Broadcasting\ShouldBroadcastNow;
+//use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
 use Illuminate\Foundation\Events\Dispatchable;
 use Illuminate\Queue\SerializesModels;
 
+//class Streaming implements ShouldBroadcast
 class Streaming implements ShouldBroadcastNow
 {
     use Dispatchable, InteractsWithSockets, SerializesModels;
 
     public readonly string $chunk;
     public readonly int $index;
+    public readonly Status $status;
 
     /**
      * Create a new event instance.
      */
-    public function __construct(private readonly Message $message, string $chunk, int $index)
+    public function __construct(private readonly Message $message, int $index, string $chunk, Status $status)
     {
-        $this->chunk = $chunk;
         $this->index = $index;
+        $this->chunk = $chunk;
+        $this->status = $status;
     }
 
     /**

+ 87 - 17
app/Jobs/AskQuestionJob.php

@@ -5,12 +5,12 @@ namespace App\Jobs;
 use App\Enums\Status;
 use App\Events\Streaming;
 use App\Events\Wisper;
-use App\Helpers\ChunkParser;
 use App\Models\Message;
 use GuzzleHttp\Psr7\Utils;
 use Illuminate\Bus\Batchable;
 use Illuminate\Contracts\Queue\ShouldQueue;
 use Illuminate\Foundation\Queue\Queueable;
+use Illuminate\Support\Arr;
 use Illuminate\Support\Facades\Http;
 
 class AskQuestionJob implements ShouldQueue
@@ -23,6 +23,14 @@ class AskQuestionJob implements ShouldQueue
     private readonly Message $message;
     private readonly string $question;
 
+    private int $position = 0;
+    private int $iteration = 0;
+    private string $buffering;
+    private ?Status $status = null;
+    private string $thinking = '';
+    private string $content = '';
+    private array $fields = [];
+
     /**
      * Create a new job instance.
      */
@@ -37,52 +45,114 @@ class AskQuestionJob implements ShouldQueue
 
     /**
      * Execute the job.
+     * @throws \Exception
+     * @throws \Exception
      */
-    public function handle(ChunkParser $response): void
+    public function handle(): void
     {
-        //TODO: ; For Status
         if ($this->batch()->cancelled()) {
             $this->status(Status::Canceled);
             return;
         }
 
+        $this->message->load('chat');
         $this->status(Status::Sending);
 
         /** @var \Illuminate\Http\Client\Response $stream */
         $stream = Http::inference()
             ->withOptions(['stream' => true])
-            ->post('/answer', ['message' => $this->question, 'history' => []]);
+            ->post('/ask', ['message' => $this->question, 'history' => $this->history()]);
 
         $body = $stream->toPsrResponse()->getBody();
-        for ($i = 0; !$body->eof(); $i++) {
-            $response->append(Utils::readLine($body))
-                ->parse();
+        $this->buffering = '';
+        while (!$body->eof()) {
+            $string = null;
+            $status = null;
+
+            $read = Utils::readLine($body);
+            $chunk = json_decode($read, true);
+
+            if (!$chunk || $chunk['type'] !== 'AIMessageChunk') {
+                continue;
+            }
+
+            if (isset($chunk['additional_kwargs']['reasoning_content']) && $chunk['additional_kwargs']['reasoning_content'] !== '') {
+                $string = $chunk['additional_kwargs']['reasoning_content'];
+                $status = Status::Thinking;
+            } else if (isset($chunk['content']) && $chunk['content'] !== '') {
+                $string = $chunk['content'];
+                $status = Status::Typing;
+            }
 
-            if ($status = $response->status) {
+            if ($chunk['response_metadata'] ?? false) {
+                $this->fields = Arr::only($chunk, ['response_metadata', 'usage_metadata', 'tool_calls', 'invalid_tool_calls']);
+            }
+
+            if ($status && !in_array($this->status, [Status::Thinking, Status::Typing])) {
                 $this->status($status);
             }
 
-            $this->chunk($i, $response);
+            if ($status) {
+                $this->iteration++;
+
+                if ($this->status !== $status || $this->iteration % 100 === 0) {
+                    $this->event();
+                    $this->status($status);
+                }
+            }
+
+            $this->buffering .= $string;
+        }
+
+        if ($this->buffering) {
+            $this->event();
         }
 
         $this->status(Status::Completed);
-        $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]);
+        $this->message->update([
+            'content' => $this->content,
+            'thinking' => $this->thinking,
+            'fields' => $this->fields
+        ]);
     }
 
     private function status(Status $status): void
     {
+        $this->status = $status;
         $this->message->update(['status' => $status]);
         event(new Wisper($this->message, $status));
     }
 
-    private function chunk(int $index, ChunkParser $response): void
+    private function history(): array
     {
-        try {
-            event(new Streaming($this->message, $response->chunk, $index));
-//            Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]));
-        } catch (\Exception $e) {
-            report($e);
-        }
+        return Message::where('chat_id', $this->message->chat_id)->where('id', '!=', $this->message->id)
+            ->whereNotNull('content')
+            ->orderByDesc('id')
+            ->offset(1)->limit(5)->get()
+            ->sortBy('id')
+            ->map(fn($message) => ['role' => $message->role, 'content' => $message->content])
+            ->values()
+            ->toArray();
+    }
+
+    /**
+     * @throws \Exception
+     */
+    private function event(): void
+    {
+        $string = $this->buffering;
+        $this->buffering = '';
+
+        match ($this->status) {
+            Status::Thinking => $this->thinking .= $string,
+            Status::Typing => $this->content .= $string,
+
+            default => throw new \Exception('Unexpected match value')
+        };
+
+        //Model::withoutBroadcasting(fn() => $this->message->update(['content' => $response->content, 'thinking' => $response->think, 'fields' => $response->fields]));
+
+        event(new Streaming($this->message, $this->position++, $string, $this->status));
     }
 
     public function failed(\Throwable $exception): void

+ 1 - 1
app/Jobs/SetChatTitleJob.php

@@ -42,6 +42,6 @@ class SetChatTitleJob implements ShouldQueue
         ]);
 
         $result = $response->json();
-        $this->chat->update(['title' => $result['response']]);
+        $this->chat->update(['title' => $result['content']]);
     }
 }

+ 3 - 3
app/Models/Message.php

@@ -14,12 +14,12 @@ class Message extends Model
     use HasUuids;
     use BroadcastsEvents;
 
-    protected $fillable = ['content', 'thinking', 'from', 'status', 'fields'];
+    protected $fillable = ['content', 'thinking', 'role', 'status', 'fields'];
 
     public function casts(): array
     {
         return [
-            'feidls' => 'array',
+            'fields' => 'array',
             'status' => Status::class,
         ];
     }
@@ -41,6 +41,6 @@ class Message extends Model
 
     public function broadcastWith(string $event): array
     {
-        return ['id' => $this->id, 'from' => $this->from, 'status' => $this->status];
+        return ['id' => $this->id, 'role' => $this->role, 'status' => $this->status];
     }
 }

+ 3 - 3
app/Services/MessageService.php

@@ -13,12 +13,12 @@ class MessageService
     /**
      * @throws \Throwable
      */
-    public function create(Chat $chat, string $content, string $sender = 'user'): void
+    public function create(Chat $chat, string $content, string $sender = 'human'): void
     {
-        $message = new Message(['from' => $sender, 'content' => $content]);
+        $message = new Message(['role' => $sender, 'content' => $content]);
         $chat->messages()->save($message);
 
-        $answer = $chat->messages()->create(['from' => 'assistant', 'status' => Status::Loading]);
+        $answer = $chat->messages()->create(['role' => 'ai', 'status' => Status::Loading]);
         $batch = Bus::batch([new AskQuestionJob($answer, $content)])
             ->name('Ask Model')
             ->onQueue('primary')

+ 1 - 4
database/migrations/2025_11_09_131811_create_messages_table.php

@@ -20,10 +20,7 @@ return new class extends Migration {
             $table->longText('thinking')->nullable();
             $table->longText('content')->nullable();
 
-            $table->string('from')->default('user');
-
-            $table->integer('tokens_in')->default(0);
-            $table->integer('tokens_out')->default(0);
+            $table->string('role')->default('human');
 
             $table->jsonb('fields')->nullable();
 

+ 12 - 6
resources/js/Components/Message.vue

@@ -19,7 +19,7 @@ const {message} = defineProps({
 response.setStatus(message.status)
 
 useEchoModel('App.Models.Message', message.id, '.Wisper', e => response.setStatus(e.status))
-useEchoModel('App.Models.Message', message.id, '.Streaming', e => response.append(e.chunk, e.index))
+useEchoModel('App.Models.Message', message.id, '.Streaming', e => response.append(e.chunk, e.index, e.status))
 
 const stream = computed(() => ({
     think: response.think.value ? parse(response.think.value) : null,
@@ -35,19 +35,24 @@ const spoiler = ref(false)
 
 <template>
     <div>
-        <div>
+        <div class="space-y-2">
             <div v-if="stream.status === 0">
                 <Badge class="text-sm" variant="outline">
                     <Spinner class="!text-lg mr-1"/>
                     <span>Загрузка<AnimatedDots/></span>
                 </Badge>
             </div>
-            <div v-if="[1, 2].includes(stream.status) || stream.think || thinking">
+            <div v-else-if="[1, 2].includes(stream.status) || stream.think || thinking">
                 <Badge class="cursor-pointer text-sm" variant="outline" @click="spoiler = !spoiler">
                     <Spinner v-if="stream.status === 1" class="mr-1"/>
                     <span>Размышления<AnimatedDots :animating="stream.status === 2"/></span>
                 </Badge>
             </div>
+            <div v-if="stream.status === 3">
+                <Badge class="cursor-pointer p-1" variant="outline">
+                    <Spinner/>
+                </Badge>
+            </div>
         </div>
 
         <transition
@@ -58,12 +63,13 @@ const spoiler = ref(false)
             leave-from-class="scale-y-100"
             leave-to-class="scale-y-0"
         >
-            <div v-show="spoiler"
-                 class="h-[250px] origin-top transform overflow-auto bg-muted rounded-lg px-3 mt-2 border shadow-xs">
+            <div v-if="thinking ?? stream.think" v-show="spoiler"
+                 class="max-h-[250px] origin-top transform overflow-auto bg-muted rounded-lg px-3 py-2 mt-2 border shadow-xs">
                 <div class="max-w-full prose dark:prose-invert text-xs" v-html="thinking ?? stream.think"></div>
             </div>
         </transition>
-        <div class="max-w-full prose dark:prose-invert text-sm mt-2" v-html="content ?? stream.content"></div>
+        <div v-if="content ?? stream.content" class="max-w-full prose dark:prose-invert text-sm mt-2"
+             v-html="content ?? stream.content"></div>
     </div>
 </template>
 

+ 21 - 63
resources/js/Composables/useInferenceResponse.js

@@ -1,43 +1,35 @@
-import {ref} from 'vue'
+import {computed, reactive, ref} from 'vue'
 
-class ChunkParser {
-    constructor(tags, name = 'inference_rag_') {
-        this.name = name
-        this.tags = tags
-        this.chunks = []
-    }
-
-    append(chunk, index) {
-        this.chunks.push({index: index, chunk: chunk})
-    }
+export function useInferenceResponse() {
+    const status = ref(null)
+    const think = computed(() => buffer('think'))
+    const content = computed(() => buffer('content'))
 
-    parse(handlers) {
-        const buffer = this.buffer()
+    let chunks = reactive({'think': [], 'content': []})
 
-        this.tags.forEach(tag => {
-            const positions = this.position(buffer, tag)
-            let string = null
+    function append(chunk, index, type) {
+        setStatus(type)
+        type = type === 2 ? 'think' : 'content'
 
-            if(positions.length > 0) {
-                const length = `<|${this.name}${tag}|>`.length
-                string = buffer.substring(positions[0] + length, positions[1] || buffer.length)
-            }
+        chunks[type].push({index: index, chunk: chunk})
+    }
 
-            handlers[tag]?.(string)
-        })
+    function setStatus(value) {
+        status.value = value
     }
 
-    position(input, tag) {
-        const regex = new RegExp(`<\\|${this.name}${tag}\\|>`, 'g')
-        return [...input.matchAll(regex)].map(match => match.index)
+    function reset() {
+        status.value = null
+        chunks.think = []
+        chunks.content = []
     }
 
-    buffer() {
-        if (!this.chunks.length) return '';
+    function buffer(type) {
+        if (!chunks[type].length) return '';
 
-        const [{index: firstIndex}] = this.chunks.slice().sort((a, b) => a.index - b.index);
+        const [{index: firstIndex}] = chunks[type].slice().sort((a, b) => a.index - b.index);
 
-        return this.chunks
+        return chunks[type]
             .slice()
             .sort((a, b) => a.index - b.index)
             .filter(({index}, i) => index === firstIndex + i)
@@ -45,39 +37,5 @@ class ChunkParser {
             .join('');
     }
 
-    reset() {
-        this.chunks = []
-    }
-}
-
-export function useInferenceResponse() {
-    const status = ref(null)
-    const think = ref(null)
-    const content = ref(null)
-
-    const parser = new ChunkParser(['think', 'content', 'fields'])
-
-    function append(chunk, index) {
-        parser.append(chunk, index)
-        parser.parse({
-            think: text => think.value = text,
-            content: text => content.value = text,
-            fields: () => {
-            },
-        })
-    }
-
-    function setStatus(value) {
-        status.value = value
-    }
-
-    function reset() {
-        think.value = null
-        content.value = null
-        status.value = null
-
-        parser.reset()
-    }
-
     return {think, content, status, append, reset, setStatus}
 }

+ 3 - 5
resources/js/Pages/Chat/View.vue

@@ -20,9 +20,7 @@ const submit = () => {
 }
 
 useEcho(`App.Models.Chat.${chat.id}`, '.MessageCreated', (e) => {
-    if (e?.data?.from !== 'user') {
-        reload()
-    }
+    if (e?.data?.role !== 'human') reload()
 });
 
 useEcho(`App.Models.Chat.${chat.id}`, '.MessageUpdated', () => reload());
@@ -47,7 +45,7 @@ onMounted(() => nextTick(() => scroll()))
             <div class="flex-1 my-2">
                 <div class="space-y-4">
                     <div v-for="message in chat.messages">
-                        <div v-if="message.from === 'user'" class="flex">
+                        <div v-if="message.role === 'human'" class="flex">
                             <div class="ml-auto border bg-card text-card-foreground p-2 rounded-lg max-w-[75%] text-sm"
                                  v-html="message.content.replaceAll('\n', '<br />')"></div>
                         </div>
@@ -55,7 +53,7 @@ onMounted(() => nextTick(() => scroll()))
                     </div>
                 </div>
             </div>
-            <div class="shrink-0 sticky bottom-2">
+            <div class="shrink-0 sticky bottom-2 mt-100">
                 <!--                // TODO: If Last Message Not In Canceled Failed or Completed -->
                 <ChatInput v-model="form.message" :disabled="form.processing" @submit="submit"/>
             </div>