from threading import Thread from queue import Queue from djitellopy import Tello import signal import sys import time import logging import cv2 from control import ManualControl, AutoControl from control import Status class UAV: def __init__(self) -> None: self.tello = Tello() signal.signal(signal.SIGINT, self.signal_handler) self.tello.connect() print('Current_battery: ' + str(self.tello.get_battery())) self.tello.streamon() self.frame_read = self.tello.get_frame_read() self.left_right_velocity = 0 self.for_back_velocity = 0 self.up_down_velocity = 0 self.yaw_velocity = 0 self.send_rc_control = False self.manual_control = ManualControl(self) self.auto_control = AutoControl(self) self.status = Status.DETECT self.frame_queue = Queue(maxsize=1) # 用于存放视频帧, 1用于避免帧堆积 def update(self): """ Update routine. Send velocities to Tello. 向Tello发送各方向速度信息 """ if self.send_rc_control: self.tello.send_rc_control(self.left_right_velocity, self.for_back_velocity, self.up_down_velocity, self.yaw_velocity) def run(self): """ 主程序 """ # self.init_flight() state_thread = Thread(target=self.video_stream) state_thread.start() # self.send_rc_control = True # self.manual_control.start() # 启动手动控制键盘监听 while True: # self.up_down_velocity(10) # self.send_rc_control = True self.auto_control() self.update() self.video_display() time.sleep(0.05) def signal_handler(self, signal, frame): print ('\nSignal Catched! 执行急停!') self.tello.emergency() sys.exit(0) def init_flight(self): self.tello.turn_motor_on() time.sleep(10) logging.warn('自检完毕,可以起飞') self.tello.takeoff() def video_stream(self): height, width, _ = self.frame_read.frame.shape while True: frame = self.frame_read.frame self.frame_queue.put(frame) time.sleep(1 / 30) def video_display(self): """ 显示视频 """ if not self.frame_queue.empty(): frame = self.frame_queue.get() cv2.imshow("Tello Camera", frame) if cv2.waitKey(1) & 0xFF == ord('q'): self.keep_recording = False cv2.destroyAllWindows() def auto_track_land(self): if self.status == Status.INIT or self.status == Status.TRACKING: front_img = self.frame_queue.get() if self.status == Status.DETECT: self.auto_control.detect(front_img=front_img) elif self.status == Status.TRACK: self.auto_control.track(front_img=front_img) if __name__ == '__main__': uav = UAV() # while True: # print(uav.tello.get_current_state()) uav.run()