Skip to content

Live audio stream #2526

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
xXxNIKIxXx opened this issue Oct 19, 2024 · 10 comments
Open

Live audio stream #2526

xXxNIKIxXx opened this issue Oct 19, 2024 · 10 comments
Labels

Comments

@xXxNIKIxXx
Copy link

What do you need help with?

Is there or will there be a possibility to just feed an pyaudio stream to stream.stream_file?
This is to get the stream:

# Parameters
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024

# Initialize PyAudio
p = pyaudio.PyAudio()

# Open stream
stream = p.open(format=FORMAT,
                channels=CHANNELS,
                rate=RATE,
                input=True,
                frames_per_buffer=CHUNK)
@postlund
Copy link
Owner

That's not something I have considered implementing. But you can pass an up.BufferedIOBase, maybe you can write an adapter that supports pyaudio through that?

@xXxNIKIxXx
Copy link
Author

I had the same Idea but always get mediafile.FileTypeError: 'stream': not in a recognized format.
Here is the function i use

import pyaudio
import threading
import io

# Parameters
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 1024

class BufferedAudioStream(io.BufferedIOBase):
    def __init__(self, format, channels, rate, chunk):
        self.p = pyaudio.PyAudio()
        self.stream = self.p.open(format=format,
                                  channels=channels,
                                  rate=rate,
                                  input=True,
                                  frames_per_buffer=chunk)
        self.chunk = chunk
        self.buffer = bytearray()
        self.lock = threading.Lock()
        self.running = True

    def read(self, size=-1):
        with self.lock:
            if size == -1:
                size = len(self.buffer)
            data = self.buffer[:size]
            self.buffer = self.buffer[size:]
            return bytes(data)

    def update_buffer(self):
        while self.running:
            data = self.stream.read(self.chunk)
            with self.lock:
                self.buffer.extend(data)

    def close(self):
        self.running = False
        self.stream.stop_stream()
        self.stream.close()
        self.p.terminate()



# Initialize BufferedAudioStream
audio_stream = BufferedAudioStream(FORMAT, CHANNELS, RATE, CHUNK)

# Start the buffer update thread
buffer_thread = threading.Thread(target=audio_stream.update_buffer)
buffer_thread.start()
            
await atv.stream.stream_file(audio_stream)

@xXxNIKIxXx
Copy link
Author

i have also tried using lameenc to convert the stream into mp3 format. But this also didn't work.

@postlund
Copy link
Owner

Metadata detection is failing (for some reason). Try passing some bogus metadata to see if that helps (it will disable auto detection).

https://pyatv.dev/development/stream/#custom-metadata

@xXxNIKIxXx
Copy link
Author

Unfortunately this didn't work for me

@postlund
Copy link
Owner

Can you provide a more detailed exception of what is happening?

@xXxNIKIxXx
Copy link
Author

Yes. When I use the code above, I get the following error:
mediafile.FileTypeError: 'stream': not in a recognized format.

This is probably because the stream doesn't have any header information to indicate what format it is. Then I tried using it as a stream reader, but I got the same error.

With the library lameenc, I tried to convert the stream into an MP3 stream, but it didn't work because the MP3 information was provided in every frame, not just once. I also tried using the metadata to set the information, but that didn’t work either.

@xXxNIKIxXx
Copy link
Author

Do you have an idea how to implement this? Or have an idea what the problem is?

@postlund
Copy link
Owner

I believe you are on to the problem already. pyatv expects a proper container format (e.g. mp3 or ogg) and cannot decode raw PCM frames at the moment. That could probably be implemented in some intricate way, but I can't say how I would do it right now. I will have to think about it a bit.

@xXxNIKIxXx
Copy link
Author

Found an approach. Now have to refine to work fully live. Now it is only possible to stream intervals of x seconds.

import asyncio
import threading
import io
import numpy as np
import sounddevice as sd
import soundfile as sf
from pyatv import scan, connect, exceptions

