Source code for rdata.unparser
"""Utilities for unparsing a rdata file."""
from __future__ import annotations
import io
from typing import TYPE_CHECKING
from rdata.parser import (
RData,
RObjectType,
)
from ._ascii import UnparserASCII
from ._binary import UnparserBinary
from ._xdr import UnparserXDR
if TYPE_CHECKING:
import os
from collections.abc import Callable
from typing import Any, Literal
from ._unparser import WriteableBinaryFile
FileFormat = Literal["xdr", "ascii", "binary"]
FileType = Literal["rds", "rda"]
Compression = Literal["gzip", "bzip2", "xz"] | None
[docs]
def unparse_file(
path: os.PathLike[Any] | str,
r_data: RData,
*,
file_format: FileFormat = "xdr",
file_type: FileType = "rds",
compression: Compression = "gzip",
) -> None:
"""
Unparse RData object to a file.
Args:
path: File path to be created.
r_data: RData object.
file_format: File format.
file_type: File type.
compression: Compression.
"""
open_with_compression: Callable[
[os.PathLike[Any] | str, Literal["wb"]],
WriteableBinaryFile,
]
if compression is None:
open_with_compression = open
elif compression == "bzip2":
from bz2 import open as open_with_compression # noqa: PLC0415
elif compression == "gzip":
from gzip import open as open_with_compression # noqa: PLC0415
elif compression == "xz":
from lzma import open as open_with_compression # noqa: PLC0415
else:
msg = f"Unknown compression: {compression}"
raise ValueError(msg)
with open_with_compression(path, "wb") as f:
unparse_fileobj(
f,
r_data,
file_format=file_format,
file_type=file_type,
)
def unparse_fileobj(
fileobj: WriteableBinaryFile,
r_data: RData,
*,
file_format: FileFormat = "xdr",
file_type: FileType = "rds",
) -> None:
"""
Unparse RData object to a file object.
Args:
fileobj: File object.
r_data: RData object.
file_format: File format.
file_type: File type.
"""
unparser_class: type[UnparserXDR | UnparserASCII | UnparserBinary]
if file_format == "ascii":
unparser_class = UnparserASCII
rda_magic = "RDA"
elif file_format == "xdr":
unparser_class = UnparserXDR
rda_magic = "RDX"
elif file_format == "binary":
unparser_class = UnparserBinary
rda_magic = "RDB"
else:
msg = f"Unknown file format: {file_format}"
raise ValueError(msg)
# Check that RData object for rda file is of correct kind
if file_type == "rda":
r_object = r_data.object
if not (
r_object.info.type is RObjectType.LIST
and r_object.tag is not None
and r_object.tag.info.type is RObjectType.SYM
):
msg = "r_data object must be dictionary-like for rda file"
raise ValueError(msg)
# Write rda-specific magic
if file_type == "rda":
fileobj.write(f"{rda_magic}{r_data.versions.format}\n".encode("ascii"))
unparser = unparser_class(fileobj)
unparser.unparse_r_data(r_data)
[docs]
def unparse_data(
r_data: RData,
*,
file_format: FileFormat = "xdr",
file_type: FileType = "rds",
) -> bytes:
"""
Unparse RData object to a bytestring.
Args:
r_data: RData object.
file_format: File format.
file_type: File type.
Returns:
Bytestring of data.
"""
fd = io.BytesIO()
unparse_fileobj(fd, r_data, file_format=file_format, file_type=file_type)
return fd.getvalue()