USVLander/utils.py

76 lines
2.9 KiB
Python

from enum import Enum
from pathlib import Path
from datetime import datetime
import time
import cv2
import logging
import csv
class Status(Enum):
# INIT = 0
DETECT = 1
TRACK = 2
LANDING = 3
FALLING = 4
class DataRecorder:
def __init__(self, exp_note='default', need_record=False):
# use path lib to manage file path
self.need_record = need_record
if need_record:
self.root_folder = Path('data') / str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '_' + exp_note)
self.root_folder.mkdir(parents=True, exist_ok=False)
self.video_folder = self.root_folder / 'video'
self.video_folder.mkdir(parents=True, exist_ok=False)
with open(str(self.root_folder) + '/sensor_data.csv', mode='a', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['timestamp', 'mid', 'x', 'y', 'z', 'mpry', 'pitch', 'roll', 'yaw', 'vgx', 'vgy', 'vgz', 'templ', 'temph', 'tof', 'h', 'bat', 'baro', 'time', 'agx', 'agy', 'agz', 'status', 'cmd_vel'])
writer.writeheader()
else:
logging.warn("实验数据将不被记录!")
def frame_record(self, frame, cam_name:str):
if self.need_record:
cam_folder = self.video_folder / cam_name # 分离不同来源的视频
if not cam_folder.exists():
# 如果文件夹不存在则创建
print('create folder:', str(cam_folder))
cam_folder.mkdir(parents=True, exist_ok=False)
frame_name = cam_folder / str(str(time.time()) + '.jpg')
if cv2.imwrite(str(frame_name), frame):
pass
else:
print('write frame failed')
def state_record(self, states):
if self.need_record:
with open(str(self.root_folder) + '/sensor_data.csv', mode='a', newline='') as file:
writer = csv.DictWriter(file, fieldnames=['timestamp', 'mid', 'x', 'y', 'z', 'mpry', 'pitch', 'roll', 'yaw', 'vgx', 'vgy', 'vgz', 'templ', 'temph', 'tof', 'h', 'bat', 'baro', 'time', 'agx', 'agy', 'agz', 'status', 'cmd_vel'])
# writer.writeheader()
writer.writerow(states)
def anything_record(self, data, data_name:str, header:list):
if self.need_record:
with open(str(self.root_folder) + '/' + data_name + '.csv', mode='a', newline='') as file:
writer = csv.DictWriter(file, fieldnames=header)
# writer.writeheader()
writer.writerow(data)
if __name__ == '__main__':
recorder = DataRecorder('aa')
# print(recorder.root_folder)
camera = cv2.VideoCapture(0)
while True:
ret, frame = camera.read()
if ret:
print(frame.shape)
cv2.imshow('frame', frame)
recorder.frame_record(frame, 'front')
cv2.waitKey(1)
time.sleep(0.1)