[ad_1]
When beginning the workflow from the terminal, it’s simple to see which step it’s executing and the logging we put in these steps.
We will additionally allow the human-in-the-loop interplay by merely utilizing user_feedback = enter()
within the workflow. This can pause the workflow and anticipate the consumer enter (See the human-in-the-loop instance on this official Llamaindex pocket book). Nonetheless, to have the ability to obtain the identical performance in a user-friendly interface, we’d like further modifications to the unique workflow.
Workflow can take a very long time to execute, so for a greater consumer expertise, Llamaindex supplied a technique to ship streaming occasions to point the progress of the workflow, as proven within the pocket book right here. In my workflow, I outline a WorkflowStreamingEvent
class to incorporate helpful details about the occasion message, corresponding to the kind of the occasion, and from which step it’s despatched:
class WorkflowStreamingEvent(BaseModel):
event_type: Literal["server_message", "request_user_input"] = Subject(
..., description="Sort of the occasion"
)
event_sender: str = Subject(
..., description="Sender (workflow step identify) of the occasion"
)
event_content: Dict[str, Any] = Subject(..., description="Content material of the occasion")
To allow sending streaming occasions, the workflow step must have entry to the shared context, which is completed by including @step(pass_context=True)
decorator to the step definition. Then within the step definition, we will ship occasion messages concerning the progress by means of the context. For instance, within the tavily_query()
step:
@step(pass_context=True)
async def tavily_query(self, ctx: Context, ev: StartEvent) -> TavilyResultsEvent:
ctx.knowledge["research_topic"] = ev.user_query
question = f"arxiv papers concerning the state-of-the-art of {ev.user_query}"
ctx.write_event_to_stream(
Occasion(
msg=WorkflowStreamingEvent(
event_type="server_message",
event_sender=examine.currentframe().f_code.co_name,
event_content={"message": f"Querying Tavily with: '{question}'"},
).model_dump()
)
)
On this instance, we set the event_type
to be “server_message”
. It signifies that it’s an replace message and no consumer motion is required. We now have one other kind of occasion "request_user_input"
that signifies a consumer enter is required. For instance, within the gather_feedback_outline()
step within the workflow, after producing the slide textual content outlines from the unique paper abstract, a message is distributed to immediate the consumer to offer approval and suggestions on the define textual content:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
"""Current consumer the unique paper abstract and the outlines generated, collect suggestions from consumer"""
...# Ship a particular occasion indicating that consumer enter is required
ctx.write_event_to_stream(
Occasion(
msg=json.dumps(
{
"event_type": "request_user_input",
"event_sender": examine.currentframe().f_code.co_name,
"event_content": {
"abstract": ev.abstract,
"define": ev.define.dict(),
"message": "Do you approve this define? If not, please present suggestions.",
},
}
)
)
)
...
These occasions are dealt with in another way within the backend API and the frontend logic, which I’ll describe intimately within the later sections of this text.
When sending a "request_user_input"
occasion to the consumer, we solely need to proceed to the subsequent step after now we have acquired the consumer enter. As proven within the workflow diagram above, it both proceeds to the outlines_with_layout()
step if the consumer approves the define, or to the summary2outline()
step once more if the consumer doesn’t approve.
That is achieved utilizing the Future()
object from Python’s asyncio
library. Within the SlideGenerationWorkflow
class, we set an attribute self.user_input_future = asyncio.Future()
that may be waited on within the gather_feedback_outline()
step. The following execution of the workflow is conditioned on the content material of the consumer suggestions:
@step(pass_context=True)
async def gather_feedback_outline(
self, ctx: Context, ev: OutlineEvent
) -> OutlineFeedbackEvent | OutlineOkEvent:
...# Watch for consumer enter
if not self.user_input_future.achieved():
user_response = await self.user_input_future
logger.data(f"gather_feedback_outline: Received consumer response: {user_response}")
# Course of user_response, which ought to be a JSON string
strive:
response_data = json.hundreds(user_response)
approval = response_data.get("approval", "").decrease().strip()
suggestions = response_data.get("suggestions", "").strip()
besides json.JSONDecodeError:
# Deal with invalid JSON
logger.error("Invalid consumer response format")
increase Exception("Invalid consumer response format")
if approval == ":materials/thumb_up:":
return OutlineOkEvent(abstract=ev.abstract, define=ev.define)
else:
return OutlineFeedbackEvent(
abstract=ev.abstract, define=ev.define, suggestions=suggestions
)
We arrange the backend utilizing fastAPI, expose a POST endpoint to deal with requests, and provoke the workflow run. The asynchronous perform run_workflow_endpoint()
takes ResearchTopic
as enter. Within the perform, an asynchronous generator event_generator()
is outlined, which creates a process to run the workflow and streams the occasions to the shopper because the workflow progresses. When the workflow finishes, it is going to additionally stream the ultimate file outcomes to the shopper.
class ResearchTopic(BaseModel):
question: str = Subject(..., instance="instance question")@app.put up("/run-slide-gen")
async def run_workflow_endpoint(matter: ResearchTopic):
workflow_id = str(uuid.uuid4())
wf = SummaryAndSlideGenerationWorkflow(wid=workflow_id, timeout=2000, verbose=True)
wf.add_workflows(
summary_gen_wf=SummaryGenerationWorkflow(
wid=workflow_id, timeout=800, verbose=True
)
)
wf.add_workflows(
slide_gen_wf=SlideGenerationWorkflow(
wid=workflow_id, timeout=1200, verbose=True
)
)
async def event_generator():
loop = asyncio.get_running_loop()
logger.debug(f"event_generator: loop id {id(loop)}")
yield f"{json.dumps({'workflow_id': workflow_id})}nn"
process = asyncio.create_task(wf.run(user_query=matter.question))
logger.debug(f"event_generator: Created process {process}")
strive:
async for ev in wf.stream_events():
logger.data(f"Sending message to frontend: {ev.msg}")
yield f"{ev.msg}nn"
await asyncio.sleep(0.1) # Small sleep to make sure correct chunking
final_result = await process
# Assemble the obtain URL
download_pptx_url = f"http://backend:80/download_pptx/{workflow_id}"
download_pdf_url = f"http://backend:80/download_pdf/{workflow_id}"
final_result_with_url = {
"consequence": final_result,
"download_pptx_url": download_pptx_url,
"download_pdf_url": download_pdf_url,
}
yield f"{json.dumps({'final_result': final_result_with_url})}nn"
besides Exception as e:
error_message = f"Error in workflow: {str(e)}"
logger.error(error_message)
yield f"{json.dumps({'occasion': 'error', 'message': error_message})}nn"
lastly:
# Clear up
workflows.pop(workflow_id, None)
return StreamingResponse(event_generator(), media_type="textual content/event-stream")
Along with this endpoint, there are endpoints for receiving consumer enter from the shopper and dealing with file obtain requests. Since every workflow is assigned a singular workflow ID, we will map the consumer enter acquired from the shopper to the proper workflow. By name the set_result()
on the awaiting Future
, the pending workflow can resume execution.
@app.put up("/submit_user_input")
async def submit_user_input(knowledge: dict = Physique(...)):
workflow_id = knowledge.get("workflow_id")
user_input = knowledge.get("user_input")
wf = workflows.get(workflow_id)
if wf and wf.user_input_future:
loop = wf.user_input_future.get_loop() # Get the loop from the longer term
logger.data(f"submit_user_input: wf.user_input_future loop id {id(loop)}")
if not wf.user_input_future.achieved():
loop.call_soon_threadsafe(wf.user_input_future.set_result, user_input)
logger.data("submit_user_input: set_result known as")
else:
logger.data("submit_user_input: future already achieved")
return {"standing": "enter acquired"}
else:
increase HTTPException(
status_code=404, element="Workflow not discovered or future not initialized"
)
The obtain endpoint additionally identifies the place the ultimate file is situated primarily based on the workflow ID.
@app.get("/download_pptx/{workflow_id}")
async def download_pptx(workflow_id: str):
file_path = (
Path(settings.WORKFLOW_ARTIFACTS_PATH)
/ "SlideGenerationWorkflow"
/ workflow_id
/ "closing.pptx"
)
if file_path.exists():
return FileResponse(
path=file_path,
media_type="utility/vnd.openxmlformats-officedocument.presentationml.presentation",
filename=f"closing.pptx",
)
else:
increase HTTPException(status_code=404, element="File not discovered")
Within the frontend web page, after the consumer submits the analysis matter by means of st.text_input()
, a long-running course of is began in a background thread in a brand new occasion loop for receiving the streamed occasions from the backend, with out interfering with the remainder of the web page:
def start_long_running_task(url, payload, message_queue, user_input_event):
strive:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(
get_stream_data(url, payload, message_queue, user_input_event)
)
loop.shut()
besides Exception as e:
message_queue.put(("error", f"Exception in background thread: {str(e)}"))...
def primary():
...
with st.sidebar:
with st.type(key="slide_gen_form"):
question = st.text_input(
"Enter the subject of your analysis:",
)
submit_button = st.form_submit_button(label="Submit")
if submit_button:
# Reset the workflow_complete flag for a brand new workflow
st.session_state.workflow_complete = False
# Begin the long-running process in a separate thread
if (
st.session_state.workflow_thread is None
or not st.session_state.workflow_thread.is_alive()
):
st.write("Beginning the background thread...")
st.session_state.workflow_thread = threading.Thread(
goal=start_long_running_task,
args=(
"http://backend:80/run-slide-gen",
{"question": question},
st.session_state.message_queue,
st.session_state.user_input_event,
),
)
st.session_state.workflow_thread.begin()
st.session_state.received_lines = []
else:
st.write("Background thread is already working.")
The occasion knowledge streamed from the backend is fetched by httpx.AsyncClient
and put right into a message queue for additional processing. Completely different data is extracted relying on the occasion varieties. For occasion kind “request_user_input”
, the thread can be paused till the consumer enter is supplied.
async def fetch_streaming_data(url: str, payload: dict = None):
async with httpx.AsyncClient(timeout=1200.0) as shopper:
async with shopper.stream("POST", url=url, json=payload) as response:
async for line in response.aiter_lines():
if line:
yield lineasync def get_stream_data(url, payload, message_queue, user_input_event):
# message_queue.put(("message", "Beginning to fetch streaming knowledge..."))
data_json = None
async for knowledge in fetch_streaming_data(url, payload):
if knowledge:
strive:
data_json = json.hundreds(knowledge)
if "workflow_id" in data_json:
# Ship workflow_id to primary thread
message_queue.put(("workflow_id", data_json["workflow_id"]))
proceed
elif "final_result" in data_json:
# Ship final_result to primary thread
message_queue.put(("final_result", data_json["final_result"]))
proceed
event_type = data_json.get("event_type")
event_sender = data_json.get("event_sender")
event_content = data_json.get("event_content")
if event_type in ["request_user_input"]:
# Ship the message to the primary thread
message_queue.put(("user_input_required", data_json))
# Wait till consumer enter is supplied
user_input_event.wait()
user_input_event.clear()
proceed
else:
# Ship the road to the primary thread
message_queue.put(("message", format_workflow_info(data_json)))
besides json.JSONDecodeError: # todo: is that this vital?
message_queue.put(("message", knowledge))
if data_json and "final_result" in data_json or "final_result" in str(knowledge):
break # Cease processing after receiving the ultimate consequence
We retailer the messages within the st.session_state
and use a st.expander()
to show and replace these streamed knowledge.
if st.session_state.received_lines:
with expander_placeholder.container():
# Create or replace the expander with the most recent truncated line
expander = st.expander(st.session_state.expander_label)
for line in st.session_state.received_lines:
expander.write(line)
expander.divider()
To make sure the UI stays responsive and shows the occasion messages when they’re being processed in a background thread, we use a customed autorefresh part to refresh the web page at a set interval:
if not st.session_state.workflow_complete:
st_autorefresh(interval=2000, restrict=None, key="data_refresh")
When the streamed occasion is of kind “request_user_input”
, we’ll show associated data in a separate container and collect consumer suggestions. As there will be a number of occasions that require consumer enter from one workflow run, we put them in a message queue and ensure to assign a singular key to the st.suggestions()
, st.text_area()
and st.button()
which can be linked to every occasion to make sure the widgets don’t intervene with one another:
def gather_outline_feedback(placeholder):
container = placeholder.container()
with container:
if st.session_state.user_input_required:
knowledge = st.session_state.user_input_prompt
event_type = knowledge.get("event_type")
if event_type == "request_user_input":
abstract = knowledge.get("event_content").get("abstract")
define = knowledge.get("event_content").get("define")
prompt_message = knowledge.get("event_content").get(
"message", "Please overview the define."
)# show the content material for consumer enter
st.markdown("## Authentic Abstract:")
st.text_area("Abstract", abstract, disabled=True, top=400)
st.divider()
st.markdown("## Generated Slide Define:")
st.json(define)
st.write(prompt_message)
# Outline distinctive keys for widgets
current_prompt = st.session_state.prompt_counter
approval_key = f"approval_state_{current_prompt}"
feedback_key = f"user_feedback_{current_prompt}"
# Show the approval suggestions widget
approval = st.suggestions("thumbs", key=approval_key)
st.write(f"Present Approval state is: {approval}")
logging.data(f"Present Approval state is: {approval}")
# Show the suggestions textual content space
suggestions = st.text_area(
"Please present suggestions if in case you have any:", key=feedback_key
)
# Deal with the submission of consumer response
if st.button(
"Submit Suggestions", key=f"submit_response_{current_prompt}"
):
if not st.session_state.user_response_submitted:
# Retrieve approval and suggestions utilizing distinctive keys
approval_state = st.session_state.get(approval_key)
user_feedback = st.session_state.get(feedback_key, "")
# Guarantee approval_state is legitimate
if approval_state not in [0, 1]:
st.error("Please choose an approval possibility.")
return
user_response = {
"approval": (
":materials/thumb_down:"
if approval_state == 0
else ":materials/thumb_up:"
),
"suggestions": user_feedback,
}
# Ship the consumer's response to the backend
strive:
response = requests.put up(
"http://backend:80/submit_user_input",
json={
"workflow_id": st.session_state.workflow_id,
"user_input": json.dumps(user_response),
},
)
response.raise_for_status()
logging.data(
f"Backend response for submitting approval: {response.status_code}"
)
besides requests.RequestException as e:
st.error(f"Didn't submit consumer enter: {str(e)}")
return
...
Ultimately, when the workflow run lastly finishes, the frontend shopper will get a response that comprises the trail to the ultimate generated recordsdata (identical slide deck in pdf format for rendering within the UI and pptx format for downloading as the ultimate consequence). We show the pdf file and create a button for downloading the pptx file:
if "download_url_pdf" in st.session_state and st.session_state.download_url_pdf:
download_url_pdf = st.session_state.download_url_pdf
strive:
# Fetch the PDF content material
pdf_response = requests.get(download_url_pdf)
pdf_response.raise_for_status()
st.session_state.pdf_data = pdf_response.content materialst.markdown("### Generated Slide Deck:")
# Show the PDF utilizing an iframe
st.markdown(
f'',
unsafe_allow_html=True,
)
besides Exception as e:
st.error(f"Didn't load the PDF file: {str(e)}")
# Present the obtain button for PPTX if out there
if (
"download_url_pptx" in st.session_state
and st.session_state.download_url_pptx
):
download_url_pptx = st.session_state.download_url_pptx
strive:
# Fetch the PPTX content material
pptx_response = requests.get(download_url_pptx)
pptx_response.raise_for_status()
pptx_data = pptx_response.content material
st.download_button(
label="Obtain Generated PPTX",
knowledge=pptx_data,
file_name="generated_slides.pptx",
mime="utility/vnd.openxmlformats-officedocument.presentationml.presentation",
)
besides Exception as e:
st.error(f"Didn't load the PPTX file: {str(e)}")
We are going to create a multi-service Docker utility with docker-compose
to run the frontend and backend apps.
model: '3.8'companies:
backend:
construct:
context: ./backend
args:
- --no-cache
ports:
- "8000:80"
networks:
- app-network
volumes:
- .env:/app/.env
- ./knowledge:/app/knowledge
- ./workflow_artifacts:/app/workflow_artifacts
- ~/.azure:/root/.azure
frontend:
construct:
context: ./frontend
args:
- --no-cache
ports:
- "8501:8501"
networks:
- app-network
networks:
app-network:
That’s it! Simply run docker-compose up
, and we now have an app that may run a analysis workflow primarily based on the consumer’s enter question, immediate the consumer for suggestions through the execution, and show the ultimate consequence to the consumer.
[ad_2]
Lingzhen Chen
2024-09-24 05:32:19
Source hyperlink:https://towardsdatascience.com/building-an-interactive-ui-for-llamaindex-workflows-842dd7abedde?source=rss—-7f60cf5620c9—4