Controlling my program with TCP-commands

Hi! I’m still quite new to Python, so apologies if I’m missing something really basic…

I have this code, which has speech recognition and then sends recognized text to an API which will give a response (chatbot).
Now there are some things wrong in my flow. Where the biggest issue is that I want to remain listening to TCP commands while get_speech() is active.

As you can see in code, I’m listening to TCP-commands; with the “openMic” command you should be able to basically start the chatbot.
This will gives a welcome message from the API and then listens to your input. With the “closeMic” command I want to stop the chatbot ← which doesn’t work.

The get_speech(), function is running infinitely. The function it self runs perfect. However, I always want to be able to stop the function with the closeMic command.
It is also important that I don’t want to break the program, since the user should be able to start the function again with the “openMic” command.
I’ve tried several things like adding booleans, passing booleans… But I’m stuck.

Is there anyone who can help me out?
I’ve hidden my API and charID

import requests
import socket
import base64
import speech_recognition as sr
from playsound import playsound
 
TCP_IP = "127.0.0.1"
TCP_PORT = 9999
url = "https://api.convai.com/character/getResponse"
 
def get_speech(recognizer, microphone):
    with microphone as source:
        print("Adjusting for ambient noise...")
        recognizer.adjust_for_ambient_noise(source)
        print("Listening for your voice...")
        audio = recognizer.listen(source)
    try:
        print("Recognizing your speech...")
        return recognizer.recognize_google(audio)
    except sr.UnknownValueError:
        print("Google Speech Recognition could not understand audio")
    except sr.RequestError as e:
        print(f"Could not request results from Google Speech Recognition service; {e}")
 
def get_response(sentence):
    payload = {
        'userText': [sentence],  
        'charID': '',
        'sessionID': '-1',
        'voiceResponse': 'True'
    }
    headers = {
        'CONVAI-API-KEY': ''
    }
 
    response = requests.request("POST", url, headers=headers, data=payload)
    data = response.json()
    character_response = data["text"]
    print("Text response: " + data["text"])
 
    decode_string = base64.b64decode(data["audio"])
 
    with open('audioResponse.wav', 'wb') as f:
        f.write(decode_string)
 
def main(): 
    recognizer = sr.Recognizer()
    microphone = sr.Microphone()
 
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind((TCP_IP, TCP_PORT))
    server_socket.listen(1)
    print(f"Server listening on {TCP_IP}:{TCP_PORT}")
 
    chatbotEnabled = False
    connected = False
 
    client_socket, addr = server_socket.accept()
    connected = True
    print(f"Client connected: {addr}")
 
    data = client_socket.recv(1024).decode().strip()
 
    while connected:
        if data == "openMic":
            chatbotEnabled = True
            # user_input = get_speech(recognizer, microphone, chatbotEnabled)
        elif data == "closeMic":
            chatbotEnabled = False
            print("chatbot disabled")
            get_response("Bye")
            playsound("audioResponse.wav")
            continue
        elif data == "stopProgram":
            connected = False
            break
 
        if chatbotEnabled:
            get_response("Please introduce yourself")
            playsound("audioResponse.wav")
            user_input = get_speech(recognizer, microphone)
 
        if user_input is None:
            continue
        elif user_input in ["quit", "exit", "bye"]:
            break
        elif not user_input is None:
            response = get_response(user_input)
            playsound("audioResponse.wav")
 
if __name__ == "__main__":
    main()

This is not reliable. You might get part of a command or more then one command.

You need to loop over the received data until you have all of a command.

To do that you need to design a protocol that knows what a command is.
For example the command might be terminated by a \n byte.
telnet would use \r\n as the line terminator (as it done with HTTP as well).

You also need to handle the case that more then one command is read at a time.

data is assigned once outside of the loop.
I expect you want to read a series of commands from the socket, not just one.

Hi Kasper, fun problem!

I’m not previously familiar with the speech recognition library you are using, but I believe I found the correct package and I took a look at the docs: avryhof/speech-recognition

In addition to what Barry mentioned about socket receive,

You indicated that your get_speech function never returns. Based on the print statements that run, how far does get_speech run before it “stops”? I am assuming either the “recognizer.listen” or perhaps “recognizer.recognize_google”?

Based on what I see in the library docs, “listen” should not block indefinitely. You are calling “adjust_for_ambient_noise”, which should set the recognizer’s energy_threshold that decides how loud the input audio has to be for it to start recording. It should stop recording after the audio source goes silent. I would add another print, after the adjustment and before the call to listen, that prints the properties of the recognizer object, especially “energy_threshold” and “pause_threshold”, to see if they are set to reasonable values. Another thing to look at is if there is a way to terminate your audio source, as the listen call should end of the audio source does.

