Hi, does anyone have ideas to improve this code by, for example speed and efficiency?
#File name: Main.py
from libraries import *
kit = ServoKit(channels=16)
now = datetime.now()
dt_string = now.strftime("%m-%d-%Y %H:%I%p")
location = '/home/pi/Desktop/' + dt_string + '.h264'
left_mor = 4 # GPIO4
right_mor = 17 # GPIO17
power_mgmt_1 = 0x6b
power_mgmt_2 = 0x6c
cs = digitalio.DigitalInOut(board.CE0)
spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
camera = PiCamera(resolution='1280x720', framerate=60)
mcp = MCP.MCP3008(spi, cs)
volts = AnalogIn(mcp, MCP.P0)
smok = AnalogIn(mcp, MCP.P1)
pi = pigpio.pi()
logger = logging.getLogger()
bus = smbus.SMBus(1)
PAGE = ''' < html >
< head >
< title > AEROCK
Live
Stream < / title > < / head > < body >
< center > < h1 > AEROCK
Live
Stream < / h1 > < /
center >
< center > < img
src = "stream.mjpg" width = "1280" height = "720" > < / center > < / body > < / html >'''
address = 0x68
def delay(t):
time.sleep(t)
def init_servos():
logger.info("Initializing Servos")
kit.servo[0].angle = 90
kit.servo[1].angle = 90
kit.servo[2].angle = 90
kit.servo[3].angle = 90
kit.servo[4].angle = 90
kit.servo[5].angle = 90
kit.servo[6].angle = 90
kit.servo[7].angle = 90
logger.info("Servos Initialized")
def get_gps():
logger.info("Getting GPS Data")
port = '/dev/ttyAMAO'
ser = serial.Serial(port, baudrate=9600, timeout=0.5)
newdata = str(ser.readline())
chunks = newdata.split(',')
where = newdata.find('$GPRMC')
gprmcpart = newdata[where:]
morechunks = gprmcpart.split(',')
lat = str(morechunks[3]) + morechunks[4]
lng = str(morechunks[5]) + morechunks[6]
alt = chunks[9]
spd = morechunks[8]
logger.info("Got GPS Data")
logger.info("-- -- -" + lat + "-- -- -" + lng + "-- -- -" +alt + "-- -- -" + spd)
func()
def init_motors():
logger.info("Arming Motors")
pi.set_servo_pulsewidth(left_mor, 0)
pi.set_servo_pulsewidth(right_mor, 0)
delay(1)
pi.set_servo_pulsewidth(left_mor, 2000)
pi.set_servo_pulsewidth(right_mor, 2000)
delay(1)
pi.set_servo_pulsewidth(left_mor, 700)
pi.set_servo_pulsewidth(right_mor, 700)
delay(1)
logger.info("Motors Armed")
def init_cam():
logger.info("Initializing Camera")
camera.start_preview()
camera.start_recording(location)
logger.info("Camera Initialized and Recording Started")
def read_byte(adr):
return bus.read_byte_data(address, adr)
def read_word(adr):
high = bus.read_byte_data(address, adr)
low = bus.read_byte_data(address, adr + 1)
val = (high << 8) + low
return val
def read_word_2c(adr):
val = read_word(adr)
if val >= 0x8000:
return -((65535 - val) + 1)
else:
return val
def dist(a, b):
return math.sqrt((a * a) + (b * b))
def get_y_rotation(x, y, z):
radians = math.atan2(x, dist(y, z))
return -math.degrees(radians)
def get_x_rotation(x, y, z):
radians = math.atan2(y, dist(x, z))
return math.degrees(radians)
def get_gyro():
func.gyro_xout = read_word_2c(0x43)
func.gyro_yout = read_word_2c(0x45)
func.gyro_zout = read_word_2c(0x47)
logger.info("{}\t {}\t {}\t {}".format("X out: ", func.gyro_xout, "scaled: ", (func.gyro_xout / 131)))
print("{}\t {}\t {}\t {}".format("Y out: ", func.gyro_yout, "scaled: ", (func.gyro_yout / 131)))
print("{}\t {}\t {}\t {}".format("Z out: ", func.gyro_zout, "scaled: ", (func.gyro_zout / 131)))
accel_xout = read_word_2c(0x3b)
accel_yout = read_word_2c(0x3d)
accel_zout = read_word_2c(0x3f)
func.accel_xout_scaled = accel_xout / 16384.0
func.accel_yout_scaled = accel_yout / 16384.0
func.accel_zout_scaled = accel_zout / 16384.0
logger.info("{}\t {}\t {}\t {}".format("X out: ", accel_xout, "scaled: ", func.accel_xout_scaled))
logger.info("{}\t {}\t {}\t {}".format("Y out: ", accel_yout, "scaled: ", func.accel_yout_scaled))
logger.info("{}\t {}\t {}\t {}".format("Z out: ", accel_zout, "scaled: ", func.accel_zout_scaled))
def get_adc():
logger.info("Reading ADC")
multi = 25 / 3.3
voltage = volts.voltage * multi
smoke = smok.voltage
if smoke > 0.6:
logger.warning("More smoke detected than usual")
if smoke > 1:
logger.critical("Too much smoke, turning off motors and parachuting")
motor_sped(0)
if voltage < 9:
logger.critical("Battery levels too low, turning off motors and parachuting")
motor_sped(0)
if voltage < 10.6:
logger.critical("Battery levels low, decreasing motor speed")
motor_sped(1500)
def left_aileron(z):
if z == "down":
kit.servo[0].angle = 45
logger.info("Left aileron down")
elif z == "up":
kit.servo[0].angle = 135
logger.info("Left aileron up")
elif z == "middle":
kit.servo[0].angle = 135
logger.info("Left aileron middle")
else:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
def right_aileron(x):
if x == "down":
kit.servo[0].angle = 45
logger.info("Right aileron down")
elif x == "up":
kit.servo[0].angle = 135
logger.info("Right aileron up")
elif x == "middle":
kit.servo[0].angle = 135
logger.info("Right aileron middle")
else:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
def rudder(d):
if d == "left":
kit.servo[2].angle = 45
logger.info("Rudder left")
elif d == "right":
kit.servo[2].angle = 135
logger.info("Rudder right")
elif d == "middle":
kit.servo[2].angle = 90
logger.info("Rudder middle")
else:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
def elevators(b):
if b == "down":
kit.servo[3].angle = 45
logger.info("Elevators down")
elif b == "up":
kit.servo[3].angle = 135
logger.info("Elevators up")
elif b == "middle":
kit.servo[3].angle = 90
logger.info("Elevators middle")
else:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
def front_gear(o):
if o == "deployed":
kit.servo[4].angle = 90
logger.info("Front gear of the plane deployed")
elif o == "retracted":
kit.servo[4].angle = 0
logger.info("Front gear of the plane retracted")
else:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
def movecam(e):
if e > 180:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
else:
kit.servo[7].angle = e
logger.info("Moved camera to" + str(e) + "degree position")
def back_gears(f):
if f == "deployed":
kit.servo[5].angle = 90
kit.servo[6].angle = 90
logger.info("Back gears of the plane deployed")
elif f == "retracted":
kit.servo[5].angle = 0
kit.servo[6].angle = 0
logger.info("Back gear of the plane retracted")
else:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
def motor_sped(speed):
if speed > 2000:
logger.error("Incorrect parameters passed")
raise Exception("Incorrect parameters passed")
else:
pi.set_servo_pulsewidth(left_mor, speed)
pi.set_servo_pulsewidth(right_mor, speed)
logger.info("Motor Speed set to" + str(speed))
def take_off():
motor_sped(2000)
delay(5)
elevators("up")
delay(12)
elevators("middle")
front_gear("retracted")
back_gears("retracted")
def landing():
motor_sped(1345)
delay(5)
motor_sped(1090)
delay(3)
elevators("down")
front_gear("deployed")
back_gears("deployed")
delay(7)
elevators("middle")
delay(3)
elevators("up")
motor_sped(0)
delay(2)
elevators("middle")
class StreamingOutput(object):
def __init__(self):
self.frame = None
self.buffer = io.BytesIO()
self.condition = threading.Condition()
def write(self, buf):
if buf.startswith(b'\xff\xd8'):
# New frame, copy the existing buffer's content and notify all
# clients it's available
self.buffer.truncate()
with self.condition:
self.frame = self.buffer.getvalue()
self.condition.notify_all()
self.buffer.seek(0)
return self.buffer.write(buf)
class StreamingHandler(server.BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/':
self.send_response(301)
self.send_header('Location', '/index.html')
self.end_headers()
elif self.path == '/index.html':
content = PAGE.encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'text/html')
self.send_header('Content-Length', len(content))
self.end_headers()
self.wfile.write(content)
elif self.path == '/stream.mjpg':
self.send_response(200)
self.send_header('Age', 0)
self.send_header('Altitude', func.alt)
self.send_header('Cache-Control', 'no-cache, private')
self.send_header('Pragma', 'no-cache')
self.send_header('Content-Type', 'multipart/x-mixed-replace; boundary=FRAME')
self.end_headers()
try:
while True:
with output.condition:
output.condition.wait()
frame = output.frame
self.wfile.write(b'--FRAME\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', len(frame))
self.end_headers()
self.wfile.write(frame)
self.wfile.write(b'\r\n')
except Exception as e:
logging.warning(
'Removed streaming client %s: %s',
self.client_address, str(e))
else:
self.send_error(404)
self.end_headers()
class StreamingServer(socketserver.ThreadingMixIn, server.HTTPServer):
allow_reuse_address = True
daemon_threads = True
output = StreamingOutput()
def get_stuff():
get_gyro()
get_gps()
get_adc()
def flight():
take_off()
delay(5)
landing()
#File name: code.py
from main import *
bus.write_byte_data(address, power_mgmt_1, 0) # initialize gyroscope
logger.setLevel(logging.DEBUG)
camera.start_recording(output, format='mjpeg')
logging.basicConfig(filename=dt_string + "logs", format='%(asctime)s %(message)s', filemode='w')
init_cam()
init_servos()
init_motors()
try:
address = ('', 8000)
server = StreamingServer(address, StreamingHandler)
server.serve_forever()
if __name__ == "__main__":
# creating thread
t1 = threading.Thread(target=flight())
t2 = threading.Thread(target=get_stuff())
while True:
t1.start()
t2.start()
t1.join()
t2.join()
finally:
pass
#File name: libraries.py
import board
import busio
import digitalio
import io
import logging
import math
import pigpio
import socketserver
import time
import serial
from datetime import datetime
from http import server
from Adafruit_PureIO import smbus
from adafruit_servokit import ServoKit
import threading
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn
from picamera import PiCamera
from _ctypes_test import func