Source code for embdgen.core.utils.image

# SPDX-License-Identifier: GPL-3.0-only

"""
Utility functions for working with images
"""
from __future__ import annotations

import io
import os
import tempfile
import shutil

from pathlib import Path
from typing import Optional
from fallocate import fallocate, FALLOC_FL_PUNCH_HOLE, FALLOC_FL_KEEP_SIZE # type: ignore


[docs] class BuildLocation: """ Temporary location of the builds """ __instance: BuildLocation | None = None _path: Path _was_created: bool def __new__(cls) -> BuildLocation: if cls.__instance is None: cls.__instance: BuildLocation = super(BuildLocation, cls).__new__(cls) cls.__instance._path = Path(tempfile.mkdtemp(prefix="embdgen-")) cls.__instance._was_created = True return cls.__instance def __del__(self): self._remove()
[docs] def set_path(self, path: Path) -> None: self._remove() self._path = path self._was_created = False if not self._path.exists(): self._path.mkdir(parents=True, exist_ok=True) self._was_created = True
[docs] def remove(self) -> None: self._remove() BuildLocation.__instance = None
def _remove(self) -> None: if self._was_created and self._path.exists(): shutil.rmtree(self._path) @property def path(self): return self._path
[docs] def create_empty_image(filename: Path, size: int) -> None: """ Create an empty sparse (if possible) file """ with open(filename, "wb") as out_file: out_file.truncate(size)
[docs] def copy_sparse(out_file: io.BufferedIOBase, in_file: io.BufferedIOBase, size: Optional[int]=None) -> None: """ Copy sparse from in_file to out_file up to size bytes. This does not necessarily create the minimum sparse file. In the current implementation it only makes 4096 byte blocks of zeros sparse. If a block is shorter, it is actually written to the out_file. This is a time vs. space optimization. Checking if a fixed size block is empty can be implemented very fast in python, but checking if an arbitrarily long block is empty is not very efficient. """ cur_pos = in_file.tell() max_size = in_file.seek(0, io.SEEK_END) - cur_pos in_file.seek(cur_pos) if not size: size = max_size elif size > max_size: raise Exception(f"Trying to copy {size} B, but the file has only {max_size} B left") zero_block = b"\0" * 4096 def is_zero(block): """ Note: if the block size is less than 4096, this will return False, but it just makes a part of the file non-sparse, which does not matter """ return block == zero_block if size == 0: return to_copy = size while to_copy > 0: block_size = min(4096, to_copy) data = in_file.read(block_size) to_copy -= len(data) if not is_zero(data): out_file.write(data) else: # Deallocate blocks, to ensure they are 0 fallocate(out_file, out_file.tell(), len(data), FALLOC_FL_PUNCH_HOLE + FALLOC_FL_KEEP_SIZE) out_file.seek(len(data), io.SEEK_CUR) # If there is a hole at the end of the file, # allocate the remainder of the file as a whole cur_pos = out_file.tell() out_file.seek(0, io.SEEK_END) if cur_pos > out_file.tell(): out_file.seek(cur_pos - 1) out_file.write(b"\0") os.ftruncate(out_file.fileno(), cur_pos) else: out_file.seek(cur_pos)
[docs] def get_temp_file(ext: str="") -> Path: return Path(tempfile.mktemp(dir=BuildLocation().path, suffix=ext))