diff --git a/agent.py b/agent.py new file mode 100644 index 0000000..3349f83 --- /dev/null +++ b/agent.py @@ -0,0 +1,92 @@ +from threading import Thread +from queue import Queue +from djitellopy import Tello +import signal +import sys +import time +import logging +import cv2 + +from control import ManualControl + + +class UAV: + def __init__(self) -> None: + self.tello = Tello() + signal.signal(signal.SIGINT, self.signal_handler) + self.tello.connect() + print('Current_battery: ' + str(self.tello.get_battery())) + self.tello.streamon() + self.frame_read = self.tello.get_frame_read() + + self.left_right_velocity = 0 + self.for_back_velocity = 0 + self.up_down_velocity = 0 + self.yaw_velocity = 0 + + self.send_rc_control = False + self.manual_control = ManualControl(self) + + self.frame_queue = Queue(maxsize=1) # 用于存放视频帧, 1用于避免帧堆积 + + def update(self): + """ Update routine. Send velocities to Tello. + + 向Tello发送各方向速度信息 + """ + if self.send_rc_control: + self.tello.send_rc_control(self.left_right_velocity, self.for_back_velocity, + self.up_down_velocity, self.yaw_velocity) + + def run(self): + """ 主程序 """ + # self.init_flight() + + state_thread = Thread(target=self.video_stream) + state_thread.start() + + # self.send_rc_control = True + # self.manual_control.start() # 启动手动控制键盘监听 + + while True: + # self.up_down_velocity(10) + # self.send_rc_control = True + self.update() + self.video_display() + time.sleep(0.05) + + def signal_handler(self, signal, frame): + print ('\nSignal Catched! 执行急停!') + self.tello.emergency() + sys.exit(0) + + def init_flight(self): + self.tello.turn_motor_on() + time.sleep(10) + logging.warn('自检完毕,可以起飞') + self.tello.takeoff() + + def video_stream(self): + height, width, _ = self.frame_read.frame.shape + while True: + frame = self.frame_read.frame + self.frame_queue.put(frame) + + time.sleep(1 / 30) + + def video_display(self): + """ 显示视频 """ + if not self.frame_queue.empty(): + frame = self.frame_queue.get() + cv2.imshow("Tello Camera", frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + self.keep_recording = False + cv2.destroyAllWindows() + + + +if __name__ == '__main__': + uav = UAV() + # while True: + # print(uav.tello.get_current_state()) + uav.run() \ No newline at end of file diff --git a/control.py b/control.py new file mode 100644 index 0000000..4e8ddc0 --- /dev/null +++ b/control.py @@ -0,0 +1,66 @@ +from pynput import keyboard +import time + + +class ManualControl: + def __init__(self, uav): + self.uav = uav + self.listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release) + + def start(self): + self.listener.start() + + def on_press(self, key): + try: + if key == keyboard.Key.up: + self.uav.for_back_velocity = 50 + elif key == keyboard.Key.down: + self.uav.for_back_velocity = -50 + elif key == keyboard.Key.left: + self.uav.left_right_velocity = -50 + elif key == keyboard.Key.right: + self.uav.left_right_velocity = 50 + elif key.char == 'w': + self.uav.up_down_velocity = 50 + elif key.char == 's': + self.uav.up_down_velocity = -50 + elif key.char == 'q': + self.uav.yaw_velocity = -50 + elif key.char == 'e': + self.uav.yaw_velocity = 50 + # print(f'left_right_velocity: {self.uav.left_right_velocity}, for_back_velocity: {self.uav.for_back_velocity}, up_down_velocity: {self.uav.up_down_velocity}, yaw_velocity: {self.uav.yaw_velocity}') + except AttributeError: + pass + + def on_release(self, key): + try: + if key == keyboard.Key.up or key == keyboard.Key.down: + self.uav.for_back_velocity = 0 + elif key == keyboard.Key.left or key == keyboard.Key.right: + self.uav.left_right_velocity = 0 + elif key.char == 'w' or key.char == 's': + self.uav.up_down_velocity = 0 + elif key.char == 'q' or key.char == 'e': + self.uav.yaw_velocity = 0 + except AttributeError: + pass + +class FakeUAV: + def __init__(self) -> None: + self.left_right_velocity = 0 + self.for_back_velocity = 0 + self.up_down_velocity = 0 + self.yaw_velocity = 0 + + def send_rc_control(self, left_right_velocity, for_back_velocity, up_down_velocity, yaw_velocity): + print(f'left_right_velocity: {left_right_velocity}, for_back_velocity: {for_back_velocity}, up_down_velocity: {up_down_velocity}, yaw_velocity: {yaw_velocity}') + +if __name__ == '__main__': + # uav = FakeUAV() + # manual_control=ManualControl(uav) + # manual_control.start() + # while True: + # time.sleep(0.05) + + import logging + logging.info('aaa') diff --git a/main.py b/main.py index 98a00e0..b3c00c9 100644 --- a/main.py +++ b/main.py @@ -5,7 +5,7 @@ my_drone = Tello() my_drone.connect() print(my_drone.get_battery()) my_drone.streamon() -my_drone.set_video_direction(Tello.CAMERA_DOWNWARD) +my_drone.set_video_direction(Tello.CAMERA_FORWARD) while True: # Read a frame from the video stream img = my_drone.get_frame_read().frame