76 lines
2.9 KiB
Python
76 lines
2.9 KiB
Python
from enum import Enum
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
import time
|
|
import cv2
|
|
import logging
|
|
import csv
|
|
|
|
|
|
class Status(Enum):
|
|
# INIT = 0
|
|
DETECT = 1
|
|
TRACK = 2
|
|
LANDING = 3
|
|
FALLING = 4
|
|
|
|
class DataRecorder:
|
|
def __init__(self, exp_note='default', need_record=False):
|
|
# use path lib to manage file path
|
|
self.need_record = need_record
|
|
if need_record:
|
|
self.root_folder = Path('data') / str(datetime.now().strftime('%Y-%m-%d_%H-%M-%S') + '_' + exp_note)
|
|
self.root_folder.mkdir(parents=True, exist_ok=False)
|
|
self.video_folder = self.root_folder / 'video'
|
|
self.video_folder.mkdir(parents=True, exist_ok=False)
|
|
|
|
with open(str(self.root_folder) + '/sensor_data.csv', mode='a', newline='') as file:
|
|
writer = csv.DictWriter(file, fieldnames=['timestamp', 'mid', 'x', 'y', 'z', 'mpry', 'pitch', 'roll', 'yaw', 'vgx', 'vgy', 'vgz', 'templ', 'temph', 'tof', 'h', 'bat', 'baro', 'time', 'agx', 'agy', 'agz', 'status', 'cmd_vel'])
|
|
writer.writeheader()
|
|
|
|
else:
|
|
logging.warn("实验数据将不被记录!")
|
|
|
|
def frame_record(self, frame, cam_name:str):
|
|
if self.need_record:
|
|
cam_folder = self.video_folder / cam_name # 分离不同来源的视频
|
|
if not cam_folder.exists():
|
|
# 如果文件夹不存在则创建
|
|
print('create folder:', str(cam_folder))
|
|
cam_folder.mkdir(parents=True, exist_ok=False)
|
|
|
|
frame_name = cam_folder / str(str(time.time()) + '.jpg')
|
|
if cv2.imwrite(str(frame_name), frame):
|
|
pass
|
|
else:
|
|
print('write frame failed')
|
|
|
|
def state_record(self, states):
|
|
if self.need_record:
|
|
with open(str(self.root_folder) + '/sensor_data.csv', mode='a', newline='') as file:
|
|
writer = csv.DictWriter(file, fieldnames=['timestamp', 'mid', 'x', 'y', 'z', 'mpry', 'pitch', 'roll', 'yaw', 'vgx', 'vgy', 'vgz', 'templ', 'temph', 'tof', 'h', 'bat', 'baro', 'time', 'agx', 'agy', 'agz', 'status', 'cmd_vel'])
|
|
# writer.writeheader()
|
|
writer.writerow(states)
|
|
|
|
def anything_record(self, data, data_name:str, header:list):
|
|
if self.need_record:
|
|
with open(str(self.root_folder) + '/' + data_name + '.csv', mode='a', newline='') as file:
|
|
writer = csv.DictWriter(file, fieldnames=header)
|
|
# writer.writeheader()
|
|
writer.writerow(data)
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
recorder = DataRecorder('aa')
|
|
# print(recorder.root_folder)
|
|
camera = cv2.VideoCapture(0)
|
|
while True:
|
|
ret, frame = camera.read()
|
|
if ret:
|
|
print(frame.shape)
|
|
cv2.imshow('frame', frame)
|
|
|
|
recorder.frame_record(frame, 'front')
|
|
cv2.waitKey(1)
|
|
time.sleep(0.1) |