# ----- Parameters -----
sample_rate = 44100
channels = 1
buffer_duration = 3  # seconds
max_samples = sample_rate * buffer_duration

# ----- A simple ring buffer for audio -----
class RingBuffer:
    def __init__(self, max_samples):
        self.max_samples = max_samples
        self.buffer = []  # stores numpy arrays (chunks of audio data)
        self.total_samples = 0
        self.lock = threading.Lock()

    def append(self, data):
        with self.lock:
            self.buffer.append(data.copy())
            self.total_samples += data.shape[0]
            # Remove older chunks if we exceed max_samples
            while self.total_samples > self.max_samples:
                removed = self.buffer.pop(0)
                self.total_samples -= removed.shape[0]

    def get_buffer(self):
        with self.lock:
            if self.buffer:
                return np.concatenate(self.buffer, axis=0)
            else:
                # Return an empty array with the correct shape
                return np.empty((0, channels))

# Global ring buffer instance
ring_buffer = RingBuffer(max_samples)

# ----- Audio recording in a background thread -----
def record_audio(ring_buffer, sample_rate, channels, stop_event):
    def callback(indata, frames, time, status):
        if status:
            print(status, flush=True)
        ring_buffer.append(indata)

    with sd.InputStream(samplerate=sample_rate, channels=channels, callback=callback):
        print(f"Recording audio continuously (keeping the last {buffer_duration} seconds)...")
        while not stop_event.is_set():
            sd.sleep(100)
    print("Audio recording stopped.")

# ----- Function to encode raw PCM data to WAV in memory -----
def encode_audio_to_wav(audio_data, sample_rate, channels):
    memfile = io.BytesIO()
    # Write to an in-memory WAV file (PCM 16-bit)
    with sf.SoundFile(memfile, mode='w', samplerate=sample_rate, channels=channels,
                      format='WAV', subtype='PCM_16') as f:
        f.write(audio_data)
    return memfile.getvalue()

# ----- Async function to scan, select a device, and stream the audio -----
async def main():
    print("Scanning for devices...")
    # Scan for devices using the current running loop
    atvs = await scan(asyncio.get_running_loop(), timeout=5)
    if not atvs:
        print("No devices found.")
        return

    # Sort and list devices
    atvs = sorted(atvs, key=lambda x: x.name)
    for index, device in enumerate(atvs):
        print(f"{index}: {device.name}")

    selection = int(input("Select a device by number: "))
    selected_device = atvs[selection]

    atv = None  # So we can safely call close() later
    try:
        atv = await connect(selected_device, asyncio.get_running_loop())
        print(f"Connected to {selected_device.name}")

        print("Starting continuous audio streaming...")
        while True:
            # Get the last 3 seconds of recorded audio from the ring buffer
            audio_data = ring_buffer.get_buffer()
            if audio_data.size == 0:
                print("No audio data recorded yet!")
                await asyncio.sleep(1)
                continue

            # Ensure audio_data is 2D (frames x channels)
            if audio_data.ndim == 1:
                audio_data = audio_data.reshape(-1, channels)

            # Encode the audio data into an in-memory WAV file
            wav_data = encode_audio_to_wav(audio_data, sample_rate, channels)

            # Wrap wav_data in a BytesIO so stream_file gets a file-like object
            await atv.stream.stream_file(io.BytesIO(wav_data))
            print("Chunk streamed. Waiting for the next chunk...")

            # Wait before streaming the next chunk
            await asyncio.sleep(buffer_duration)
    except exceptions.AuthenticationError:
        print("Authentication error")
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        if atv is not None:
            await atv.close()

# ----- Main entry point -----
if __name__ == "__main__":
    # An event to signal the recording thread to stop
    stop_event = threading.Event()
    record_thread = threading.Thread(target=record_audio, args=(ring_buffer, sample_rate, channels, stop_event))
    record_thread.start()

    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Interrupted by user.")
    finally:
        stop_event.set()
        record_thread.join()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants