USVLander/agent.py

92 lines
2.5 KiB
Python

from threading import Thread
from queue import Queue
from djitellopy import Tello
import signal
import sys
import time
import logging
import cv2
from control import ManualControl
class UAV:
def __init__(self) -> None:
self.tello = Tello()
signal.signal(signal.SIGINT, self.signal_handler)
self.tello.connect()
print('Current_battery: ' + str(self.tello.get_battery()))
self.tello.streamon()
self.frame_read = self.tello.get_frame_read()
self.left_right_velocity = 0
self.for_back_velocity = 0
self.up_down_velocity = 0
self.yaw_velocity = 0
self.send_rc_control = False
self.manual_control = ManualControl(self)
self.frame_queue = Queue(maxsize=1) # 用于存放视频帧, 1用于避免帧堆积
def update(self):
""" Update routine. Send velocities to Tello.
向Tello发送各方向速度信息
"""
if self.send_rc_control:
self.tello.send_rc_control(self.left_right_velocity, self.for_back_velocity,
self.up_down_velocity, self.yaw_velocity)
def run(self):
""" 主程序 """
# self.init_flight()
state_thread = Thread(target=self.video_stream)
state_thread.start()
# self.send_rc_control = True
# self.manual_control.start() # 启动手动控制键盘监听
while True:
# self.up_down_velocity(10)
# self.send_rc_control = True
self.update()
self.video_display()
time.sleep(0.05)
def signal_handler(self, signal, frame):
print ('\nSignal Catched! 执行急停!')
self.tello.emergency()
sys.exit(0)
def init_flight(self):
self.tello.turn_motor_on()
time.sleep(10)
logging.warn('自检完毕,可以起飞')
self.tello.takeoff()
def video_stream(self):
height, width, _ = self.frame_read.frame.shape
while True:
frame = self.frame_read.frame
self.frame_queue.put(frame)
time.sleep(1 / 30)
def video_display(self):
""" 显示视频 """
if not self.frame_queue.empty():
frame = self.frame_queue.get()
cv2.imshow("Tello Camera", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
self.keep_recording = False
cv2.destroyAllWindows()
if __name__ == '__main__':
uav = UAV()
# while True:
# print(uav.tello.get_current_state())
uav.run()