Skip to content

Python Implementation of Real-Time AI Conversation with PyRx #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -398,3 +398,6 @@ FodyWeavers.xsd
*.sln.iml
bak/RealtimeConversationClientRX.cs
bak/RealtimeConversationClientRX.Observables.cs

# .env file for python
.env
116 changes: 116 additions & 0 deletions src/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# RxAI: Real-Time AI Conversation with Python and PyRx

RxAI is an advanced real-time AI conversation system developed using Python and PyRx. It integrates cutting-edge AI capabilities to facilitate live, interactive dialogues. The system is designed to leverage the power of OpenAI's language models in combination with reactive programming, enabling fluid and context-aware conversations.

---

## Features

RxAI comes equipped with several powerful features that make it a versatile solution for real-time interactions:

- **Real-time AI conversation**: RxAI processes and responds to user inputs instantly, creating a seamless conversational experience similar to speaking with a human.
- **Function calling**: The system supports dynamic function calling, allowing it to execute specific actions during a conversation based on user intent. For example, it can initiate system commands or trigger other functionalities.
- **Reactive Python implementation**: RxAI leverages reactive programming paradigms using RxPY, making it highly responsive to changes in user input, updates, or any other asynchronous event. This ensures smooth handling of multiple streams of information in real time.

---

## Screenshots

![Screenshot about a joke and calling 'goodbye' function.](example.jpg)

---

## Project Structure

