def on_new_com_data(self, *args, **kwargs):
"""
To handle mental command data emitted from Cortex
Returns
-------
data: dictionary
the format such as {'action': 'neutral', 'power': 0.0, 'time': 1590736942.8479}
"""
data = kwargs.get('data')
#print(data['mc data: {}'.format(data)])
if data:
# self.guardar_datos_en_csv(data, 'C:/Users/James/Desktop/cortex-v2-example-master/datos.txt')
# Agregar el comando actual al historial
command = data.get('action', '').lower() #
self.command_history.append(command)
# Check if it has been at least 8 segueing since the last impression
current_time = time.time()
if current_time - self.last_print_time >= 8:
counts = {word: self.command_history.count(word) for word in self.target_words}
# Check if most of the commands are any of the target words
majority_word = max(counts, key=counts.get)
majority_count = counts[majority_word]
if majority_count >= 20: # If 20 or more of the last 40 samples are of any target word
self.guardar_datos_en_csv(majority_word, 'C:/Users/James/Desktop/cortex-v2-example-master/datos.txt')
print(f"Mayority of command are: {majority_word}")
comando = majority_word
comando_encoded = comando.encode('utf-8')
self.envioporsocket(comando_encoded)
self.last_print_time = current_time # Update the record of the last moment of printing
def envioporsocket(self,comando): #here we will be the client, the server will be the VM
host = '192.168.1.103'
port = 5000
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((host, port))
s.sendall(comando)
Hi guys!, im working on a bci interface to drive a drone with mental commands. this is part of my code(Client Side), im having some troubles… if i send the commands to the virtual machine using static commands like in the next code, it works good and the drone keeps flying waiting for the commands
import socket
import time
# Configura el servidor y el puerto al que te estás conectando
server_ip = '192.168.1.103' # Reemplaza con la direcciĂłn IP del servidor
server_port = 5000
# Crea un socket
client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Conéctate al servidor
client_socket.connect((server_ip, server_port))
# Define los comandos que deseas enviar al servidor
comandos = ["neutral", "right", "left","right", "land"]
for comando in comandos:
# EnvĂa el comando al servidor
client_socket.send(comando.encode('utf-8'))
print(f"Comando enviado: {comando}")
time.sleep(15) # Espera 8 segundos antes de enviar el siguiente comando
if comando == 'land':
break
But… if i send them using the client side(def on _new_com) drone doesnt keeps in the air and lands by itself and takeoff to execute the command and lands again, and i expected a big delay
here is the server side code(Drone API):
def manejar_comandos():
# Configura el servidor socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(('0.0.0.0', 5000))
server_socket.listen(1)
print("Servidor socket activo. Esperando comandos...")
# Inicializa el Crazyflie
with SyncCrazyflie(URI, cf=Crazyflie(rw_cache='./cache')) as scf:
mc = MotionCommander(scf)
while True:
client_socket, client_address = server_socket.accept()
print("Cliente conectado desde:", client_address)
mc.take_off()
print("Despegando")
while True:
command_bytes = client_socket.recv(1024)
if not command_bytes:
break # Si no se reciben datos, salimos del bucle interno
command = command_bytes.decode('utf-8')
print(f"Comando recibido: {command}")
# Procesa y ejecuta el comando
if command == "neutral":
print("NNo hago nada")
time.sleep(3)
elif command == "right":
mc.right(0.3)
print("Girando a la derecha")
time.sleep(3)
elif command == "left":
mc.left(0.3)
print("Girando a la izquierda")
time.sleep(3)
elif command == "up":
#mc.up(0.2)
print("Movimiento hacia arriba")
time.sleep(1)
mc.stop()
elif command == "down":
#mc.down(0.2)
print("Movimiento hacia abajo")
time.sleep(1)
mc.stop()
else:
mc.stop()
mc.land()
print("Aterrizando por comando desconocido")
break
print("Cliente desconectado.")
mc.stop()
mc.land()
print("Aterrizando el Crazyflie por romper el try")
if __name__ == '__main__':
cflib.crtp.init_drivers()
# Inicia un hilo para manejar los comandos
thread = threading.Thread(target=manejar_comandos)
thread.start()
thread.join()
Im open to change things to optimize my code, im not very skilled programming but i try