Coverage for src / bob / io / base / __init__.py: 85%
118 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-24 16:14 +0100
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-24 16:14 +0100
1# import Libraries of other lib packages
2import logging
4import h5py
5import imageio
6import numpy as np
8logger = logging.getLogger(__name__)
9import os
11# Allowing the loading of truncated files in case PIL is used
12# https://github.com/kirumang/Pix2Pose/issues/2
13from PIL import ImageFile
15ImageFile.LOAD_TRUNCATED_IMAGES = True
18hdf5_extensions = [".hdf5", ".h5", ".hdf", ".hdf5", ".h5", ".hdf", ".hdf5"]
19image_extensions = [
20 ".jpg",
21 ".jpeg",
22 ".png",
23 ".bmp",
24 ".gif",
25 ".tif",
26 ".tiff",
27 ".pgm",
28 ".pbm",
29 ".pnm",
30 ".ppm",
31]
34def _is_string(s):
35 """Returns ``True`` if the given object is a string or bytes."""
36 return isinstance(s, (bytes, str))
39@np.deprecate(new_name="os.makedirs(directory, exist_ok=True)")
40def create_directories_safe(directory, dryrun=False):
41 """Creates a directory if it does not exists, with concurrent access
42 support. This function will also create any parent directories that might
43 be required. If the dryrun option is selected, it does not actually create
44 the directory, but just writes the (Linux) command that would have been
45 executed.
47 **Parameters:**
49 ``directory`` : str
50 The directory that you want to create.
52 ``dryrun`` : bool
53 Only ``print`` the command to console, but do not execute it.
54 """
55 if dryrun:
56 print("[dry-run] mkdir -p '%s'" % directory)
57 else:
58 os.makedirs(directory, exist_ok=True)
61def open_file(filename) -> np.ndarray:
62 """Reads a file content.
64 Parameters
65 ----------
67 ``filename`` : str
68 The name of the file to open.
69 """
71 def check_gray(img):
72 # Checking for gray scaled images
73 if (
74 img.ndim > 2
75 and np.array_equal(img[:, :, 0], img[:, :, 1])
76 and np.array_equal(img[:, :, 0], img[:, :, 2])
77 ):
78 img = img[:, :, 0]
79 return img
81 # get the extension
82 extension = os.path.splitext(filename)[1].lower()
84 if extension in hdf5_extensions:
85 with h5py.File(filename, "r") as f:
86 keys = list(f.keys())
87 if len(keys) == 1:
88 key = keys[0]
89 else:
90 key = "array"
91 if key not in keys:
92 raise RuntimeError(
93 f"The file {filename} does not contain the key {key}"
94 )
95 dataset = f[key]
96 # if the data was saved as a string, load it back as string
97 string_dtype = h5py.check_string_dtype(dataset.dtype)
98 if string_dtype is not None:
99 dataset = dataset.asstr()
100 return dataset[()]
102 elif extension in image_extensions:
103 from ..image import to_bob
105 img = imageio.imread(filename)
107 # PNGs have an additional channel, which we don't want
108 # Alpha channels for instance have to be ignored
109 if img.ndim > 2:
110 if extension.lower() == ".png" and img.shape[-1] in (2, 4):
111 img = img[:, :, :-1]
112 if img.shape[-1] == 1:
113 img = img.squeeze(-1)
115 # PBMs return a boolean array; Convert it to 0 or 255 values
116 if extension.lower() == ".pbm" and img.dtype == bool:
117 img = img.astype(np.uint8) * 255
119 img = check_gray(img)
120 return img if img.ndim == 2 else to_bob(img)
121 else:
122 raise ValueError(f"Unknown file extension: {extension}")
125def write_file(filename, data, format="pillow") -> None:
126 """Writes the contents of a :py:class:`numpy.ndarray` to a file.
128 Parameters
129 ----------
131 ``filename`` : str
132 The name of the file to write to.
134 ``data`` : :py:class:`numpy.ndarray`
135 The data to write to the file.
137 ``format`` : str
138 The format to use to read the file. By default imageio selects the appropriate for you based on the filename and its contents
139 """
140 extension = os.path.splitext(filename)[1] # get the extension
142 if extension in hdf5_extensions:
143 with h5py.File(filename, "w") as f:
144 f["array"] = data
145 elif extension in image_extensions:
146 # Pillow is the format with the best support for all image formats
147 from ..image import to_matplotlib
149 imageio.imwrite(filename, to_matplotlib(data), format=format)
150 else:
151 raise RuntimeError(f"Unknown file extension: {extension}")
154def load(inputs) -> np.ndarray:
155 """Loads the content of a file.
157 Will take a filename (or an iterable of filenames) and put the content into a
158 :py:class:`numpy.ndarray`.
160 **Parameters:**
162 ``inputs`` : various types
164 This might represent several different entities:
166 1. The name of a file (full path) from where to load the data. In this
167 case, this assumes that the file contains an array and returns a loaded
168 numpy ndarray.
169 2. An iterable of filenames to be loaded in memory. In this case, this
170 would assume that each file contains a single 1D sample or a set of 1D
171 samples, load them in memory and concatenate them into a single and
172 returned 2D :py:class:`numpy.ndarray`.
174 **Returns:**
176 ``data`` : :py:class:`numpy.ndarray`
177 The data loaded from the given ``inputs``.
178 """
179 from collections.abc import Iterable
181 import numpy
183 if _is_string(inputs):
184 if not os.path.exists(inputs):
185 raise RuntimeError(f"`{inputs}' does not exist!")
186 try:
187 return open_file(inputs)
188 except Exception as e:
189 raise RuntimeError(f"Could not load `{inputs}'!") from e
191 elif isinstance(inputs, Iterable):
192 retval = []
193 for obj in inputs:
194 if _is_string(obj):
195 retval.append(load(obj))
196 else:
197 raise TypeError(
198 "Iterable contains an object which is not a filename"
199 )
200 return numpy.vstack(retval)
201 else:
202 raise TypeError(
203 "Unexpected input object. This function is expecting a filename, "
204 "or an iterable of filenames."
205 )
208def save(array, filename, create_directories=False):
209 """Saves the contents of an array-like object to file.
211 Effectively, this is the same as opening a file with the mode flag set to ``'w'``
212 (write with truncation) and calling ``file.write`` passing ``array`` as parameter.
214 Parameters:
216 ``array`` : array_like
217 The array-like object to be saved on the file
219 ``filename`` : str
220 The name of the file where you need the contents saved to
222 ``create_directories`` : bool
223 Automatically generate the directories if required (defaults to ``False``
224 because of compatibility reasons; might change in future to default to
225 ``True``)
226 """
227 # create directory if not existent yet
228 if create_directories:
229 create_directories_safe(os.path.dirname(filename))
231 # if array is a string, don't create a numpy array
232 if not isinstance(array, str):
233 # requires data is c-contiguous and aligned, will create a copy otherwise
234 array = np.require(array, requirements=("C_CONTIGUOUS", "ALIGNED"))
236 write_file(filename, array)
239# Just to make it homogenous with the C++ API
240write = save
241read = load
244# Keeps compatibility with the previously existing API
245# open = File
248def _generate_features(reader, paths, same_size=False):
249 """Load and stack features in a memory efficient way. This function is
250 meant to be used inside :py:func:`vstack_features`.
252 Parameters
253 ----------
254 reader : ``collections.Callable``
255 See the documentation of :py:func:`vstack_features`.
256 paths : ``collections.Iterable``
257 See the documentation of :py:func:`vstack_features`.
258 same_size : :obj:`bool`, optional
259 See the documentation of :py:func:`vstack_features`.
261 Yields
262 ------
263 object
264 The first object returned is a tuple of :py:class:`numpy.dtype` of
265 features and the shape of the first feature. The rest of objects are
266 the actual values in features. The features are returned in C order.
267 """
268 shape_determined = False
269 for i, path in enumerate(paths):
271 feature = np.atleast_2d(reader(path))
272 feature = np.ascontiguousarray(feature)
273 if not shape_determined:
274 shape_determined = True
275 dtype = feature.dtype
276 shape = list(feature.shape)
277 yield (dtype, shape)
278 else:
279 # make sure all features have the same shape and dtype
280 if same_size:
281 assert shape == list(
282 feature.shape
283 ), f"Expected feature shape of {shape}, got {feature.shape}"
284 else:
285 assert shape[1:] == list(
286 feature.shape[1:]
287 ), f"Ignoring first dimension, expected feature shape of {shape}, got {feature.shape}"
288 assert dtype == feature.dtype
290 if same_size:
291 yield (feature.ravel(),)
292 else:
293 for feat in feature:
294 yield (feat.ravel(),)
297def vstack_features(reader, paths, same_size=False, dtype=None):
298 """Stacks all features in a memory efficient way.
300 Parameters
301 ----------
302 reader : ``collections.Callable``
303 The function to load the features. The function should only take one
304 argument ``path`` and return loaded features. Use :any:`functools.partial`
305 to accommodate your reader to this format.
306 The features returned by ``reader`` are expected to have the same
307 :py:class:`numpy.dtype` and the same shape except for their first
308 dimension. First dimension should correspond to the number of samples.
309 paths : ``collections.Iterable``
310 An iterable of paths to iterate on. Whatever is inside path is given to
311 ``reader`` so they do not need to be necessarily paths to actual files.
312 If ``same_size`` is ``True``, ``len(paths)`` must be valid.
313 same_size : :obj:`bool`, optional
314 If ``True``, it assumes that arrays inside all the paths are the same
315 shape. If you know the features are the same size in all paths, set this
316 to ``True`` to improve the performance.
317 dtype : :py:class:`numpy.dtype`, optional
318 If provided, the data will be casted to this format.
320 Returns
321 -------
322 numpy.ndarray
323 The read features with the shape ``(n_samples, *features_shape[1:])``.
325 Examples
326 --------
327 This function in a simple way is equivalent to calling
328 ``numpy.vstack([reader(p) for p in paths])``.
330 >>> import numpy
331 >>> from bob.io.base import vstack_features
332 >>> def reader(path):
333 ... # in each file, there are 5 samples and features are 2 dimensional.
334 ... return numpy.arange(10).reshape(5,2)
335 >>> paths = ['path1', 'path2']
336 >>> all_features = vstack_features(reader, paths)
337 >>> numpy.allclose(all_features, numpy.array(
338 ... [[0, 1],
339 ... [2, 3],
340 ... [4, 5],
341 ... [6, 7],
342 ... [8, 9],
343 ... [0, 1],
344 ... [2, 3],
345 ... [4, 5],
346 ... [6, 7],
347 ... [8, 9]]))
348 True
349 >>> all_features_with_more_memory = numpy.vstack([reader(p) for p in paths])
350 >>> numpy.allclose(all_features, all_features_with_more_memory)
351 True
353 You can allocate the array at once to improve the performance if you know
354 that all features in paths have the same shape and you know the total number
355 of the paths:
357 >>> all_features = vstack_features(reader, paths, same_size=True)
358 >>> numpy.allclose(all_features, numpy.array(
359 ... [[0, 1],
360 ... [2, 3],
361 ... [4, 5],
362 ... [6, 7],
363 ... [8, 9],
364 ... [0, 1],
365 ... [2, 3],
366 ... [4, 5],
367 ... [6, 7],
368 ... [8, 9]]))
369 True
370 """
371 iterable = _generate_features(reader, paths, same_size)
372 data_dtype, shape = next(iterable)
373 if dtype is None:
374 dtype = data_dtype
375 if same_size:
376 # numpy black magic: https://stackoverflow.com/a/12473478/1286165
377 field_dtype = [("", (dtype, (np.prod(shape),)))]
378 total_size = len(paths)
379 all_features = np.fromiter(iterable, field_dtype, total_size)
380 else:
381 field_dtype = [("", (dtype, (np.prod(shape[1:]),)))]
382 all_features = np.fromiter(iterable, field_dtype)
384 # go from a field array to a normal array
385 all_features = all_features.view(dtype)
386 # the shape is assumed to be (n_samples, ...) it can be (5, 2) or (5, 3, 4).
387 shape = list(shape)
388 shape[0] = -1
389 return np.reshape(all_features, shape, order="C")
392# gets sphinx autodoc done right - don't remove it
393__all__ = [_ for _ in dir() if not _.startswith("_")]