| | |
| | |
| | """ |
| | cha_json.py — 將單一 CLAN .cha 轉成 JSON(強化 %mor/%wor/%gra 對齊) |
| | 用法: |
| | # CLI |
| | python3 cha_json.py --input /path/to/input.cha --output /path/to/output.json |
| | |
| | 程式化呼叫(供 pipeline 使用): |
| | from cha_json import cha_to_json_file, cha_to_dict |
| | out_path, data = cha_to_json_file("/path/in.cha", "/path/out.json") |
| | data2 = cha_to_dict("/path/in.cha") |
| | """ |
| |
|
| | from __future__ import annotations |
| | import re |
| | import json |
| | import sys |
| | import argparse |
| | from pathlib import Path |
| | from collections import defaultdict |
| | from typing import List, Dict, Any, Tuple, Optional |
| |
|
| | |
| | TAG_PREFIXES = ("*PAR", "*INV", "%mor:", "%gra:", "%wor:", "@") |
| | WORD_RE = re.compile(r"[A-Za-z0-9]+") |
| |
|
| | |
| | ID_PAR_RE = re.compile(r"\|PAR\d*\|") |
| |
|
| | |
| | UTTER_RE = re.compile(r"^\*(INV|PAR\d+):") |
| |
|
| | |
| | SYN_SETS = [ |
| | {"be", "am", "is", "are", "was", "were", "been", "being"}, |
| | {"have", "has", "had"}, |
| | {"do", "does", "did", "done", "doing"}, |
| | {"go", "goes", "going", "went", "gone"}, |
| | {"run", "runs", "running", "ran"}, |
| | {"see", "sees", "seeing", "saw", "seen"}, |
| | {"get", "gets", "getting", "got", "gotten"}, |
| | {"drop", "drops", "dropping", "dropped"}, |
| | {"swim", "swims", "swimming", "swam", "swum"}, |
| | ] |
| | def same_syn(a: str, b: str) -> bool: |
| | if not a or not b: |
| | return False |
| | for s in SYN_SETS: |
| | if a in s and b in s: |
| | return True |
| | return False |
| |
|
| | def canonical(txt: str) -> str: |
| | """token/word → 比對用字串:去掉 & ~ - | 之後的非字母數字、轉小寫""" |
| | head = re.split(r"[~\-\&|]", txt, 1)[0] |
| | m = WORD_RE.search(head) |
| | return m.group(0).lower() if m else "" |
| |
|
| | def merge_multiline(block_lines: List[str]) -> str: |
| | """ |
| | 合併跨行的 %mor/%wor/%gra。 |
| | 規則:以 '%' 開頭者作為起始,往下串,遇到新標籤或 @ 開頭就停。 |
| | """ |
| | merged, buf = [], None |
| | for raw in block_lines: |
| | ln = raw.rstrip("\n").replace("\x15", "") |
| | if ln.lstrip().startswith("%") and ":" in ln: |
| | if buf: |
| | merged.append(buf) |
| | buf = ln |
| | else: |
| | if buf and ln.strip(): |
| | buf += " " + ln.strip() |
| | else: |
| | merged.append(ln) |
| | if buf: |
| | merged.append(buf) |
| | return "\n".join(merged) |
| |
|
| | def cha_to_json(lines: List[str]) -> Dict[str, Any]: |
| | """ |
| | 將 .cha 檔行列表轉 JSON 結構。 |
| | 回傳格式: |
| | { |
| | "sentences": [...], |
| | "pos_mapping": {...}, |
| | "grammar_mapping": {...}, |
| | "aphasia_types": {...}, |
| | "text_all": "..." # 方便下游模型使用的 PAR 合併文字 |
| | } |
| | """ |
| | |
| | pos_map: Dict[str, int] = defaultdict(lambda: len(pos_map) + 1) |
| | gra_map: Dict[str, int] = defaultdict(lambda: len(gra_map) + 1) |
| | aphasia_map: Dict[str, int] = defaultdict(lambda: len(aphasia_map)) |
| |
|
| | data: List[Dict[str, Any]] = [] |
| | sent: Optional[Dict[str, Any]] = None |
| |
|
| | i = 0 |
| | while i < len(lines): |
| | line = lines[i].rstrip("\n") |
| |
|
| | |
| | if line.startswith("@Begin"): |
| | sent = { |
| | "sentence_id": f"S{len(data)+1}", |
| | "sentence_pid": None, |
| | "aphasia_type": None, |
| | "dialogues": [] |
| | } |
| | i += 1 |
| | continue |
| |
|
| | |
| | if line.startswith("@End"): |
| | if sent and sent["dialogues"]: |
| | if not sent.get("aphasia_type"): |
| | sent["aphasia_type"] = "UNKNOWN" |
| | aphasia_map["UNKNOWN"] |
| | data.append(sent) |
| | sent = None |
| | i += 1 |
| | continue |
| |
|
| | |
| | if sent and line.startswith("@PID:"): |
| | parts = line.split("\t") |
| | if len(parts) > 1: |
| | sent["sentence_pid"] = parts[1].strip() |
| | i += 1 |
| | continue |
| |
|
| | if sent and line.startswith("@ID:"): |
| | |
| | if ID_PAR_RE.search(line): |
| | aph = "UNKNOWN" |
| | |
| | |
| | |
| | aph = aph.upper() |
| | aphasia_map[aph] |
| | sent["aphasia_type"] = aph |
| | i += 1 |
| | continue |
| |
|
| | |
| | if sent and UTTER_RE.match(line): |
| | role_tag = UTTER_RE.match(line).group(1) |
| | role = "INV" if role_tag == "INV" else "PAR" |
| |
|
| | if not sent["dialogues"]: |
| | sent["dialogues"].append({"INV": [], "PAR": []}) |
| | |
| | if role == "INV" and sent["dialogues"][-1]["PAR"]: |
| | sent["dialogues"].append({"INV": [], "PAR": []}) |
| |
|
| | |
| | sent["dialogues"][-1][role].append( |
| | {"tokens": [], "word_pos_ids": [], "word_grammar_ids": [], "word_durations": [], "utterance_text": ""} |
| | ) |
| | i += 1 |
| | continue |
| |
|
| | |
| | if sent and line.startswith("%mor:"): |
| | blk = [line]; i += 1 |
| | while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): |
| | blk.append(lines[i]); i += 1 |
| |
|
| | units = merge_multiline(blk).replace("%mor:", "").strip().split() |
| | toks, pos_ids = [], [] |
| | for u in units: |
| | if "|" in u: |
| | pos, rest = u.split("|", 1) |
| | word = rest.split("|", 1)[0] |
| | toks.append(word) |
| | pos_ids.append(pos_map[pos]) |
| |
|
| | dlg = sent["dialogues"][-1] |
| | tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] |
| | tgt["tokens"], tgt["word_pos_ids"] = toks, pos_ids |
| | |
| | tgt["utterance_text"] = " ".join(toks).strip() |
| | continue |
| |
|
| | |
| | if sent and line.startswith("%wor:"): |
| | blk = [line]; i += 1 |
| | while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): |
| | blk.append(lines[i]); i += 1 |
| |
|
| | merged = merge_multiline(blk).replace("%wor:", "").strip() |
| | |
| | raw_pairs = re.findall(r"(\S+)\s+(\d+)_(\d+)", merged) |
| | wor = [(w, int(s), int(e)) for (w, s, e) in raw_pairs] |
| |
|
| | dlg = sent["dialogues"][-1] |
| | tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] |
| |
|
| | |
| | aligned: List[Tuple[str, int]] = [] |
| | j = 0 |
| | for tok in tgt.get("tokens", []): |
| | c_tok = canonical(tok) |
| | match = None |
| | for k in range(j, len(wor)): |
| | c_w = canonical(wor[k][0]) |
| | if ( |
| | c_tok == c_w |
| | or c_w.startswith(c_tok) |
| | or c_tok.startswith(c_w) |
| | or same_syn(c_tok, c_w) |
| | ): |
| | match = wor[k] |
| | j = k + 1 |
| | break |
| | dur = (match[2] - match[1]) if match else 0 |
| | aligned.append([tok, dur]) |
| | tgt["word_durations"] = aligned |
| | continue |
| |
|
| | |
| | if sent and line.startswith("%gra:"): |
| | blk = [line]; i += 1 |
| | while i < len(lines) and not lines[i].lstrip().startswith(TAG_PREFIXES): |
| | blk.append(lines[i]); i += 1 |
| |
|
| | units = merge_multiline(blk).replace("%gra:", "").strip().split() |
| | triples = [] |
| | for u in units: |
| | |
| | parts = u.split("|") |
| | if len(parts) == 3: |
| | a, b, r = parts |
| | if a.isdigit() and b.isdigit(): |
| | triples.append([int(a), int(b), gra_map[r]]) |
| |
|
| | dlg = sent["dialogues"][-1] |
| | tgt = dlg["PAR"][-1] if dlg["PAR"] else dlg["INV"][-1] |
| | tgt["word_grammar_ids"] = triples |
| | continue |
| |
|
| | |
| | i += 1 |
| |
|
| | |
| | if sent and sent["dialogues"]: |
| | if not sent.get("aphasia_type"): |
| | sent["aphasia_type"] = "UNKNOWN" |
| | aphasia_map["UNKNOWN"] |
| | data.append(sent) |
| |
|
| | |
| | par_texts: List[str] = [] |
| | for s in data: |
| | for turn in s.get("dialogues", []): |
| | for par_ut in turn.get("PAR", []): |
| | if par_ut.get("utterance_text"): |
| | par_texts.append(par_ut["utterance_text"]) |
| | text_all = "\n".join(par_texts).strip() |
| |
|
| | return { |
| | "sentences": data, |
| | "pos_mapping": dict(pos_map), |
| | "grammar_mapping": dict(gra_map), |
| | "aphasia_types": dict(aphasia_map), |
| | "text_all": text_all |
| | } |
| |
|
| | |
| | def cha_to_dict(cha_path: str) -> Dict[str, Any]: |
| | """讀取 .cha 檔並回傳 dict(不寫檔)。""" |
| | p = Path(cha_path) |
| | if not p.exists(): |
| | raise FileNotFoundError(f"找不到檔案: {cha_path}") |
| | with p.open("r", encoding="utf-8") as fh: |
| | lines = fh.readlines() |
| | return cha_to_json(lines) |
| |
|
| | def cha_to_json_file(cha_path: str, output_json: Optional[str] = None) -> Tuple[str, Dict[str, Any]]: |
| | """ |
| | 將 .cha 轉成 JSON 並寫檔。 |
| | 回傳:(output_json_path, data_dict) |
| | """ |
| | data = cha_to_dict(cha_path) |
| | out_path = Path(output_json) if output_json else Path(cha_path).with_suffix(".json") |
| | out_path.parent.mkdir(parents=True, exist_ok=True) |
| | with out_path.open("w", encoding="utf-8") as fh: |
| | json.dump(data, fh, ensure_ascii=False, indent=4) |
| | return str(out_path), data |
| |
|
| | |
| | def parse_args(): |
| | p = argparse.ArgumentParser() |
| | p.add_argument("--input", "-i", type=str, required=True, help="輸入 .cha 檔") |
| | p.add_argument("--output", "-o", type=str, required=True, help="輸出 .json 檔") |
| | return p.parse_args() |
| |
|
| | def cha_to_json_path(cha_path: str, output_json: str | None = None) -> str: |
| | """Backward-compatible alias for old code.""" |
| | out, _ = cha_to_json_file(cha_path, output_json=output_json) |
| | return out |
| |
|
| | def main(): |
| | args = parse_args() |
| | in_path = Path(args.input) |
| | out_path = Path(args.output) |
| |
|
| | if not in_path.exists(): |
| | sys.exit(f"❌ 找不到檔案: {in_path}") |
| |
|
| | with in_path.open("r", encoding="utf-8") as fh: |
| | lines = fh.readlines() |
| |
|
| | dataset = cha_to_json(lines) |
| |
|
| | out_path.parent.mkdir(parents=True, exist_ok=True) |
| | with out_path.open("w", encoding="utf-8") as fh: |
| | json.dump(dataset, fh, ensure_ascii=False, indent=4) |
| |
|
| | print( |
| | f"✅ 轉換完成 → {out_path}(句數 {len(dataset['sentences'])}," |
| | f"pos={len(dataset['pos_mapping'])},gra={len(dataset['grammar_mapping'])}," |
| | f"類型鍵={list(dataset['aphasia_types'].keys())})" |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|