229 lines
8.2 KiB
Python
229 lines
8.2 KiB
Python
from pynput import keyboard
|
||
import time
|
||
import logging
|
||
from vision import Tracker
|
||
import cv2
|
||
from utils import Status
|
||
import pupil_apriltags as apriltag
|
||
from djitellopy import Tello
|
||
|
||
|
||
|
||
TICKS_PER_SEC = 30
|
||
|
||
|
||
|
||
|
||
class PIDController:
|
||
def __init__(self, kp, ki, kd, current_time):
|
||
self.kp = kp
|
||
self.ki = ki
|
||
self.kd = kd
|
||
self.previous_error = 0
|
||
self.integral = 0
|
||
self.last_time = current_time
|
||
|
||
def control(self, error, current_time) -> int:
|
||
current_time = current_time
|
||
delta_time = current_time - self.last_time
|
||
if delta_time == 0:
|
||
delta_time = 1 / TICKS_PER_SEC
|
||
derivative = (error - self.previous_error) / delta_time
|
||
self.integral += error * delta_time
|
||
|
||
output = self.kp * error + self.ki * self.integral + self.kd * derivative
|
||
|
||
self.previous_error = error
|
||
self.last_time = current_time
|
||
|
||
return int(output)
|
||
|
||
|
||
class ManualControl:
|
||
def __init__(self, uav):
|
||
self.uav = uav
|
||
self.listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)
|
||
|
||
def start(self):
|
||
self.listener.start()
|
||
|
||
def on_press(self, key):
|
||
try:
|
||
if key == keyboard.Key.up:
|
||
self.uav.for_back_velocity = 50
|
||
elif key == keyboard.Key.down:
|
||
self.uav.for_back_velocity = -50
|
||
elif key == keyboard.Key.left:
|
||
self.uav.left_right_velocity = -50
|
||
elif key == keyboard.Key.right:
|
||
self.uav.left_right_velocity = 50
|
||
elif key.char == 'w':
|
||
self.uav.up_down_velocity = 50
|
||
elif key.char == 's':
|
||
self.uav.up_down_velocity = -50
|
||
elif key.char == 'z':
|
||
self.uav.yaw_velocity = -50
|
||
elif key.char == 'c':
|
||
self.uav.yaw_velocity = 50
|
||
elif key.char == 'l':
|
||
self.uav.tello.land()
|
||
# print(f'left_right_velocity: {self.uav.left_right_velocity}, for_back_velocity: {self.uav.for_back_velocity}, up_down_velocity: {self.uav.up_down_velocity}, yaw_velocity: {self.uav.yaw_velocity}')
|
||
except AttributeError:
|
||
pass
|
||
|
||
def on_release(self, key):
|
||
try:
|
||
if key == keyboard.Key.up or key == keyboard.Key.down:
|
||
self.uav.for_back_velocity = 0
|
||
elif key == keyboard.Key.left or key == keyboard.Key.right:
|
||
self.uav.left_right_velocity = 0
|
||
elif key.char == 'w' or key.char == 's':
|
||
self.uav.up_down_velocity = 0
|
||
elif key.char == 'z' or key.char == 'c':
|
||
self.uav.yaw_velocity = 0
|
||
except AttributeError:
|
||
pass
|
||
|
||
|
||
class AutoControl:
|
||
def __init__(self, uav):
|
||
self.uav = uav
|
||
self.target = [0, 0, 0, 0]
|
||
self.tag_detector = apriltag.Detector(families='tag36h11')
|
||
|
||
self.yaw_controller = PIDController(0.3, 0, 0.1, time.time())
|
||
self.for_back_controler = PIDController(100, 0, 0.1, time.time())
|
||
self.left_right_controler = PIDController(50, 0, 0.1, time.time()) # 摄像头左右视野较小
|
||
self.loss_target_times = 0
|
||
self.is_get_tag = False
|
||
self.get_tag_times = 0
|
||
|
||
def detect(self, front_img):
|
||
cv2.imshow("ROI select", front_img)
|
||
|
||
# 在 selectROI 前等待按键(可选)
|
||
key = cv2.waitKey(10) # 等待按键
|
||
if key == ord('q'): # 如果按下 'q' 键
|
||
print("Quit selected before ROI.")
|
||
self.uav.tello.land()
|
||
cv2.destroyAllWindows()
|
||
exit()
|
||
|
||
self.gROI = cv2.selectROI("ROI select", front_img, False)
|
||
if (not self.gROI):
|
||
print("空框选,退出")
|
||
self.uav.tello.land()
|
||
self.gTracker = Tracker(tracker_type="KCF")
|
||
self.gTracker.initWorking(front_img, self.gROI)
|
||
logging.info("start tracking")
|
||
self.uav.status = Status.TRACK
|
||
cv2.destroyWindow("ROI select")
|
||
|
||
def track(self, front_img):
|
||
_item = self.gTracker.track(front_img)
|
||
if _item.getMessage():
|
||
self.loss_target_times = 0
|
||
print("tracking")
|
||
self.target = _item.getMessage()['target']
|
||
frame = _item.getFrame()
|
||
cv2.imshow("track result", frame)
|
||
self.uav.recorder.frame_record(frame, 'KCF_tracking')
|
||
|
||
# store the frame to video
|
||
|
||
_key = cv2.waitKey(1) & 0xFF
|
||
if (_key == ord('q')) | (_key == 27):
|
||
self.uav.status = Status.LANDING
|
||
|
||
if (_key == ord('r')) :
|
||
self.uav.status = Status.DETECT
|
||
# print(_item.getFrame())
|
||
# print(self.target)
|
||
yaw_vel = self.yaw_controller.control(self.target[0] - self.uav.front_cam_shape[0]/2, time.time())
|
||
self.uav.yaw_velocity = int(yaw_vel)
|
||
self.uav.for_back_velocity = 20
|
||
|
||
else:
|
||
self.loss_target_times += 1
|
||
if self.loss_target_times == 6:
|
||
# 等待6帧(0.2秒),仍丢失则重新检测并重置计数器
|
||
self.loss_target_times = 0 # 在函数头部还有一个
|
||
self.uav.status = Status.DETECT
|
||
self.uav.yaw_velocity = 0
|
||
self.uav.for_back_velocity = 0
|
||
|
||
if self.target[1] > self.uav.front_cam_shape[1] * 0.9:
|
||
print(self.target[1], self.uav.front_cam_shape[0]/10)
|
||
logging.warn("ready to land")
|
||
self.uav.yaw_velocity = 0
|
||
self.uav.for_back_velocity = 10
|
||
self.uav.status = Status.LANDING
|
||
self.uav.tello.set_video_direction(Tello.CAMERA_DOWNWARD)
|
||
|
||
def landing(self, bottom_img):
|
||
tag_large = self.tag_detector.detect(bottom_img[:, :, 0], estimate_tag_pose=True, camera_params=([353.836278849487, 353.077028029136, 163.745870517989, 115.130883974855]), tag_size=0.169)
|
||
tag_small = self.tag_detector.detect(bottom_img[:, :, 0], estimate_tag_pose=True, camera_params=([353.836278849487, 353.077028029136, 163.745870517989, 115.130883974855]), tag_size=0.018)
|
||
tag_list = []
|
||
for tag in tag_large:
|
||
if tag.tag_id == 1:
|
||
tag_list.append(tag)
|
||
for tag in tag_small:
|
||
if tag.tag_id == 0:
|
||
tag_list.append(tag)
|
||
|
||
if tag_list:
|
||
self.get_tag_times += 1
|
||
if self.get_tag_times == 5:
|
||
self.is_get_tag = True
|
||
for_vel = -1 * self.for_back_controler.control(tag_list[0].pose_t[0], time.time())
|
||
self.uav.for_back_velocity = int(for_vel)
|
||
|
||
left_vel = -1 * self.left_right_controler.control(tag_list[0].pose_t[1], time.time())
|
||
self.uav.left_right_velocity = int(left_vel)
|
||
# print(self.uav.for_back_velocity, self.uav.left_right_velocity)
|
||
if tag_list[0].pose_t[0] < 0.2 and tag_list[0].pose_t[1] < 0.2:
|
||
self.uav.up_down_velocity = -20
|
||
# print(tag_list[0].pose_t) # 后正左正高度
|
||
if tag_list[0].pose_t[2] < 0.3:
|
||
self.uav.tello.land()
|
||
else:
|
||
self.uav.up_down_velocity = 0
|
||
self.uav.left_right_velocity = 0
|
||
if self.is_get_tag:
|
||
self.uav.for_back_velocity = 0
|
||
logging.warn("no tag detected")
|
||
|
||
cv2.imshow('aa', bottom_img[:, :, 0])
|
||
cv2.waitKey(1)
|
||
|
||
|
||
# if tags:
|
||
# self.uav.status = Status.LANDED
|
||
# self.uav.tello.land()
|
||
# logging.warn('landed')
|
||
# else:
|
||
# self.uav.up_down_velocity = -20
|
||
|
||
|
||
|
||
|
||
class FakeUAV:
|
||
def __init__(self) -> None:
|
||
self.left_right_velocity = 0
|
||
self.for_back_velocity = 0
|
||
self.up_down_velocity = 0
|
||
self.yaw_velocity = 0
|
||
|
||
def send_rc_control(self, left_right_velocity, for_back_velocity, up_down_velocity, yaw_velocity):
|
||
print(f'left_right_velocity: {left_right_velocity}, for_back_velocity: {for_back_velocity}, up_down_velocity: {up_down_velocity}, yaw_velocity: {yaw_velocity}')
|
||
|
||
if __name__ == '__main__':
|
||
# uav = FakeUAV()
|
||
# manual_control=ManualControl(uav)
|
||
# manual_control.start()
|
||
# while True:
|
||
# time.sleep(0.05)
|
||
|
||
import logging
|
||
logging.info('aaa')
|