from enum import Enum from pathlib import Path from datetime import datetime import time import cv2 import logging import csv class Status(Enum): # INIT = 0 DETECT = 1 TRACK = 2 LANDING = 3 class DataRecorder: def __init__(self, exp_note='default', need_record=False): # use path lib to manage file path self.need_record = need_record if need_record: self.root_folder = Path('data') / str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '_' + exp_note) self.root_folder.mkdir(parents=True, exist_ok=False) self.video_folder = self.root_folder / 'video' self.video_folder.mkdir(parents=True, exist_ok=False) with open(str(self.root_folder) + '/sensor_data.csv', mode='a', newline='') as file: self.writer = csv.DictWriter(file, fieldnames=['timestamp', 'mid', 'x', 'y', 'z', 'mpry', 'pitch', 'roll', 'yaw', 'vgx', 'vgy', 'vgz', 'templ', 'temph', 'tof', 'h', 'bat', 'baro', 'time', 'agx', 'agy', 'agz']) self.writer.writeheader() else: logging.warn("实验数据将不被记录!") def frame_record(self, frame, cam_name:str): if self.need_record: cam_folder = self.video_folder / cam_name # 分离不同来源的视频 if not cam_folder.exists(): # 如果文件夹不存在则创建 print('create folder:', str(cam_folder)) cam_folder.mkdir(parents=True, exist_ok=False) frame_name = cam_folder / str(str(time.time()) + '.jpg') if cv2.imwrite(str(frame_name), frame): pass else: print('write frame failed') def state_record(self, states): if self.need_record: with open(str(self.root_folder) + '/sensor_data.csv', mode='a', newline='') as file: writer = csv.DictWriter(file, fieldnames=['timestamp', 'mid', 'x', 'y', 'z', 'mpry', 'pitch', 'roll', 'yaw', 'vgx', 'vgy', 'vgz', 'templ', 'temph', 'tof', 'h', 'bat', 'baro', 'time', 'agx', 'agy', 'agz']) writer.writerow(states) if __name__ == '__main__': recorder = DataRecorder('aa') # print(recorder.root_folder) camera = cv2.VideoCapture(0) while True: ret, frame = camera.read() if ret: print(frame.shape) cv2.imshow('frame', frame) recorder.frame_record(frame, 'front') cv2.waitKey(1) time.sleep(0.1)