I also see that “listen” allows for a “timeout” parameter. It is good practice to always set a timeout for an operation that might block for a long time. This doesn’t actually fix your problem, it’s just a good idea. Pick a number of seconds longer than the longest time you’d expect to speak one phrase to the chatbot. (This goes for using sockets as well - you can set a timeout on the socket object that applies to all its functions, or pass a timeout to each function.)

Lastly, the general thing you are trying to do here falls under “concurrency”/“concurrent programming”. Generally a program can be doing exactly one thing at a time. If it is inside the recognizer “listen” call, it cannot also be listening for messages from the socket. There a number of ways to handle this but multithreading is probably the most straightforward. The speech recognizer includes a method that will manage a background thread for you, which may be a good place to start - “listen_in_background”.

I’m on my mobile phone right now so it is hard to give a detailed example, but I’m happy to talk more about multithreading if you decide to go that route.

Hi Barry, great advise for using carriage return, will implement this!

Basically it “stops” at either one of the except rules or at “recognizer.recognice_google”. In that last case it returns the value back to main() which then forwards the response back to get_response()
after get_response() is handled get_speech() comes back (see last lines in code elif not user_input is none: ...)

Based on your reply, I think I should multi-thread? I prefer that the mic is not constantly running in the background. But then I still have the issue on the moment when the mic or google is busy.
So maybe I should check if I can run it in the background and control the mic with boolean(s).

If you have any good examples/help, I would be very happy :slight_smile:

If you need the ability for a TCP socket command to interrupt the microphone, then yes, multithreading would be a great idea.

Though, I don’t think there’s currently anything inside your loop to update data with new commands from the socket, so maybe I’m misunderstanding your protocol plans.

That acutally is quite a good comment (also). I should store the last received data value.
A most basic way to explain the plan is probably that an other computer should be able to “open” or “close” the chatbot (which is the get_speech() and get_response() functions).

Making it “open” is not really an issue, closing once open is the issue.

Alright, so I’m still struggling.
It is hard to make this working while the progress of speech recognition as well as receiving a response is busy. Does anybody have an example maybe?

Hi Kasper,

Now that it’s the weekend I threw together a little example server showing how I would start something like this. It is deliberately not robust[1] but shows the general idea.

gitlab.com/flyinghyrax/pynng-example

I made two other major simplifications for the sake of the example:

  1. I’ve used PyNNG instead of plain sockets. NNG is a protocol library that handles some of the nuances of working with sockets for me (like what Barry Scott mentioned above). [2]
  2. I ignored everything to do with speech recognition and text-to-speech; my background thread just uses time.sleep to “pretend to do work.” This lets me show starting/stopping the thread without worrying about those details that are specific to your project.

There are 2 important parts to the server:

  1. The command loop, which is where the socket listens for commands and sends responses.
  2. The background thread, the important part of that being how to stop the thread with threading.Event

The server command loop follows this outline:

  1. Open the socket, listen, and accept a connection (NNG makes this look like 1 step, anyway)
  2. Use recv to wait for the client to send you something
    • If this times out, just go through the loop again, basically just restarts the recv
  3. When you get a command, branch (if/elif, or match) depending on what kind of command it is.
    • The key thing here is when handling the command, don’t block or take too long, because you want to be ready to receive the next command as soon as possible
  4. send a response to the client, if needed (depends on your protocol)
  5. Unless the command was to stop/exit, go through the loop again (repeat from step 2)

The way I did the background thread was to try to match how the speech recognition library’s listen_in_background function works, where it returns a function for you to call when you want it to stop. You don’t have to do thread cancelation like this. There are a lot of ways to tell another thread you’d like it to stop and the way I did it isn’t the most elegant by a long shot. The important thing about thread cancelation is that it must be cooperative. You can’t reliably force another thread to stop - instead if you have thread A that starts thread B:

  • thread B has a boolean or event that indicates another thread wanted it to stop. It never sets this flag itself, only checks it (it is “read only”). It checks periodically, like in between units of “work” or at the top of a loop.
  • thread A is given some way to safely set this flag. Maybe it is given a function, or it can access an Event directly that is shared by B. But it never checks this flag, only sets it (it is “write only”)
  • When thread A wants thread B to stop, it sets the Event or boolean or calls the cancel function it has, then (ideally) waits for B to finish by calling ‘join’.
  • Meanwhile in thread B, the next time it checks its cancelation flag it sees that it has been set, so instead of doing its next work item it cleans up any resources it is using and returns from its function. This ends the thread.

…that’s the world’s least thorough introduction to cooperative multithreading. I am certain there are much better blog posts or tutorials or what have you out there, but I wanted to at least cover the bits that I used in my little toy example server.


  1. Multithreading makes signal handling (like when you ctrl+c to force stop a script) and error handling (e.g. what happens when an exception gets thrown in a different thread) more complicated/nuanced and I’ve ignored all that to keep things small. ↩︎

  2. It can also do much fancier things, like pub/sub and automatic retries and transports other than TCP, but I don’t really need those for this. ↩︎

