Skip to content

Python API Reference

Contents


Algorithm API Example: dstream.steady_algo

Implements the steady downsampling algorithm, which retains items in a uniform distribution across elapsed stream history.

Import as

from downstream.dstream import steady_algo

Or, alternately

from downstream import dstream
steady_algo = dstream.steady_algo

steady_algo.assign_storage_site

steady_algo._steady_assign_storage_site.steady_assign_storage_site(S, T)

Site selection algorithm for steady curation.

Parameters:

Name Type Description Default
S int

Buffer size. Must be a power of two.

required
T int

Current logical time.

required

Returns:

Type Description
Optional[int]

Selected site, if any.

Raises:

Type Description
ValueError

If insufficient ingest capacity is available.

See steady_algo.has_ingest_capacity for details.

Source code in steady_algo/_steady_assign_storage_site.py
def steady_assign_storage_site(S: int, T: int) -> typing.Optional[int]:
    """Site selection algorithm for steady curation.

    Parameters
    ----------
    S : int
        Buffer size. Must be a power of two.
    T : int
        Current logical time.

    Returns
    -------
    typing.Optional[int]
        Selected site, if any.

    Raises
    ------
    ValueError
        If insufficient ingest capacity is available.

        See `steady_algo.has_ingest_capacity` for details.
    """
    if not steady_has_ingest_capacity(S, T):
        raise ValueError(f"Insufficient ingest capacity for {S=}, {T=}")

    s = S.bit_length() - 1
    t = T.bit_length() - s  # Current epoch (or negative)
    h = ctz(T + 1)  # Current hanoi value
    if h < t:  # If not a top n(T) hanoi value...
        return None  # ...discard without storing

    i = T >> (h + 1)  # Hanoi value incidence (i.e., num seen)
    if i == 0:  # Special case the 0th bunch
        k_b = 0  # Bunch position
        o = 0  # Within-bunch offset
        w = s + 1  # Segment width
    else:
        j = bit_floor(i) - 1  # Num full-bunch segments
        B = j.bit_length()  # Num full bunches
        k_b = (1 << B) * (s - B + 1)  # Bunch position
        w = h - t + 1  # Segment width
        assert w > 0
        o = w * (i - j - 1)  # Within-bunch offset

    p = h % w  # Within-segment offset
    return k_b + o + p  # Calculate placement site

steady_algo.assign_storage_site_batched

steady_algo._steady_assign_storage_site_batched.steady_assign_storage_site_batched(S, T)

Site selection algorithm for steady curation.

Vectorized implementation for bulk calculations.

Parameters:

Name Type Description Default
S Union[ndarray, int]

Buffer size. Must be a power of two, <= 2**52.

required
T Union[ndarray, int]

Current logical time. Must be <= 2**52.

required

Returns:

Type Description
array

Selected site, if any. Otherwise, S.

Source code in steady_algo/_steady_assign_storage_site_batched.py
def steady_assign_storage_site_batched(
    S: typing.Union[np.ndarray, int], T: typing.Union[np.ndarray, int]
) -> np.ndarray:
    """Site selection algorithm for steady curation.

    Vectorized implementation for bulk calculations.

    Parameters
    ----------
    S : Union[np.ndarray, int]
        Buffer size. Must be a power of two, <= 2**52.
    T : Union[np.ndarray, int]
        Current logical time. Must be <= 2**52.

    Returns
    -------
    np.array
        Selected site, if any. Otherwise, S.
    """
    S, T = np.atleast_1d(S).astype(np.int64), np.atleast_1d(T).astype(np.int64)
    assert np.logical_and(S > 1, np.bitwise_count(S) == 1).all()
    # restriction <= 2 ** 52 might be overly conservative
    assert (np.maximum(S, T) <= 2**52).all()

    s = bitlen32(S) - 1
    t = bitlen32(T) - s  # Current epoch (or negative)
    h = ctz32(T + 1)  # Current hanoi value

    i = T >> (h + 1)  # Hanoi value incidence (i.e., num seen)

    j = bit_floor32(i) - 1  # Num full-bunch segments
    B = bitlen32(j)  # Num full bunches
    k_b = (1 << B) * (s - B + 1)  # Bunch position
    w = h - t + 1  # Segment width
    assert np.where((h >= t) & (i != 0), w > 0, True).all()
    o = w * (i - j - 1)  # Within-bunch offset

    # Special case the 0th bunch...
    zeroth_bunch = i == 0
    k_b[zeroth_bunch] = 0
    o[zeroth_bunch] = 0
    w[zeroth_bunch] = np.broadcast_to(s, w.shape)[zeroth_bunch] + 1

    with np.errstate(divide="ignore"):
        p = h % w  # Within-segment offset

    # handle discard without storing for non-top n(T) hanoi value...
    return np.where(h >= t, k_b + o + p, S)

steady_algo.get_ingest_capacity

steady_algo._steady_get_ingest_capacity.steady_get_ingest_capacity(S)

How many data item ingestions does this algorithm support?

Returns None if the number of supported ingestions is unlimited.

See Also

has_ingest_capacity : Does this algorithm have the capacity to ingest n data items?

Source code in steady_algo/_steady_get_ingest_capacity.py
def steady_get_ingest_capacity(S: int) -> typing.Optional[int]:
    """How many data item ingestions does this algorithm support?

    Returns None if the number of supported ingestions is unlimited.

    See Also
    --------
    has_ingest_capacity : Does this algorithm have the capacity to ingest `n`
    data items?
    """
    surface_size_ok = S.bit_count() == 1 and S > 1
    return None if surface_size_ok else 0

steady_algo.has_ingest_capacity

steady_algo._steady_has_ingest_capacity.steady_has_ingest_capacity(S, T)

Does this algorithm have the capacity to ingest a data item at logical time T?

Parameters:

Name Type Description Default
S int

The number of buffer sites available.

required
T int

Queried logical time.

required

Returns:

Type Description
bool
See Also

get_ingest_capacity : How many data item ingestions does this algorithm support? has_ingest_capacity_batched : Numpy-friendly implementation.

Source code in steady_algo/_steady_has_ingest_capacity.py
def steady_has_ingest_capacity(S: int, T: int) -> bool:
    """Does this algorithm have the capacity to ingest a data item at logical
    time T?

    Parameters
    ----------
    S : int
        The number of buffer sites available.
    T : int
        Queried logical time.

    Returns
    -------
    bool

    See Also
    --------
    get_ingest_capacity : How many data item ingestions does this algorithm
    support?
    has_ingest_capacity_batched : Numpy-friendly implementation.
    """
    assert T >= 0
    ingest_capacity = steady_get_ingest_capacity(S)
    return ingest_capacity is None or T < ingest_capacity

steady_algo.has_ingest_capacity_batched

steady_algo._steady_has_ingest_capacity_batched.steady_has_ingest_capacity_batched(S, T)

Does this algorithm have the capacity to ingest a data item at logical time T?

