144 lines
4.7 KiB
Python
144 lines
4.7 KiB
Python
from threading import Thread
|
|
from queue import Queue
|
|
from djitellopy import Tello
|
|
import signal
|
|
import sys
|
|
import time
|
|
import logging
|
|
import cv2
|
|
import numpy as np
|
|
from control import ManualControl, AutoControl
|
|
from utils import Status, DataRecorder
|
|
|
|
|
|
|
|
|
|
class UAV:
|
|
def __init__(self) -> None:
|
|
self.tello = Tello()
|
|
signal.signal(signal.SIGINT, self.signal_handler)
|
|
self.tello.connect()
|
|
print('Current_battery: ' + str(self.tello.get_battery()))
|
|
self.tello.streamon()
|
|
self.tello.set_video_resolution(Tello.RESOLUTION_480P)
|
|
self.tello.set_video_fps(Tello.FPS_30)
|
|
self.tello.set_video_bitrate(Tello.BITRATE_1MBPS)
|
|
self.tello.set_video_direction(Tello.CAMERA_FORWARD)
|
|
|
|
self.frame_read = self.tello.get_frame_read()
|
|
|
|
self.left_right_velocity = 0
|
|
self.for_back_velocity = 0
|
|
self.up_down_velocity = 0
|
|
self.yaw_velocity = 0
|
|
|
|
self.send_rc_control = False
|
|
self.manual_control = ManualControl(self)
|
|
self.auto_control = AutoControl(self)
|
|
self.status = Status.DETECT
|
|
|
|
self.frame_queue = Queue(maxsize=1) # 用于存放视频帧, 1用于避免帧堆积
|
|
record_name = input("请输入数据备注(英文):")
|
|
if record_name:
|
|
self.recorder = DataRecorder(exp_note=record_name, need_record=True)
|
|
else:
|
|
self.recorder = DataRecorder()
|
|
|
|
self.front_cam_shape = (648, 478) # x, y
|
|
|
|
def update(self):
|
|
""" Update routine. Send velocities to Tello.
|
|
|
|
向Tello发送各方向速度信息
|
|
"""
|
|
if self.yaw_velocity > 50:
|
|
self.yaw_velocity = 50
|
|
elif self.yaw_velocity < -50:
|
|
self.yaw_velocity = -50
|
|
|
|
if self.send_rc_control:
|
|
self.tello.send_rc_control(self.left_right_velocity, self.for_back_velocity,
|
|
self.up_down_velocity, self.yaw_velocity)
|
|
print(self.left_right_velocity, self.for_back_velocity,
|
|
self.up_down_velocity, self.yaw_velocity)
|
|
|
|
def run(self):
|
|
""" 主程序 """
|
|
# self.init_flight()
|
|
# self.tello.turn_motor_on()
|
|
|
|
state_thread = Thread(target=self.video_stream)
|
|
state_thread.start()
|
|
|
|
self.manual_control.start() # 启动手动控制键盘监听
|
|
|
|
while True:
|
|
# self.up_down_velocity(10)
|
|
self.auto_track_land()
|
|
self.update()
|
|
# self.video_display()
|
|
time.sleep(0.05)
|
|
|
|
def signal_handler(self, signal, frame):
|
|
print ('\nSignal Catched! 执行急停!')
|
|
self.tello.emergency()
|
|
self.tello.streamoff()
|
|
sys.exit(0)
|
|
|
|
def init_flight(self):
|
|
self.tello.turn_motor_on()
|
|
time.sleep(5)
|
|
logging.warn('自检完毕,可以起飞')
|
|
self.tello.takeoff()
|
|
self.send_rc_control = True # 开启发送控制信号
|
|
self.update()
|
|
|
|
def video_stream(self):
|
|
# 独立线程,用于获取视频流并记录状态
|
|
height, width, _ = self.frame_read.frame.shape
|
|
while True:
|
|
frame = self.frame_read.frame
|
|
self.frame_queue.put(frame)
|
|
|
|
states = self.tello.get_current_state()
|
|
states['timestamp'] = time.time()
|
|
states['status'] = self.status
|
|
states['cmd_vel'] = [self.left_right_velocity, self.for_back_velocity, self.up_down_velocity, self.yaw_velocity]
|
|
|
|
self.recorder.state_record(states)
|
|
self.recorder.frame_record(frame, 'origin')
|
|
|
|
time.sleep(1 / 30)
|
|
|
|
def video_display(self):
|
|
""" 显示视频 """
|
|
if not self.frame_queue.empty():
|
|
frame = self.frame_queue.get()
|
|
cv2.imshow("Tello Camera", frame)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
self.keep_recording = False
|
|
cv2.destroyAllWindows()
|
|
if cv2.waitKey(1) & 0xFF == ord('s'):
|
|
cv2.imwrite(str(time.time()) + '.jpg', frame)
|
|
|
|
def auto_track_land(self):
|
|
# print(self.status)
|
|
|
|
if self.status == Status.TRACK or self.status == Status.DETECT:
|
|
front_img = self.frame_queue.get()
|
|
front_img = cv2.cvtColor(front_img, cv2.COLOR_RGB2BGR)
|
|
if self.status == Status.DETECT:
|
|
self.auto_control.detect(front_img=front_img)
|
|
elif self.status == Status.TRACK:
|
|
self.auto_control.track(front_img=front_img)
|
|
|
|
elif self.status == Status.LANDING:
|
|
bottom_img = self.frame_queue.get()
|
|
self.auto_control.landing(bottom_img=bottom_img)
|
|
time.sleep(1 / 30)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
uav = UAV()
|
|
uav.run() |