Python API Reference
Contents
- Algorithm API Example:
dstream.steady_algo
- Additional algorithms in
dstream
- Container API:
dsurf.Surface
- Dataframe API
Algorithm API Example: dstream.steady_algo
Implements the steady downsampling algorithm, which retains items in a uniform distribution across elapsed stream history.
Import as
Or, alternately
steady_algo.assign_storage_site
steady_algo._steady_assign_storage_site.steady_assign_storage_site(S, T)
Site selection algorithm for steady curation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
int
|
Buffer size. Must be a power of two. |
required |
T
|
int
|
Current logical time. |
required |
Returns:
Type | Description |
---|---|
Optional[int]
|
Selected site, if any. |
Raises:
Type | Description |
---|---|
ValueError
|
If insufficient ingest capacity is available. See |
Source code in steady_algo/_steady_assign_storage_site.py
steady_algo.assign_storage_site_batched
steady_algo._steady_assign_storage_site_batched.steady_assign_storage_site_batched(S, T)
Site selection algorithm for steady curation.
Vectorized implementation for bulk calculations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
Union[ndarray, int]
|
Buffer size. Must be a power of two, <= 2**52. |
required |
T
|
Union[ndarray, int]
|
Current logical time. Must be <= 2**52. |
required |
Returns:
Type | Description |
---|---|
array
|
Selected site, if any. Otherwise, S. |
Source code in steady_algo/_steady_assign_storage_site_batched.py
steady_algo.get_ingest_capacity
steady_algo._steady_get_ingest_capacity.steady_get_ingest_capacity(S)
How many data item ingestions does this algorithm support?
Returns None if the number of supported ingestions is unlimited.
See Also
has_ingest_capacity : Does this algorithm have the capacity to ingest n
data items?
Source code in steady_algo/_steady_get_ingest_capacity.py
steady_algo.has_ingest_capacity
steady_algo._steady_has_ingest_capacity.steady_has_ingest_capacity(S, T)
Does this algorithm have the capacity to ingest a data item at logical time T?
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
int
|
The number of buffer sites available. |
required |
T
|
int
|
Queried logical time. |
required |
Returns:
Type | Description |
---|---|
bool
|
|
See Also
get_ingest_capacity : How many data item ingestions does this algorithm support? has_ingest_capacity_batched : Numpy-friendly implementation.
Source code in steady_algo/_steady_has_ingest_capacity.py
steady_algo.has_ingest_capacity_batched
steady_algo._steady_has_ingest_capacity_batched.steady_has_ingest_capacity_batched(S, T)
Does this algorithm have the capacity to ingest a data item at logical time T?
Vectorized implementation for bulk calculations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
int or ndarray
|
The number of buffer sites available. |
required |
T
|
int or ndarray
|
Queried logical time. |
required |
Returns:
Type | Description |
---|---|
np.ndarray of bool
|
True if ingest capacity is sufficient, False otherwise. |
See Also
get_ingest_capacity : How many data item ingestions does this algorithm support?
Source code in steady_algo/_steady_has_ingest_capacity_batched.py
steady_algo.lookup_ingest_times
steady_algo._steady_lookup_ingest_times.steady_lookup_ingest_times(S, T)
Ingest time lookup algorithm for steady curation.
Lazy implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
int
|
Buffer size. Must be a power of two. |
required |
T
|
int
|
Current logical time. |
required |
Yields:
Type | Description |
---|---|
Optional[int]
|
Ingest time of stored item at buffer sites in index order. |
Source code in steady_algo/_steady_lookup_ingest_times.py
steady_algo.lookup_ingest_times_batched
steady_algo._steady_lookup_ingest_times_batched.steady_lookup_ingest_times_batched(S, T, parallel=True)
Ingest time lookup algorithm for steady curation.
Vectorized implementation for bulk calculations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
int
|
Buffer size. Must be a power of two. |
required |
T
|
ndarray
|
One-dimensional array of current logical times. |
required |
parallel
|
bool
|
Should numba be applied to parallelize operations? |
True
|
Returns:
Type | Description |
---|---|
ndarray
|
Ingest time of stored items at buffer sites in index order. Two-dimensional array. Each row corresponds to an entry in T. Contains S columns, each corresponding to buffer sites. |
Source code in steady_algo/_steady_lookup_ingest_times_batched.py
steady_algo.lookup_ingest_times_eager
steady_algo._steady_lookup_ingest_times_eager.steady_lookup_ingest_times_eager(S, T)
Ingest time lookup algorithm for steady curation.
Eager implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
S
|
int
|
Buffer size. Must be a power of two. |
required |
T
|
int
|
Current logical time. |
required |
Returns:
Type | Description |
---|---|
List[int]
|
Ingest time of stored item at buffer sites in index order. |
Source code in steady_algo/_steady_lookup_ingest_times_eager.py
Additional Algorithms in dstream
The following additional algorithms are available in the dstream
module:
circular_algo
implements a simple ring buffer for last n sampling.compressing_algo
implements steady curation via Gunther's compressing circular buffer algorithm.hybrid_0_steady_1_stretched_2_algo
for hybrid downsampling combining steady and stretched algorithms.hybrid_0_steady_1_stretchedxtc_2_algo
for hybrid downsampling combining steady and stretchedxtc algorithms.hybrid_0_steady_1_tilted_2_algo
for hybrid downsampling combining steady and tilted algorithms.hybrid_0_steady_1_tiltedxtc_2_algo
for hybrid downsampling combining steady and tiltedxtc algorithms.primed_algo_*
which fills surface left-to-right before delegating to another algorithm.stretched_algo
for stretched downsampling.stretchedxtc_algo
for stretched downsampling, with infiniteT
domain extension.tilted_algo
for tilted downsampling.tiltedxtc_algo
for stretched downsampling, with infiniteT
domain extension.
These algorithms follow identical API conventions as steady_algo
, as shown above.
Container API: dsurf.Surface
Import as
Or, alternately
dsurf.Surface
dsurf.Surface
Bases: Generic[_DSurfDataItem]
Container orchestrating downstream curation over a fixed-size buffer.
Source code in dsurf/_Surface.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 |
|
__deepcopy__(memo)
Ensure pickle compatibility when algo is a module.
__init__(algo, storage, T=0)
Initialize a downstream Surface object, which stores hereditary stratigraphic annotations using a provided algorithm.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
algo
|
ModuleType
|
The algorithm used by the surface to determine the placements
of data items. Should be one of the modules in |
required |
storage
|
Union[MutableSequence[_DSurfDataItem], int]
|
The object used to hold any ingested data. If an integer is
passed in, a list of length |
required |
T
|
int
|
The initial logical time (i.e. how many items have been ingested) |
0
|
Source code in dsurf/_Surface.py
from_hex(hex_string, algo, *, S, storage_bitoffset=None, storage_bitwidth, T_bitoffset=0, T_bitwidth=32)
staticmethod
Deserialize a Surface object from a hex string representation.
Hex string representation needs exactly two contiguous parts: 1. dstream_T (which is the number of ingesgted data items), and 2. dstream_storage (which holds all the stored data items).
Data items are deserialized as unsigned integers.
Data in hex string representation should use big-endian byte order.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hex_string
|
str
|
Hex string to be parsed, which can be uppercase or lowercase. |
required |
algo
|
ModuleType
|
Dstream algorithm to use to create the new Surface object. |
required |
storage_bitoffset
|
Optional[int]
|
Number of bits before the storage. |
None
|
storage_bitwidth
|
int
|
Number of bits used for storage. |
required |
T_bitoffset
|
int
|
Number of bits before dstream_T. |
0
|
T_bitwidth
|
int
|
Number of bits used to store dstream_T. |
32
|
S
|
int
|
Number of buffer sites used to store data items. Determines how many items are unpacked from storage. |
required |
See Also
Surface.to_hex() Serialize a Surface object into a hex string.
Source code in dsurf/_Surface.py
ingest_many(n_ingests, item_getter, use_relative_time=False)
Ingest multiple data items.
Optimizes for the case where large amounts of data is ready to be ingested, In such a scenario, we can avoid assigning multiple objects to the same site, and simply iterate through sites that would be updated after items were ingested.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
n_ingests
|
int
|
The number of data to ingest |
required |
item_getter
|
int -> object
|
For a given ingest time within the n_ingests window, should return the associated data item. |
required |
use_relative_time
|
bool
|
Use the relative time (i.e. timesteps since current self.T)
instead of the absolute time as input to |
False
|
Source code in dsurf/_Surface.py
ingest_one(item)
Ingest data item.
Returns the storage site of the data item, or None if the data item is not retained.
Source code in dsurf/_Surface.py
lookup(include_empty=True)
Iterate over data item ingest times, including null values for uninitialized sites.
Source code in dsurf/_Surface.py
lookup_zip_items(include_empty=True)
Iterate over ingest times and values of data items in the order they appear on the downstream storage, including sites not yet written to.
Source code in dsurf/_Surface.py
to_hex(*, item_bitwidth, T_bitwidth=32)
Serialize a Surface object into a hex string representation.
Serialized data comprises two components: 1. dstream_T (the number of data items ingested) and 2. dstream_storage (binary data of data item values).
The hex layout used is:
0x...
########**************************************************
^ ^
T, length = `T_bitwidth` / 4 |
|
storage, length = `item_bitwidth` / 4 * S
This hex string can be reconstituted by calling Surface.from_hex()
with the following parameters:
- T_bitoffset
= 0
- T_bitwidth
= T_bitwidth
- storage_bitoffset
= storage_bitoffset
- storage_bitwidth
= self.S * item_bitwidth
Parameters:
Name | Type | Description | Default |
---|---|---|---|
item_bitwidth
|
int
|
Number of storage bits used per data item. |
required |
T_bitwidth
|
int
|
Number of bits used to store ingested items count ( |
32
|
See Also
Surface.from_hex() Deserialize a Surface object from a hex string.
Source code in dsurf/_Surface.py
Dataframe API
Import as
dataframe.explode_lookup_packed
dataframe._explode_lookup_packed.explode_lookup_packed(df, *, calc_Tbar_argv=False, value_type, result_schema='coerce')
Explode downstream-curated data from hexidecimal serialization of
downstream buffers and counters to one-data-item-per-row, applying
downstream lookup to identify origin time Tbar
of each item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
DataFrame containing packed data to be exploded. |
required |
calc_Tbar_argv
|
bool
|
Include column indicating sorted order of |
False
|
value_type
|
(hex, uint64, uint32, uint16, uint8)
|
Type of the packed data values. Determines how the packed data is interpreted. |
'hex'
|
result_schema
|
(coerce, relax, shrink)
|
Schema for the resulting DataFrame. Determines how the output DataFrame is structured and what types are used. |
'coerce'
|
Returns:
Type | Description |
---|---|
DataFrame
|
DataFrame with one row per data item, containing the original data and
the corresponding |
Source code in dataframe/_explode_lookup_packed.py
dataframe.explode_lookup_unpacked
dataframe._explode_lookup_unpacked.explode_lookup_unpacked(df, *, calc_Tbar_argv=False, value_type, result_schema='coerce')
Explode downstream-curated data from one-buffer-per-row (with each
buffer containing multiple data items) to one-data-item-per-row, applying
downstream lookup to identify origin time Tbar
of each item.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
The input DataFrame containing unpacked data with required columns, one row per dstream buffer. Required schema:
Optional schema:
Additional user-defined columns will be forwarded to the output DataFrame. |
required |
calc_Tbar_argv
|
bool
|
Should the algorithm calculate the argsort of ingest times for each buffer? |
False
|
value_type
|
(hex, uint64, uint32, uint16, uint8)
|
The desired data type for the 'dstream_value' column in the output DataFrame. Note that 'hex' is not yet supported. |
'hex'
|
result_schema
|
Literal[coerce, relax, shrink]
|
How should dtypes in the output DataFrame be handled? - 'coerce' : cast all columns to output schema. - 'relax' : keep all columns as-is. - 'shrink' : cast columns to smallest possible types. |
'coerce'
|
Returns:
Type | Description |
---|---|
DataFrame
|
A DataFrame with exploded data and extracted values, one row per data item from the input dstream buffers. Output schema:
User-defined columns are NOT forwarded from the unpacked dataframe. To include additional columns, join the output DataFrame with the original input DataFrame. |
Raises:
Type | Description |
---|---|
NotImplementedError
|
|
ValeError
|
If any of the required columns are missing from the input DataFrame. |
See Also
unpack_data_packed : Preproccessing step, converts data with downstream buffer and counter serialized into a single hexadecimal string into input format for this function.
Source code in dataframe/_explode_lookup_unpacked.py
234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 |
|
dataframe.unpack_data_packed
dataframe._unpack_data_packed.unpack_data_packed(df, *, result_schema='coerce')
Unpack data with dstream buffer and counter serialized into a single hexadecimal data field.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
The input DataFrame containing packed data with required columns, one row per dstream buffer. Required schema:
Optional schema:
|
required |
result_schema
|
Literal[coerce, relax, shrink]
|
How should dtypes in the output DataFrame be handled?
|
'coerce'
|
Returns:
Type | Description |
---|---|
DataFrame
|
Processed DataFrame with unpacked and decoded data fields, one row per dstream buffer Output schema: - 'dstream_algo' : pl.Categorical - Name of downstream curation algorithm used. - e.g., 'dstream.steady_algo' - 'dstream_data_id' : pl.UInt64 - Row index identifier for dstream buffer. - 'dstream_S' : pl.UInt32 - Capacity of dstream buffer, in number of data items. - 'dstream_T' : pl.UInt64 - Logical time elapsed (number of elapsed data items in stream). - 'dstream_storage_hex' : pl.String - Raw dstream buffer binary data, containing packed data items. - Represented as a hexadecimal string. User-defined columns and 'downstream_version' will be forwarded from the input DataFrame. |
Raises:
Type | Description |
---|---|
NotImplementedError
|
If any of the bit offset or bit width columns are not hex-aligned (i.e., not multiples of 4 bits). |
ValueError
|
If any of the required columns are missing from the input DataFrame. |
See Also
downstream.dataframe.explode_lookup_unpacked : Explodes unpacked buffers into individual constituent data items.
Source code in dataframe/_unpack_data_packed.py
199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 |
|
Command Line Interface
For information on available command line interface (CLI) commands