Vectorized implementation for bulk calculations.

Parameters:

Name Type Description Default
S int or ndarray

The number of buffer sites available.

required
T int or ndarray

Queried logical time.

required

Returns:

Type Description
np.ndarray of bool

True if ingest capacity is sufficient, False otherwise.

See Also

get_ingest_capacity : How many data item ingestions does this algorithm support?

Source code in steady_algo/_steady_has_ingest_capacity_batched.py
def steady_has_ingest_capacity_batched(
    S: typing.Union[int, np.ndarray],
    T: typing.Union[int, np.ndarray],
) -> np.ndarray:
    """Does this algorithm have the capacity to ingest a data item at logical
    time T?

        Vectorized implementation for bulk calculations.

    Parameters
    ----------
    S : int or np.ndarray
        The number of buffer sites available.
    T : int or np.ndarray
        Queried logical time.

    Returns
    -------
    np.ndarray of bool
        True if ingest capacity is sufficient, False otherwise.

    See Also
    --------
    get_ingest_capacity : How many data item ingestions does this algorithm
    support?
    """
    S, T = np.asarray(S), np.asarray(T)
    assert (T >= 0).all()

    surface_size_ok = np.logical_and(np.bitwise_count(S) == 1, S > 1)
    return surface_size_ok + np.zeros_like(T, dtype=bool)  # Broadcast T.size

steady_algo.lookup_ingest_times

steady_algo._steady_lookup_ingest_times.steady_lookup_ingest_times(S, T)

Ingest time lookup algorithm for steady curation.

Lazy implementation.

Parameters:

Name Type Description Default
S int

Buffer size. Must be a power of two.

required
T int

Current logical time.

required

Yields:

Type Description
Optional[int]

Ingest time of stored item at buffer sites in index order.

Source code in steady_algo/_steady_lookup_ingest_times.py
def steady_lookup_ingest_times(
    S: int, T: int
) -> typing.Iterable[typing.Optional[int]]:
    """Ingest time lookup algorithm for steady curation.

    Lazy implementation.

    Parameters
    ----------
    S : int
        Buffer size. Must be a power of two.
    T : int
        Current logical time.

    Yields
    ------
    typing.Optional[int]
        Ingest time of stored item at buffer sites in index order.
    """
    assert T >= 0
    if T < S:  # Patch for before buffer is filled...
        return (v if v < T else None for v in steady_lookup_impl(S, S))
    else:  # ... assume buffer has been filled
        return steady_lookup_impl(S, T)

steady_algo.lookup_ingest_times_batched

steady_algo._steady_lookup_ingest_times_batched.steady_lookup_ingest_times_batched(S, T, parallel=True)

Ingest time lookup algorithm for steady curation.

Vectorized implementation for bulk calculations.

Parameters:

Name Type Description Default
S int

Buffer size. Must be a power of two.

required
T ndarray

One-dimensional array of current logical times.

required
parallel bool

Should numba be applied to parallelize operations?

True

Returns:

Type Description
ndarray

Ingest time of stored items at buffer sites in index order.

Two-dimensional array. Each row corresponds to an entry in T. Contains S columns, each corresponding to buffer sites.

Source code in steady_algo/_steady_lookup_ingest_times_batched.py
def steady_lookup_ingest_times_batched(
    S: int,
    T: np.ndarray,
    parallel: bool = True,
) -> np.ndarray:
    """Ingest time lookup algorithm for steady curation.

    Vectorized implementation for bulk calculations.

    Parameters
    ----------
    S : int
        Buffer size. Must be a power of two.
    T : np.ndarray
        One-dimensional array of current logical times.
    parallel : bool, default True
        Should numba be applied to parallelize operations?

    Returns
    -------
    np.ndarray
        Ingest time of stored items at buffer sites in index order.

        Two-dimensional array. Each row corresponds to an entry in T. Contains
        S columns, each corresponding to buffer sites.
    """
    assert np.issubdtype(np.asarray(S).dtype, np.integer), S
    assert np.issubdtype(T.dtype, np.integer), T

    if (T < S).any():
        raise ValueError("T < S not supported for batched lookup")

    return [
        _steady_lookup_ingest_times_batched,
        _steady_lookup_ingest_times_batched_jit,
    ][bool(parallel)](np.int64(S), T.astype(np.int64))

steady_algo.lookup_ingest_times_eager

steady_algo._steady_lookup_ingest_times_eager.steady_lookup_ingest_times_eager(S, T)

Ingest time lookup algorithm for steady curation.

Eager implementation.

Parameters:

Name Type Description Default
S int

Buffer size. Must be a power of two.

required
T int

Current logical time.

required

Returns:

Type Description
List[int]

Ingest time of stored item at buffer sites in index order.

Source code in steady_algo/_steady_lookup_ingest_times_eager.py
def steady_lookup_ingest_times_eager(S: int, T: int) -> typing.List[int]:
    """Ingest time lookup algorithm for steady curation.

    Eager implementation.

    Parameters
    ----------
    S : int
        Buffer size. Must be a power of two.
    T : int
        Current logical time.

    Returns
    -------
    typing.List[int]
        Ingest time of stored item at buffer sites in index order.
    """
    if T < S:
        raise ValueError("T < S not supported for eager lookup")
    return list(steady_lookup_impl(S, T))

Additional Algorithms in dstream

The following additional algorithms are available in the dstream module:

  • circular_algo implements a simple ring buffer for last n sampling.
  • compressing_algo implements steady curation via Gunther's compressing circular buffer algorithm.
  • hybrid_0_steady_1_stretched_2_algo for hybrid downsampling combining steady and stretched algorithms.
  • hybrid_0_steady_1_stretchedxtc_2_algo for hybrid downsampling combining steady and stretchedxtc algorithms.
  • hybrid_0_steady_1_tilted_2_algo for hybrid downsampling combining steady and tilted algorithms.
  • hybrid_0_steady_1_tiltedxtc_2_algo for hybrid downsampling combining steady and tiltedxtc algorithms.
  • primed_algo_* which fills surface left-to-right before delegating to another algorithm.
  • stretched_algo for stretched downsampling.
  • stretchedxtc_algo for stretched downsampling, with infinite T domain extension.
  • tilted_algo for tilted downsampling.
  • tiltedxtc_algo for stretched downsampling, with infinite T domain extension.

These algorithms follow identical API conventions as steady_algo, as shown above.


Container API: dsurf.Surface

Import as

from downstream.dsurf import Surface

Or, alternately

from downstream import dsurf
Surface = dstream.Surface

dsurf.Surface

dsurf.Surface

Bases: Generic[_DSurfDataItem]

Container orchestrating downstream curation over a fixed-size buffer.

