"""Class for decoding a project"""
import struct
from pathlib import Path, PurePath
from typing import List, TypeVar
import click
import magic
from rpgmaker_mv_decoder.callbacks import Callbacks
from rpgmaker_mv_decoder.clickdisplay import ClickDisplay
from rpgmaker_mv_decoder.constants import OCT_STREAM, RPG_MAKER_MV_MAGIC
from rpgmaker_mv_decoder.exceptions import FileFormatError, RPGMakerHeaderError
from rpgmaker_mv_decoder.project import Project
from rpgmaker_mv_decoder.utils import int_xor
_T = TypeVar("_T", bound="ProjectDecoder")
[docs]class ProjectDecoder(Project):
"""Handles a project and runs operations"""
def __init__(
self: _T,
source: PurePath,
destination: PurePath,
key: str,
callbacks: Callbacks = Callbacks(),
) -> _T:
"""`ProjectDecoder` constructor
Args:
- `source` (`PurePath`): Where to find the files to decode
- `destination` (`PurePath`): Where to save the files to decode
- `key` (`str`): Key to use when decoding
- `callbacks` (`Callback`, optional): Callbacks to run on events.\
Defaults to `Callback()`.
Returns:
- `ProjectDecoder`: object to run actions on
"""
Project.__init__(self, source, destination, key, callbacks)
def _get_output_filename(self: _T, filename: Path, data: bytes = None) -> str:
"""`_get_output_filename` Returns a file name for the specified file
If data is not `None`, uses libmagic to figure out the actual file type
and place a proper extension on the file. Otherwise it uses the
original name to generate the extension.
Args:
- `filename` (`Path`): Original file path.
- `data` (`bytes`, optional): File data (decoded) for libmagic. \
Defaults to `None`.
Raises:
- `FileFormatError`: If libmagic can't determine the file type\
or the existing file extension is unknown.
Returns:
- `str`: The decoded file extension
"""
output_file: PurePath = self.project_paths.output_directory.joinpath(
PurePath(filename).relative_to(self.project_paths.source)
)
if data:
filetype: str = magic.from_buffer(data, mime=True)
if filetype == OCT_STREAM:
raise FileFormatError(
f'"{filetype}" == "{OCT_STREAM}"',
"Found octlet stream, key is probably incorrect.",
)
return output_file.with_suffix("." + filetype.split("/")[-1])
if not filename:
raise ValueError("data and filename are both None")
if filename.suffix == ".rpgmvp":
return output_file.with_suffix(".png")
if filename.suffix == ".rpgmvo":
return output_file.with_suffix(".ogg")
raise FileFormatError(
f'"{filename.suffix}"',
f'Unknown extension "{filename.suffix}"',
)
[docs] def decode_file(self: _T, input_file: PurePath, detect_type: bool) -> bool:
"""`decode_file` Takes a path and decodes a file
Args:
- `input_file` (`PurePath`): File to read and modify
- `detect_type` (`bool`): True means generate file extensions based on\
file contents
Returns:
- `bool`: True if the operation should continue
"""
output_file = self._get_output_filename(input_file)
data: bytes
with click.open_file(input_file, "rb") as file:
data: bytes = self.decode_header(file.read(32))
data += file.read()
if detect_type:
output_file = self._get_output_filename(input_file, data)
return self._save_file(output_file, data)
[docs] def decode(
self: _T,
detect_type: bool,
) -> None:
"""`decode` Decodes a project
Args:
- `detect_type` (`bool`): True means generate file extensions based on\
file contents
"""
self._callbacks.info(f"Reading from: '{self.project_paths.source}'")
self._callbacks.info(f"Writing to: '{self.project_paths.output_directory}'")
files: List[Path] = self.project_paths.encoded_files
click_display = ClickDisplay(files)
with click.progressbar(
files,
label="Decoding files",
width=0,
item_show_func=click_display.show_item,
) as files_to_decode:
filename: Path
for filename in files_to_decode:
if self._callbacks.progressbar(files_to_decode):
break
try:
if not self.decode_file(filename, detect_type):
break
except RPGMakerHeaderError:
warning_text: str = f'Invalid header found on "{filename}", skipping.'
self._callbacks.warning(warning_text)
except FileFormatError:
self._callbacks.warning(
"Found octlet stream, key is probably incorrect, "
f"skipping {click.format_filename(str(filename))}"
)
self._callbacks.progressbar(None)