Wow, @flyinghyrax that is a quite a complete help; so thanks!!!
I’ve edited the code to this for now (server-side is only edited):

import logging
import threading
import time
import speech_recognition as sr     ## added
from playsound import playsound     ## added
import requests                     ## added
import base64                       ## added

from typing import Callable, Optional
import pynng

log = logging.getLogger("server")
addr = "tcp://127.0.0.1:8765"


# def pretend_get_prompt_audio(prompt: str):
#     log.info(f"getting audio for {prompt}")
#     time.sleep(0.4)
#     log.info(f"playing audio for prompt {prompt}")

def get_prompt_audio(sentence):
    payload = {
        'userText': [sentence],  
        'charID': '',
        'sessionID': '-1',
        'voiceResponse': 'True'
    }
    headers = {
        'CONVAI-API-KEY': ''
    }
 
    response = requests.request("POST", "https://api.convai.com/character/getResponse", headers=headers, data=payload)
    data = response.json()
    character_response = data["text"]
    log.info("Text response: " + data["text"])
 
    decode_string = base64.b64decode(data["audio"])
 
    with open('audioResponse.wav', 'wb') as f:
        f.write(decode_string)
    playsound("audioResponse.wav")

# def pretend_get_user_speech(timeout: int):
#     log.info("listening for user speech...")
#     time.sleep(timeout)
#     log.info("finished listening for user speech")

def get_user_speech(recognizer,microphone):
    with microphone as source:
        log.info("Adjusting for ambient noise...")
        recognizer.adjust_for_ambient_noise(source)
        log.info("Listening for your voice...")
        audio = recognizer.listen(source)
    try:
        log.info("Recognizing your speech...")
        return recognizer.recognize_google(audio)
    except sr.UnknownValueError:
        log.info("Google Speech Recognition could not understand audio")
        return None
    except sr.RequestError as e:
        log.info(f"Could not request results from Google Speech Recognition service; {e}")


def start_background_chat_loop(listen_duration: int) -> Callable[[], None]:
    recognizer = sr.Recognizer() ## added
    microphone = sr.Microphone() ## added
    cancel_event = threading.Event()

    def work():
        start_time = time.time()
        while not cancel_event.is_set():
            duration = time.time() - start_time
            user_input = get_user_speech(recognizer, microphone)
            ## pretend_get_prompt_audio("example prompt")
            # could re-check cancel_event here
            # pretend_get_user_speech(listen_duration)
            if not user_input == None:
                get_prompt_audio(user_input)
                #log.info(f"I've been doing work for {duration:0.2d} seconds!") ## returns error

    background_thread = threading.Thread(target=work, name="worker")

    def do_cancel():
        log.info("Stopping?")
        cancel_event.set()
        background_thread.join()

    background_thread.start()

    return do_cancel


def command_loop():
    stop_callback: Optional[Callable[[], None]] = None
    log.info("server entered command loop")

    with pynng.Pair0(listen=addr, recv_timeout=3000, send_timeout=3000) as connection:
        log.info(f"server started listening ({addr})")

        while True:
            try:
                message = connection.recv(block=True)
            except pynng.exceptions.Timeout:
                continue

            match message:
                case b"open":
                    if stop_callback is None:
                        stop_callback = start_background_chat_loop(3)
                        log.info("started chat loop")
                    else:
                        log.warning(
                            "received 'open' command but chat loop already running"
                        )
                    connection.send(b"OK: open")

                case b"close":
                    if stop_callback is not None:
                        stop_callback()
                        stop_callback = None
                        log.info("stopped chat loop")
                    else:
                        log.warning(
                            "received 'close' command but chat loop not running"
                        )
                    connection.send(b"OK: close")

                case b"exit":
                    log.info("received 'exit' command")
                    if stop_callback is not None:
                        stop_callback()
                    connection.send(b"OK: exit")
                    return

                case unrecognized:
                    log.warning(f"received unrecognized command '{unrecognized!s}'")
                    connection.send(b"ERR: unrecognized command")
                    continue


def main():
    # can have argument parsing or other setup
    logging.basicConfig(
        format="%(asctime)s | %(levelname)s | %(threadName)s | %(message)s",
        datefmt="%H:%M:%S",
        level=logging.DEBUG,
    )
    command_loop()

main()


I’ve left all your code in. Where changes were necessary I’ve commented them out.
So there are still some issues. Let me first share the log (not 100% accurate, but same result - above code is updated):

