В предыдущей публикации демонстрировалось использование HTMX для добавления интерактивности к элементам HTML. Таким образом, достигается эффект JavaScript без применения JavaScript. Для примера начиналась разработка простого чата, который возвращает имитированный ответ от модели машинного обучения. В этой статье расширяются возможности чатбота за счет добавления нескольких функций, включая потоковую передачу, которая существенно улучшает пользовательский опыт по сравнению с ранее созданным синхронным чатом.
- ✅ Потоковая передача в реальном времени с помощью SSE
- ✅ Архитектура на основе сессий для поддержки нескольких пользователей
- ✅ Асинхронная координация с использованием asyncio.Queue
- ✅ Чистые шаблоны HTMX с выделенной обработкой SSE
- ✅ Агент для поиска в Google, отвечающий на запросы с актуальными данными
- ✅ Практически нулевое использование JavaScript
Вот что будет создано в этой части:

Переход от синхронной к асинхронной связи
Предыдущая реализация опиралась на базовые возможности веб-форм. Обмен данными происходил синхронно, то есть ответ от сервера поступал только после полного завершения обработки. Запрос отправлялся, ожидался полный ответ, после чего он отображался. В промежутке между ними просто наступало ожидание.
Современные чатботы функционируют иначе, обеспечивая асинхронный обмен данными. Это достигается через потоковую передачу: обновления и частичные ответы приходят постепенно, без необходимости ждать полного результата. Такой подход особенно полезен, когда генерация ответа занимает время, что характерно для моделей машинного обучения при создании длинных текстов.
SSE против WebSockets
SSE (Server-Sent Events) и WebSockets представляют собой протоколы для обмена данными в реальном времени между клиентом и сервером.
WebSockets обеспечивают двустороннюю связь: браузер и сервер могут одновременно отправлять и получать данные. Это применяется в онлайн-играх, чатах и инструментах совместной работы, таких как Google Sheets.
SSE работает в одном направлении, позволяя передачу только от сервера к клиенту. Клиент не может отправлять данные по этому протоколу. Если WebSockets сравнить с двусторонним телефонным разговором, где собеседники говорят и слушают одновременно, то SSE похож на прослушивание радио. SSE часто используется для уведомлений, обновления графиков в финансовых приложениях или лент новостей.
Почему предпочтение отдается SSE? В данном сценарии не требуется двусторонняя связь, и стандартный HTTP (в отличие от WebSockets) полностью удовлетворяет потребности: отправка данных и получение ответа. SSE просто обеспечивает получение данных в виде потока, и ничего дополнительного не нужно.
Что предстоит реализовать
- Пользователь вводит запрос
- Сервер принимает запрос и передает его модели машинного обучения
- Модель начинает генерировать контент
- Сервер немедленно возвращает каждую порцию контента
- Браузер добавляет эту порцию в DOM
Работа разделяется на серверную и клиентскую части.
Серверная часть
Серверная логика состоит из двух этапов:
- POST-эндпоинт, который принимает сообщение и ничего не возвращает
- GET-эндпоинт, который читает очередь и генерирует потоковый вывод
Для демонстрации сначала создается имитация ответа модели путем повторения ввода пользователя, так что слова в потоке будут идентичны запросу.
Чтобы сохранить порядок, потоки сообщений (очереди) разделяются по сессиям пользователей, иначе разговоры перемешаются. Для этого вводится словарь сессий, содержащий очереди.
Далее серверу нужно указать ожидание заполнения очереди перед началом потоковой передачи ответа. Без этого возникнут проблемы с параллелизмом или таймингом: SSE запустится на клиенте, очередь окажется пустой, SSE закроется, а ввод сообщения произойдет слишком поздно.
Решение — асинхронные очереди! Использование асинхронных очередей дает несколько преимуществ:
- Если в очереди есть данные: возвращает их сразу
- Если очередь пуста: приостанавливает выполнение до вызова queue.put()
- Несколько потребителей: каждый получает свои данные
- Безопасность потоков: отсутствие гонок
Вот код для детального изучения:
from fastapi import FastAPI, Request, Form from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse, StreamingResponse import asyncio import time import uuid app = FastAPI() templates = Jinja2Templates("templates") # This object will store session id and their corresponding value, an async queue. sessions = dict() @app.get("/") async def root(request: Request): session_id = str(uuid.uuid4()) sessions[session_id] = asyncio.Queue() return templates.TemplateResponse(request, "index.html", context={"session_id": session_id}) @app.post("/chat") async def chat(request: Request, query: str=Form(...), session_id: str=Form(...)): """ Send message to session-based queue """ # Create the session if it does not exist if session_id not in sessions: sessions[session_id] = asyncio.Queue() # Put the message in the queue await sessions[session_id].put(query) return {"status": "queued", "session_id": session_id} @app.get("/stream/{session_id}") async def stream(session_id: str): async def response_stream(): if session_id not in sessions: print(f"Session {session_id} not found!") return queue = sessions[session_id] # This BLOCKS until data arrives print(f"Waiting for message in session {session_id}") data = await queue.get() print(f"Got message: {data}") message = "" await asyncio.sleep(1) for token in data.replace("\n", " ").split(" "): message += token + " " data = f"""data: <li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div>{message}</div></li>\n\n""" yield data await asyncio.sleep(0.03) queue.task_done() return StreamingResponse(response_stream(), media_type="text/event-stream") Разберем ключевые аспекты.
Изоляция сессий
Каждый пользователь должен иметь собственную очередь сообщений, чтобы избежать смешения диалогов. Это реализуется через словарь сессий. В производственных приложениях вместо этого обычно применяется Redis. В коде новый идентификатор сессии генерируется при загрузке страницы и сохраняется в словаре сессий. Перезагрузка страницы инициирует новую сессию, поскольку очереди сообщений не сохраняются, но это можно реализовать через базу данных, как описано в следующей части.
# This object will store session id and their corresponding value, an async queue. sessions = dict() @app.get("/") async def root(request: Request): session_id = str(uuid.uuid4()) sessions[session_id] = asyncio.Queue() return templates.TemplateResponse(request, "index.html", context={"session_id": session_id}) Блокирующая координация
Необходимо контролировать последовательность отправки SSE и получения запроса пользователя. На серверной стороне порядок следующий:
- Получить сообщение от пользователя
- Создать очередь сообщений и заполнить ее
- Отправить сообщения из очереди в потоковом ответе
Нарушение этого порядка может вызвать нежелательное поведение, например, чтение пустой очереди сообщений перед ее заполнением запросом пользователя.
Для контроля последовательности применяется asyncio.Queue. Этот объект используется дважды:
- При вставке новых сообщений в очередь. Вставка сообщений пробуждает опрос в эндпоинте SSE
await sessions[session_id].put(query)- При извлечении сообщений из очереди. В этой строке код блокируется до сигнала от очереди о наличии новых данных:
data = await queue.get()Этот шаблон обеспечивает несколько преимуществ:
- Каждый пользователь имеет свою очередь
- Отсутствует риск условий гонки
Имитация потоковой передачи
В этой статье ответ модели имитируется путем разделения запроса пользователя на слова и возврата их по одному. В следующей части подключится реальная модель машинного обучения.
Потоковая передача обрабатывается объектом StreamingResponse из FastAPI. Этот объект ожидает асинхронный генератор, который выдает данные до завершения. Используется ключевое слово yield вместо return, иначе генератор остановится после первой итерации.
Разберем функцию потоковой передачи:
Сначала проверяется наличие очереди для текущей сессии, из которой будут извлекаться сообщения:
if session_id not in sessions: print(f"Session {session_id} not found!") return queue = sessions[session_id]Затем, получив очередь, извлекаются сообщения, если они есть, иначе выполнение приостанавливается в ожидании. Это ключевая часть функции:
# This BLOCKS until data arrives print(f"Waiting for message in session {session_id}") data = await queue.get() print(f"Got message: {data}")Для имитации потока сообщение разбивается на слова (здесь называемые tokens), добавляются задержки с помощью asyncio.sleep для симуляции генерации текста моделью. Обратите внимание, что выдаваемые данные — это строки HTML, обернутые в строку, начинающуюся с "data:". Так отправляются сообщения SSE. Можно также использовать метаданные "event:" для пометки сообщений. Пример:
event: my_custom_event data: <div>Content to swap into your HTML page.</div>Реализация в Python (для строгих разработчиков рекомендуется использовать шаблоны Jinja для рендеринга HTML вместо строк):
message = "" # First pause to let the browser display "Thinking when the message is sent" await asyncio.sleep(1) # Simulate streaming by splitting message in words for token in data.replace("\n", " ").split(" "): # We append tokens to the message message += token + " " # We wrap the message in HTML tags with the "data" metadata data = f"""data: <li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div>{message}</div></li>\n\n""" yield data # Pause to simulate the LLM generation process await asyncio.sleep(0.03) queue.task_done()Клиентская часть
Клиентская сторона выполняет две задачи: отправку запросов пользователя на сервер и прослушивание сообщений SSE по конкретному каналу (идентификатору сессии). Для этого применяется принцип "разделения обязанностей", где каждый элемент HTMX отвечает за одну функцию.
- Форма отправляет ввод пользователя
- Прослушиватель SSE управляет потоком
- Список чата отображает сообщение
Для отправки сообщений используется стандартный элемент textarea в форме. Магия HTMX проявляется ниже:
<form hx-post="/chat" hx-swap="none" hx-trigger="click from:#submitButton" hx-on::before-request=" htmx.find('#chat').innerHTML += `<li class='mb-6 justify-start max-w-[80%]'><div class='font-bold'>Me</div><div>${htmx.find('#query').value}</div></li>`; htmx.find('#chat').innerHTML += `<li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div class='text-right'>Thinking...</div></li>`; htmx.find('#query').value = ''; " ><textarea name="query" placeholder="Write a message..." rows="4"></textarea><button type="submit" >Sends</button></form>Как упоминалось в первой части, несколько атрибутов HTMX требуют пояснений:
hx-post: Эндпоинт, на который подаются данные формы.hx-swap: Установлено в none, поскольку эндпоинт не возвращает данных.hx-trigger: Указывает событие, запускающее запрос.hx-on::before-request: Небольшой фрагмент JavaScript для добавления отзывчивости. Добавляется запрос пользователя в список чата и отображается сообщение "Thinking..." во время ожидания потока SSE. Это предпочтительнее пустой страницы.
На сервер отправляются два параметра: ввод пользователя и идентификатор сессии. Таким образом, сообщение вставляется в правильную очередь на сервере.
Затем определяется компонент, специально предназначенный для прослушивания сообщений SSE.
<!-- Messages will be added to this list--><div><ul></ul></div><!-- SSE listened (message buffer)--><div hx-ext="sse" sse-connect="/stream/{{ session_id }}" sse-swap="message" hx-swap="outerHTML scroll:bottom" hx-target="#chat>li:last-child" ></div>Этот компонент подключается к эндпоинту /stream и передает идентификатор сессии для прослушивания сообщений только этой сессии. Атрибут hx-target указывает добавлять данные в последний элемент li чата. hx-swap определяет замену всего текущего li. Так работает эффект потока: замена текущего сообщения на последнее.
Примечание: для замены конкретных элементов DOM можно использовать другие методы, такие как внеполосные (OOB) замены. Они требуют указания конкретного id в DOM. В данном случае id не присваиваются элементам списка намеренно.
Реальный чатбот с использованием Google Agent Development Kit
Теперь пора заменить имитацию потокового эндпоинта на настоящую модель машинного обучения. Для этого создается агент на базе Google ADK с инструментами и памятью для получения информации и запоминания деталей разговора.
Краткое введение в агенты
Большинство знает, что такое модель машинного обучения, по крайней мере предполагается. Основной недостаток современных моделей — отсутствие доступа к информации в реальном времени: знания фиксированы на момент обучения. Другой минус — невозможность работы с данными за пределами обучающего набора, например, внутренними данными компании.
Агенты — это вид приложений ИИ, способных рассуждать, действовать и наблюдать. Рассуждение выполняется моделью как "мозгом". "Руки" агента — это "инструменты", которые могут быть:
- Функцией Python, например, для вызова API
- Сервером MCP — стандартом для подключения агентов к API через унифицированный интерфейс (например, доступ ко всем инструментам GSuite без самостоятельной реализации коннекторов)
- Другими агентами (в этом случае применяется делегирование, где маршрутизатор или главный агент управляет подагентами)
В демонстрации используется простой агент с одним инструментом: поиском в Google. Это позволит получать свежие данные и обеспечивать их надежность (по крайней мере, надеется на качество результатов Google).
В экосистеме Google ADK агенты требуют базовой информации:
- Имя и описание, в основном для документации
- Инструкции: промпт, определяющий поведение агента (использование инструментов, формат вывода, шаги и т.д.)
- Инструменты: функции, серверы MCP или агенты, доступные для достижения цели
Есть также концепции памяти и управления сессиями, но они вне текущего обсуждения.
Переходим к определению агента!
Потоковый агент поиска в Google
from google.adk.agents import Agent from google.adk.agents.run_config import RunConfig, StreamingMode from google.adk.runners import Runner from google.adk.sessions import InMemorySessionService from google.genai import types from google.adk.tools import google_search # Define constants for the agent APP_NAME = "default" # Application USER_ID = "default" # User SESSION = "default" # Session MODEL_NAME = "gemini-2.5-flash-lite" # Step 1: Create the LLM Agent root_agent = Agent( model=MODEL_NAME, name="text_chat_bot", description="A text chatbot", instruction="You are a helpful assistant. Your goal is to answer questions based on your knowledge. Use your Google Search tool to provide the latest and most accurate information", tools=[google_search] ) # Step 2: Set up Session Management # InMemorySessionService stores conversations in RAM (temporary) session_service = InMemorySessionService() # Step 3: Create the Runner runner = Runner(agent=root_agent, app_name=APP_NAME, session_service=session_service)Объект Runner выступает оркестратором между пользователем и агентом.
Далее переопределяется эндпоинт /stream. Сначала проверяется существование сессии агента, если нет — создается:
# Attempt to create a new session or retrieve an existing one try: session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) except: session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id )Затем запрос пользователя преобразуется и передается агенту асинхронно для получения потока:
# Convert the query string to the ADK Content format query = types.Content(role="user", parts=[types.Part(text=query)]) # Stream the agent's response asynchronously async for event in runner.run_async( user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE) ): Есть нюанс: при генерации ответа агент может выдать двойной перенос строки "\n\n". Это проблематично, поскольку события SSE завершаются этим символом. Двойной перенос в строке приведет к:
- Обрезке текущего сообщения
- Неправильному форматированию следующего, что остановит поток SSE
Это можно проверить самостоятельно. Для исправления применяется небольшой трюк, плюс другой для форматирования списков (используется Tailwind CSS, переопределяющий некоторые правила). Трюк:
if event.partial: message += event.content.parts[0].text # Hack here html_content = markdown.markdown(message, extensions=['fenced_code']).replace("\n", "<br/>").replace("<li>", "<li class='ml-4'>").replace("<ul>", "<ul class='list-disc'>") full_html = f"""data: <li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div>{html_content}</div></li>\n\n""" yield full_htmlТаким образом, двойные переносы не прервут поток SSE.
Полный код маршрута:
@app.get("/stream/{session_id}") async def stream(session_id: str): async def response_stream(): if session_id not in sessions: print(f"Session {session_id} not found!") return # Attempt to create a new session or retrieve an existing one try: session = await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) except: session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=session_id ) queue = sessions[session_id] # This BLOCKS until data arrives print(f"Waiting for message in session {session_id}") query = await queue.get() print(f"Got message: {query}") message = "" # Convert the query string to the ADK Content format query = types.Content(role="user", parts=[types.Part(text=query)]) # Stream the agent's response asynchronously async for event in runner.run_async( user_id=USER_ID, session_id=session.id, new_message=query, run_config=RunConfig(streaming_mode=StreamingMode.SSE) ): if event.partial: message += event.content.parts[0].text html_content = markdown.markdown(message, extensions=['fenced_code']).replace("\n", "<br/>").replace("<li>", "<li class='ml-4'>").replace("<ul>", "<ul class='list-disc'>") full_html = f"""data: <li class='mb-6 ml-[20%]'><div class='font-bold text-right'>AI</div><div>{html_content}</div></li>\n\n""" yield full_html queue.task_done() return StreamingResponse(response_stream(), media_type="text/event-stream")На этом все! Теперь можно вести диалог с чатботом.
Добавляется фрагмент CSS для форматирования блоков кода. Если чатбот генерирует фрагменты кода, их нужно правильно отображать. Вот HTML:
pre, code { background-color: black; color: lightgrey; padding: 1%; border-radius: 10px; white-space: pre-wrap; font-size: 0.8rem; letter-spacing: -1px; }Теперь чатбот может генерировать фрагменты кода:

Это впечатляет.
Обзор рабочего процесса
С помощью менее 200 строк кода создался чат с следующим процессом: потоковый ответ от сервера и красивое отображение за счет SSE и HTMX.
User types "Hello World" → Submit ├── 1. Add "Me: Hello World" to chat ├── 2. Add "AI: Thinking..." to chat ├── 3. POST /chat with message ├── 4. Server queues message ├── 5. SSE stream produces a LLM response based on the query ├── 6. Stream "AI: This" (replaces "Thinking...") ├── 7. Stream "AI: This is the answer ..." └── 8. Complete Заключение
В этой серии публикаций показано, насколько просто разрабатывать приложение-чатбот практически без JavaScript и тяжелых фреймворков JS, используя только Python и HTML. Рассмотрены темы серверного рендеринга, событий от сервера (SSE), асинхронной потоковой передачи, агентов с помощью библиотеки HTMX.
Основная цель — продемонстрировать, что веб-приложения доступны разработчикам без опыта в JavaScript. Существует веский аргумент против постоянного использования JavaScript в веб-разработке, и хотя это мощный язык, часто он применяется излишне вместо более простых и надежных методов. Дебаты о серверных против клиентских приложениях продолжаются, но надеется, что эти материалы открыли глаза некоторым и чему-то научили.