From 8d2c8e3c867d20548f1cf8c5eb99d8d9586a6c3f Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 24 Apr 2026 20:51:09 -0700 Subject: [PATCH 1/5] allow compression extensions --- README.md | 5 ++ sigmf/archive.py | 168 +++++++++++++++++++++++++++++++------- sigmf/archivereader.py | 177 +++++++++++++++++++++++++++++++++-------- sigmf/hashing.py | 12 ++- sigmf/sigmffile.py | 26 ++++-- tests/test_archive.py | 100 +++++++++++++++++++++++ 6 files changed, 419 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index e2457cc..5292217 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,11 @@ samples = meta[0:1024] # get first 1024 samples sample_rate = meta.sample_rate # get sample rate +# read compressed SigMF archives +meta = sigmf.fromfile("recording.sigmf.gz") # gzip-compressed +meta = sigmf.fromfile("recording.sigmf.xz") # xz-compressed +meta = sigmf.fromfile("recording.sigmf.zip") # zip archive + # read other formats containing RF time series as SigMF meta = sigmf.fromfile("recording.wav") # WAV meta = sigmf.fromfile("recording.cdif") # BLUE / Platinum diff --git a/sigmf/archive.py b/sigmf/archive.py index f0bef9a..0d4a22e 100644 --- a/sigmf/archive.py +++ b/sigmf/archive.py @@ -10,6 +10,7 @@ import shutil import tarfile import tempfile +import zipfile from pathlib import Path from .error import SigMFFileError, SigMFFileExistsError @@ -19,10 +20,72 @@ SIGMF_DATASET_EXT = ".sigmf-data" SIGMF_COLLECTION_EXT = ".sigmf-collection" +SIGMF_COMPRESSED_EXTS = { + # compression type -> unique compound extension + "gz": ".sigmf.gz", + "xz": ".sigmf.xz", + "zip": ".sigmf.zip", +} + +# all recognized archive extensions (uncompressed + compressed) +SIGMF_ARCHIVE_EXTS = {SIGMF_ARCHIVE_EXT} | set(SIGMF_COMPRESSED_EXTS.values()) + + +def _detect_compression(path): + """Detect compression type from a file path's extension(s). + + Parameters + ---------- + path : Path + Path to check. + + Returns + ------- + str or None + Compression type ("gz", "xz", "zip") or None for uncompressed. + """ + name = str(path).lower() + for comp_type, ext in SIGMF_COMPRESSED_EXTS.items(): + if name.endswith(ext): + return comp_type + return None + + +def _get_archive_basename(path): + """Get the archive base name (without any sigmf archive extension). + + Parameters + ---------- + path : Path + Archive file path. + + Returns + ------- + str + Base name without sigmf extension. + + Examples + -------- + >>> _get_archive_basename(Path("recording.sigmf")) + 'recording' + >>> _get_archive_basename(Path("recording.sigmf.gz")) + 'recording' + >>> _get_archive_basename(Path("my.recording.sigmf.zip")) + 'my.recording' + """ + name = path.name + # check compound extensions first (longest match) + for ext in sorted(SIGMF_COMPRESSED_EXTS.values(), key=len, reverse=True): + if name.endswith(ext): + return name[: -len(ext)] + if name.endswith(SIGMF_ARCHIVE_EXT): + return name[: -len(SIGMF_ARCHIVE_EXT)] + return path.stem + class SigMFArchive: """ - Archive a SigMFFile into a tar file. + Archive a SigMFFile into a tar or zip file, optionally with compression. Parameters ---------- @@ -32,7 +95,9 @@ class SigMFArchive: name : PathLike | str | bytes Path to archive file to create. - If `name` doesn't end in .sigmf, it will be appended. + If `name` doesn't end in a recognized sigmf archive extension, + .sigmf will be appended. Recognized extensions: + .sigmf, .sigmf.gz, .sigmf.xz, .sigmf.zip For example: if `name` == "/tmp/archive1", then the following archive will be created: /tmp/archive1.sigmf @@ -53,48 +118,70 @@ class SigMFArchive: - archive1.sigmf-meta - archive1.sigmf-data + compression : str, optional + Compression type: "gz", "xz", "zip", or None (default). + If None and `name` has a recognized compressed extension, + compression is auto-detected from the extension. + overwrite : bool, default False If False, raise exception if archive file already exists. Raises ------ SigMFFileError - If `sigmffile` has no data_file set, or if `name` is not writable. + If `sigmffile` has no data_file set, or if `name` is not writable, + or if an invalid compression type is given. """ - def __init__(self, sigmffile, name=None, fileobj=None, overwrite=False): + VALID_COMPRESSIONS = {None, "gz", "xz", "zip"} + + def __init__(self, sigmffile, name=None, fileobj=None, compression=None, overwrite=False): is_buffer = fileobj is not None self.sigmffile = sigmffile - self.path, arcname, fileobj = self._resolve(name, fileobj, overwrite) + self.path, arcname, fileobj, compression = self._resolve(name, fileobj, compression, overwrite) self._ensure_data_file_set() self._validate() - tar = tarfile.TarFile(mode="w", fileobj=fileobj, format=tarfile.PAX_FORMAT) + # prepare temp files with metadata and data tmpdir = Path(tempfile.mkdtemp()) meta_path = tmpdir / (arcname + SIGMF_METADATA_EXT) data_path = tmpdir / (arcname + SIGMF_DATASET_EXT) - # write files with open(meta_path, "w") as handle: self.sigmffile.dump(handle) if isinstance(self.sigmffile.data_buffer, io.BytesIO): - # write data buffer to archive self.sigmffile.data_file = data_path with open(data_path, "wb") as handle: handle.write(self.sigmffile.data_buffer.getbuffer()) else: - # copy data to archive shutil.copy(self.sigmffile.data_file, data_path) - tar.add(tmpdir, arcname=arcname, filter=self.chmod) - # close files & remove tmpdir - tar.close() + + if compression == "zip": + self._write_zip(fileobj, arcname, tmpdir, meta_path, data_path) + else: + self._write_tar(fileobj, arcname, tmpdir, compression) + if not is_buffer: # only close fileobj if we aren't working w/a buffer fileobj.close() shutil.rmtree(tmpdir) + def _write_tar(self, fileobj, arcname, tmpdir, compression): + """Write archive as tar (optionally compressed).""" + mode = "w" if compression is None else f"w:{compression}" + tar = tarfile.open(mode=mode, fileobj=fileobj, format=tarfile.PAX_FORMAT) + tar.add(tmpdir, arcname=arcname, filter=self.chmod) + tar.close() + + def _write_zip(self, fileobj, arcname, tmpdir, meta_path, data_path): + """Write archive as zip.""" + with zipfile.ZipFile(fileobj, mode="w", compression=zipfile.ZIP_DEFLATED) as zf: + # add data file first (matches tar convention for faster metadata updates) + zf.write(data_path, arcname=f"{arcname}/{arcname}{SIGMF_DATASET_EXT}") + zf.write(meta_path, arcname=f"{arcname}/{arcname}{SIGMF_METADATA_EXT}") + @staticmethod def chmod(tarinfo: tarfile.TarInfo): """permission filter for writing tar files""" @@ -111,9 +198,9 @@ def _ensure_data_file_set(self): def _validate(self): self.sigmffile.validate() - def _resolve(self, name, fileobj, overwrite=False): + def _resolve(self, name, fileobj, compression, overwrite=False): """ - Resolve both (name, fileobj) into (path, arcname, fileobj) given either or both. + Resolve both (name, fileobj) into (path, arcname, fileobj, compression) given either or both. Parameters ---------- @@ -121,6 +208,8 @@ def _resolve(self, name, fileobj, overwrite=False): Path to archive file to create. fileobj : BufferedWriter | None Open file handle object. + compression : str | None + Compression type or None. overwrite : bool, default False If False, raise exception if archive file already exists. @@ -132,15 +221,18 @@ def _resolve(self, name, fileobj, overwrite=False): Name of the sigmf object within the archive. fileobj : BufferedWriter Open file handle object. + compression : str | None + Resolved compression type. """ + if compression not in self.VALID_COMPRESSIONS: + raise SigMFFileError(f"Invalid compression type '{compression}'. Must be one of: {self.VALID_COMPRESSIONS}") + if fileobj: try: - # exception if not byte-writable fileobj.write(bytes()) - # exception if no name property of handle path = Path(fileobj.name) if not name: - arcname = path.stem + arcname = _get_archive_basename(path) else: arcname = name except io.UnsupportedOperation as exc: @@ -149,16 +241,36 @@ def _resolve(self, name, fileobj, overwrite=False): raise SigMFFileError(f"fileobj {fileobj} is invalid.") from exc elif name: path = Path(name) - # ensure name has correct suffix if it exists - if path.suffix == "": - # add extension if none was given - path = path.with_suffix(SIGMF_ARCHIVE_EXT) - elif path.suffix != SIGMF_ARCHIVE_EXT: - # ensure suffix is correct - raise SigMFFileError(f"Invalid extension ({path.suffix} != {SIGMF_ARCHIVE_EXT}).") - arcname = path.stem - - # check if file exists and overwrite is disabled + name_str = str(path).lower() + + # auto-detect compression from extension if not explicitly set + detected = _detect_compression(path) + if compression is None and detected is not None: + compression = detected + + # check if path has a recognized archive extension + has_archive_ext = any(name_str.endswith(ext) for ext in SIGMF_ARCHIVE_EXTS) + + if not has_archive_ext: + if path.suffix == "": + # no extension — append the appropriate one + if compression is not None: + path = Path(str(path) + SIGMF_COMPRESSED_EXTS[compression]) + else: + path = path.with_suffix(SIGMF_ARCHIVE_EXT) + else: + # has an unrecognized extension + raise SigMFFileError( + f"Unrecognized archive extension for '{path.name}'. " + f"Recognized extensions: {sorted(SIGMF_ARCHIVE_EXTS)}" + ) + elif detected is not None and compression is not None and detected != compression: + raise SigMFFileError( + f"Extension implies '{detected}' compression but compression='{compression}' was specified." + ) + + arcname = _get_archive_basename(path) + if not overwrite and path.exists(): raise SigMFFileExistsError(path, "Archive file") @@ -169,4 +281,4 @@ def _resolve(self, name, fileobj, overwrite=False): else: raise SigMFFileError("Either `name` or `fileobj` needs to be defined.") - return path, arcname, fileobj + return path, arcname, fileobj, compression diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 25bac69..5f13c63 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -8,22 +8,35 @@ import io import tarfile +import zipfile from pathlib import Path from . import __version__ -from .archive import SIGMF_ARCHIVE_EXT, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT +from .archive import ( + SIGMF_ARCHIVE_EXT, + SIGMF_ARCHIVE_EXTS, + SIGMF_DATASET_EXT, + SIGMF_METADATA_EXT, + _detect_compression, +) from .error import SigMFFileError +from .hashing import calculate_sha512 from .sigmffile import SigMFFile class SigMFArchiveReader: """ - Access data within SigMF archive tarball in-place without extracting. + Access data within SigMF archive (tar, tar.gz, tar.xz, or zip) in-place. + + For uncompressed tar archives opened by path, data is memory-mapped + directly from the archive file for efficient access. Compressed archives + and buffer-based reading load data into memory. Parameters ---------- name : str | bytes | PathLike, optional - Optional path to archive file to access. + Path to archive file to access. Recognized extensions: + .sigmf, .sigmf.gz, .sigmf.xz, .sigmf.zip skip_checksum : bool, optional Skip dataset checksum calculation. map_readonly : bool, optional @@ -35,7 +48,7 @@ class SigMFArchiveReader: Raises ------ - SigMFError + SigMFFileError Archive file does not exist or is improperly formatted. ValueError If invalid arguments. @@ -46,48 +59,134 @@ class SigMFArchiveReader: def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None, autoscale=True): if name is not None: path = Path(name) - if path.suffix != SIGMF_ARCHIVE_EXT: - err = "archive extension != {}".format(SIGMF_ARCHIVE_EXT) - raise SigMFFileError(err) - - tar_obj = tarfile.open(path) + compression = _detect_compression(path) + + # validate extension + name_str = str(path).lower() + if not any(name_str.endswith(ext) for ext in SIGMF_ARCHIVE_EXTS): + raise SigMFFileError( + f"Unrecognized archive extension for '{path.name}'. " + f"Recognized extensions: {sorted(SIGMF_ARCHIVE_EXTS)}" + ) + + if compression == "zip": + json_contents, data_buffer, data_size_bytes = self._read_zip(path) + self._init_from_buffer( + json_contents, data_buffer, data_size_bytes, skip_checksum, map_readonly, autoscale + ) + elif compression is not None: + # compressed tar (gz, xz) — must decompress to ram + json_contents, data_buffer, data_size_bytes = self._read_tar(path) + self._init_from_buffer( + json_contents, data_buffer, data_size_bytes, skip_checksum, map_readonly, autoscale + ) + else: + # uncompressed tar — memmap directly + self._init_from_tar_memmap(path, skip_checksum, map_readonly, autoscale) elif archive_buffer is not None: - tar_obj = tarfile.open(fileobj=archive_buffer, mode="r:") + # try tar first, fall back to zip + try: + tar_obj = tarfile.open(fileobj=archive_buffer, mode="r:*") + json_contents, data_buffer, data_size_bytes = self._read_tar_obj(tar_obj) + tar_obj.close() + except tarfile.TarError: + archive_buffer.seek(0) + json_contents, data_buffer, data_size_bytes = self._read_zip_fileobj(archive_buffer) + self._init_from_buffer(json_contents, data_buffer, data_size_bytes, skip_checksum, map_readonly, autoscale) else: raise ValueError("Either `name` or `archive_buffer` must be not None.") + def _read_tar_obj(self, tar_obj): + """Extract metadata and data from an open tar object.""" json_contents = None - data_offset = None + data_buffer = None data_size_bytes = None for memb in tar_obj.getmembers(): - if memb.isdir(): # memb.type == tarfile.DIRTYPE: - # the directory structure will be reflected in the member name + if memb.isdir(): continue - - elif memb.isfile(): # memb.type == tarfile.REGTYPE: + elif memb.isfile(): if memb.name.endswith(SIGMF_METADATA_EXT): - json_contents = memb.name - if data_offset is None: - # consider a warnings.warn() here; the datafile should be earlier in the - # archive than the metadata, so that updating it (like, adding an annotation) - # is fast. - pass - with tar_obj.extractfile(memb) as memb_fid: - json_contents = memb_fid.read() + with tar_obj.extractfile(memb) as fid: + json_contents = fid.read() + elif memb.name.endswith(SIGMF_DATASET_EXT): + data_size_bytes = memb.size + with tar_obj.extractfile(memb) as fid: + data_buffer = io.BytesIO(fid.read()) + + if data_buffer is None: + raise SigMFFileError("No .sigmf-data file found in archive!") + return json_contents, data_buffer, data_size_bytes + + def _read_tar(self, path): + """Read a tar archive (possibly compressed) from disk.""" + tar_obj = tarfile.open(path) + result = self._read_tar_obj(tar_obj) + tar_obj.close() + return result + + def _read_zip(self, path): + """Read a zip archive from disk.""" + with zipfile.ZipFile(path, "r") as zf: + return self._read_zip_obj(zf) + + def _read_zip_fileobj(self, fileobj): + """Read a zip archive from a buffer.""" + with zipfile.ZipFile(fileobj, "r") as zf: + return self._read_zip_obj(zf) + + def _read_zip_obj(self, zf): + """Extract metadata and data from an open ZipFile object.""" + json_contents = None + data_buffer = None + data_size_bytes = None + + for member_name in zf.namelist(): + if member_name.endswith(SIGMF_METADATA_EXT): + json_contents = zf.read(member_name) + elif member_name.endswith(SIGMF_DATASET_EXT): + raw = zf.read(member_name) + data_size_bytes = len(raw) + data_buffer = io.BytesIO(raw) + + if data_buffer is None: + raise SigMFFileError("No .sigmf-data file found in archive!") + return json_contents, data_buffer, data_size_bytes + + def _init_from_buffer(self, json_contents, data_buffer, data_size_bytes, skip_checksum, map_readonly, autoscale): + """Initialize sigmffile from in-memory data.""" + self.sigmffile = SigMFFile(metadata=json_contents, autoscale=autoscale) + self.sigmffile.validate() + self.sigmffile.set_data_file( + data_buffer=data_buffer, + skip_checksum=skip_checksum, + size_bytes=data_size_bytes, + map_readonly=map_readonly, + ) + self.ndim = self.sigmffile.ndim + self.shape = self.sigmffile.shape + + def _init_from_tar_memmap(self, path, skip_checksum, map_readonly, autoscale): + """Initialize sigmffile with memmap into uncompressed tar.""" + tar_obj = tarfile.open(path) + json_contents = None + data_offset = None + data_size_bytes = None + for memb in tar_obj.getmembers(): + if memb.isdir(): + continue + elif memb.isfile(): + if memb.name.endswith(SIGMF_METADATA_EXT): + with tar_obj.extractfile(memb) as fid: + json_contents = fid.read() elif memb.name.endswith(SIGMF_DATASET_EXT): data_offset = memb.offset_data data_size_bytes = memb.size - with tar_obj.extractfile(memb) as memb_fid: - data_buffer = io.BytesIO(memb_fid.read()) - else: - print(f"A regular file {memb.name} was found but ignored in the archive") - else: - print(f"A member of type {memb.type} and name {memb.name} was found but not handled, just FYI.") + tar_obj.close() if data_offset is None: raise SigMFFileError("No .sigmf-data file found in archive!") @@ -95,18 +194,30 @@ def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_bu self.sigmffile = SigMFFile(metadata=json_contents, autoscale=autoscale) self.sigmffile.validate() + # compute hash of data portion only (not full tar file) + if not skip_checksum: + data_hash = calculate_sha512(filename=path, offset=data_offset, size=data_size_bytes) + old_hash = self.sigmffile.get_global_field(SigMFFile.HASH_KEY) + if old_hash is not None and old_hash != data_hash: + raise SigMFFileError("Calculated file hash does not match associated metadata.") + self.sigmffile.set_global_field(SigMFFile.HASH_KEY, data_hash) + + # memmap directly into the tar file at the data offset self.sigmffile.set_data_file( - data_buffer=data_buffer, - skip_checksum=skip_checksum, + data_file=path, + skip_checksum=True, + offset=data_offset, size_bytes=data_size_bytes, map_readonly=map_readonly, ) + # set_data_file sets DATASET_KEY for non-.sigmf-data files (NCD), + # but the tar archive path is not a dataset — clear it + if SigMFFile.DATASET_KEY in self.sigmffile.get_global_info(): + del self.sigmffile._metadata[SigMFFile.GLOBAL_KEY][SigMFFile.DATASET_KEY] self.ndim = self.sigmffile.ndim self.shape = self.sigmffile.shape - tar_obj.close() - def __len__(self): return self.sigmffile.__len__() diff --git a/sigmf/hashing.py b/sigmf/hashing.py index 3874729..17dfa44 100644 --- a/sigmf/hashing.py +++ b/sigmf/hashing.py @@ -10,7 +10,7 @@ from pathlib import Path -def calculate_sha512(filename=None, fileobj=None): +def calculate_sha512(filename=None, fileobj=None, offset=0, size=None): """ Calculate SHA512 hash of a dataset for integrity verification. @@ -24,6 +24,10 @@ def calculate_sha512(filename=None, fileobj=None): fileobj : file-like object, optional An open file-like object (e.g., BytesIO) to hash. Must have read() and seek() methods. Cannot be used together with filename. + offset : int, optional + Byte offset into the file to start hashing from. Default is 0. + size : int, optional + Number of bytes to hash. If None, hash from offset to end of file. Returns ------- @@ -40,7 +44,11 @@ def calculate_sha512(filename=None, fileobj=None): if filename is not None: fileobj = open(filename, "rb") - bytes_to_hash = Path(filename).stat().st_size + if size is not None: + bytes_to_hash = size + else: + bytes_to_hash = Path(filename).stat().st_size + fileobj.seek(offset) elif fileobj is not None: current_pos = fileobj.tell() # seek to end diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 633d46f..c58af69 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -19,6 +19,7 @@ from .archive import ( SIGMF_ARCHIVE_EXT, SIGMF_COLLECTION_EXT, + SIGMF_COMPRESSED_EXTS, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, SigMFArchive, @@ -573,7 +574,9 @@ def get_capture_byte_boundaries(self, index): end_byte = start_byte if index == len(self.get_captures()) - 1: # last captures...data is the rest of the file - if self.data_file is not None: + if self.data_size_bytes is not None: + file_size = self.data_size_bytes + elif self.data_file is not None: file_size = self.data_file.stat().st_size elif self.data_buffer is not None: file_size = len(self.data_buffer.getbuffer()) @@ -796,7 +799,7 @@ def validate(self): """ validate.validate(self._metadata, self.get_schema()) - def archive(self, name=None, fileobj=None, overwrite=False): + def archive(self, name=None, fileobj=None, compression=None, overwrite=False): """Dump contents to SigMF archive format. `name` and `fileobj` are passed to SigMFArchive and are defined there. @@ -807,13 +810,17 @@ def archive(self, name=None, fileobj=None, overwrite=False): Name of the archive file to create. If None, a temporary file will be created. fileobj : file-like object, optional A file-like object to write the archive to. If None, a file will be created at `name`. + compression : str, optional + Compression type: "gz", "xz", "zip", or None (default). + If None and `name` has a recognized compressed extension, + compression is auto-detected from the extension. overwrite : bool, default False If False, raise exception if archive file already exists. """ - archive = SigMFArchive(self, name, fileobj, overwrite=overwrite) + archive = SigMFArchive(self, name, fileobj, compression=compression, overwrite=overwrite) return archive.path - def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False, overwrite=False): + def tofile(self, file_path, pretty=True, toarchive=False, compression=None, skip_validate=False, overwrite=False): """ Write metadata file or full archive containing metadata & dataset. @@ -824,8 +831,10 @@ def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False, o pretty : bool, default True When True will write more human-readable output, otherwise will be flat JSON. toarchive : bool, default False - If True will write both dataset & metadata into SigMF archive format as a single `tar` file. + If True will write both dataset & metadata into SigMF archive format. If False will only write metadata to `sigmf-meta`. + compression : str, optional + Compression type when toarchive=True: "gz", "xz", "zip", or None. skip_validate : bool, default False Skip validation of metadata before writing. overwrite : bool, default False @@ -836,7 +845,7 @@ def tofile(self, file_path, pretty=True, toarchive=False, skip_validate=False, o fns = get_sigmf_filenames(file_path) if toarchive: - self.archive(fns["archive_fn"], overwrite=overwrite) + self.archive(fns["archive_fn"], compression=compression, overwrite=overwrite) else: # check if metadata file exists if not overwrite and fns["meta_fn"].exists(): @@ -1320,6 +1329,11 @@ def fromfile(filename, skip_checksum=False, autoscale=True): # group SigMF extensions for cleaner checking sigmf_extensions = (SIGMF_METADATA_EXT, SIGMF_DATASET_EXT, SIGMF_COLLECTION_EXT, SIGMF_ARCHIVE_EXT) + # try compressed SigMF archive (.sigmf.gz, .sigmf.xz, .sigmf.zip) + for comp_ext in SIGMF_COMPRESSED_EXTS.values(): + if file_path.name.lower().endswith(comp_ext) and Path.is_file(file_path): + return fromarchive(file_path, skip_checksum=skip_checksum, autoscale=autoscale) + # try SigMF archive if (ext.endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): return fromarchive(archive_fn, skip_checksum=skip_checksum, autoscale=autoscale) diff --git a/tests/test_archive.py b/tests/test_archive.py index 36abfa8..ccbf9f0 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -20,6 +20,7 @@ from sigmf import SigMFFile, __specification__, error, fromfile from sigmf.archive import SIGMF_DATASET_EXT, SIGMF_METADATA_EXT +from sigmf.archivereader import SigMFArchiveReader from .testdata import TEST_FLOAT32_DATA, TEST_METADATA @@ -178,3 +179,102 @@ def test_archive_read_samples_beyond_end(self): # FIXME: Should this raise a SigMFFileError instead? with self.assertRaises(OSError): meta.read_samples(start_index=meta.sample_count + 10, count=5) + + +class TestCompressedArchive(unittest.TestCase): + """Tests for compressed SigMF archive support.""" + + def setUp(self): + """create test data and sigmf object""" + self.temp_dir = Path(tempfile.mkdtemp()) + self.temp_path_data = self.temp_dir / "test.sigmf-data" + TEST_FLOAT32_DATA.tofile(self.temp_path_data) + self.sigmf_object = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=self.temp_path_data) + self.original_samples = self.sigmf_object.read_samples() + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def _roundtrip(self, archive_path, compression=None): + """write archive, read it back, verify samples match""" + self.sigmf_object.archive(name=archive_path, compression=compression, overwrite=True) + self.assertTrue(archive_path.exists()) + readback = fromfile(str(archive_path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + return readback + + def test_roundtrip_gz(self): + """test gz compressed archive round-trip""" + self._roundtrip(self.temp_dir / "test.sigmf.gz") + + def test_roundtrip_xz(self): + """test xz compressed archive round-trip""" + self._roundtrip(self.temp_dir / "test.sigmf.xz") + + def test_roundtrip_zip(self): + """test zip compressed archive round-trip""" + self._roundtrip(self.temp_dir / "test.sigmf.zip") + + def test_explicit_compression_parameter(self): + """test explicit compression= parameter without matching extension""" + path = self.temp_dir / "test_explicit" + self.sigmf_object.archive(name=path, compression="gz", overwrite=True) + actual_path = self.temp_dir / "test_explicit.sigmf.gz" + self.assertTrue(actual_path.exists()) + readback = fromfile(str(actual_path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_extension_autodetect(self): + """test that compression is auto-detected from extension""" + path = self.temp_dir / "test_auto.sigmf.xz" + self.sigmf_object.archive(name=path, overwrite=True) + self.assertTrue(path.exists()) + readback = fromfile(str(path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_metadata_preserved(self): + """test that metadata survives compression round-trip""" + for ext in ["sigmf.gz", "sigmf.xz", "sigmf.zip"]: + path = self.temp_dir / f"meta_test.{ext}" + readback = self._roundtrip(path) + self.assertEqual( + self.sigmf_object.get_global_field(SigMFFile.DATATYPE_KEY), + readback.get_global_field(SigMFFile.DATATYPE_KEY), + ) + self.assertEqual(len(self.sigmf_object.get_annotations()), len(readback.get_annotations())) + + def test_compressed_smaller_than_uncompressed(self): + """test that compressed archives are smaller than uncompressed""" + uncompressed_path = self.temp_dir / "size_test.sigmf" + gz_path = self.temp_dir / "size_test.sigmf.gz" + xz_path = self.temp_dir / "size_test.sigmf.xz" + zip_path = self.temp_dir / "size_test.sigmf.zip" + + self.sigmf_object.archive(name=uncompressed_path, overwrite=True) + self.sigmf_object.archive(name=gz_path, overwrite=True) + self.sigmf_object.archive(name=xz_path, overwrite=True) + self.sigmf_object.archive(name=zip_path, overwrite=True) + + uncompressed_size = uncompressed_path.stat().st_size + for compressed_path in [gz_path, xz_path, zip_path]: + self.assertLess(compressed_path.stat().st_size, uncompressed_size) + + def test_invalid_compression_raises_error(self): + """test that invalid compression type raises error""" + path = self.temp_dir / "bad.sigmf" + for unsupported in ["bz2", "7z"]: + with self.assertRaises(error.SigMFFileError, msg=f"{unsupported} is not yet supported"): + self.sigmf_object.archive(name=path, compression=unsupported, overwrite=True) + + def test_mismatched_extension_and_compression_raises_error(self): + """test that mismatched extension and compression raises error""" + path = self.temp_dir / "mismatch.sigmf.gz" + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.archive(name=path, compression="xz", overwrite=True) + + def test_uncompressed_archive_uses_memmap(self): + """test that uncompressed archives use memmap for data access""" + path = self.temp_dir / "memmap_test.sigmf" + self.sigmf_object.archive(name=path, overwrite=True) + reader = SigMFArchiveReader(path) + self.assertIsInstance(reader.sigmffile._memmap, np.memmap) From c3ccd71de359e8624cf3a4123e99f50723298d9a Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 24 Apr 2026 21:23:17 -0700 Subject: [PATCH 2/5] add new method to allow easy sigmf write directly from numpy array --- README.md | 12 ++++- docs/source/advanced.rst | 50 ++++++++++++++++++++- docs/source/quickstart.rst | 26 +++++++++++ sigmf/__init__.py | 2 +- sigmf/sigmffile.py | 91 +++++++++++++++++++++++++++++++++++++- tests/test_sigmffile.py | 74 +++++++++++++++++++++++++++++++ 6 files changed, 251 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 5292217..5e80d49 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,6 @@ meta = sigmf.fromfile("recording.sigmf-meta") samples = meta[0:1024] # get first 1024 samples sample_rate = meta.sample_rate # get sample rate - # read compressed SigMF archives meta = sigmf.fromfile("recording.sigmf.gz") # gzip-compressed meta = sigmf.fromfile("recording.sigmf.xz") # xz-compressed @@ -40,6 +39,17 @@ meta = sigmf.fromfile("recording.cdif") # BLUE / Platinum meta = sigmf.fromfile("recording.xml") # Signal Hound Spike ``` +### Write SigMF + +```python +import numpy as np +import sigmf + +data = np.array([0.1 + 0.2j, 0.3 + 0.4j], dtype=np.complex64) +# creates recording.sigmf-data and recording.sigmf-meta +meta = sigmf.tofile("recording", data, sample_rate=48000) +``` + ### Docs **[Please visit our documentation for full API reference and more info.](https://sigmf.readthedocs.io/en/latest/)** diff --git a/docs/source/advanced.rst b/docs/source/advanced.rst index 4a050c4..f45c330 100644 --- a/docs/source/advanced.rst +++ b/docs/source/advanced.rst @@ -143,7 +143,7 @@ The SigMF Collection and its associated Recordings can now be loaded like this: Load a SigMF Archive and slice without untaring ----------------------------------------------- -Since an *archive* is merely a tarball (uncompressed), and since there any many +Since an *archive* is a tarball (uncompressed by default), and since there are many excellent tools for manipulating tar files, it's fairly straightforward to access the *data* part of a SigMF archive without un-taring it. This is a compelling feature because **1** archives make it harder for the ``-data`` and @@ -195,3 +195,51 @@ read it, this can be done "in mid air" or "without touching the ground (disk)". >>> arc[:10] array([-20.+11.j, -21. -6.j, -17.-20.j, -13.-52.j, 0.-75.j, 22.-58.j, 48.-44.j, 49.-60.j, 31.-56.j, 23.-47.j], dtype=complex64) + +------------------------------ +Compressed SigMF Archives +------------------------------ + +SigMF archives can be compressed using gzip, xz, or zip. The compression format +is determined by the file extension: + ++---------------------+-------------+ +| Extension | Format | ++=====================+=============+ +| ``.sigmf`` | uncompressed| ++---------------------+-------------+ +| ``.sigmf.gz`` | gzip tar | ++---------------------+-------------+ +| ``.sigmf.xz`` | xz tar | ++---------------------+-------------+ +| ``.sigmf.zip`` | zip archive | ++---------------------+-------------+ + +**Writing compressed archives:** + +:: + + >>> import sigmf + >>> signal = sigmf.sigmffile.fromfile('recording.sigmf-meta') + + # compress by extension + >>> signal.archive('recording.sigmf.xz') + + # or specify compression explicitly + >>> signal.archive('recording.sigmf', compression='gz') + +**Reading compressed archives:** + +:: + + >>> arc = sigmf.SigMFArchiveReader('recording.sigmf.xz') + >>> arc[:10] + array([-20.+11.j, ...], dtype=complex64) + +**Memory behavior:** + +Uncompressed ``.sigmf`` archives use ``numpy.memmap`` to access the data +directly inside the tar file — no extra memory is needed, even for very large +recordings. Compressed archives (``.sigmf.gz``, ``.sigmf.xz``, ``.sigmf.zip``) +must decompress the data into RAM before it can be accessed. Keep this in mind +when working with large compressed recordings. diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 9c058d2..4927f99 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -45,6 +45,32 @@ Verify SigMF Integrity & Compliance Save a Numpy array as a SigMF Recording --------------------------------------- +.. code-block:: python + + import numpy as np + import sigmf + + # suppose we have a complex timeseries signal + data = np.zeros(1024, dtype=np.complex64) + + # write to disk — datatype is inferred from the numpy array + meta = sigmf.tofile("example", data, sample_rate=48000, frequency=915e6) + + # or write to a SigMF archive (example.sigmf) + meta = sigmf.tofile("example.sigmf", data, sample_rate=48000, frequency=915e6) + + # or write directly to a compressed archive (example.sigmf.xz) + meta = sigmf.tofile("example", data, sample_rate=48000, compression="xz") + +The returned ``SigMFFile`` object can be used to add captures, annotations, +or archive the recording. + +--------------------------------------------------- +Save a Numpy array with Full Metadata (Advanced) +--------------------------------------------------- + +For full control over global fields, captures, and annotations: + .. code-block:: python import numpy as np diff --git a/sigmf/__init__.py b/sigmf/__init__.py index d481f0d..e66a84f 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -22,4 +22,4 @@ from .archive import SigMFArchive from .archivereader import SigMFArchiveReader from .siggen import SigMFGenerator -from .sigmffile import SigMFCollection, SigMFFile, fromarchive, fromfile +from .sigmffile import SigMFCollection, SigMFFile, fromarchive, fromfile, tofile diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index c58af69..5eb28d1 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -23,6 +23,8 @@ SIGMF_DATASET_EXT, SIGMF_METADATA_EXT, SigMFArchive, + _detect_compression, + _get_archive_basename, ) from .error import ( SigMFAccessError, @@ -31,7 +33,7 @@ SigMFFileError, SigMFFileExistsError, ) -from .utils import dict_merge +from .utils import dict_merge, get_data_type_str class SigMFMetafile: @@ -1258,6 +1260,93 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): return None +def tofile(filename, data, sample_rate, frequency=None, toarchive=False, compression=None, global_info=None): + """ + Convenience method to write a numpy array to a SigMF recording. + + For quick saves — infers the SigMF datatype from the numpy dtype, writes + the data file, creates metadata with a single capture at index 0, and + saves to disk. For full control over captures, annotations, and global + fields, use ``SigMFFile`` directly. + + Parameters + ---------- + filename : str | PathLike + Base filename or archive path. Accepts: + - ``"recording"`` — produces ``recording.sigmf-data`` and ``recording.sigmf-meta`` + - ``"recording.sigmf"`` — produces uncompressed archive (auto-detects toarchive) + - ``"recording.sigmf.xz"`` — produces compressed archive (auto-detects compression) + data : np.ndarray + Signal samples to write. + sample_rate : float + Sample rate in Hz. + frequency : float, optional + Center frequency in Hz for the capture. + toarchive : bool, default False + If True, produce a ``.sigmf`` archive instead of loose data/meta files. + Auto-detected from filename extension if not specified. + compression : str, optional + If set, also creates a compressed archive. One of "gz", "xz", "zip". + Auto-detected from filename extension if not specified. Implies toarchive. + global_info : dict, optional + Additional global metadata fields to include. + + Returns + ------- + SigMFFile + The SigMFFile object with data and metadata. + """ + file_path = Path(filename) + + # detect compressed extension and extract base name + detected = _detect_compression(file_path) + if detected is not None: + if compression is not None and compression != detected: + raise SigMFFileError( + f"Extension implies '{detected}' compression but compression='{compression}' was specified." + ) + compression = detected + base_name = _get_archive_basename(file_path) + base_path = file_path.parent / base_name + elif file_path.name.endswith(SIGMF_ARCHIVE_EXT): + toarchive = True + base_path = file_path.parent / file_path.stem + else: + base_path = file_path + + # compression implies archive + if compression is not None: + toarchive = True + + fns = get_sigmf_filenames(base_path) + data_path = fns["data_fn"] + + data.tofile(data_path) + + info = { + SigMFFile.DATATYPE_KEY: get_data_type_str(data), + SigMFFile.SAMPLE_RATE_KEY: sample_rate, + } + if global_info is not None: + info.update(global_info) + + capture_meta = None + if frequency is not None: + capture_meta = {SigMFFile.FREQUENCY_KEY: frequency} + + meta = SigMFFile(data_file=data_path, global_info=info) + meta.add_capture(0, metadata=capture_meta) + + if toarchive: + # create archive only — no loose files + meta.archive(str(fns["base_fn"]), compression=compression) + data_path.unlink() + else: + meta.tofile(base_path) + + return meta + + def fromarchive(archive_path, dir=None, skip_checksum=False, autoscale=True): """Extract an archive and return a SigMFFile. diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index 6da3776..e744137 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -519,3 +519,77 @@ def test_default_behavior(self): with self.assertRaises(error.SigMFFileError): self.sigmf_obj.tofile(self.test_archive_path, toarchive=True) + + +class TestTofileConvenience(unittest.TestCase): + """Tests for the sigmf.tofile() convenience function.""" + + def setUp(self): + self.temp_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + shutil.rmtree(self.temp_dir) + + def test_basic_write(self): + """test writing with a bare filename""" + path = self.temp_dir / "basic" + meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000) + self.assertTrue((self.temp_dir / "basic.sigmf-data").exists()) + self.assertTrue((self.temp_dir / "basic.sigmf-meta").exists()) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) + + def test_write_with_frequency(self): + """test that frequency kwarg populates capture metadata""" + path = self.temp_dir / "freq" + meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000, frequency=915e6) + self.assertEqual(meta.get_capture_info(0).get("core:frequency"), 915e6) + + def test_write_compressed_by_extension(self): + """test that .sigmf.xz extension creates archive only""" + path = self.temp_dir / "comp.sigmf.xz" + meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=100) + self.assertTrue((self.temp_dir / "comp.sigmf.xz").exists()) + self.assertFalse((self.temp_dir / "comp.sigmf-data").exists()) + self.assertFalse((self.temp_dir / "comp.sigmf-meta").exists()) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) + + def test_write_compressed_by_kwarg(self): + """test that compression kwarg creates archive only""" + path = self.temp_dir / "comp2" + meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=100, compression="gz") + self.assertTrue((self.temp_dir / "comp2.sigmf.gz").exists()) + self.assertFalse((self.temp_dir / "comp2.sigmf-data").exists()) + self.assertFalse((self.temp_dir / "comp2.sigmf-meta").exists()) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) + + def test_roundtrip_through_compressed_archive(self): + """test write then read via compressed archive""" + path = self.temp_dir / "rt.sigmf.zip" + sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000) + readback = sigmf.fromfile(str(path)) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) + + def test_write_toarchive(self): + """test that toarchive=True creates .sigmf archive only""" + path = self.temp_dir / "archived" + meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000, toarchive=True) + self.assertTrue((self.temp_dir / "archived.sigmf").exists()) + self.assertFalse((self.temp_dir / "archived.sigmf-data").exists()) + self.assertFalse((self.temp_dir / "archived.sigmf-meta").exists()) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) + + def test_write_toarchive_by_extension(self): + """test that .sigmf extension auto-detects toarchive""" + path = self.temp_dir / "autoarch.sigmf" + meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000) + self.assertTrue((self.temp_dir / "autoarch.sigmf").exists()) + self.assertFalse((self.temp_dir / "autoarch.sigmf-data").exists()) + self.assertFalse((self.temp_dir / "autoarch.sigmf-meta").exists()) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) + + def test_roundtrip_through_archive(self): + """test write then read via uncompressed archive""" + path = self.temp_dir / "rt_arch" + sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000, toarchive=True) + readback = sigmf.fromfile(str(self.temp_dir / "rt_arch.sigmf")) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) From 05ac8068c23b864661adcfdaa1a54b797fe3a773 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Wed, 29 Apr 2026 10:57:22 -0700 Subject: [PATCH 3/5] increment to v1.10.0 --- sigmf/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index e66a84f..f78ea6e 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.9.1" +__version__ = "1.10.0" # matching version of the SigMF specification __specification__ = "1.2.6" From 1435246a47d95cf22fa22d4b00fc0b41bf00a655 Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 1 May 2026 11:04:10 -0700 Subject: [PATCH 4/5] polish implementation, new fromarray() method * fromarray() only creates SigMFFile object from numpy array, doesn't write files * SigMFFile.tofile() auto-detects archive/compression from extension * when data_buffer exists, will also write `.sigmf-data`, like when using (SigMFGenerator) * added more tests --- README.md | 3 +- docs/source/advanced.rst | 25 +++-- docs/source/quickstart.rst | 17 ++-- sigmf/__init__.py | 2 +- sigmf/convert/blue.py | 4 +- sigmf/convert/signalhound.py | 10 +- sigmf/convert/wav.py | 6 +- sigmf/sigmffile.py | 135 ++++++++++++++------------ tests/test_archive.py | 183 +++++++++++++++++++++++------------ tests/test_archivereader.py | 2 +- tests/test_sigmffile.py | 106 +++++++++----------- tests/test_validation.py | 2 +- 12 files changed, 279 insertions(+), 216 deletions(-) diff --git a/README.md b/README.md index 5e80d49..defd4ac 100644 --- a/README.md +++ b/README.md @@ -46,8 +46,9 @@ import numpy as np import sigmf data = np.array([0.1 + 0.2j, 0.3 + 0.4j], dtype=np.complex64) +meta = sigmf.fromarray(data, sample_rate=48000) # creates recording.sigmf-data and recording.sigmf-meta -meta = sigmf.tofile("recording", data, sample_rate=48000) +meta.tofile("recording") ``` ### Docs diff --git a/docs/source/advanced.rst b/docs/source/advanced.rst index f45c330..683778d 100644 --- a/docs/source/advanced.rst +++ b/docs/source/advanced.rst @@ -200,8 +200,8 @@ read it, this can be done "in mid air" or "without touching the ground (disk)". Compressed SigMF Archives ------------------------------ -SigMF archives can be compressed using gzip, xz, or zip. The compression format -is determined by the file extension: +SigMF archives can be compressed using gzip, xz, or zip. +The file extension determines the archive format: +---------------------+-------------+ | Extension | Format | @@ -222,24 +222,23 @@ is determined by the file extension: >>> import sigmf >>> signal = sigmf.sigmffile.fromfile('recording.sigmf-meta') - # compress by extension - >>> signal.archive('recording.sigmf.xz') + # extension determines format + >>> signal.tofile('recording.sigmf.xz') + >>> signal.archive('recording.sigmf.gz') - # or specify compression explicitly - >>> signal.archive('recording.sigmf', compression='gz') + # compression parameter creates archive with correct extension + >>> signal.tofile('recording', compression='xz') # → recording.sigmf.xz + >>> signal.archive('recording', compression='gz') # → recording.sigmf.gz **Reading compressed archives:** :: - >>> arc = sigmf.SigMFArchiveReader('recording.sigmf.xz') - >>> arc[:10] + >>> signal = sigmf.fromfile('recording.sigmf.xz') + >>> signal[:10] array([-20.+11.j, ...], dtype=complex64) **Memory behavior:** -Uncompressed ``.sigmf`` archives use ``numpy.memmap`` to access the data -directly inside the tar file — no extra memory is needed, even for very large -recordings. Compressed archives (``.sigmf.gz``, ``.sigmf.xz``, ``.sigmf.zip``) -must decompress the data into RAM before it can be accessed. Keep this in mind -when working with large compressed recordings. +Uncompressed ``.sigmf`` archives use ``numpy.memmap`` for zero-copy access. +Compressed archives must decompress into RAM before access. diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 4927f99..5345590 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -53,17 +53,20 @@ Save a Numpy array as a SigMF Recording # suppose we have a complex timeseries signal data = np.zeros(1024, dtype=np.complex64) - # write to disk — datatype is inferred from the numpy array - meta = sigmf.tofile("example", data, sample_rate=48000, frequency=915e6) + # create SigMFFile from array — datatype is inferred from the numpy array + meta = sigmf.fromarray(data, sample_rate=48000, frequency=915e6) + + # write to separate .sigmf-meta and .sigmf-data files + meta.tofile("example") # or write to a SigMF archive (example.sigmf) - meta = sigmf.tofile("example.sigmf", data, sample_rate=48000, frequency=915e6) + meta.tofile("example.sigmf") - # or write directly to a compressed archive (example.sigmf.xz) - meta = sigmf.tofile("example", data, sample_rate=48000, compression="xz") + # or write to a compressed archive (example.sigmf.xz) + meta.tofile("example.sigmf.xz") -The returned ``SigMFFile`` object can be used to add captures, annotations, -or archive the recording. +The ``SigMFFile`` object can be modified before writing to add additional +captures, annotations, or global metadata fields. --------------------------------------------------- Save a Numpy array with Full Metadata (Advanced) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index f78ea6e..fca8ff4 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -22,4 +22,4 @@ from .archive import SigMFArchive from .archivereader import SigMFArchiveReader from .siggen import SigMFGenerator -from .sigmffile import SigMFCollection, SigMFFile, fromarchive, fromfile, tofile +from .sigmffile import SigMFCollection, SigMFFile, fromarchive, fromarray, fromfile diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py index c988542..c92650a 100644 --- a/sigmf/convert/blue.py +++ b/sigmf/convert/blue.py @@ -726,12 +726,12 @@ def construct_sigmf( meta.add_capture(0, metadata=capture_info) if create_archive: - meta.tofile(filenames["archive_fn"], toarchive=True, overwrite=overwrite) + meta.tofile(filenames["archive_fn"], overwrite=overwrite) log.info("wrote SigMF archive to %s", filenames["archive_fn"]) # metadata returned should be for this archive meta = fromfile(filenames["archive_fn"]) else: - meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + meta.tofile(filenames["meta_fn"], overwrite=overwrite) log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) log.debug("created %r", meta) diff --git a/sigmf/convert/signalhound.py b/sigmf/convert/signalhound.py index 8fa163f..95ffb0c 100644 --- a/sigmf/convert/signalhound.py +++ b/sigmf/convert/signalhound.py @@ -10,12 +10,12 @@ import io import logging import tempfile -import defusedxml.ElementTree as ET -from xml.etree.ElementTree import Element from datetime import datetime, timedelta, timezone from pathlib import Path from typing import List, Optional, Tuple +from xml.etree.ElementTree import Element +import defusedxml.ElementTree as ET import numpy as np from .. import SigMFFile, fromfile @@ -407,7 +407,7 @@ def signalhound_to_sigmf( if out_path is not None: output_dir = filenames["meta_fn"].parent output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + meta.tofile(filenames["meta_fn"], overwrite=overwrite) log.info("wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) log.debug("created %r", meta) @@ -435,7 +435,7 @@ def signalhound_to_sigmf( output_dir = filenames["archive_fn"].parent output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["archive_fn"], toarchive=True, overwrite=overwrite) + meta.tofile(filenames["archive_fn"], overwrite=overwrite) log.info("wrote SigMF archive to %s", filenames["archive_fn"]) # metadata returned should be for this archive meta = fromfile(filenames["archive_fn"]) @@ -460,7 +460,7 @@ def signalhound_to_sigmf( _add_annotations(meta, annotations) # write metadata file - meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + meta.tofile(filenames["meta_fn"], overwrite=overwrite) log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) log.debug("created %r", meta) diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py index c298b0a..2b715e1 100644 --- a/sigmf/convert/wav.py +++ b/sigmf/convert/wav.py @@ -176,7 +176,7 @@ def wav_to_sigmf( filenames = get_sigmf_filenames(out_path) output_dir = filenames["meta_fn"].parent output_dir.mkdir(parents=True, exist_ok=True) - meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + meta.tofile(filenames["meta_fn"], overwrite=overwrite) log.info("wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) log.debug("created %r", meta) @@ -201,7 +201,7 @@ def wav_to_sigmf( meta = SigMFFile(data_file=data_path, global_info=global_info) meta.add_capture(0, metadata=capture_info) - meta.tofile(filenames["archive_fn"], toarchive=True, overwrite=overwrite) + meta.tofile(filenames["archive_fn"], overwrite=overwrite) log.info("wrote SigMF archive to %s", filenames["archive_fn"]) # metadata returned should be for this archive meta = fromfile(filenames["archive_fn"]) @@ -219,7 +219,7 @@ def wav_to_sigmf( meta = SigMFFile(data_file=data_path, global_info=global_info) meta.add_capture(0, metadata=capture_info) - meta.tofile(filenames["meta_fn"], toarchive=False, overwrite=overwrite) + meta.tofile(filenames["meta_fn"], overwrite=overwrite) log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) log.debug("created %r", meta) diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 5eb28d1..35abe28 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -824,38 +824,79 @@ def archive(self, name=None, fileobj=None, compression=None, overwrite=False): def tofile(self, file_path, pretty=True, toarchive=False, compression=None, skip_validate=False, overwrite=False): """ - Write metadata file or full archive containing metadata & dataset. + Write metadata file or archive based on file extension. + + The file extension determines the output format: + - No extension or other extension → `.sigmf-meta` file (and `.sigmf-data` if data_buffer exists) + - `.sigmf` → uncompressed archive + - `.sigmf.gz`, `.sigmf.xz`, `.sigmf.zip` → compressed archive Parameters ---------- file_path : string - Location to save. + Location to save. Extension determines output format. pretty : bool, default True - When True will write more human-readable output, otherwise will be flat JSON. + When True will write human-readable JSON, otherwise flat JSON. toarchive : bool, default False - If True will write both dataset & metadata into SigMF archive format. - If False will only write metadata to `sigmf-meta`. + If True, forces archive creation (writes metadata and data to archive) regardless of file extension. compression : str, optional - Compression type when toarchive=True: "gz", "xz", "zip", or None. + Compression type: "gz", "xz", "zip", or None. + If specified, must match file extension if extension implies compression. + If no archive extension is present, creates a compressed archive. skip_validate : bool, default False Skip validation of metadata before writing. overwrite : bool, default False If False, raise exception if output file already exists. + + Examples + -------- + >>> meta.tofile('recording') # creates recording.sigmf-meta + >>> meta.tofile('recording.sigmf') # creates recording.sigmf (archive) + >>> meta.tofile('recording.sigmf.gz') # creates recording.sigmf.gz (compressed) + >>> meta.tofile('recording', compression='xz') # creates recording.sigmf.xz """ if not skip_validate: self.validate() - fns = get_sigmf_filenames(file_path) + + path = Path(file_path) + + # auto-detect compression from extension + detected_compression = _detect_compression(path) + if detected_compression is not None: + if compression is not None and compression != detected_compression: + raise SigMFFileError( + f"Extension implies '{detected_compression}' compression but compression='{compression}' was specified." + ) + compression = detected_compression + toarchive = True + + # auto-detect archive from .sigmf extension + if path.name.lower().endswith(SIGMF_ARCHIVE_EXT): + toarchive = True + + # compression implies archive + if compression is not None: + toarchive = True if toarchive: - self.archive(fns["archive_fn"], compression=compression, overwrite=overwrite) + # pass the original file_path to archive() so it handles extension properly + self.archive(file_path, compression=compression, overwrite=overwrite) else: - # check if metadata file exists + # write metadata file (and data file if data_buffer exists) + fns = get_sigmf_filenames(file_path) if not overwrite and fns["meta_fn"].exists(): raise SigMFFileExistsError(fns["meta_fn"], "Metadata file") with open(fns["meta_fn"], "w") as fp: self.dump(fp, pretty=pretty) fp.write("\n") # text files should end in carriage return + # write data file if data_buffer exists + if self.data_buffer is not None: + if not overwrite and fns["data_fn"].exists(): + raise SigMFFileExistsError(fns["data_fn"], "Data file") + with open(fns["data_fn"], "wb") as fp: + fp.write(self.data_buffer.getbuffer()) + def read_samples_in_capture(self, index=0): """ Reads samples from the specified captures segment in its entirety. @@ -1260,69 +1301,48 @@ def get_dataset_filename_from_metadata(meta_fn, metadata=None): return None -def tofile(filename, data, sample_rate, frequency=None, toarchive=False, compression=None, global_info=None): +def fromarray(data, sample_rate, frequency=None, global_info=None): """ - Convenience method to write a numpy array to a SigMF recording. + Create a SigMFFile from a numpy array. - For quick saves — infers the SigMF datatype from the numpy dtype, writes - the data file, creates metadata with a single capture at index 0, and - saves to disk. For full control over captures, annotations, and global + Convenience function that infers the SigMF datatype from the numpy dtype, + creates an in-memory SigMFFile with a single capture at index 0. The + returned object can then be written to disk using ``tofile()`` or + ``archive()``. For full control over captures, annotations, and global fields, use ``SigMFFile`` directly. Parameters ---------- - filename : str | PathLike - Base filename or archive path. Accepts: - - ``"recording"`` — produces ``recording.sigmf-data`` and ``recording.sigmf-meta`` - - ``"recording.sigmf"`` — produces uncompressed archive (auto-detects toarchive) - - ``"recording.sigmf.xz"`` — produces compressed archive (auto-detects compression) data : np.ndarray - Signal samples to write. + Signal samples. sample_rate : float Sample rate in Hz. frequency : float, optional Center frequency in Hz for the capture. - toarchive : bool, default False - If True, produce a ``.sigmf`` archive instead of loose data/meta files. - Auto-detected from filename extension if not specified. - compression : str, optional - If set, also creates a compressed archive. One of "gz", "xz", "zip". - Auto-detected from filename extension if not specified. Implies toarchive. global_info : dict, optional Additional global metadata fields to include. Returns ------- SigMFFile - The SigMFFile object with data and metadata. + The SigMFFile object with in-memory data and metadata. + + Examples + -------- + >>> import numpy as np + >>> data = np.random.randn(1000) + 1j * np.random.randn(1000) + >>> meta = fromarray(data, sample_rate=1e6, frequency=915e6) + >>> meta.tofile('recording') # creates recording.sigmf-meta and recording.sigmf-data + >>> meta.tofile('recording.sigmf') # creates recording.sigmf archive """ - file_path = Path(filename) - - # detect compressed extension and extract base name - detected = _detect_compression(file_path) - if detected is not None: - if compression is not None and compression != detected: - raise SigMFFileError( - f"Extension implies '{detected}' compression but compression='{compression}' was specified." - ) - compression = detected - base_name = _get_archive_basename(file_path) - base_path = file_path.parent / base_name - elif file_path.name.endswith(SIGMF_ARCHIVE_EXT): - toarchive = True - base_path = file_path.parent / file_path.stem - else: - base_path = file_path - - # compression implies archive - if compression is not None: - toarchive = True - - fns = get_sigmf_filenames(base_path) - data_path = fns["data_fn"] + import io - data.tofile(data_path) + # create in-memory data buffer + data_buffer = io.BytesIO() + data_buffer.write(data.tobytes()) + data_buffer.seek(0) + # build metadata info = { SigMFFile.DATATYPE_KEY: get_data_type_str(data), SigMFFile.SAMPLE_RATE_KEY: sample_rate, @@ -1334,16 +1354,11 @@ def tofile(filename, data, sample_rate, frequency=None, toarchive=False, compres if frequency is not None: capture_meta = {SigMFFile.FREQUENCY_KEY: frequency} - meta = SigMFFile(data_file=data_path, global_info=info) + # create sigmffile object with in-memory buffer + meta = SigMFFile(global_info=info) + meta.set_data_file(data_buffer=data_buffer) meta.add_capture(0, metadata=capture_meta) - if toarchive: - # create archive only — no loose files - meta.archive(str(fns["base_fn"]), compression=compression) - data_path.unlink() - else: - meta.tofile(base_path) - return meta diff --git a/tests/test_archive.py b/tests/test_archive.py index ccbf9f0..547a5c9 100644 --- a/tests/test_archive.py +++ b/tests/test_archive.py @@ -37,7 +37,7 @@ def setUp(self): TEST_FLOAT32_DATA.tofile(self.temp_path_data) self.sigmf_object = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=self.temp_path_data) self.sigmf_object.tofile(self.temp_path_meta) - self.sigmf_object.tofile(self.temp_path_archive, toarchive=True) + self.sigmf_object.tofile(self.temp_path_archive) self.sigmf_tarfile = tarfile.open(self.temp_path_archive, mode="r", format=tarfile.PAX_FORMAT) def tearDown(self): @@ -195,86 +195,143 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.temp_dir) - def _roundtrip(self, archive_path, compression=None): - """write archive, read it back, verify samples match""" - self.sigmf_object.archive(name=archive_path, compression=compression, overwrite=True) - self.assertTrue(archive_path.exists()) - readback = fromfile(str(archive_path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) - return readback - - def test_roundtrip_gz(self): - """test gz compressed archive round-trip""" - self._roundtrip(self.temp_dir / "test.sigmf.gz") - - def test_roundtrip_xz(self): - """test xz compressed archive round-trip""" - self._roundtrip(self.temp_dir / "test.sigmf.xz") - - def test_roundtrip_zip(self): - """test zip compressed archive round-trip""" - self._roundtrip(self.temp_dir / "test.sigmf.zip") - - def test_explicit_compression_parameter(self): - """test explicit compression= parameter without matching extension""" - path = self.temp_dir / "test_explicit" - self.sigmf_object.archive(name=path, compression="gz", overwrite=True) - actual_path = self.temp_dir / "test_explicit.sigmf.gz" - self.assertTrue(actual_path.exists()) - readback = fromfile(str(actual_path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) - - def test_extension_autodetect(self): - """test that compression is auto-detected from extension""" - path = self.temp_dir / "test_auto.sigmf.xz" - self.sigmf_object.archive(name=path, overwrite=True) - self.assertTrue(path.exists()) - readback = fromfile(str(path)) - np.testing.assert_array_equal(self.original_samples, readback[:]) - - def test_metadata_preserved(self): - """test that metadata survives compression round-trip""" + def test_roundtrip_all_formats(self): + """compressed archives roundtrip with data intact""" for ext in ["sigmf.gz", "sigmf.xz", "sigmf.zip"]: - path = self.temp_dir / f"meta_test.{ext}" - readback = self._roundtrip(path) + path = self.temp_dir / f"test.{ext}" + self.sigmf_object.archive(name=path, overwrite=True) + self.assertTrue(path.exists()) + readback = fromfile(str(path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + # verify metadata preserved self.assertEqual( self.sigmf_object.get_global_field(SigMFFile.DATATYPE_KEY), readback.get_global_field(SigMFFile.DATATYPE_KEY), ) - self.assertEqual(len(self.sigmf_object.get_annotations()), len(readback.get_annotations())) def test_compressed_smaller_than_uncompressed(self): - """test that compressed archives are smaller than uncompressed""" - uncompressed_path = self.temp_dir / "size_test.sigmf" - gz_path = self.temp_dir / "size_test.sigmf.gz" - xz_path = self.temp_dir / "size_test.sigmf.xz" - zip_path = self.temp_dir / "size_test.sigmf.zip" - - self.sigmf_object.archive(name=uncompressed_path, overwrite=True) - self.sigmf_object.archive(name=gz_path, overwrite=True) - self.sigmf_object.archive(name=xz_path, overwrite=True) - self.sigmf_object.archive(name=zip_path, overwrite=True) - - uncompressed_size = uncompressed_path.stat().st_size - for compressed_path in [gz_path, xz_path, zip_path]: - self.assertLess(compressed_path.stat().st_size, uncompressed_size) + """compressed archives are smaller than uncompressed""" + paths = { + "sigmf": self.temp_dir / "test.sigmf", + "gz": self.temp_dir / "test.sigmf.gz", + "xz": self.temp_dir / "test.sigmf.xz", + "zip": self.temp_dir / "test.sigmf.zip", + } + for path in paths.values(): + self.sigmf_object.archive(name=path, overwrite=True) + + uncompressed_size = paths["sigmf"].stat().st_size + for key in ["gz", "xz", "zip"]: + self.assertLess(paths[key].stat().st_size, uncompressed_size) + + def test_explicit_compression_param(self): + """explicit compression parameter adds correct extension""" + path = self.temp_dir / "foo" + self.sigmf_object.archive(name=path, compression="gz", overwrite=True) + expected = self.temp_dir / "foo.sigmf.gz" + self.assertTrue(expected.exists()) + readback = fromfile(str(expected)) + np.testing.assert_array_equal(self.original_samples, readback[:]) def test_invalid_compression_raises_error(self): - """test that invalid compression type raises error""" + """invalid compression type raises error""" path = self.temp_dir / "bad.sigmf" for unsupported in ["bz2", "7z"]: - with self.assertRaises(error.SigMFFileError, msg=f"{unsupported} is not yet supported"): + with self.assertRaises(error.SigMFFileError): self.sigmf_object.archive(name=path, compression=unsupported, overwrite=True) def test_mismatched_extension_and_compression_raises_error(self): - """test that mismatched extension and compression raises error""" - path = self.temp_dir / "mismatch.sigmf.gz" + """mismatched extension and compression parameter raises error""" + path = self.temp_dir / "foo.sigmf.gz" with self.assertRaises(error.SigMFFileError): self.sigmf_object.archive(name=path, compression="xz", overwrite=True) + with self.assertRaises(error.SigMFFileError): + self.sigmf_object.tofile(str(path), compression="xz", overwrite=True) def test_uncompressed_archive_uses_memmap(self): - """test that uncompressed archives use memmap for data access""" - path = self.temp_dir / "memmap_test.sigmf" + """uncompressed archives use memmap for data access""" + path = self.temp_dir / "foo.sigmf" self.sigmf_object.archive(name=path, overwrite=True) reader = SigMFArchiveReader(path) self.assertIsInstance(reader.sigmffile._memmap, np.memmap) + + def test_tofile_sigmf_ext(self): + """tofile() with .sigmf extension creates archive""" + path = self.temp_dir / "foo.sigmf" + self.sigmf_object.tofile(str(path), overwrite=True) + self.assertTrue(path.exists()) + self.assertFalse((self.temp_dir / "foo.sigmf-meta").exists()) + readback = fromfile(str(path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_tofile_compressed_ext(self): + """tofile() with compressed extensions creates compressed archives""" + for ext, name in [("gz", "bar"), ("xz", "baz"), ("zip", "qux")]: + path = self.temp_dir / f"{name}.sigmf.{ext}" + self.sigmf_object.tofile(str(path), overwrite=True) + self.assertTrue(path.exists()) + self.assertFalse((self.temp_dir / f"{name}.sigmf.{ext}.sigmf-meta").exists()) + readback = fromfile(str(path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_tofile_explicit_compression(self): + """tofile() with explicit compression parameter adds correct extension""" + path = self.temp_dir / "foo" + self.sigmf_object.tofile(str(path), compression="xz", overwrite=True) + expected = self.temp_dir / "foo.sigmf.xz" + self.assertTrue(expected.exists()) + self.assertFalse((self.temp_dir / "foo.sigmf").exists()) + readback = fromfile(str(expected)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_archive_sigmf_ext(self): + """archive() with .sigmf extension creates archive""" + path = self.temp_dir / "bar.sigmf" + self.sigmf_object.archive(name=path, overwrite=True) + self.assertTrue(path.exists()) + readback = fromfile(str(path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_archive_compressed_ext(self): + """archive() with compressed extensions creates compressed archives""" + for ext, name in [("gz", "foo"), ("xz", "bar"), ("zip", "baz")]: + path = self.temp_dir / f"{name}.sigmf.{ext}" + self.sigmf_object.archive(name=path, overwrite=True) + self.assertTrue(path.exists()) + readback = fromfile(str(path)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_archive_explicit_compression(self): + """archive() with explicit compression parameter adds correct extension""" + path = self.temp_dir / "qux" + self.sigmf_object.archive(name=path, compression="xz", overwrite=True) + expected = self.temp_dir / "qux.sigmf.xz" + self.assertTrue(expected.exists()) + readback = fromfile(str(expected)) + np.testing.assert_array_equal(self.original_samples, readback[:]) + + def test_data_buffer_writes_data_file(self): + """tofile() with data_buffer writes both metadata and data files""" + # create sigmffile with data_buffer (like SigMFGenerator does) + import io + + data_buffer = io.BytesIO() + data_buffer.write(TEST_FLOAT32_DATA.tobytes()) + data_buffer.seek(0) + + meta = SigMFFile(copy.deepcopy(TEST_METADATA)) + meta.set_data_file(data_buffer=data_buffer) + + # tofile without archive extension should create separate files + path = self.temp_dir / "generated" + meta.tofile(str(path), overwrite=True) + + # should create both .sigmf-meta and .sigmf-data + expected_meta = self.temp_dir / "generated.sigmf-meta" + expected_data = self.temp_dir / "generated.sigmf-data" + self.assertTrue(expected_meta.exists()) + self.assertTrue(expected_data.exists()) + + # verify data roundtrips correctly + readback = fromfile(str(path)) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index 80552b3..fc945ec 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -52,7 +52,7 @@ def test_access_data_without_untar(self): SigMFFile.NUM_CHANNELS_KEY: num_channels, }, ) - temp_meta.tofile(temp_archive.name, toarchive=True, overwrite=True) + temp_meta.tofile(temp_archive.name, overwrite=True) readback = SigMFArchiveReader(temp_archive.name) readback_samples = readback[:] diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index e744137..bbfd0cd 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -330,7 +330,7 @@ def test_capture_byte_boundaries(self) -> None: # get a meta pair and archive meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8) arc_path = self.temp_dir / "arc.sigmf" - meta.tofile(arc_path, toarchive=True) + meta.tofile(arc_path) arc = sigmf.fromfile(arc_path) for bdx in range(3): self.assertEqual(meta.get_capture_byte_boundaries(bdx), arc.get_capture_byte_boundaries(bdx)) @@ -478,15 +478,15 @@ def test_metadata_overwrite_works(self): def test_prevent_archive_overwrite(self): """tofile archive raises exception when archive exists and overwrite=False""" # create existing archive - self.sigmf_obj.tofile(self.test_archive_path, toarchive=True) + self.sigmf_obj.tofile(self.test_archive_path) with self.assertRaises(error.SigMFFileError) as context: - self.sigmf_obj.tofile(self.test_archive_path, toarchive=True, overwrite=False) + self.sigmf_obj.tofile(self.test_archive_path, overwrite=False) self.assertIn("already exists", str(context.exception)) def test_archive_overwrite_works(self): """tofile archive succeeds when archive exists and overwrite=True""" # create existing archive - self.sigmf_obj.tofile(self.test_archive_path, toarchive=True) + self.sigmf_obj.tofile(self.test_archive_path) self.assertTrue(self.test_archive_path.exists()) original_checksum = self.sigmf_obj.get_global_field("core:sha512") @@ -497,7 +497,7 @@ def test_archive_overwrite_works(self): alt_sigmf.set_data_file(self.alt_data_path) # should succeed with overwrite=True and content should change - alt_sigmf.tofile(self.test_archive_path, toarchive=True, overwrite=True) + alt_sigmf.tofile(self.test_archive_path, overwrite=True) self.assertTrue(self.test_archive_path.exists()) # verify by reading the archive content back @@ -511,18 +511,18 @@ def test_default_behavior(self): """overwrite defaults to False for safety""" # create existing files self.sigmf_obj.tofile(self.test_meta_path) - self.sigmf_obj.tofile(self.test_archive_path, toarchive=True) + self.sigmf_obj.tofile(self.test_archive_path) # should raise exceptions with default overwrite=False with self.assertRaises(error.SigMFFileError): self.sigmf_obj.tofile(self.test_meta_path) with self.assertRaises(error.SigMFFileError): - self.sigmf_obj.tofile(self.test_archive_path, toarchive=True) + self.sigmf_obj.tofile(self.test_archive_path) -class TestTofileConvenience(unittest.TestCase): - """Tests for the sigmf.tofile() convenience function.""" +class TestFromarrayConvenience(unittest.TestCase): + """Tests for the sigmf.fromarray() convenience function.""" def setUp(self): self.temp_dir = Path(tempfile.mkdtemp()) @@ -530,66 +530,54 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.temp_dir) - def test_basic_write(self): - """test writing with a bare filename""" - path = self.temp_dir / "basic" - meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000) - self.assertTrue((self.temp_dir / "basic.sigmf-data").exists()) - self.assertTrue((self.temp_dir / "basic.sigmf-meta").exists()) + def test_basic_creation(self): + """test creating SigMFFile from array""" + meta = sigmf.fromarray(TEST_FLOAT32_DATA, sample_rate=4000) + self.assertEqual(meta.get_global_field(SigMFFile.SAMPLE_RATE_KEY), 4000) + self.assertEqual(meta.get_global_field(SigMFFile.DATATYPE_KEY), "rf32_le") np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) - def test_write_with_frequency(self): + def test_with_frequency(self): """test that frequency kwarg populates capture metadata""" - path = self.temp_dir / "freq" - meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000, frequency=915e6) + meta = sigmf.fromarray(TEST_FLOAT32_DATA, sample_rate=4000, frequency=915e6) self.assertEqual(meta.get_capture_info(0).get("core:frequency"), 915e6) - def test_write_compressed_by_extension(self): - """test that .sigmf.xz extension creates archive only""" - path = self.temp_dir / "comp.sigmf.xz" - meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=100) - self.assertTrue((self.temp_dir / "comp.sigmf.xz").exists()) - self.assertFalse((self.temp_dir / "comp.sigmf-data").exists()) - self.assertFalse((self.temp_dir / "comp.sigmf-meta").exists()) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) - - def test_write_compressed_by_kwarg(self): - """test that compression kwarg creates archive only""" - path = self.temp_dir / "comp2" - meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=100, compression="gz") - self.assertTrue((self.temp_dir / "comp2.sigmf.gz").exists()) - self.assertFalse((self.temp_dir / "comp2.sigmf-data").exists()) - self.assertFalse((self.temp_dir / "comp2.sigmf-meta").exists()) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) - - def test_roundtrip_through_compressed_archive(self): - """test write then read via compressed archive""" - path = self.temp_dir / "rt.sigmf.zip" - sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000) + def test_write_separate_files(self): + """test writing to separate meta and data files""" + meta = sigmf.fromarray(TEST_FLOAT32_DATA, sample_rate=4000) + path = self.temp_dir / "basic" + meta.tofile(str(path)) + self.assertTrue((self.temp_dir / "basic.sigmf-data").exists()) + self.assertTrue((self.temp_dir / "basic.sigmf-meta").exists()) readback = sigmf.fromfile(str(path)) np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) - def test_write_toarchive(self): - """test that toarchive=True creates .sigmf archive only""" - path = self.temp_dir / "archived" - meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000, toarchive=True) + def test_write_archive(self): + """test writing to uncompressed archive""" + meta = sigmf.fromarray(TEST_FLOAT32_DATA, sample_rate=4000) + path = self.temp_dir / "archived.sigmf" + meta.tofile(str(path)) self.assertTrue((self.temp_dir / "archived.sigmf").exists()) self.assertFalse((self.temp_dir / "archived.sigmf-data").exists()) self.assertFalse((self.temp_dir / "archived.sigmf-meta").exists()) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) - - def test_write_toarchive_by_extension(self): - """test that .sigmf extension auto-detects toarchive""" - path = self.temp_dir / "autoarch.sigmf" - meta = sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000) - self.assertTrue((self.temp_dir / "autoarch.sigmf").exists()) - self.assertFalse((self.temp_dir / "autoarch.sigmf-data").exists()) - self.assertFalse((self.temp_dir / "autoarch.sigmf-meta").exists()) - np.testing.assert_array_equal(TEST_FLOAT32_DATA, meta[:]) + readback = sigmf.fromfile(str(path)) + np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) - def test_roundtrip_through_archive(self): - """test write then read via uncompressed archive""" - path = self.temp_dir / "rt_arch" - sigmf.tofile(path, TEST_FLOAT32_DATA, sample_rate=48000, toarchive=True) - readback = sigmf.fromfile(str(self.temp_dir / "rt_arch.sigmf")) + def test_write_compressed_archive(self): + """test writing to compressed archive""" + meta = sigmf.fromarray(TEST_FLOAT32_DATA, sample_rate=4000) + path = self.temp_dir / "comp.sigmf.xz" + meta.tofile(str(path)) + self.assertTrue((self.temp_dir / "comp.sigmf.xz").exists()) + self.assertFalse((self.temp_dir / "comp.sigmf-data").exists()) + self.assertFalse((self.temp_dir / "comp.sigmf-meta").exists()) + readback = sigmf.fromfile(str(path)) np.testing.assert_array_equal(TEST_FLOAT32_DATA, readback[:]) + + def test_with_global_info(self): + """test that global_info dict is merged into metadata""" + meta = sigmf.fromarray( + TEST_FLOAT32_DATA, sample_rate=4000, global_info={"core:author": "test_author", "core:description": "test"} + ) + self.assertEqual(meta.get_global_field("core:author"), "test_author") + self.assertEqual(meta.get_global_field("core:description"), "test") diff --git a/tests/test_validation.py b/tests/test_validation.py index eaff000..fe6d278 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -39,7 +39,7 @@ def setUp(self): some_meta = SigMFFile(copy.deepcopy(TEST_METADATA), data_file=junk_path) some_meta.tofile(tmp_path / "a") some_meta.tofile(tmp_path / "b") - some_meta.tofile(tmp_path / "c", toarchive=True) + some_meta.tofile(tmp_path / "c.sigmf") def tearDown(self): """cleanup""" From 2d0fb8387e1c4e8ea9255613bd41327dd371b62a Mon Sep 17 00:00:00 2001 From: Teque5 Date: Fri, 1 May 2026 11:25:00 -0700 Subject: [PATCH 5/5] fix docstring & blacken-docs on RST --- docs/source/advanced.rst | 88 ++++++++++++++++++++++---------------- docs/source/converters.rst | 4 +- docs/source/quickstart.rst | 46 ++++++++++++-------- docs/source/siggen.rst | 4 +- sigmf/sigmffile.py | 22 +++++++--- 5 files changed, 96 insertions(+), 68 deletions(-) diff --git a/docs/source/advanced.rst b/docs/source/advanced.rst index 683778d..3cbee2b 100644 --- a/docs/source/advanced.rst +++ b/docs/source/advanced.rst @@ -18,7 +18,7 @@ the recording of the SigMF logo used in this example `from the specification from sigmf import SigMFFile, sigmffile # Load a dataset - path = 'logo/sigmf_logo' # extension is optional + path = "logo/sigmf_logo" # extension is optional signal = sigmffile.fromfile(path) # Get some metadata and all annotations @@ -31,13 +31,15 @@ the recording of the SigMF logo used in this example `from the specification for adx, annotation in enumerate(annotations): annotation_start_idx = annotation[SigMFFile.START_INDEX_KEY] annotation_length = annotation[SigMFFile.LENGTH_INDEX_KEY] - annotation_comment = annotation.get(SigMFFile.COMMENT_KEY, "[annotation {}]".format(adx)) + annotation_comment = annotation.get( + SigMFFile.COMMENT_KEY, "[annotation {}]".format(adx) + ) # Get capture info associated with the start of annotation capture = signal.get_capture_info(annotation_start_idx) freq_center = capture.get(SigMFFile.FREQUENCY_KEY, 0) - freq_min = freq_center - 0.5*sample_rate - freq_max = freq_center + 0.5*sample_rate + freq_min = freq_center - 0.5 * sample_rate + freq_max = freq_center + 0.5 * sample_rate # Get frequency edges of annotation (default to edges of capture) freq_start = annotation.get(SigMFFile.FLO_KEY) @@ -66,34 +68,41 @@ First, create a single SigMF Recording and save it to disk: data = np.zeros(1024, dtype=np.complex64) # write those samples to file in cf32_le - data.tofile('example_cf32.sigmf-data') + data.tofile("example_cf32.sigmf-data") # create the metadata meta = SigMFFile( - data_file='example_cf32.sigmf-data', # extension is optional - global_info = { + data_file="example_cf32.sigmf-data", # extension is optional + global_info={ SigMFFile.DATATYPE_KEY: get_data_type_str(data), # in this case, 'cf32_le' SigMFFile.SAMPLE_RATE_KEY: 48000, - SigMFFile.AUTHOR_KEY: 'jane.doe@domain.org', - SigMFFile.DESCRIPTION_KEY: 'All zero complex float32 example file.', - } + SigMFFile.AUTHOR_KEY: "jane.doe@domain.org", + SigMFFile.DESCRIPTION_KEY: "All zero complex float32 example file.", + }, ) # create a capture key at time index 0 - meta.add_capture(0, metadata={ - SigMFFile.FREQUENCY_KEY: 915000000, - SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), - }) + meta.add_capture( + 0, + metadata={ + SigMFFile.FREQUENCY_KEY: 915000000, + SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), + }, + ) # add an annotation at sample 100 with length 200 & 10 KHz width - meta.add_annotation(100, 200, metadata = { - SigMFFile.FLO_KEY: 914995000.0, - SigMFFile.FHI_KEY: 915005000.0, - SigMFFile.COMMENT_KEY: 'example annotation', - }) + meta.add_annotation( + 100, + 200, + metadata={ + SigMFFile.FLO_KEY: 914995000.0, + SigMFFile.FHI_KEY: 915005000.0, + SigMFFile.COMMENT_KEY: "example annotation", + }, + ) # check for mistakes & write to disk - meta.tofile('example_cf32.sigmf-meta') # extension is optional + meta.tofile("example_cf32.sigmf-meta") # extension is optional Now lets add another SigMF Recording and associate them with a SigMF Collection: @@ -103,41 +112,44 @@ Now lets add another SigMF Recording and associate them with a SigMF Collection: data_ci16 = np.zeros(1024, dtype=np.complex64) - #rescale and save as a complex int16 file: + # rescale and save as a complex int16 file: data_ci16 *= pow(2, 15) - data_ci16.view(np.float32).astype(np.int16).tofile('example_ci16.sigmf-data') + data_ci16.view(np.float32).astype(np.int16).tofile("example_ci16.sigmf-data") # create the metadata for the second file meta_ci16 = SigMFFile( - data_file='example_ci16.sigmf-data', # extension is optional - global_info = { - SigMFFile.DATATYPE_KEY: 'ci16_le', # get_data_type_str() is only valid for numpy types + data_file="example_ci16.sigmf-data", # extension is optional + global_info={ + SigMFFile.DATATYPE_KEY: "ci16_le", # get_data_type_str() is only valid for numpy types SigMFFile.SAMPLE_RATE_KEY: 48000, - SigMFFile.DESCRIPTION_KEY: 'All zero complex int16 file.', - } + SigMFFile.DESCRIPTION_KEY: "All zero complex int16 file.", + }, ) meta_ci16.add_capture(0, metadata=meta.get_capture_info(0)) - meta_ci16.tofile('example_ci16.sigmf-meta') - - collection = SigMFCollection(['example_cf32.sigmf-meta', 'example_ci16.sigmf-meta'], - metadata = {'collection': { - SigMFCollection.AUTHOR_KEY: 'sigmf@sigmf.org', - SigMFCollection.DESCRIPTION_KEY: 'Collection of two all zero files.', + meta_ci16.tofile("example_ci16.sigmf-meta") + + collection = SigMFCollection( + ["example_cf32.sigmf-meta", "example_ci16.sigmf-meta"], + metadata={ + "collection": { + SigMFCollection.AUTHOR_KEY: "sigmf@sigmf.org", + SigMFCollection.DESCRIPTION_KEY: "Collection of two all zero files.", } - } + }, ) streams = collection.get_stream_names() sigmf = [collection.get_SigMFFile(stream) for stream in streams] - collection.tofile('example_zeros.sigmf-collection') + collection.tofile("example_zeros.sigmf-collection") The SigMF Collection and its associated Recordings can now be loaded like this: .. code-block:: python import sigmf - collection = sigmf.fromfile('example_zeros') - ci16_sigmffile = collection.get_SigMFFile(stream_name='example_ci16') - cf32_sigmffile = collection.get_SigMFFile(stream_name='example_cf32') + + collection = sigmf.fromfile("example_zeros") + ci16_sigmffile = collection.get_SigMFFile(stream_name="example_ci16") + cf32_sigmffile = collection.get_SigMFFile(stream_name="example_cf32") ----------------------------------------------- Load a SigMF Archive and slice without untaring diff --git a/docs/source/converters.rst b/docs/source/converters.rst index 43e2770..3dbe84c 100644 --- a/docs/source/converters.rst +++ b/docs/source/converters.rst @@ -29,8 +29,8 @@ formats and reads without writing any output files: # auto-detect and create NCD for any supported format meta = sigmf.fromfile("recording.cdif") # BLUE file - meta = sigmf.fromfile("recording.wav") # WAV file - meta = sigmf.fromfile("recording.xml") # Signal Hound Spike file + meta = sigmf.fromfile("recording.wav") # WAV file + meta = sigmf.fromfile("recording.xml") # Signal Hound Spike file meta = sigmf.fromfile("recording.sigmf") # SigMF archive all_samples = meta.read_samples() diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 5345590..6d9a292 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -23,15 +23,16 @@ Read a SigMF Recording .. code-block:: python import sigmf + handle = sigmf.fromfile("example.sigmf") # reading data - handle.read_samples() # read all timeseries data - handle[10:50] # read memory slice of samples 10 through 50 + handle.read_samples() # read all timeseries data + handle[10:50] # read memory slice of samples 10 through 50 # accessing metadata - handle.sample_rate # get sample rate attribute - handle.get_global_info() # returns 'global' dictionary - handle.get_captures() # returns list of 'captures' dictionaries - handle.get_annotations() # returns list of all annotations + handle.sample_rate # get sample rate attribute + handle.get_global_info() # returns 'global' dictionary + handle.get_captures() # returns list of 'captures' dictionaries + handle.get_annotations() # returns list of all annotations ----------------------------------- Verify SigMF Integrity & Compliance @@ -88,30 +89,37 @@ For full control over global fields, captures, and annotations: # create the metadata meta = SigMFFile( - data_file="example.sigmf-data", # extension is optional - global_info = { + data_file="example.sigmf-data", # extension is optional + global_info={ SigMFFile.DATATYPE_KEY: get_data_type_str(data), # in this case, "cf32_le" SigMFFile.SAMPLE_RATE_KEY: 48000, SigMFFile.AUTHOR_KEY: "jane.doe@domain.org", SigMFFile.DESCRIPTION_KEY: "All zero complex float32 example file.", - } + }, ) # create a capture key at time index 0 - meta.add_capture(0, metadata={ - SigMFFile.FREQUENCY_KEY: 915000000, - SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), - }) + meta.add_capture( + 0, + metadata={ + SigMFFile.FREQUENCY_KEY: 915000000, + SigMFFile.DATETIME_KEY: get_sigmf_iso8601_datetime_now(), + }, + ) # add an annotation at sample 100 with length 200 & 10 KHz width - meta.add_annotation(100, 200, metadata = { - SigMFFile.FLO_KEY: 914995000.0, - SigMFFile.FHI_KEY: 915005000.0, - SigMFFile.COMMENT_KEY: "example annotation", - }) + meta.add_annotation( + 100, + 200, + metadata={ + SigMFFile.FLO_KEY: 914995000.0, + SigMFFile.FHI_KEY: 915005000.0, + SigMFFile.COMMENT_KEY: "example annotation", + }, + ) # validate & write to disk - meta.tofile("example.sigmf-meta") # extension is optional + meta.tofile("example.sigmf-meta") # extension is optional ---------------------------------- Attribute Access for Global Fields diff --git a/docs/source/siggen.rst b/docs/source/siggen.rst index 9d2c022..6f0b6b3 100644 --- a/docs/source/siggen.rst +++ b/docs/source/siggen.rst @@ -64,8 +64,8 @@ A seed ensures reproducibility across runs. signal = SigMFGenerator(seed=0xDEADBEEF).generate() # the number and type of components are randomly chosen - print(signal.description) # e.g. "synthetic signal with 3 tones and 2 sweeps" - print(signal.get_annotations()) # one annotation per component + print(signal.description) # e.g. "synthetic signal with 3 tones and 2 sweeps" + print(signal.get_annotations()) # one annotation per component Without a seed, each call produces a different signal. diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 35abe28..4077422 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -850,10 +850,15 @@ def tofile(self, file_path, pretty=True, toarchive=False, compression=None, skip Examples -------- - >>> meta.tofile('recording') # creates recording.sigmf-meta - >>> meta.tofile('recording.sigmf') # creates recording.sigmf (archive) - >>> meta.tofile('recording.sigmf.gz') # creates recording.sigmf.gz (compressed) - >>> meta.tofile('recording', compression='xz') # creates recording.sigmf.xz + >>> from sigmf.siggen import SigMFGenerator + >>> import tempfile + >>> from pathlib import Path + >>> meta = SigMFGenerator().generate() + >>> tmpdir = Path(tempfile.mkdtemp()) + >>> meta.tofile(tmpdir / 'recording') # creates recording.sigmf-meta and recording.sigmf-data pair + >>> meta.tofile(tmpdir / 'recording.sigmf') # creates recording.sigmf (archive) + >>> meta.tofile(tmpdir / 'recording.sigmf.gz') # creates recording.sigmf.gz (compressed) + >>> meta.tofile(tmpdir / 'other', compression='xz') # creates other.sigmf.xz """ if not skip_validate: self.validate() @@ -1330,10 +1335,13 @@ def fromarray(data, sample_rate, frequency=None, global_info=None): Examples -------- >>> import numpy as np + >>> import tempfile + >>> from pathlib import Path >>> data = np.random.randn(1000) + 1j * np.random.randn(1000) - >>> meta = fromarray(data, sample_rate=1e6, frequency=915e6) - >>> meta.tofile('recording') # creates recording.sigmf-meta and recording.sigmf-data - >>> meta.tofile('recording.sigmf') # creates recording.sigmf archive + >>> meta = fromarray(data, sample_rate=1e6, frequency=915e6) # returns SigMFFile + >>> tmpdir = Path(tempfile.mkdtemp()) + >>> meta.tofile(tmpdir / 'recording') # creates recording.sigmf-meta and recording.sigmf-data + >>> meta.tofile(tmpdir / 'recording.sigmf') # creates recording.sigmf archive """ import io