import re
import json
import logging
import os
[docs]
def clean_text(text):
"""Cleans the input text by removing excessive whitespace.
This function replaces all sequences of whitespace characters (including tabs,
newlines, and multiple spaces) with a single space. It also trims any leading
or trailing whitespace from the text.
Args:
text (str): The input text to be cleaned.
Returns:
str: The cleaned text with excessive whitespace removed and leading/trailing
whitespace trimmed.
"""
return re.sub(r"\s+", " ", text).strip()
[docs]
def write_response_to_file(response, output_file, json_format=False):
"""
Write the generated response to the specified output file.
Depending on the `json_format` flag, the response is either written as JSON or plain text.
Args:
response (str): The response string to be written to the file.
output_file (str): The base name for the output file (without extension).
json_format (bool): Flag to indicate whether to write as JSON. Defaults to False.
Side Effects:
Writes the response to the specified output file.
"""
def write_to_json(response, json_file_path):
"""
Write the response to a JSON file, handling any existing data.
Args:
response (str): The response string to be written.
json_file_path (str): The path to the JSON file.
Side Effects:
Updates the JSON file with new question-answer pairs.
"""
# Load existing JSON data or start fresh if needed
response_data = load_existing_json_data(json_file_path)
# Parse the response into question-answer pairs
qa_pairs = parse_response_into_qa_pairs(response)
# Append parsed QA pairs to the JSON data
for qa in qa_pairs:
if "\n" in qa:
question, answer = qa.split("\n", 1)
response_data.append({"question": question.strip(), "answer": answer.strip()})
else:
logging.warning(f"Malformed QA pair found: {qa}")
# Write the updated data back to the JSON file
save_json_data(json_file_path, response_data)
def load_existing_json_data(json_file_path):
"""
Load existing data from a JSON file, or return an empty list if there are issues.
Args:
json_file_path (str): The path to the JSON file.
Returns:
list: Existing data from the JSON file, or an empty list if the file is empty or invalid.
"""
if os.path.exists(json_file_path):
try:
with open(json_file_path, "r", encoding="utf-8") as json_file:
return json.load(json_file)
except json.JSONDecodeError:
logging.warning(f"JSON decode error in {json_file_path}, starting fresh.")
return []
def save_json_data(json_file_path, data):
"""
Save the provided data to a JSON file.
Args:
json_file_path (str): The path to the JSON file.
data (list): The list of question-answer pairs to be written to the file.
Side Effects:
Overwrites the content of the JSON file with the provided data.
"""
with open(json_file_path, "w", encoding="utf-8") as json_file:
json.dump(data, json_file, ensure_ascii=False, indent=4)
def write_to_text(response, text_file_path):
"""
Write the response to a text file, handling any existing data.
Args:
response (str): The response string to be written.
text_file_path (str): The path to the text file.
Side Effects:
Updates the text file with new question-answer pairs.
"""
# Load existing QA pairs from the text file, if any
existing_qa_pairs = load_existing_text_data(text_file_path)
# Parse the response into question-answer pairs
qa_pairs = parse_response_into_qa_pairs(response)
# Append parsed QA pairs to the existing pairs
for qa in qa_pairs:
if "\n" in qa:
question, answer = qa.split("\n", 1)
existing_qa_pairs.append(f"{question.strip()}\n{answer.strip()}")
else:
logging.warning(f"Malformed QA pair found: {qa}")
# Write all QA pairs (existing + new) to the text file
save_text_data(text_file_path, existing_qa_pairs)
def load_existing_text_data(text_file_path):
"""
Load existing QA pairs from a text file, or return an empty list if the file doesn't exist.
Args:
text_file_path (str): The path to the text file.
Returns:
list: Existing QA pairs, or an empty list if the file is empty or invalid.
"""
if os.path.exists(text_file_path):
with open(text_file_path, "r", encoding="utf-8") as text_file:
return text_file.read().strip().split("\n\n")
return []
def save_text_data(text_file_path, qa_pairs):
"""
Save the provided QA pairs to a text file.
Args:
text_file_path (str): The path to the text file.
qa_pairs (list): The list of question-answer pairs to be written.
Side Effects:
Overwrites the content of the text file with the provided QA pairs.
"""
with open(text_file_path, "w", encoding="utf-8") as text_file:
for qa in qa_pairs:
text_file.write(qa + "\n\n")
def parse_response_into_qa_pairs(response):
"""
Parse the response into question-answer pairs.
Args:
response (str): The response string to be parsed.
Returns:
list: A list of question-answer pairs.
"""
return response.strip().split("\n\n")
# Log the response being processed
logging.info(response)
# Determine the format for writing the response
if json_format:
json_file_path = output_file.replace(".txt", ".json")
write_to_json(response, json_file_path)
else:
text_file_path = output_file
write_to_text(response, text_file_path)