- `rtclient`: Provided by Microsoft ([link](https://github.com/Azure-Samples/aoai-realtime-audio-sdk/tree/6779885d3aaa2ddbed2bbc5dbba74da8cddffca1/python/rtclient)), this package offers low-level components to support real-time AI conversations. It includes:
- **Session Handling**: Manages conversation sessions, including starting, updating, and ending sessions.
- **Message Management**: Defines and handles various message types exchanged during conversations.
- **Testing Utilities**: Includes `client_test.py` to ensure correct behavior of the real-time client.
- **Model Definitions**: Provides models such as `SessionUpdateMessage` to represent various data structures used in the system.

- `conversation_client.py`: Manages the conversation using `RTClient` and `RxPY` to facilitate real-time AI interactions. It handles:
- **Session Initialization**: Configures conversation parameters such as turn detection and input transcription.
- **Event Handling**: Processes various types of events including audio input and AI responses.
- **Function Calls**: Manages function calls initiated during conversation, and notifies when function calls start and finish.
- **Subscriptions**: Allows external subscriptions to events like transcription updates, audio updates, and error notifications.

- `microphone_stream.py`: Captures microphone input for real-time audio processing, providing an audio stream for interaction with the AI.

- `speaker_output.py`: Manages audio playback for the AI's response. It includes:
- **Audio Playback**: Buffers and plays back audio data.
- **Clearing Playback**: Clears the audio buffer and stops playback as needed.
- **Dispose Method**: Closes the audio stream to release resources when no longer needed.

- `main.py`: The main script that initializes and runs the conversation client, serving as the entry point for the project. It handles the overall flow of the AI conversation.

- `main.ipynb`: A Jupyter Notebook version of the AI conversation demo, useful for interactive exploration, debugging, or showcasing features step-by-step.

---

## Installation

1. **Create a virtual environment (e.g., `your_venv`)**:

```bash
python -m venv your_venv
```

2. **Activate the virtual environment**:

- **Windows**: `your_venv/Scripts/activate`
- **Linux/Mac**: `source your_venv/bin/activate`

3. **Install requirements**:

```bash
pip install -r requirements.txt
```

4. **Install `rtclient`**:

```bash
pip install -e .
```

5. **Set API Key**

Set `OPENAI_API_KEY` in the `.env` file.

---

## Running the Demo

- Run `main.py` with Python:
```bash
python main.py
```
- Alternatively, open and run `main.ipynb` in Jupyter Notebook.

---

## Tips

- To exit the conversation, simply say **"Goodbye"**. This triggers function calling to quit the application.
- Avoid feedback loops by muting your speakers or using headphones to prevent the AI's output from being picked up by the microphone.

---

## How It Works

RxAI operates by integrating real-time audio input with advanced language models to facilitate dynamic and context-aware conversations. Here's a step-by-step overview of how the system works:

1. **Audio Capture**: The `microphone_stream.py` module captures audio input from the user in real time. The audio is streamed to the `conversation_client.py` for processing.

2. **Audio Processing and Transcription**: Using `RTClient`, the captured audio is converted into text transcriptions. These transcriptions are managed through reactive streams using RxPY, enabling efficient handling of updates and modifications.

3. **Real-Time AI Interaction**: Once the transcription is available, the input is passed to OpenAI's language model via the `conversation_client.py`. The AI processes the input to generate a response based on the current context of the conversation.

4. **Function Calling**: Depending on user intent, the system may identify the need to trigger specific actions. This is managed through dynamic function calling, where certain commands are executed automatically during the conversation. Examples include ending the conversation or interfacing with external systems.

5. **Audio Response Playback**: The generated AI response is either played back as audio using the `speaker_output.py` module or displayed as text. This ensures a seamless and interactive experience for the user.

6. **Reactive Updates**: The entire workflow leverages reactive programming to handle changes and updates in real time. This means that as new input or events occur, the system adapts instantly, making the conversation fluid and natural.

7. **Session Management**: The `rtclient` module provided by Microsoft helps manage the session lifecycle, handling configurations, event subscriptions, and maintaining consistent communication throughout the conversation.

Overall, RxAI combines advanced language processing with real-time input/output handling and reactive programming to create a highly interactive and responsive conversational experience.
112 changes: 112 additions & 0 deletions src/python/conversation_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import asyncio
from rtclient import RTInputAudioItem, RTResponse
from rx.subject import Subject
from rx import operators as ops


class ConversationClient:
"""Manages the conversation using RTClient and RxPY."""

def __init__(self, client):
self.client = client
self.input_transcription_updates = Subject()
self.output_transcription_updates = Subject()
self.output_transcription_delta_updates = Subject()
self.function_call_started = Subject()
self.function_call_finished = Subject()
self.error_messages = Subject()
self.audio_delta_updates = Subject()
self.speech_started = Subject()
self.stop_event = asyncio.Event()

async def initialize_session(self, options, function_definitions):
await self.client.configure(
turn_detection=options['turn_detection'],
input_audio_transcription=options['input_audio_transcription'],
tools=function_definitions,
)

async def start(self):
try:
async for event in self.client.events():
if isinstance(event, RTInputAudioItem):
self.speech_started.on_next(None)
await event
if event.transcript is not None:
self.input_transcription_updates.on_next(event)
elif isinstance(event, RTResponse):
await self._handle_response(event)
except Exception as e:
self.error_messages.on_next(str(e))
self.stop_event.set()
finally:
self.stop_event.set()

async def _handle_response(self, response):
async for item in response:
if item.type == "message":
await self._handle_message_item(item)
elif item.type == "function_call":
self.function_call_started.on_next(item)
await item
self.function_call_finished.on_next(item)

if item.function_name == "user_wants_to_finish_conversation":
self.stop_event.set()
else:
pass

async def _handle_message_item(self, item):
async for content_part in item:
if content_part.type == "audio":
async for audio_chunk in content_part.audio_chunks():
self.audio_delta_updates.on_next(audio_chunk)

transcript = ""
async for transcript_chunk in content_part.transcript_chunks():
transcript += transcript_chunk
self.output_transcription_updates.on_next(transcript)
elif content_part.type == "text":
text_data = ""
async for text_chunk in content_part.text_chunks():
text_data += text_chunk
self.output_transcription_delta_updates.on_next(text_chunk)
self.output_transcription_updates.on_next(text_data)

def subscribe_events(self, scheduler):
self.input_transcription_updates.pipe(
ops.observe_on(scheduler)
).subscribe(lambda t: print(f"User: {t.transcript.strip('\n')}"))

self.output_transcription_delta_updates.pipe(
ops.observe_on(scheduler)
).subscribe(lambda t: print(f"{t}", end='', flush=True))

self.output_transcription_updates.pipe(
ops.observe_on(scheduler)
).subscribe(lambda t: print(f"Assistant: {t}\n"))

self.function_call_started.pipe(
ops.observe_on(scheduler)
).subscribe(lambda f: print(f"Function call started: {f.function_name}({f.arguments})"))

self.function_call_finished.pipe(
ops.observe_on(scheduler)
).subscribe(
lambda f: print(
f"Function call finished: {getattr(f, 'result', None) if getattr(f, 'result', None) is not None else f.function_name}"
)
)

self.error_messages.pipe(
ops.observe_on(scheduler)
).subscribe(lambda msg: print(f"Error: {msg}"))

def subscribe_audio(self, speaker_output):
self.audio_delta_updates.subscribe(
lambda audio_chunk: speaker_output.enqueue_for_playback(audio_chunk)
)

self.speech_started.subscribe(
lambda _: speaker_output.clear_playback()
)
Binary file added src/python/example.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
102 changes: 102 additions & 0 deletions src/python/main.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import asyncio\n",
"import os\n",
"from dotenv import load_dotenv\n",
"from rtclient import RTClient, ServerVAD, InputAudioTranscription\n",
"from conversation_client import ConversationClient\n",
"from microphone_stream import MicrophoneAudioStream\n",
"from speaker_output import SpeakerOutput\n",
"from azure.core.credentials import AzureKeyCredential\n",
"from rx.scheduler.eventloop import AsyncIOScheduler\n",
"import nest_asyncio\n",
"\n",
"nest_asyncio.apply()\n",
"\n",
"load_dotenv()\n",
"\n",
"async def main():\n",
" key = os.environ.get(\"OPENAI_API_KEY\")\n",
" if not key:\n",
" raise ValueError(\"Please set the OPENAI_API_KEY environment variable in your .env file.\")\n",
"\n",
" model = os.environ.get(\"OPENAI_MODEL\")\n",
" async with RTClient(key_credential=AzureKeyCredential(key), model=model) as client:\n",
" print(\"Configuring Session...\")\n",
" await client.configure(instructions=\"You are a helpful and friendly AI assistant.\") \n",
" conversation_client = ConversationClient(client)\n",
"\n",
" print(\" >>> Listening to microphone input\")\n",
" print(\" >>> (Say 'stop' or 'goodbye' to end the conversation)\\n\")\n",
"\n",
" finish_conversation_tool = {\n",
" \"type\": \"function\",\n",
" \"name\": \"user_wants_to_finish_conversation\",\n",
" \"description\": \"Invoked when the user says 'goodbye' explicitly.\",\n",
" \"parameters\": {\n",
" \"type\": \"object\",\n",
" \"properties\": {},\n",
" \"required\": []\n",
" }\n",
" }\n",
"\n",
" options = {\n",
" 'turn_detection': ServerVAD(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=200),\n",
" 'input_audio_transcription': InputAudioTranscription(model=\"whisper-1\"),\n",
" }\n",
" await conversation_client.initialize_session(options, [finish_conversation_tool])\n",
"\n",
" loop = asyncio.get_event_loop()\n",
" asyncio_scheduler = AsyncIOScheduler(loop)\n",
"\n",
" conversation_client.subscribe_events(asyncio_scheduler)\n",
"\n",
" speaker_output = SpeakerOutput()\n",
" conversation_client.subscribe_audio(speaker_output)\n",
"\n",
" microphone_stream = MicrophoneAudioStream()\n",
" microphone_stream.subscribe_audio(client)\n",
"\n",
" microphone_stream.start()\n",
" asyncio.create_task(conversation_client.start())\n",
"\n",
" await conversation_client.stop_event.wait()\n",
"\n",
" microphone_stream.stop()\n",
" speaker_output.dispose()\n",
" await client.close()\n",
" print(\"Conversation ended.\")\n",
"\n",
"if __name__ == \"__main__\":\n",
" await main()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "speech-to-speech",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Loading