from __future__ import annotations import uuid import wave from pathlib import Path from backend.app.config import AppSettings, get_settings, project_root from backend.app.text_preprocess import split_sentences from backend.app.tts.engines_subprocess import create_engine ROOT = project_root() class TTSService: def __init__(self, settings: AppSettings | None = None) -> None: self.settings = settings or get_settings() self.engine = create_engine( self.settings.tts_model, self.settings.cosyvoice_model_dir, self.settings.cosyvoice_prompt_prefix, ) self.settings.outputs_dir.mkdir(parents=True, exist_ok=True) self.settings.uploads_dir.mkdir(parents=True, exist_ok=True) def resolve_reference( self, ref_audio: Path | None = None, ref_text: str | None = None, ) -> tuple[Path, str]: if ref_audio and ref_audio.is_file(): audio = ref_audio elif self.settings.default_ref_audio: audio = Path(self.settings.default_ref_audio) else: samples = sorted(self.settings.samples_dir.glob("*.wav")) if not samples: raise FileNotFoundError( "reference WAV 없음. samples/에 녹음하거나 TTS_REF_AUDIO 설정" ) audio = samples[0] text = ref_text or self.settings.default_ref_text or "" if not text: for candidate in ( audio.with_suffix(".txt"), self.settings.samples_dir / "my_voice_ref.txt", ): if candidate.is_file(): text = candidate.read_text(encoding="utf-8").strip() break if not text and self.settings.tts_model == "f5_tts": text = "참조 음성의 대본을 samples/my_voice_ref.txt 에 저장하세요." return audio, text def synthesize_to_file( self, text: str, ref_audio: Path | None = None, ref_text: str | None = None, job_id: str | None = None, ) -> tuple[str, Path]: ref_path, ref_txt = self.resolve_reference(ref_audio, ref_text) chunks = split_sentences(text, self.settings.chunk_max_chars) job_id = job_id or uuid.uuid4().hex[:12] job_dir = self.settings.outputs_dir / job_id job_dir.mkdir(parents=True, exist_ok=True) chunk_paths: list[Path] = [] for i, chunk in enumerate(chunks): out = job_dir / f"part_{i:03d}.wav" self.engine.synthesize(chunk, ref_path, ref_txt, out) chunk_paths.append(out) final = job_dir / "output.wav" if len(chunk_paths) == 1: chunk_paths[0].replace(final) else: _concat_wav(chunk_paths, final) return job_id, final def _concat_wav(paths: list[Path], out: Path) -> None: """동일 포맷 WAV 단순 연결.""" with wave.open(str(paths[0]), "rb") as w0: params = w0.getparams() frames = [w0.readframes(w0.getnframes())] for p in paths[1:]: with wave.open(str(p), "rb") as w: if w.getparams() != params: raise ValueError(f"WAV format mismatch: {p}") frames.append(w.readframes(w.getframes())) out.parent.mkdir(parents=True, exist_ok=True) with wave.open(str(out), "wb") as wo: wo.setparams(params) for f in frames: wo.writeframes(f)