Source code in dsurf/_Surface.py
class Surface(typing.Generic[_DSurfDataItem]):
    """Container orchestrating downstream curation over a fixed-size buffer."""

    __slots__ = ("_storage", "algo", "T")

    algo: types.ModuleType
    _storage: typing.MutableSequence[_DSurfDataItem]
    T: int  # current logical time

    @staticmethod
    def from_hex(
        hex_string: str,
        algo: types.ModuleType,
        *,
        S: int,
        storage_bitoffset: typing.Optional[int] = None,
        storage_bitwidth: int,
        T_bitoffset: int = 0,
        T_bitwidth: int = 32,
    ) -> "Surface":
        """
        Deserialize a Surface object from a hex string representation.

        Hex string representation needs exactly two contiguous parts:
        1. dstream_T (which is the number of ingesgted data items), and
        2. dstream_storage (which holds all the stored data items).

        Data items are deserialized as unsigned integers.

        Data in hex string representation should use big-endian byte order.

        Parameters
        ----------
        hex_string: str
            Hex string to be parsed, which can be uppercase or lowercase.
        algo: module
            Dstream algorithm to use to create the new Surface object.
        storage_bitoffset: int, default T_bitwidth
            Number of bits before the storage.
        storage_bitwidth: int
            Number of bits used for storage.
        T_bitoffset: int, default 0
            Number of bits before dstream_T.
        T_bitwidth: int, default 32
            Number of bits used to store dstream_T.
        S: int
            Number of buffer sites used to store data items.

            Determines how many items are unpacked from storage.

        See Also
        --------
        Surface.to_hex()
            Serialize a Surface object into a hex string.
        """
        if storage_bitoffset is None:
            storage_bitoffset = T_bitwidth

        for arg in (
            "storage_bitoffset",
            "storage_bitwidth",
            "T_bitoffset",
            "T_bitwidth",
        ):
            if locals()[arg] % 4:
                msg = f"Hex-unaligned `{arg}` not yet supported"
                raise NotImplementedError(msg)

        storage_hexoffset = storage_bitoffset // 4
        storage_hexwidth = storage_bitwidth // 4
        storage_hex = hex_string[
            storage_hexoffset : storage_hexoffset + storage_hexwidth
        ]
        storage = unpack_hex(storage_hex, S)

        T_hexoffset = T_bitoffset // 4
        T_hexwidth = T_bitwidth // 4
        T_hex = hex_string[T_hexoffset : T_hexoffset + T_hexwidth]
        T = int(T_hex, base=16)

        return Surface(algo, storage, T)

    def to_hex(
        self: "Surface", *, item_bitwidth: int, T_bitwidth: int = 32
    ) -> str:
        """
        Serialize a Surface object into a hex string representation.

        Serialized data comprises two components:
            1. dstream_T (the number of data items ingested) and
            2. dstream_storage (binary data of data item values).

        The hex layout used is:

            0x...
              ########**************************************************
              ^                                                     ^
              T, length = `T_bitwidth` / 4                          |
                                                                    |
                            storage, length = `item_bitwidth` / 4 * S

        This hex string can be reconstituted by calling `Surface.from_hex()`
        with the following parameters:
            - `T_bitoffset` = 0
            - `T_bitwidth` = `T_bitwidth`
            - `storage_bitoffset` = `storage_bitoffset`
            - `storage_bitwidth` = `self.S * item_bitwidth`

        Parameters
        ----------
        item_bitwidth: int
            Number of storage bits used per data item.
        T_bitwidth: int, default 32
            Number of bits used to store ingested items count (`self.T`).

        See Also
        --------
        Surface.from_hex()
            Deserialize a Surface object from a hex string.
        """
        if T_bitwidth % 4:
            raise NotImplementedError(
                "Hex-unaligned `T_bitwidth` not yet supported"
            )

        if not all(isinstance(x, typing.SupportsInt) for x in self._storage):
            raise NotImplementedError(
                "Non-integer hex serialization not yet implemented"
            )
        if np.asarray(self._storage).min() < 0:
            raise NotImplementedError(
                "Negative integer hex serialization not yet implemented"
            )

        assert self.T >= 0
        if int(self.T).bit_length() > T_bitwidth:
            raise ValueError(
                f"{self.T}= not representable in {T_bitwidth=} bits",
            )
        T_arr = np.asarray(self.T, dtype=np.uint64)
        T_bytes = T_arr.astype(">u8").tobytes()  # big-endian u64
        T_hexwidth = T_bitwidth >> 2
        T_hex = T_bytes.hex()[-T_hexwidth:]

        surface_hex = pack_hex(self._storage, item_bitwidth)

        return T_hex + surface_hex

    def __init__(
        self: "Surface",
        algo: types.ModuleType,
        storage: typing.Union[typing.MutableSequence[_DSurfDataItem], int],
        T: int = 0,
    ) -> None:
        """Initialize a downstream Surface object, which stores hereditary
        stratigraphic annotations using a provided algorithm.

        Parameters
        ----------
        algo: module
            The algorithm used by the surface to determine the placements
            of data items. Should be one of the modules in `downstream.dstream`.
        storage: int or MutableSequence
            The object used to hold any ingested data. If an integer is
            passed in, a list of length `storage` is used. Otherwise, the
            `storage` is used directly. Random access and `__len__` must be
            supported. For example, for efficient storage, a user may pass
            in a NumPy array.
        T: int, default 0
            The initial logical time (i.e. how many items have been ingested)
        """
        self.T = T
        if isinstance(storage, int):
            self._storage = [None] * storage
        else:
            self._storage = storage
        self.algo = algo

    def __repr__(self) -> str:
        return f"Surface(algo={self.algo}, storage={self._storage})"

    def __eq__(self: "Surface", other: typing.Any) -> bool:
        if not isinstance(other, Surface):
            return False
        return (
            self.T == other.T
            and self.algo is other.algo
            and [*self.lookup_zip_items()] == [*other.lookup_zip_items()]
        )

    def __iter__(
        self: "Surface",
    ) -> typing.Iterator[typing.Optional[_DSurfDataItem]]:
        return iter(self._storage)

    def __getitem__(
        self: "Surface", site: int
    ) -> typing.Optional[_DSurfDataItem]:
        return self._storage[site]

    def __deepcopy__(self: "Surface", memo: dict) -> "Surface":
        """Ensure pickle compatibility when algo is a module."""
        new_surf = Surface(self.algo, deepcopy(self._storage, memo))
        new_surf.T = self.T
        return new_surf

    @property
    def S(self: "Surface") -> int:
        return len(self._storage)

    @typing.overload
    def lookup_zip_items(
        self: "Surface",
    ) -> typing.Iterable[typing.Tuple[typing.Optional[int], _DSurfDataItem]]:
        ...

    @typing.overload
    def lookup_zip_items(
        self: "Surface", include_empty: typing.Literal[False]
    ) -> typing.Iterable[typing.Tuple[int, _DSurfDataItem]]:
        ...

    @typing.overload
    def lookup_zip_items(
        self: "Surface", include_empty: bool
    ) -> typing.Iterable[typing.Tuple[typing.Optional[int], _DSurfDataItem]]:
        ...

    def lookup_zip_items(
        self: "Surface", include_empty: bool = True
    ) -> typing.Iterable[typing.Tuple[typing.Optional[int], _DSurfDataItem]]:
        """
        Iterate over ingest times and values of data items in the order they
        appear on the downstream storage, including sites not yet written to.
        """
        res = zip(
            self.lookup(include_empty=True),
            self._storage,
        )
        if not include_empty:
            return ((T, v) for T, v in res if T is not None)
        return res

    def ingest_many(
        self: "Surface",
        n_ingests: int,
        item_getter: typing.Callable[[int], _DSurfDataItem],
        use_relative_time: bool = False,
    ) -> None:
        """Ingest multiple data items.

        Optimizes for the case where large amounts of data is ready to be
        ingested, In such a scenario, we can avoid assigning multiple objects
        to the same site, and simply iterate through sites that would be
        updated after items were ingested.

        Parameters
        ----------
        n_ingests : int
            The number of data to ingest
        item_getter : int -> object
            For a given ingest time within the n_ingests window, should
            return the associated data item.
        use_relative_time : bool, default False
            Use the relative time (i.e. timesteps since current self.T)
            instead of the absolute time as input to `item_getter`
        """

        assert n_ingests >= 0
        if n_ingests == 0:
            return

        assert self.algo.has_ingest_capacity(self.S, self.T + n_ingests - 1)
        for site, (T_1, T_2) in enumerate(
            zip(
                self.lookup(),
                self.algo.lookup_ingest_times(self.S, self.T + n_ingests),
            )
        ):
            if T_1 != T_2 and T_2 is not None:
                self._storage[site] = item_getter(
                    T_2 - self.T if use_relative_time else T_2
                )
        self.T += n_ingests

    def ingest_one(
        self: "Surface", item: _DSurfDataItem
    ) -> typing.Optional[int]:
        """Ingest data item.

        Returns the storage site of the data item, or None if the data item is
        not retained.
        """
        assert self.algo.has_ingest_capacity(self.S, self.T)

        site = self.algo.assign_storage_site(self.S, self.T)
        if site is not None:
            self._storage[site] = item
        self.T += 1
        return site

    @typing.overload
    def lookup(
        self: "Surface",
    ) -> typing.Iterable[typing.Optional[int]]:
        ...

    @typing.overload
    def lookup(
        self: "Surface", include_empty: typing.Literal[False]
    ) -> typing.Iterable[int]:
        ...

    @typing.overload
    def lookup(
        self: "Surface", include_empty: bool
    ) -> typing.Iterable[typing.Optional[int]]:
        ...

    def lookup(
        self: "Surface", include_empty: bool = True
    ) -> typing.Union[
        typing.Iterable[typing.Optional[int]], typing.Iterable[int]
    ]:
        """Iterate over data item ingest times, including null values for
        uninitialized sites."""
        assert len(self._storage) == self.S
        return (
            T
            for T in self.algo.lookup_ingest_times(self.S, self.T)
            if include_empty or T is not None
        )

__deepcopy__(memo)

Ensure pickle compatibility when algo is a module.

Source code in dsurf/_Surface.py
def __deepcopy__(self: "Surface", memo: dict) -> "Surface":
    """Ensure pickle compatibility when algo is a module."""
    new_surf = Surface(self.algo, deepcopy(self._storage, memo))
    new_surf.T = self.T
    return new_surf

__init__(algo, storage, T=0)

Initialize a downstream Surface object, which stores hereditary stratigraphic annotations using a provided algorithm.

Parameters:

Name Type Description Default
algo ModuleType

The algorithm used by the surface to determine the placements of data items. Should be one of the modules in downstream.dstream.

required
storage Union[MutableSequence[_DSurfDataItem], int]

The object used to hold any ingested data. If an integer is passed in, a list of length storage is used. Otherwise, the storage is used directly. Random access and __len__ must be supported. For example, for efficient storage, a user may pass in a NumPy array.

required
T int

The initial logical time (i.e. how many items have been ingested)

0
Source code in dsurf/_Surface.py
def __init__(
    self: "Surface",
    algo: types.ModuleType,
    storage: typing.Union[typing.MutableSequence[_DSurfDataItem], int],
    T: int = 0,
) -> None:
    """Initialize a downstream Surface object, which stores hereditary
    stratigraphic annotations using a provided algorithm.

    Parameters
    ----------
    algo: module
        The algorithm used by the surface to determine the placements
        of data items. Should be one of the modules in `downstream.dstream`.
    storage: int or MutableSequence
        The object used to hold any ingested data. If an integer is
        passed in, a list of length `storage` is used. Otherwise, the
        `storage` is used directly. Random access and `__len__` must be
        supported. For example, for efficient storage, a user may pass
        in a NumPy array.
    T: int, default 0
        The initial logical time (i.e. how many items have been ingested)
    """
    self.T = T
    if isinstance(storage, int):
        self._storage = [None] * storage
    else:
        self._storage = storage
    self.algo = algo

from_hex(hex_string, algo, *, S, storage_bitoffset=None, storage_bitwidth, T_bitoffset=0, T_bitwidth=32) staticmethod

Deserialize a Surface object from a hex string representation.

Hex string representation needs exactly two contiguous parts: 1. dstream_T (which is the number of ingesgted data items), and 2. dstream_storage (which holds all the stored data items).

Data items are deserialized as unsigned integers.

Data in hex string representation should use big-endian byte order.

Parameters:

Name Type Description Default
hex_string str

Hex string to be parsed, which can be uppercase or lowercase.

required
algo ModuleType

Dstream algorithm to use to create the new Surface object.

required
storage_bitoffset Optional[int]

Number of bits before the storage.

None
storage_bitwidth int

Number of bits used for storage.

required
T_bitoffset int

Number of bits before dstream_T.

0
T_bitwidth int

Number of bits used to store dstream_T.

32
S int

Number of buffer sites used to store data items.

Determines how many items are unpacked from storage.

required
See Also

Surface.to_hex() Serialize a Surface object into a hex string.

Source code in dsurf/_Surface.py
@staticmethod
def from_hex(
    hex_string: str,
    algo: types.ModuleType,
    *,
    S: int,
    storage_bitoffset: typing.Optional[int] = None,
    storage_bitwidth: int,
    T_bitoffset: int = 0,
    T_bitwidth: int = 32,
) -> "Surface":
    """
    Deserialize a Surface object from a hex string representation.

    Hex string representation needs exactly two contiguous parts:
    1. dstream_T (which is the number of ingesgted data items), and
    2. dstream_storage (which holds all the stored data items).

    Data items are deserialized as unsigned integers.

    Data in hex string representation should use big-endian byte order.

    Parameters
    ----------
    hex_string: str
        Hex string to be parsed, which can be uppercase or lowercase.
    algo: module
        Dstream algorithm to use to create the new Surface object.
    storage_bitoffset: int, default T_bitwidth
        Number of bits before the storage.
    storage_bitwidth: int
        Number of bits used for storage.
    T_bitoffset: int, default 0
        Number of bits before dstream_T.
    T_bitwidth: int, default 32
        Number of bits used to store dstream_T.
    S: int
        Number of buffer sites used to store data items.

        Determines how many items are unpacked from storage.

    See Also
    --------
    Surface.to_hex()
        Serialize a Surface object into a hex string.
    """
    if storage_bitoffset is None:
        storage_bitoffset = T_bitwidth

    for arg in (
        "storage_bitoffset",
        "storage_bitwidth",
        "T_bitoffset",
        "T_bitwidth",
    ):
        if locals()[arg] % 4:
            msg = f"Hex-unaligned `{arg}` not yet supported"
            raise NotImplementedError(msg)

    storage_hexoffset = storage_bitoffset // 4
    storage_hexwidth = storage_bitwidth // 4
    storage_hex = hex_string[
        storage_hexoffset : storage_hexoffset + storage_hexwidth
    ]
    storage = unpack_hex(storage_hex, S)

    T_hexoffset = T_bitoffset // 4
    T_hexwidth = T_bitwidth // 4
    T_hex = hex_string[T_hexoffset : T_hexoffset + T_hexwidth]
    T = int(T_hex, base=16)

    return Surface(algo, storage, T)

ingest_many(n_ingests, item_getter, use_relative_time=False)

Ingest multiple data items.

Optimizes for the case where large amounts of data is ready to be ingested, In such a scenario, we can avoid assigning multiple objects to the same site, and simply iterate through sites that would be updated after items were ingested.

Parameters:

Name Type Description Default
n_ingests int

The number of data to ingest

required
item_getter int -> object

For a given ingest time within the n_ingests window, should return the associated data item.

required
use_relative_time bool

Use the relative time (i.e. timesteps since current self.T) instead of the absolute time as input to item_getter

False
Source code in dsurf/_Surface.py
def ingest_many(
    self: "Surface",
    n_ingests: int,
    item_getter: typing.Callable[[int], _DSurfDataItem],
    use_relative_time: bool = False,
) -> None:
    """Ingest multiple data items.

    Optimizes for the case where large amounts of data is ready to be
    ingested, In such a scenario, we can avoid assigning multiple objects
    to the same site, and simply iterate through sites that would be
    updated after items were ingested.

    Parameters
    ----------
    n_ingests : int
        The number of data to ingest
    item_getter : int -> object
        For a given ingest time within the n_ingests window, should
        return the associated data item.
    use_relative_time : bool, default False
        Use the relative time (i.e. timesteps since current self.T)
        instead of the absolute time as input to `item_getter`
    """

    assert n_ingests >= 0
    if n_ingests == 0:
        return

    assert self.algo.has_ingest_capacity(self.S, self.T + n_ingests - 1)
    for site, (T_1, T_2) in enumerate(
        zip(
            self.lookup(),
            self.algo.lookup_ingest_times(self.S, self.T + n_ingests),
        )
    ):
        if T_1 != T_2 and T_2 is not None:
            self._storage[site] = item_getter(
                T_2 - self.T if use_relative_time else T_2
            )
    self.T += n_ingests

ingest_one(item)

Ingest data item.

Returns the storage site of the data item, or None if the data item is not retained.

Source code in dsurf/_Surface.py
def ingest_one(
    self: "Surface", item: _DSurfDataItem
) -> typing.Optional[int]:
    """Ingest data item.

    Returns the storage site of the data item, or None if the data item is
    not retained.
    """
    assert self.algo.has_ingest_capacity(self.S, self.T)

    site = self.algo.assign_storage_site(self.S, self.T)
    if site is not None:
        self._storage[site] = item
    self.T += 1
    return site

lookup(include_empty=True)

lookup() -> typing.Iterable[typing.Optional[int]]
lookup(include_empty: typing.Literal[False]) -> typing.Iterable[int]
lookup(include_empty: bool) -> typing.Iterable[typing.Optional[int]]

Iterate over data item ingest times, including null values for uninitialized sites.

Source code in dsurf/_Surface.py
def lookup(
    self: "Surface", include_empty: bool = True
) -> typing.Union[
    typing.Iterable[typing.Optional[int]], typing.Iterable[int]
]:
    """Iterate over data item ingest times, including null values for
    uninitialized sites."""
    assert len(self._storage) == self.S
    return (
        T
        for T in self.algo.lookup_ingest_times(self.S, self.T)
        if include_empty or T is not None
    )

lookup_zip_items(include_empty=True)

lookup_zip_items() -> typing.Iterable[typing.Tuple[typing.Optional[int], _DSurfDataItem]]
lookup_zip_items(include_empty: typing.Literal[False]) -> typing.Iterable[typing.Tuple[int, _DSurfDataItem]]
lookup_zip_items(include_empty: bool) -> typing.Iterable[typing.Tuple[typing.Optional[int], _DSurfDataItem]]

Iterate over ingest times and values of data items in the order they appear on the downstream storage, including sites not yet written to.

Source code in dsurf/_Surface.py
def lookup_zip_items(
    self: "Surface", include_empty: bool = True
) -> typing.Iterable[typing.Tuple[typing.Optional[int], _DSurfDataItem]]:
    """
    Iterate over ingest times and values of data items in the order they
    appear on the downstream storage, including sites not yet written to.
    """
    res = zip(
        self.lookup(include_empty=True),
        self._storage,
    )
    if not include_empty:
        return ((T, v) for T, v in res if T is not None)
    return res

to_hex(*, item_bitwidth, T_bitwidth=32)

Serialize a Surface object into a hex string representation.

Serialized data comprises two components: 1. dstream_T (the number of data items ingested) and 2. dstream_storage (binary data of data item values).

The hex layout used is:

0x...
  ########**************************************************
  ^                                                     ^
  T, length = `T_bitwidth` / 4                          |
                                                        |
                storage, length = `item_bitwidth` / 4 * S

This hex string can be reconstituted by calling Surface.from_hex() with the following parameters: - T_bitoffset = 0 - T_bitwidth = T_bitwidth - storage_bitoffset = storage_bitoffset - storage_bitwidth = self.S * item_bitwidth

Parameters:

Name Type Description Default
item_bitwidth int

Number of storage bits used per data item.

required
T_bitwidth int

Number of bits used to store ingested items count (self.T).

32
See Also

Surface.from_hex() Deserialize a Surface object from a hex string.

Source code in dsurf/_Surface.py
def to_hex(
    self: "Surface", *, item_bitwidth: int, T_bitwidth: int = 32
) -> str:
    """
    Serialize a Surface object into a hex string representation.

    Serialized data comprises two components:
        1. dstream_T (the number of data items ingested) and
        2. dstream_storage (binary data of data item values).

    The hex layout used is:

        0x...
          ########**************************************************
          ^                                                     ^
          T, length = `T_bitwidth` / 4                          |
                                                                |
                        storage, length = `item_bitwidth` / 4 * S

    This hex string can be reconstituted by calling `Surface.from_hex()`
    with the following parameters:
        - `T_bitoffset` = 0
        - `T_bitwidth` = `T_bitwidth`
        - `storage_bitoffset` = `storage_bitoffset`
        - `storage_bitwidth` = `self.S * item_bitwidth`

    Parameters
    ----------
    item_bitwidth: int
        Number of storage bits used per data item.
    T_bitwidth: int, default 32
        Number of bits used to store ingested items count (`self.T`).

    See Also
    --------
    Surface.from_hex()
        Deserialize a Surface object from a hex string.
    """
    if T_bitwidth % 4:
        raise NotImplementedError(
            "Hex-unaligned `T_bitwidth` not yet supported"
        )

    if not all(isinstance(x, typing.SupportsInt) for x in self._storage):
        raise NotImplementedError(
            "Non-integer hex serialization not yet implemented"
        )
    if np.asarray(self._storage).min() < 0:
        raise NotImplementedError(
            "Negative integer hex serialization not yet implemented"
        )

    assert self.T >= 0
    if int(self.T).bit_length() > T_bitwidth:
        raise ValueError(
            f"{self.T}= not representable in {T_bitwidth=} bits",
        )
    T_arr = np.asarray(self.T, dtype=np.uint64)
    T_bytes = T_arr.astype(">u8").tobytes()  # big-endian u64
    T_hexwidth = T_bitwidth >> 2
    T_hex = T_bytes.hex()[-T_hexwidth:]

    surface_hex = pack_hex(self._storage, item_bitwidth)

    return T_hex + surface_hex

Dataframe API

Import as

from downstream import dataframe

dataframe.explode_lookup_packed

dataframe._explode_lookup_packed.explode_lookup_packed(df, *, calc_Tbar_argv=False, value_type, result_schema='coerce')

Explode downstream-curated data from hexidecimal serialization of downstream buffers and counters to one-data-item-per-row, applying downstream lookup to identify origin time Tbar of each item.

Parameters:

Name Type Description Default
df DataFrame

DataFrame containing packed data to be exploded.

required
calc_Tbar_argv bool

Include column indicating sorted order of Tbar values within each buffer.

False
value_type (hex, uint64, uint32, uint16, uint8)

Type of the packed data values. Determines how the packed data is interpreted.

'hex'
result_schema (coerce, relax, shrink)

Schema for the resulting DataFrame. Determines how the output DataFrame is structured and what types are used.

'coerce'

Returns:

Type Description
DataFrame

DataFrame with one row per data item, containing the original data and the corresponding Tbar value.

Source code in dataframe/_explode_lookup_packed.py
def explode_lookup_packed(
    df: pl.DataFrame,
    *,
    calc_Tbar_argv: bool = False,
    value_type: typing.Literal["hex", "uint64", "uint32", "uint16", "uint8"],
    result_schema: typing.Literal["coerce", "relax", "shrink"] = "coerce",
) -> pl.DataFrame:
    """Explode downstream-curated data from hexidecimal serialization of
    downstream buffers and counters to one-data-item-per-row, applying
    downstream lookup to identify origin time `Tbar` of each item.

    Parameters
    ----------
    df : pl.DataFrame
        DataFrame containing packed data to be exploded.
    calc_Tbar_argv : bool, default False
        Include column indicating sorted order of `Tbar` values within each
        buffer.
    value_type : {'hex', 'uint64', 'uint32', 'uint16', 'uint8'}
        Type of the packed data values. Determines how the packed data is
        interpreted.
    result_schema : {'coerce', 'relax', 'shrink'}, default 'coerce'
        Schema for the resulting DataFrame. Determines how the output DataFrame
        is structured and what types are used.

    Returns
    -------
    pl.DataFrame
        DataFrame with one row per data item, containing the original data and
        the corresponding `Tbar` value.
    """
    df = unpack_data_packed(df, result_schema=result_schema)
    return explode_lookup_unpacked(
        df,
        calc_Tbar_argv=calc_Tbar_argv,
        result_schema=result_schema,
        value_type=value_type,
    )

dataframe.explode_lookup_unpacked

dataframe._explode_lookup_unpacked.explode_lookup_unpacked(df, *, calc_Tbar_argv=False, value_type, result_schema='coerce')

Explode downstream-curated data from one-buffer-per-row (with each buffer containing multiple data items) to one-data-item-per-row, applying downstream lookup to identify origin time Tbar of each item.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing unpacked data with required columns, one row per dstream buffer.

Required schema:

  • 'dstream_algo' : pl.Categorical
    • Name of downstream curation algorithm used
    • e.g., 'dstream.steady_algo'
  • 'dstream_S' : pl.UInt32
    • Capacity of dstream buffer, in number of data items.
  • 'dstream_T' : pl.UInt64
    • Logical time elapsed (number of elapsed data items in stream).
  • 'dstream_storage_hex' : pl.String
    • Raw dstream buffer binary data, containing packed data items.
    • Represented as a hexadecimal string.

Optional schema:

  • 'downstream_version' : pl.Categorical
    • Version of downstream library used to curate data items.
  • 'dstream_data_id' : pl.UInt64
    • Unique identifier for each data item.
    • If not provided, row number will be used as identifier.
  • 'downstream_exclude_exploded' : pl.Boolean
    • Should row be dropped after exploding unpacked data?
  • 'downstream_validate_exploded' : pl.String, polars expression
    • Polars expression to validate exploded data.

Additional user-defined columns will be forwarded to the output DataFrame.

required
calc_Tbar_argv bool

Should the algorithm calculate the argsort of ingest times for each buffer?

False
value_type (hex, uint64, uint32, uint16, uint8)

The desired data type for the 'dstream_value' column in the output DataFrame.

Note that 'hex' is not yet supported.

'hex'
result_schema Literal[coerce, relax, shrink]

How should dtypes in the output DataFrame be handled? - 'coerce' : cast all columns to output schema. - 'relax' : keep all columns as-is. - 'shrink' : cast columns to smallest possible types.

'coerce'

Returns:

Type Description
DataFrame

A DataFrame with exploded data and extracted values, one row per data item from the input dstream buffers.

Output schema:

  • 'dstream_data_id' : pl.UInt64
    • Identifier for dstream buffer that data item is from.
  • 'dstream_Tbar' : pl.UInt64
    • Logical position of data item in stream (number of prior data items).
  • 'dstream_T' : pl.UInt64
    • Logical time elapsed (number of elapsed data items in stream).
  • 'dstream_value' : pl.String or specified numeric type
    • Data item content, format depends on 'value_type' argument.
  • 'dstream_value_bitwidth' : pl.UInt32
    • Size of 'dstream_value' in bits.

User-defined columns are NOT forwarded from the unpacked dataframe. To include additional columns, join the output DataFrame with the original input DataFrame.

Raises:

Type Description
NotImplementedError
  • If 'dstream_value_bitwidth' is greater than 64 or equal to 2 or 3.
  • If 'dstream_value_bitwidth' is not consistent across all data items.
  • If 'dstream_S' is not consistent across all dstream buffers.
  • If buffers aren't filled (i.e., 'dstream_T' < 'dstream_S').
  • If multiple dstream algorithms are present in the input DataFrame.
  • If 'value_type' is 'hex'.
ValeError

If any of the required columns are missing from the input DataFrame.

See Also

unpack_data_packed : Preproccessing step, converts data with downstream buffer and counter serialized into a single hexadecimal string into input format for this function.

Source code in dataframe/_explode_lookup_unpacked.py
def explode_lookup_unpacked(
    df: pl.DataFrame,
    *,
    calc_Tbar_argv: bool = False,
    value_type: typing.Literal["hex", "uint64", "uint32", "uint16", "uint8"],
    result_schema: typing.Literal["coerce", "relax", "shrink"] = "coerce",
) -> pl.DataFrame:
    """Explode downstream-curated data from one-buffer-per-row (with each
    buffer containing multiple data items) to one-data-item-per-row, applying
    downstream lookup to identify origin time `Tbar` of each item.

    Parameters
    ----------
    df : pl.DataFrame
        The input DataFrame containing unpacked data with required columns, one
        row per dstream buffer.

        Required schema:

        - 'dstream_algo' : pl.Categorical
            - Name of downstream curation algorithm used
            - e.g., 'dstream.steady_algo'
        - 'dstream_S' : pl.UInt32
            - Capacity of dstream buffer, in number of data items.
        - 'dstream_T' : pl.UInt64
            - Logical time elapsed (number of elapsed data items in stream).
        - 'dstream_storage_hex' : pl.String
            - Raw dstream buffer binary data, containing packed data items.
            - Represented as a hexadecimal string.

        Optional schema:

        - 'downstream_version' : pl.Categorical
            - Version of downstream library used to curate data items.
        - 'dstream_data_id' : pl.UInt64
            - Unique identifier for each data item.
            - If not provided, row number will be used as identifier.
        - 'downstream_exclude_exploded' : pl.Boolean
            - Should row be dropped after exploding unpacked data?
        - 'downstream_validate_exploded' : pl.String, polars expression
            - Polars expression to validate exploded data.

        Additional user-defined columns will be forwarded to the output
        DataFrame.

    calc_Tbar_argv : bool, default False
        Should the algorithm calculate the argsort of ingest times for each
        buffer?

    value_type : {'hex', 'uint64', 'uint32', 'uint16', 'uint8'}
        The desired data type for the 'dstream_value' column in the output
        DataFrame.

        Note that 'hex' is not yet supported.

    result_schema : Literal['coerce', 'relax', 'shrink'], default 'coerce'
        How should dtypes in the output DataFrame be handled?
        - 'coerce' : cast all columns to output schema.
        - 'relax' : keep all columns as-is.
        - 'shrink' : cast columns to smallest possible types.

    Returns
    -------
    pl.DataFrame
        A DataFrame with exploded data and extracted values, one row per data
        item from the input dstream buffers.

        Output schema:

        - 'dstream_data_id' : pl.UInt64
            - Identifier for dstream buffer that data item is from.
        - 'dstream_Tbar' : pl.UInt64
            - Logical position of data item in stream (number of prior data
              items).
        - 'dstream_T' : pl.UInt64
            - Logical time elapsed (number of elapsed data items in stream).
        - 'dstream_value' : pl.String or specified numeric type
            - Data item content, format depends on 'value_type' argument.
        - 'dstream_value_bitwidth' : pl.UInt32
            - Size of 'dstream_value' in bits.

        User-defined columns are NOT forwarded from the unpacked dataframe. To
        include additional columns, join the output DataFrame with the original
        input DataFrame.

    Raises
    ------
    NotImplementedError
        - If 'dstream_value_bitwidth' is greater than 64 or equal to 2 or 3.
        - If 'dstream_value_bitwidth' is not consistent across all data items.
        - If 'dstream_S' is not consistent across all dstream buffers.
        - If buffers aren't filled (i.e., 'dstream_T' < 'dstream_S').
        - If multiple dstream algorithms are present in the input DataFrame.
        - If 'value_type' is 'hex'.
    ValeError
        If any of the required columns are missing from the input DataFrame.

    See Also
    --------
    unpack_data_packed :
        Preproccessing step, converts data with downstream buffer and counter
        serialized into a single hexadecimal string into input format for this
        function.
    """
    _check_df(df)
    value_dtype = _get_value_dtype(value_type)

    if df.lazy().limit(1).collect().is_empty():
        return _make_empty(value_dtype)

    dstream_S = df.lazy().select("dstream_S").limit(1).collect().item()
    dstream_algo = df.lazy().select("dstream_algo").limit(1).collect().item()
    dstream_algo = eval(dstream_algo, {"dstream": dstream})
    num_records = df.lazy().select(pl.len()).collect().item()
    num_items = num_records * dstream_S

    logging.info("begin explode_lookup_unpacked")
    logging.info(" - prepping data...")
    df = _prep_data(df, num_records=num_records, dstream_S=dstream_S)
    _check_bitwidths(df)

    logging.info(" - exploding dataframe...")
    df_long = df.drop("dstream_storage_hex").select(
        pl.all().gather(np.repeat(np.arange(num_records), dstream_S)),
    )

    logging.info(" - unpacking hex strings...")
    df_long = _unpack_hex_strings(
        df, df_long, num_items=num_items, value_dtype=value_dtype
    )

    logging.info(" - looking up ingest times...")
    df_long = _lookup_ingest_times(
        df,
        df_long,
        calc_Tbar_argv=calc_Tbar_argv,
        lookup_op=dstream_algo.lookup_ingest_times_batched,
        dstream_S=dstream_S,
    )

    if "downstream_validate_exploded" in df_long:
        logging.info(" - evaluating `downstream_validate_unpacked` exprs...")
        df_long = _perform_validation(df_long)

    if "downstream_exclude_exploded" in df_long:
        logging.info(" - dropping excluded rows...")
        df_long = _drop_excluded_rows(df_long)

    logging.info(" - finalizing result schema")
    df_long = _finalize_result_schema(
        df_long, result_schema=result_schema, value_dtype=value_dtype
    )

    logging.info("explode_lookup_unpacked complete")
    return df_long

dataframe.unpack_data_packed

dataframe._unpack_data_packed.unpack_data_packed(df, *, result_schema='coerce')

Unpack data with dstream buffer and counter serialized into a single hexadecimal data field.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame containing packed data with required columns, one row per dstream buffer.

Required schema:

  • 'data_hex' : pl.String
    • Raw binary data, with serialized dstream buffer and counter.
    • Represented as a hexadecimal string.
  • 'dstream_algo' : pl.Categorical
    • Name of downstream curation algorithm used.
    • e.g., 'dstream.steady_algo'
  • 'dstream_storage_bitoffset' : pl.UInt64
    • Position of dstream buffer field in 'data_hex'.
  • 'dstream_storage_bitwidth' : pl.UInt64
    • Size of dstream buffer field in 'data_hex'.
  • 'dstream_T_bitoffset' : pl.UInt64
    • Position of dstream counter field in 'data_hex'.
  • 'dstream_T_bitwidth' : pl.UInt64
    • Size of dstream counter field in 'data_hex'.
  • 'dstream_S' : pl.UInt32
    • Capacity of dstream buffer, in number of data items.

Optional schema:

  • 'downstream_version' : pl.Categorical
    • Version of downstream library used to curate data items.
  • 'downstream_exclude_exploded' : pl.Boolean
    • Should row be dropped after exploding unpacked data?
  • 'downstream_exclude_unpacked' : pl.Boolean
    • Should row be dropped after unpacking packed data?
  • 'downstream_validate_exploded' : pl.String, polars expression
    • Polars expression to validate exploded data.
  • 'downstream_validate_unpacked' : pl.String, polars expression
    • Polars expression to validate unpacked data.
required
result_schema Literal[coerce, relax, shrink]

How should dtypes in the output DataFrame be handled?

  • 'coerce' : cast all columns to output schema.
  • 'relax' : keep all columns as-is.
  • 'shrink' : cast columns to smallest possible types.
'coerce'

Returns:

Type Description
DataFrame

Processed DataFrame with unpacked and decoded data fields, one row per dstream buffer

Output schema: - 'dstream_algo' : pl.Categorical - Name of downstream curation algorithm used. - e.g., 'dstream.steady_algo' - 'dstream_data_id' : pl.UInt64 - Row index identifier for dstream buffer. - 'dstream_S' : pl.UInt32 - Capacity of dstream buffer, in number of data items. - 'dstream_T' : pl.UInt64 - Logical time elapsed (number of elapsed data items in stream). - 'dstream_storage_hex' : pl.String - Raw dstream buffer binary data, containing packed data items. - Represented as a hexadecimal string.

User-defined columns and 'downstream_version' will be forwarded from the input DataFrame.

Raises:

Type Description
NotImplementedError

If any of the bit offset or bit width columns are not hex-aligned (i.e., not multiples of 4 bits).

ValueError

If any of the required columns are missing from the input DataFrame.

See Also

downstream.dataframe.explode_lookup_unpacked : Explodes unpacked buffers into individual constituent data items.

Source code in dataframe/_unpack_data_packed.py
def unpack_data_packed(
    df: pl.DataFrame,
    *,
    result_schema: typing.Literal["coerce", "relax", "shrink"] = "coerce",
) -> pl.DataFrame:
    """Unpack data with dstream buffer and counter serialized into a single
    hexadecimal data field.

    Parameters
    ----------
    df : pl.DataFrame
        The input DataFrame containing packed data with required columns, one
        row per dstream buffer.

        Required schema:

        - 'data_hex' : pl.String
            - Raw binary data, with serialized dstream buffer and counter.
            - Represented as a hexadecimal string.
        - 'dstream_algo' : pl.Categorical
            - Name of downstream curation algorithm used.
            - e.g., 'dstream.steady_algo'
        - 'dstream_storage_bitoffset' : pl.UInt64
            - Position of dstream buffer field in 'data_hex'.
        - 'dstream_storage_bitwidth' : pl.UInt64
            - Size of dstream buffer field in 'data_hex'.
        - 'dstream_T_bitoffset' : pl.UInt64
            - Position of dstream counter field in 'data_hex'.
        - 'dstream_T_bitwidth' : pl.UInt64
            - Size of dstream counter field in 'data_hex'.
        - 'dstream_S' : pl.UInt32
            - Capacity of dstream buffer, in number of data items.

        Optional schema:

        - 'downstream_version' : pl.Categorical
            - Version of downstream library used to curate data items.
        - 'downstream_exclude_exploded' : pl.Boolean
            - Should row be dropped after exploding unpacked data?
        - 'downstream_exclude_unpacked' : pl.Boolean
            - Should row be dropped after unpacking packed data?
        - 'downstream_validate_exploded' : pl.String, polars expression
            - Polars expression to validate exploded data.
        - 'downstream_validate_unpacked' : pl.String, polars expression
            - Polars expression to validate unpacked data.

    result_schema : Literal['coerce', 'relax', 'shrink'], default 'coerce'
        How should dtypes in the output DataFrame be handled?

        - 'coerce' : cast all columns to output schema.
        - 'relax' : keep all columns as-is.
        - 'shrink' : cast columns to smallest possible types.

    Returns
    -------
    pl.DataFrame
        Processed DataFrame with unpacked and decoded data fields, one row per
        dstream buffer

        Output schema:
            - 'dstream_algo' : pl.Categorical
                - Name of downstream curation algorithm used.
                - e.g., 'dstream.steady_algo'
            - 'dstream_data_id' : pl.UInt64
                - Row index identifier for dstream buffer.
            - 'dstream_S' : pl.UInt32
                - Capacity of dstream buffer, in number of data items.
            - 'dstream_T' : pl.UInt64
                - Logical time elapsed (number of elapsed data items in stream).
            - 'dstream_storage_hex' : pl.String
                - Raw dstream buffer binary data, containing packed data items.
                - Represented as a hexadecimal string.

        User-defined columns and 'downstream_version' will be forwarded from
        the input DataFrame.

    Raises
    ------
    NotImplementedError
        If any of the bit offset or bit width columns are not hex-aligned
        (i.e., not multiples of 4 bits).
    ValueError
        If any of the required columns are missing from the input DataFrame.


    See Also
    --------
    downstream.dataframe.explode_lookup_unpacked :
        Explodes unpacked buffers into individual constituent data items.
    """
    logging.info("begin explode_lookup_unpacked")
    logging.info(" - prepping data...")

    _check_df(df)
    if df.lazy().limit(1).collect().is_empty():
        return _make_empty()

    df = df.cast({"data_hex": pl.String, "dstream_algo": pl.Categorical})

    logging.info(" - calculating offsets...")
    df = _calculate_offsets(df)

    if "dstream_data_id" not in df.lazy().collect_schema().names():
        df = df.with_row_index("dstream_data_id")

    logging.info(" - extracting T and storage_hex from data_hex...")
    df = _extract_from_data_hex(df)

    if "downstream_validate_unpacked" in df:
        logging.info(" - evaluating `downstream_validate_unpacked` exprs...")
        df = _perform_validations(df)

    if "downstream_exclude_unpacked" in df:
        logging.info(" - dropping excluded rows...")
        df = _drop_excluded_rows(df)

    logging.info(" - finalizing result schema...")
    df = _finalize_result_schema(df, result_schema)

    logging.info("unpack_data_packed complete")
    return df

Command Line Interface

For information on available command line interface (CLI) commands

python3 -m downstream --help