From d0595851802f813af5cc34d9019dbd8e6be3ff92 Mon Sep 17 00:00:00 2001 From: raiot Date: Thu, 24 Aug 2023 23:16:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20ppt,=20excel,=20pdf,=20txt=20=E6=96=87?= =?UTF-8?q?=E4=BB=B6=E6=95=8F=E6=84=9F=E8=AF=8D=E6=A3=80=E7=B4=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- retriever/tools/keyword_find.py | 162 +++++++++++++++++++++++++++++++- 1 file changed, 158 insertions(+), 4 deletions(-) diff --git a/retriever/tools/keyword_find.py b/retriever/tools/keyword_find.py index f5122e1..02c7acb 100644 --- a/retriever/tools/keyword_find.py +++ b/retriever/tools/keyword_find.py @@ -1,6 +1,10 @@ import docx2txt +from pptx import Presentation import re +import PyPDF2 +import pandas as pd + def insert_html_tag(paragraph: str, keyword_list: list) -> str: """ @@ -18,6 +22,8 @@ def insert_html_tag(paragraph: str, keyword_list: list) -> str: def docx_find(file_path: str, keyword_list: list) -> dict: """ + docx 文件查找 + 查找范围:所有内容 包含所有段落,页眉页脚,表格,文本框等 :param file_path: 文件路径 :param keyword_list: 列表形式的敏感词 :return: 返回一个字典,包含文件名和敏感词所在的段落 {'file_name': 'xxx.docx', 'find_list': ['段落1', '段落2', ...]} @@ -37,13 +43,132 @@ def docx_find(file_path: str, keyword_list: list) -> dict: paragraph_keyword.append(this_para_keyword) find_list.append(para) - print(len(find_list)) + # print(len(find_list)) find_dict['find_list'] = find_list find_dict['paragraph_keyword'] = paragraph_keyword return find_dict -# print(docx_find('浅水海底电缆打捞大作业.docx', ['机密', '铲出'])) +def pptx_find(file_path: str, keyword_list: list) -> dict: + """ + ppt 文件查找 + 查找范围:所有页的所有文本框 + :param file_path: + :param keyword_list: + :return: + """ + prs = Presentation(file_path) # 打开PPT文件 + + text_content = "" + find_list = [] + paragraph_keyword = [] + + for slide in prs.slides: + for shape in slide.shapes: + if shape.has_text_frame: + for paragraph in shape.text_frame.paragraphs: + for run in paragraph.runs: + text_content += run.text + "\n" + for para in text_content.split('\n'): + para = re.sub(r'\s+', '', para) + this_para_keyword = [keyword for keyword in keyword_list if keyword in para] # 查找该段落中的敏感词 + + if this_para_keyword: + paragraph_keyword.append(this_para_keyword) + find_list.append(para) + + find_dict = {'file_name': file_path, 'find_list': find_list, 'paragraph_keyword': paragraph_keyword} + return find_dict + + +def pdf_find(file_path: str, keyword_list: list) -> dict: + """ + pdf 文件查找 + 查找范围:所有页的所有段落,分页处敏感词可能会被分割无法查找到 + :param file_path: + :param keyword_list: + :return: + """ + find_list = [] + paragraph_keyword = [] + with open(file_path, 'rb') as pdf_file: + pdf_reader = PyPDF2.PdfReader(pdf_file) + text_content = "" + + for page_num in range(len(pdf_reader.pages)): + page = pdf_reader.pages[page_num] + page_text = page.extract_text().replace("\n", "").replace(" ", "") + text_content += page_text + "\n" + + for para in text_content.split('\n'): + para = re.sub(r'\s+', '', para) + this_para_keyword = [keyword for keyword in keyword_list if keyword in para] # 查找该段落中的敏感词 + + if this_para_keyword: + paragraph_keyword.append(this_para_keyword) + find_list.append(para) + + find_dict = {'file_name': file_path, 'find_list': find_list, 'paragraph_keyword': paragraph_keyword} + return find_dict + + +def excel_find(file_path: str, keyword_list: list) -> dict: + """ + excel 文件查找 + 查找范围;所有工作表的所有单元格;页眉页脚,文本框无法查找 + :param file_path: + :param keyword_list: + :return: + """ + find_list = [] # 存储包含关键字的单元格位置 + paragraph_keyword = [] # 存储包含关键字的单元格位置 + + # 读取Excel文件的所有工作表 + xls = pd.ExcelFile(file_path) + + for sheet_name in xls.sheet_names: + df = pd.read_excel(file_path, sheet_name=sheet_name, header=None) + + # 遍历每个工作表的每个单元格 + for index, row in df.iterrows(): + for col_name, cell_value in row.items(): + if isinstance(cell_value, str): + + this_para_keyword = [keyword for keyword in keyword_list if keyword in cell_value] # 查找该段落中的敏感词 + + if this_para_keyword: + paragraph_keyword.append(this_para_keyword) + find_list.append(f"{sheet_name}, {index + 1}, {col_name}, {cell_value}") + find_dict = {'file_name': file_path, 'find_list': find_list, 'paragraph_keyword': paragraph_keyword} + + return find_dict + + +def txt_find(file_path: str, keyword_list: list) -> dict: + """ + txt 文件查找 + 查找范围:所有段落 + :param file_path: + :param keyword_list: + :return: + """ + find_list = [] + paragraph_keyword = [] + with open(file_path, 'r') as txt_file: + text_content = txt_file.read() + + for para in text_content.split('\n'): + para = re.sub(r'\s+', '', para) + this_para_keyword = [keyword for keyword in keyword_list if keyword in para] # 查找该段落中的敏感词 + + if this_para_keyword: + paragraph_keyword.append(this_para_keyword) + find_list.append(para) + + find_dict = {'file_name': file_path, 'find_list': find_list, 'paragraph_keyword': paragraph_keyword} + return find_dict + + def util_keyword_find(file_path: str, keyword_list: list) -> dict: """ 对指定单一文件进行敏感词查找 @@ -52,7 +177,7 @@ def util_keyword_find(file_path: str, keyword_list: list) -> dict: :return: 返回一个字典,包含文件名和敏感词所在的段落 {'file_name': 'xxx.docx', 'find_list': ['段落1', '段落2', ...]} """ # check file type - if file_path.endswith('.docx'): + if file_path.endswith(('.docx', '.doc', '.dot')): find_dict = docx_find(file_path, keyword_list) # 调用docx_find函数 for para, this_para_keyword in zip(find_dict['find_list'], find_dict['paragraph_keyword']): marked_para = insert_html_tag(para, keyword_list) @@ -60,9 +185,38 @@ def util_keyword_find(file_path: str, keyword_list: list) -> dict: # print(find_dict) return find_dict + elif file_path.endswith(('.pptx', '.ppt', '.pot')): + find_dict = pptx_find(file_path, keyword_list) + for para, this_para_keyword in zip(find_dict['find_list'], find_dict['paragraph_keyword']): + marked_para = insert_html_tag(para, keyword_list) + find_dict['find_list'][find_dict['find_list'].index(para)] = marked_para + return find_dict + + elif file_path.endswith(('.xlsx', '.xls', '.xlsm', '.xlsb', '.xltm', '.xltx')): + find_dict = excel_find(file_path, keyword_list) + for para, this_para_keyword in zip(find_dict['find_list'], find_dict['paragraph_keyword']): + marked_para = insert_html_tag(para, keyword_list) + find_dict['find_list'][find_dict['find_list'].index(para)] = marked_para + return find_dict + + elif file_path.endswith('.pdf'): + find_dict = pdf_find(file_path, keyword_list) + for para, this_para_keyword in zip(find_dict['find_list'], find_dict['paragraph_keyword']): + marked_para = insert_html_tag(para, keyword_list) + find_dict['find_list'][find_dict['find_list'].index(para)] = marked_para + return find_dict + + elif file_path.endswith('.txt'): + find_dict = txt_find(file_path, keyword_list) + for para, this_para_keyword in zip(find_dict['find_list'], find_dict['paragraph_keyword']): + marked_para = insert_html_tag(para, keyword_list) + find_dict['find_list'][find_dict['find_list'].index(para)] = marked_para + return find_dict + else: return {'file_name': file_path, 'find_list': []} if __name__ == '__main__': - print(util_keyword_find('浅水海底电缆打捞大作业.docx', ['打捞', '浅水'])) + # print(util_keyword_find('浅水海底电缆打捞大作业.docx', ['打捞', '浅水'])) + print(util_keyword_find('浅水.xlsx', ['打捞', '声呐', '浅水']))