09:22:53 | INFO | MainThread | server entered command loop
09:22:53 | INFO | MainThread | server started listening (tcp://127.0.0.1:8765)
09:22:56 | DEBUG | Dummy-1 | Pipe callback event 0
09:22:56 | DEBUG | Dummy-1 | Pipe callback event 1
09:22:59 | INFO | MainThread | started chat loop
Adjusting for ambient noise...
Listening for your voice...
Recognizing your speech...
09:23:04 | DEBUG | worker | Starting new HTTPS connection (1): api.convai.com:443
09:23:09 | DEBUG | worker | https://api.convai.com:443 "POST /character/getResponse HTTP/1.1" 200 292296
Text response: Hi there, I'm doing great, how are you?
Adjusting for ambient noise...
Listening for your voice...
Recognizing your speech...
09:23:20 | DEBUG | worker | Starting new HTTPS connection (1): api.convai.com:443
09:23:27 | DEBUG | worker | https://api.convai.com:443 "POST /character/getResponse HTTP/1.1" 200 2481280
Text response: Hi there! Company is a global leader in health technology, creating innovative solutions to help people lead healthier and more meaningful lives. We are focused on providing solutions to meet the needs of our customers from prevention and screening, diagnosis, therapy, and home care. We also provide connected healthcare solutions to enable better collaboration between healthcare providers.
09:23:49 | INFO | MainThread | stopped chat loop

Current issues;

  1. Would you mind to look at this?
    For each recognized text a new HTTPS connection is set up.
    That’s also why both responses start with a ‘Hi there’. It basically should start once and then remain in the ‘conversation’ as long as the chatbot is active.

  2. I can’t find what the do_cancel() function does? Can you explain? I do noticed it is triggered when exit is executed. But can’t call the function to “cancel”

  3. If speech recognizes for example “exit” I want to stop the program same way as “stop” command does. How would I be able to achieve this.

  4. As for the client side. In-the-end I want to be able to trigger server.py with packetSender for example.
    I did read the documentation, but could not find anything about possibilities… and at the moment-- it isn’t working.

Thanks again!

I’d be happy to look into this more when I have some free time! That can be irregular so I will give some quick answers now and then can fill in more detail later.

  1. This will be specific to the API being used. In general, this requires keeping some kind of state between POST requests, which means you’ll need some kind of session. Basically the server gives you something extra back in its response, and you keep that around until your next request and send it back to the server so it can associate your two HTTP requests. The requests library has very good support for this, but the exact details vary on what the service you are communicating with expects.

  2. This is a fun topic (IMO). This is using a capability called “closure.” You see how do_cancel has access to the stop_event variable? This makes sense because they are declared in the same scope - they both live inside the start_background_chat_loop function. Which at the bottom returns do_cancel - instead of returning something like an int, a string, or a list, start_background_chat_loop returns another function. “do_cancel” is its name inside before it is returned, but outside in the command loop where we call start_background_chat_loop, we take what it returns (that’s do_cancel!) and store it in the “stop_callback” variable. At that point, stop_callback is another name for the do_cancel function [1]

But wait! Variables declared inside a function are supposed to go out of scope when that function returns! stop_event is created inside the start_background_chat_loop function. That function has returned, so stop_event should no longer be accessible? In this case, what Python does is create a little wrapper around do_cancel, that holds the do_cancel function and references to any variables from outside itself that it uses. This wrapper is called a closure, and it is necessary because otherwise when we later call stop_callback a.k.a. do_cancel, variables it needs would no longer be accessible. The closure keeps them available for the function to use. [2]

This is much better explained with some visuals; if you can’t find a good diagram online I may have a try at making one. It’s also important to note that you can accomplish the same goal using classes and objects (and often that way is clearer - I was lazy and wanted to type less :wink:).

  1. Another fun topic! Right now the control flow is driven all by the main thread. It tells the other thread when to start and when to stop. This asymmetry is nice because it means the worker thread doesn’t need to know much about the main thread at all! It keeps to itself and doesn’t need to “talk” to the main thread. To change this, your threads will need some kind of shared state. Generally speaking, you want to keep the amount variables your threads can both change as low as possible. Shared state, where more than one thread can write to the same variable, is the cause of most of the difficulty with multithreaded programming. (Have a look at the search term “data race”).

That said, there are a number of ways to do this safely, but I’ll need to have a think about what might be the most straightforward. Others may have suggestions for that as well.


  1. more specifically, for the instance of the do_cancel function that was created when we called start_background_chat_loop - we could make more than one! ↩︎

  2. this technique isn’t specific to Python - it’s been added to lots of languages and is very commonly used in for example JavaScript and Swift. ↩︎

1 Like

It’s important to know that everything you can do with classes, you could do with closures (and sometimes that way is clearer, but we’re lazy and want to type less).

Enjoy the class/closure paradox :slight_smile: