M .hgtags +14 -0
@@ 1379,3 1379,17 @@ bdfa27661d703986807b8c96686df39fb63dcc61
fcfb6b79272df50dae5eaa40ebef43b4cda2e91d cs.lex-20250414
92dd5d8468460cf1abe77cd4635006e109190958 cs.hashutils-20250414
b2df4adfca73c7c2e07a1554147f6711b29c30c9 cs.hashutils-20250414.1
+8733c13b6145d7fc0f2cbbef7958b7c9eb2807dd cs.cmdutils-20250418
+3e5914c3a8b5b2a7aef765d6f05a786b303da535 cs.upd-20250426
+a5f40d37f9586ceced7ccf3b505f9c8df826ef3c cs.py.doc-20250426
+7304d2d51a925cabc4526eade33c1e2ad75afbc3 cs.cmdutils-20250426
+a6a6b84277b251398f221ceae3b03c491c4c4071 cs.testutils-20250426
+99d101c653b68ac7329ea8f9d024263305c0ff79 cs.queues-20250426
+21a1eee3dec4d6c80ad6b1dd3562ca3eb8cfe793 cs.typingutils-20250428
+a083955c73b7d558e06a0dd487786cbff0bdf6eb cs.gimmicks-20250428
+6ad19321bbd3a159d4710c176b5eb91ae66bf51f cs.deco-20250428
+9d391915b1ffac02ce9ed78c27b3b1699b13ceab cs.lex-20250428
+214ffde1edeee37ec98248bd3925ac50ef28a94d cs.buffer-20250428
+0cb4a0fd58d3617506bc11fa40188a6a7472d7da cs.fileutils-20250429
+c1023d7a35158ce797c324a7895f419276e96db8 cs.binary-20250501
+68bea49d74cd2155dd5c414480c86c63e26f8be7 cs.typingutils-20250503
M .ruff.toml +3 -0
@@ 88,12 88,15 @@ ignore = [
"D401", # First line of docstring should be in imperative mood
"D406", # Section name should end with a newline
"D407", # Missing dashed underline after section
+ "D411", # Missing blank line before section
"D412", # No blank lines allowed between a section header and its content
"D413", # Missing blank line after last section
"D415", # First line should end with a period, question mark, or exclamation point
"E731",
"I001", # Import block is un-sorted or un-formatted
"SIM108", # Use ternary operator
+ "SIM117", # Use a single `with` statement with multiple contexts instead of nested `with` statements
+ "T201", # `print` found
"TRY003", # Avoid specifying long messages outside the exception class
"Q000",
"Q002",
M Mykefile +1 -1
@@ 106,7 106,7 @@ rsync = rsync -iJOt --exclude='.*.swp'
_venv:
set -uex \
[ -d $(venv_dir) ] || { \
- ##$(base_python) -m ensurepip \
+ : SKIP $(base_python) -m ensurepip \
$(base_pip) install -U pip wheel build uv \
$(uv) venv --python $(base_python) --seed $(venv_dir) \
}
M TODO-media.txt +2 -0
@@ 1,3 1,5 @@
+mp4 grep for subtitles, mp4 crop to extract a snippet of video
+static JPEG to MJPEG converter/streamer
cs.cdrip: discid tag is a row id, needs to be the libdiscid value
cs.iso14496: a HasSubBoxes mixin for things with .boxes, provides __len__, .length, __iter__ and whatever else
ydl: issue alerts for completed downloads
M TODO.txt +5 -0
@@ 1,3 1,8 @@
+docker-caddy script
+dlog fwd local dlog sock through ssh, provide $DLOG_PIPEPATH through?
+CornuCopyBuffer - is .offset the post buffer size? it should be the logic offset - pre-buffer - but the mp4 parsing suggests that it is not?
+cs.binary as_hexpat() class method for ImHex Hex Editor Hex Patterns, maybe a cs.hexpat modules later
+units - conciser transcribe function names
cs.progress: @auto_progress - rename to @uses_progress?
cs.cmdutils: support --no-thing options with no arg
hashindex: make default rearrange mode mv ?
M bin/incd +3 -2
@@ 26,11 26,12 @@ badopts=
while [ $# -gt 0 ]
do
case $1 in
- -P|-L) pwd_opt=$1; shift ;;
+ -P|-L) pwd_opt=$1 ;;
--) shift; break ;;
- -?*) echo "$cmd: unrecognisedoption: $1" >&2; badopts=1 ;;
+ -?*) echo "$cmd: unrecognised option: $1" >&2; badopts=1 ;;
*) break ;;
esac
+ shift
done
if [ $# = 0 ]
M lib/python/cs/app/beyonwiz/enigma2.py +3 -3
@@ 15,7 15,7 @@ from os.path import basename, isfile as
from icontract import require
-from cs.binary import BinaryMultiStruct
+from cs.binary import BinaryStruct
from cs.buffer import CornuCopyBuffer
from cs.logutils import warning
from cs.pfx import Pfx, pfx_call, pfx_method
@@ 26,10 26,10 @@ from cs.threads import locked_property
from . import _Recording
# an "access point" record from the .ap file
-Enigma2APInfo = BinaryMultiStruct('Enigma2APInfo', '>QQ', 'pts offset')
+Enigma2APInfo = BinaryStruct('Enigma2APInfo', '>QQ', 'pts offset')
# a "cut" record from the .cuts file
-Enigma2Cut = BinaryMultiStruct('Enigma2Cut', '>QL', 'pts type')
+Enigma2Cut = BinaryStruct('Enigma2Cut', '>QL', 'pts type')
class Enigma2(_Recording):
''' Access Enigma2 recordings, such as those used on the Beyonwiz T3, T4 etc devices.
M lib/python/cs/app/beyonwiz/tvwiz.py +3 -3
@@ 15,7 15,7 @@ import struct
from tempfile import NamedTemporaryFile
from typing import Tuple
-from cs.binary import BinaryMultiStruct, BinarySingleStruct
+from cs.binary import BinaryStruct, BinarySingleStruct
from cs.buffer import CornuCopyBuffer
from cs.deco import promote
from cs.fileutils import datafrom
@@ 68,7 68,7 @@ HEADER_DATA_SZ = HDR_EXTINFO_OFF + HDR_E
# uchar unused;
# };
#
-TVWizFileHeader = BinaryMultiStruct(
+TVWizFileHeader = BinaryStruct(
'TVWizFileHeader',
'<5HBBBB',
'hidden1 hidden2 hidden3 hidden4 hidden5 lock media_type in_rec unused',
@@ 86,7 86,7 @@ TVWizFileHeader = BinaryMultiStruct(
# sec
# last_offset
# followed by 8640 fileOff
-class TVWizTSPoint(BinaryMultiStruct(
+class TVWizTSPoint(BinaryStruct(
'TVWizTSPoint',
'<256s256sHHLHHQ',
'service_name_bs0 event_name_bs0 mod_julian_date pad start last sec last_offset',
M lib/python/cs/app/pilfer/cache.py +4 -1
@@ 471,7 471,10 @@ class ContentCache(HasFSPath, MultiOpenM
# remove the old content file if different
old_content_rpath = old_md.get('content_rpath', '')
if old_content_rpath and old_content_rpath != content_rpath:
- pfx_call(os.remove, self.cached_pathto(old_content_rpath))
+ try:
+ pfx_call(os.remove, self.cached_pathto(old_content_rpath))
+ except FileNotFoundError as e:
+ warning("file not present in cache: %s", e)
return md_map
if __name__ == '__main__':
M lib/python/cs/binary.py +1046 -265
@@ 20,33 20,20 @@
* buffer:
an instance of `cs.buffer.CornuCopyBuffer`,
which manages an iterable of bytes-like values
- and has various useful methods;
- it also has a few factory methods to make one from a variety of sources
- such as bytes, iterables, binary files, `mmap`ped files,
- TCP data streams, etc.
+ and has various useful methods for parsing.
* chunk:
a piece of binary data obeying the buffer protocol,
+ i.e. a `collections.abc.Buffer`;
almost always a `bytes` instance or a `memoryview`,
but in principle also things like `bytearray`.
- There are 5 main classes on which an implementor should base their data structures:
- * `BinarySingleStruct`: a factory for classes based
- on a `struct.struct` format string with a single value;
- this builds a `namedtuple` subclass
- * `BinaryMultiStruct`: a factory for classes based
- on a `struct.struct` format string with multiple values;
- this also builds a `namedtuple` subclass
- * `BinarySingleValue`: a base class for subclasses
- parsing and transcribing a single value
- * `BinaryMultiValue`: a base class for subclasses
- parsing and transcribing multiple values
- with no variation
- * `SimpleBinary`: a base class for subclasses
- with custom `.parse` and `.transcribe` methods,
- for structures with variable fields
+ The `CornuCopyBuffer` is the basis for all parsing, as it manages
+ a variety of input sources such as files, memory, sockets etc.
+ It also has a factory methods to make one from a variety of sources
+ such as bytes, iterables, binary files, `mmap`ped files,
+ TCP data streams, etc.
- Any the classes derived from the above inherit all the methods
- of `AbstractBinary`.
+ All the binary classes subclass `AbstractBinary`.
Amongst other things, this means that the binary transcription
can be had simply from `bytes(instance)`,
although there are more transcription methods provided
@@ 54,34 41,155 @@
It also means that all classes have `parse`* and `scan`* methods
for parsing binary data streams.
+ The `.parse(cls,bfr)` class method reads binary data from a
+ buffer and returns an instance.
+
+ The `.transcribe(self)` method may be a regular function or a
+ generator which returns or yields things which can be transcribed
+ as bytes via the `flatten` function.
+ See the `AbstractBinary.transcribe` docstring for specifics; this might:
+ - return a `bytes`
+ - return an ASCII string
+ - be a generator which yields various values such as bytes,
+ ASCII strings, other `AbstractBinary` instances such as each
+ field (which get transcribed in turn) or an iterable of these
+ things
+
+ There are 6 main ways an implementor might base their data structures:
+ * `BinaryStruct`: a factory for classes based
+ on a `struct.struct` format string with multiple values;
+ this also builds a `namedtuple` subclass
+ * `@binclass`: a dataclass-like specification of a binary structure
+ * `BinarySingleValue`: a base class for subclasses
+ parsing and transcribing a single value, such as `UInt8` or
+ `BinaryUTF8NUL`
+ * `BinaryMultiValue`: a factory for subclasses
+ parsing and transcribing multiple values
+ with no variation
+ * `SimpleBinary`: a base class for subclasses
+ with custom `.parse` and `.transcribe` methods,
+ for structures with variable fields;
+ this makes a `SimpleNamespace` subclass
+
+ These can all be mixed as appropriate to your needs.
+
You can also instantiate objects directly;
there's no requirement for the source information to be binary.
There are several presupplied subclasses for common basic types
such as `UInt32BE` (an unsigned 32 bit big endian integer).
- # Two Examples
+ ## Some Examples
+
+ ### A `BinaryStruct`, from `cs.iso14496`
+
+ A simple `struct` style definitiion for 9 longs:
- Here are two examples drawn from `cs.iso14496`.
- Like all the `Binary*` subclasses, parsing an instance from a
+ Matrix9Long = BinaryStruct(
+ 'Matrix9Long', '>lllllllll', 'v0 v1 v2 v3 v4 v5 v6 v7 v8'
+ )
+
+ Per the `struct.struct` format string, this parses 9 big endian longs
+ and returns a `namedtuple` with 9 fields.
+ Like all the `AbstractBinary` subclasses, parsing an instance from a
stream can be done like this:
m9 = Matrix9Long.parse(bfr)
print("m9.v3", m9.v3)
- edit_list = ELSTBoxBody.parse(bfr)
- print("edit list: entry_count =", edit_list.entry_count)
-
and writing its binary form to a file like this:
f.write(bytes(m9))
- f.write(bytes(edit_list))
+
+ ### A `@binclass`, also from `cs.iso14496`
+
+ For reasons to do with the larger MP4 parser this uses an extra
+ decorator `@boxbodyclass` which is just a shim for the `@binclass`
+ decorator with an addition step.
+
+ @boxbodyclass
+ class FullBoxBody2(BoxBody):
+ """ A common extension of a basic `BoxBody`, with a version and flags field.
+ ISO14496 section 4.2.
+ """
+ version: UInt8
+ flags0: UInt8
+ flags1: UInt8
+ flags2: UInt8
- A simple `struct` style definitiion for 9 longs:
+ @property
+ def flags(self):
+ """ The flags value, computed from the 3 flag bytes.
+ """
+ return (self.flags0 << 16) | (self.flags1 << 8) | self.flags2
+
+ This has 4 fields, each an unsigned 8 bit value (one bytes),
+ and a property `.flags` which is the overall flags value for
+ the box header.
+
+ You should look at the source code for the `TKHDBoxBody` from
+ that module for an example of a `@binclass` with a variable
+ collection of fields based on an earlier `version` field value.
+
+ ### A `BinarySingleValue`, the `BSUInt` from this module
+
+ The `BSUint` transcribes an unsigned integer of arbitrary size
+ as a big endian variable sizes sequence of bytes.
+ I understand this is the same scheme MIDI uses.
+
+ You can define a `BinarySingleValue` with conventional `.parse()`
+ and `.transribe()` methods but it is usually expedient to instead
+ provide `.parse_value()` and `transcribe_value()` methods, which
+ return or transcibe the core value (the unsigned integer in
+ this case).
- Matrix9Long = BinaryMultiStruct(
- 'Matrix9Long', '>lllllllll', 'v0 v1 v2 v3 v4 v5 v6 v7 v8'
- )
+ class BSUInt(BinarySingleValue, value_type=int):
+ """ A binary serialised unsigned `int`.
+
+ This uses a big endian byte encoding where continuation octets
+ have their high bit set. The bits contributing to the value
+ are in the low order 7 bits.
+ """
+
+ @staticmethod
+ def parse_value(bfr: CornuCopyBuffer) -> int:
+ """ Parse an extensible byte serialised unsigned `int` from a buffer.
+
+ Continuation octets have their high bit set.
+ The value is big-endian.
+
+ This is the go for reading from a stream. If you already have
+ a bare bytes instance then the `.decode_bytes` static method
+ is probably most efficient;
+ there is of course the usual `AbstractBinary.parse_bytes`
+ but that constructs a buffer to obtain the individual bytes.
+ """
+ n = 0
+ b = 0x80
+ while b & 0x80:
+ b = bfr.byte0()
+ n = (n << 7) | (b & 0x7f)
+ return n
+
+ # pylint: disable=arguments-renamed
+ @staticmethod
+ def transcribe_value(n):
+ """ Encode an unsigned int as an entensible byte serialised octet
+ sequence for decode. Return the bytes object.
+ """
+ bs = [n & 0x7f]
+ n >>= 7
+ while n > 0:
+ bs.append(0x80 | (n & 0x7f))
+ n >>= 7
+ return bytes(reversed(bs))
+
+ ### A `BinaryMultiValue`
+
+ A `BinaryMultiValue` s a class factory for making a multi field
+ `AbstractBinary` from variable field descriptions.
+ You're probably better off using `@binclass` these days.
+ See the `BinaryMultiValue` docstring for details and an example.
An MP4 ELST box:
@@ 89,11 197,11 @@
""" An 'elst' Edit List FullBoxBody - section 8.6.6.
"""
- V0EditEntry = BinaryMultiStruct(
+ V0EditEntry = BinaryStruct(
'ELSTBoxBody_V0EditEntry', '>Llhh',
'segment_duration media_time media_rate_integer media_rate_fraction'
)
- V1EditEntry = BinaryMultiStruct(
+ V1EditEntry = BinaryStruct(
'ELSTBoxBody_V1EditEntry', '>Qqhh',
'segment_duration media_time media_rate_integer media_rate_fraction'
)
@@ 130,28 238,41 @@
flavours of edit entry structure and a property to return the
suitable class based on the version field. The `parse_fields()`
method is called from the base `BoxBody` class' `parse()` method
- to collect addition fields for any box. For this box it collectsa
- 32 bit `entry_count` and then a list of that many edit entries.
+ to collect addition fields for any box. For this box it collects
+ a 32 bit `entry_count` and then a list of that many edit entries.
The transcription yields corresponding values.
-
- # Module Contents
'''
from abc import ABC, abstractmethod, abstractclassmethod
from collections import namedtuple
+from dataclasses import dataclass, fields
+from inspect import signature, Signature
from struct import Struct # pylint: disable=no-name-in-module
import sys
from types import SimpleNamespace
-from typing import Any, Callable, List, Mapping, Optional, Tuple, Union
+from typing import (
+ get_args,
+ get_origin,
+ Any,
+ Callable,
+ Iterable,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+ Union,
+)
+
+from typeguard import check_type, typechecked
from cs.buffer import CornuCopyBuffer
-from cs.deco import OBSOLETE, promote, strable
-from cs.gimmicks import warning, debug
-from cs.lex import cropped, cropped_repr, typed_str
+from cs.deco import OBSOLETE, decorator, promote, Promotable, strable
+from cs.gimmicks import Buffer, debug, warning
+from cs.lex import cropped, cropped_repr, r, stripped_dedent, typed_str
from cs.pfx import Pfx, pfx, pfx_method, pfx_call
from cs.seq import Seq
-__version__ = '20240630-post'
+__version__ = '20250501-post'
DISTINFO = {
'keywords': ["python3"],
@@ 167,28 288,36 @@ DISTINFO = {
'cs.lex',
'cs.pfx',
'cs.seq',
+ 'typeguard',
],
'python_requires':
'>=3.6',
}
-if (sys.version_info.major < 3
- or (sys.version_info.major == 3 and sys.version_info.minor < 6)):
+if sys.version_info < (3, 6):
warning(
"module %r requires Python 3.6 for reliable field ordering but version_info=%s",
__name__, sys.version_info
)
-def flatten(chunks):
- ''' Flatten `chunks` into an iterable of `bytes`-like instances.
- None of the `bytes` instances will be empty.
+def flatten(transcription) -> Iterable[bytes]:
+ ''' Flatten `transcription` into an iterable of `Buffer`s.
+ None of the `Buffer`s will be empty.
This exists to allow subclass methods to easily return
- transcribeable things (having a `.transcribe` method), ASCII
+ transcribable things (having a `.transcribe` method), ASCII
strings or bytes or iterables or even `None`, in turn allowing
them simply to return their superclass' chunks iterators
directly instead of having to unpack them.
+ The supplied `transcription` may be any of the following:
+ - `None`: yield nothing
+ - an object with a `.transcribe` method: yield from
+ `flatten(transcription.transcribe())`
+ - a `Buffer`: yield the `Buffer` if it is not empty
+ - a `str`: yield `transcription.encode('ascii')`
+ - an iterable: yield from `flatten(item)` for each item in `transcription`
+
An example from the `cs.iso14496.METABoxBody` class:
def transcribe(self):
@@ 197,40 326,60 @@ def flatten(chunks):
yield self.boxes
The binary classes `flatten` the result of the `.transcribe`
- method to obtain `bytes` insteances for the object's bnary
+ method to obtain `bytes` instances for the object's binary
transcription.
'''
- if chunks is None:
+ if transcription is None:
pass
- elif hasattr(chunks, 'transcribe'):
- yield from flatten(chunks.transcribe())
- elif isinstance(chunks, (bytes, memoryview)):
- if chunks:
- yield chunks
- elif isinstance(chunks, str):
- if chunks:
- yield chunks.encode('ascii')
+ elif hasattr(transcription, 'transcribe'):
+ yield from flatten(transcription.transcribe())
+ elif isinstance(transcription, Buffer):
+ if transcription:
+ yield transcription
+ elif isinstance(transcription, str):
+ if transcription:
+ yield transcription.encode('ascii')
else:
- for subchunk in chunks:
- yield from flatten(subchunk)
+ for item in transcription:
+ yield from flatten(item)
+
+@decorator
+def parse_offsets(parse, report=False):
+ ''' Decorate `parse` (usually an `AbstractBinary` class method)
+ to record the buffer starting offset as `self.offset`
+ and the buffer post parse offset as `self.end_offset`.
+ If the decorator parameter `report` is true,
+ call `bfr.report_offset()` with the starting offset at the end of the parse.
+ '''
+
+ def parse_wrapper(cls, bfr: CornuCopyBuffer, **parse_kw):
+ offset = bfr.offset
+ self = parse(cls, bfr, **parse_kw)
+ self.offset = offset
+ self.end_offset = bfr.offset
+ if report:
+ bfr.report_offset(offset)
+ return self
+
+ return parse_wrapper
_pt_spec_seq = Seq()
-def pt_spec(pt, name=None):
+def pt_spec(pt, name=None, value_type=None, as_repr=None, as_str=None):
''' Convert a parse/transcribe specification `pt`
into an `AbstractBinary` subclass.
This is largely used to provide flexibility
in the specifications for the `BinaryMultiValue` factory
- but can be used as a factory for other simple classes.
+ but can also be used as a factory for other simple classes.
If the specification `pt` is a subclass of `AbstractBinary`
this is returned directly.
- If `pt` is a 2-tuple of `str`
+ If `pt` is a (str,str) 2-tuple
the values are presumed to be a format string for `struct.struct`
- and filed names separated by spaces;
- a new `BinaryMultiStruct` class is created from these and returned.
+ and field names separated by spaces;
+ a new `BinaryStruct` class is created from these and returned.
Otherwise two functions
`f_parse_value(bfr)` and `f_transcribe_value(value)`
@@ 251,39 400,53 @@ def pt_spec(pt, name=None):
if issubclass(pt, AbstractBinary):
return pt
except TypeError:
+ # not a class at all, fall through
pass
# other specifications construct a class
try:
+ # an object with .parse_value and .transcribe_value attributes
f_parse_value = pt.parse_value
f_transcribe_value = pt.transcribe_value
except AttributeError:
+ # an int number of bytes
if isinstance(pt, int):
# pylint: disable=unnecessary-lambda-assignment
f_parse_value = lambda bfr: bfr.take(pt)
f_transcribe_value = lambda value: value
+ if value_type is None:
+ value_type = Buffer
+ elif not issubclass(value_type, Buffer):
+ raise TypeError(f'supplied {value_type=} is not a subclass of Buffer')
else:
- pt0, pt1 = pt
- if isinstance(pt0, str) and isinstance(pt1, str):
+ struct_format, struct_fields = pt
+ if isinstance(struct_format, str) and isinstance(struct_fields, str):
+ # a struct (format,fields) 2-tuple
# struct format and field names
- return BinaryMultiStruct(
- '_'.join(
- (
- name or "PTStruct", str(next(_pt_spec_seq)),
- pt1.replace(' ', '__')
- )
- ), pt0, pt1
- )
- f_parse_value = pt0
- f_transcribe_value = pt1
+ if name is None:
+ name = f'PTStruct_{next(_pt_spec_seq)}__{struct_fields.replace(" ", "__")}'
+ return BinaryStruct(name, struct_format, struct_fields)
+ # otherwise a parse/transcribe pair
+ f_parse_value, f_transcribe_value = pt
- class PTValue(BinarySingleValue): # pylint: disable=used-before-assignment
+ if value_type is None:
+ sig = signature(f_parse_value)
+ value_type = sig.return_annotation
+ if value_type is Signature.empty:
+ raise ValueError(f'no return type annotation on {f_parse_value=}')
+
+ class PTValue(BinarySingleValue, value_type=value_type): # pylint: disable=used-before-assignment
''' A `BinarySingleValue` subclass
made from `f_parse_value` and `f_transcribe_value`.
'''
+ if as_str:
+ __str__ = as_str
+ if as_repr:
+ __repr__ = as_repr
+
@staticmethod
- def parse_value(bfr: CornuCopyBuffer):
- ''' Parse value form buffer.
+ def parse_value(bfr: CornuCopyBuffer) -> value_type:
+ ''' Parse value from buffer.
'''
return f_parse_value(bfr)
@@ 293,13 456,16 @@ def pt_spec(pt, name=None):
'''
return f_transcribe_value(value)
- if name is not None:
- PTValue.__name__ = name
- PTValue.__name__ += '_' + str(next(_pt_spec_seq))
+ PTValue.__name__ = name or f'PTValue_{next(_pt_spec_seq)}'
+ PTValue.__doc__ = stripped_dedent(
+ f'''{name}, a `BinarySingleValue` subclass
+ made from {f_parse_value=} and {f_transcribe_value=}.
+ '''
+ )
return PTValue
class bs(bytes):
- ''' A `bytes subclass with a compact `repr()`.
+ ''' A `bytes` subclass with a compact `repr()`.
'''
def __repr__(self):
@@ 320,7 486,7 @@ class bs(bytes):
return cls(obj)
raise TypeError(f'{cls.__name__}.promote({obj.__class__}): cannot promote')
-class AbstractBinary(ABC):
+class AbstractBinary(Promotable, ABC):
''' Abstract class for all `Binary`* implementations,
specifying the abstract `parse` and `transcribe` methods
and providing various helper methods.
@@ 330,6 496,54 @@ class AbstractBinary(ABC):
- `scan`* methods are generators yielding successive instances from a buffer
'''
+ def __str__(self, attr_names=None, attr_choose=None, str_func=None):
+ ''' The string summary of this object.
+ If called explicitly rather than via `str()` the following
+ optional parametsrs may be supplied:
+ * `attr_names`: an iterable of `str` naming the attributes to include;
+ the default if the keys of `self.__dict__`
+ * `attr_choose`: a callable to select amongst the attribute names names;
+ the default is to choose names which do not start with an underscore
+ * `str_func`: a callable returning the string form of an attribute value;
+ the default returns `cropped_repr(v)` where `v` is the value's `.value`
+ attribute for single value objects otherwise the object itself
+ '''
+ if attr_names is None:
+ attr_names = self._field_names
+ if attr_choose is None:
+ # pylint: disable=unnecessary-lambda-assignment
+ attr_choose = lambda attr: not attr.startswith('_')
+ elif attr_choose is True:
+ attr_choose = lambda: True
+ if str_func is None:
+ str_func = lambda obj: (
+ cropped_repr(obj.value)
+ if is_single_value(obj) else cropped_repr(obj)
+ )
+ attr_values = [
+ (attr, getattr(self, attr, None))
+ for attr in attr_names
+ if attr_choose(attr)
+ ]
+ return "%s(%s)" % (
+ self.__class__.__name__,
+ ','.join(
+ ("%s=%s" % (attr, str_func(obj)) for attr, obj in attr_values)
+ ),
+ )
+
+ def __repr__(self):
+ return "%s(%s)" % (
+ self.__class__.__name__, ",".join(
+ "%s=%s:%s" % (attr, type(value).__name__, cropped_repr(value))
+ for attr, value in self.__dict__.items()
+ )
+ )
+
+ @property
+ def _field_names(self):
+ return self.__dict__.keys()
+
# pylint: disable=deprecated-decorator
@abstractclassmethod
def parse(cls, bfr: CornuCopyBuffer):
@@ 363,12 577,13 @@ class AbstractBinary(ABC):
raise NotImplementedError("transcribe")
@pfx_method
- def self_check(self):
+ def self_check(self, *, field_types=None):
''' Internal self check. Returns `True` if passed.
If the structure has a `FIELD_TYPES` attribute, normally a
- class attribute, then check the fields against it. The
- `FIELD_TYPES` attribute is a mapping of `field_name` to
+ class attribute, then check the fields against it.
+
+ The `FIELD_TYPES` attribute is a mapping of `field_name` to
a specification of `required` and `types`. The specification
may take one of 2 forms:
* a tuple of `(required,types)`
@@ 406,37 621,39 @@ class AbstractBinary(ABC):
it has more than one acceptable type.
'''
ok = True
- try:
- fields_spec = self.FIELD_TYPES
- except AttributeError:
- warning("no FIELD_TYPES")
- ##ok = False
- else:
- # check fields against self.FIELD_TYPES
- for field_name, field_spec in fields_spec.items():
- with Pfx(".%s=%s", field_name, field_spec):
- if isinstance(field_spec, tuple):
- required, basetype = field_spec
- else:
- required, basetype = True, field_spec
- try:
- field = getattr(self, field_name)
- except AttributeError:
- if required:
- warning(
- "missing required field %s.%s: __dict__=%s",
- type(self).__name__, field_name, cropped_repr(self.__dict__)
- )
- ok = False
- else:
- if not isinstance(field, basetype):
- warning(
- "should be an instance of %s:%s but is %s", (
- 'tuple'
- if isinstance(basetype, tuple) else basetype.__name__
- ), basetype, typed_str(field, max_length=64)
- )
- ok = False
+ if field_types is None:
+ try:
+ field_types = self.FIELD_TYPES
+ except AttributeError:
+ warning("no FIELD_TYPES")
+ field_types = {}
+ # check fields against self.FIELD_TYPES
+ # TODO: call self_check on members with a .self_check() method
+ for field_name, field_spec in field_types.items():
+ with Pfx(".%s=%s", field_name, field_spec):
+ if isinstance(field_spec, tuple):
+ required, basetype = field_spec
+ else:
+ required, basetype = True, field_spec
+ try:
+ field = getattr(self, field_name)
+ except AttributeError:
+ if required:
+ warning(
+ "missing required field %s.%s: __dict__=%s",
+ type(self).__name__, field_name, cropped_repr(self.__dict__)
+ )
+ ok = False
+ else:
+ if not isinstance(field, basetype):
+ warning(
+ "should be an instance of %s:%s but is %s", (
+ 'tuple'
+ if isinstance(basetype, tuple) else basetype.__name__
+ ), basetype, typed_str(field, max_length=64)
+ )
+ ok = False
+ raise RuntimeError
return ok
def __bytes__(self):
@@ 464,8 681,12 @@ class AbstractBinary(ABC):
with_offsets=False,
**parse_kw,
):
- ''' Function to scan the buffer `bfr` for repeated instances of `cls`
- until end of input and yield them.
+ ''' A generator to scan the buffer `bfr` for repeated instances of `cls`
+ until end of input, and yield them.
+
+ Note that if `bfr` is not already a `CornuCopyBuffer`
+ it is promoted to `CornuCopyBuffer` from several types
+ such as filenames etc; see `CornuCopyBuffer.promote`.
Parameters:
* `bfr`: the buffer to scan, or any object suitable for `CornuCopyBuffer.promote`
@@ 482,8 703,7 @@ class AbstractBinary(ABC):
Scanning stops after `max_count` instances (if specified).
If fewer than `min_count` instances (if specified) are scanned
a warning is issued.
- This is to accomodate nonconformant streams
- without raising exceptions.
+ This is to accomodate nonconformant streams without raising exceptions.
Callers wanting to validate `max_count` may want to probe `bfr.at_eof()`
after return.
Callers not wanting a warning over `min_count` should not specify it,
@@ 492,28 712,20 @@ class AbstractBinary(ABC):
if count is None:
if min_count is None:
min_count = 0
- else:
- if min_count < 0:
- raise ValueError(
- "min_count must be >=0 if specified, got: %r" % (min_count,)
- )
+ elif min_count < 0:
+ raise ValueError(f'{min_count=} must be >=0 if specified')
if max_count is not None:
if max_count < 0:
- raise ValueError(
- "max_count must be >=0 if specified, got: %r" % (max_count,)
- )
+ raise ValueError(f'{max_count=} must be >=0 if specified')
if max_count < min_count:
- raise ValueError(
- "max_count must be >= min_count, got: min_count=%r, max_count=%rr"
- % (min_count, max_count)
- )
+ raise ValueError(f'{max_count=} must be >= {min_count=}')
else:
if min_count is not None or max_count is not None:
raise ValueError(
"scan_with_offsets: may not combine count with either min_count or max_count"
)
if count < 0:
- raise ValueError("count must be >=0 if specified, got: %r" % (count,))
+ raise ValueError(f'{count=} must be >=0 if specified')
min_count = max_count = count
scanned = 0
while (max_count is None or scanned < max_count) and not bfr.at_eof():
@@ 557,7 769,7 @@ class AbstractBinary(ABC):
`self.scan_with_offsets(..,**kw)` according to the
`with_offsets` parameter.
- *Deprecated; please just call `scan` with a filesystem pathname.
+ *Deprecated; please just call `scan` with a filesystem pathname.*
Parameters:
* `fspath`: the filesystem path of the file to scan
@@ 613,9 825,7 @@ class AbstractBinary(ABC):
'''
instance, offset = cls.parse_bytes(bs, **parse_bytes_kw)
if offset < len(bs):
- raise ValueError(
- "unparsed data at offset %d: %r" % (offset, bs[offset:])
- )
+ raise ValueError(f'unparsed data at {offset=}: {bs[offset:]!r}')
return instance
@classmethod
@@ 665,6 875,18 @@ class AbstractBinary(ABC):
file.flush()
return length
+def is_single_value(obj):
+ ''' Test whether `obj` is a single value binary object.
+
+ This currently recognises `BinarySingleValue` instances
+ and tuple based `AbstractBinary` instances of length 1.
+ '''
+ if isinstance(obj, BinarySingleValue):
+ return True
+ if isinstance(obj, AbstractBinary) and isinstance(obj, tuple):
+ return tuple.__len__(obj) == 1
+ return False
+
class SimpleBinary(SimpleNamespace, AbstractBinary):
''' Abstract binary class based on a `SimpleNamespace`,
thus providing a nice `__str__` and a keyword based `__init__`.
@@ 681,23 903,7 @@ class SimpleBinary(SimpleNamespace, Abst
super().__init__(field1=field1, field2=field2)
'''
- def __str__(self, attr_names=None, attr_choose=None):
- if attr_names is None:
- attr_names = sorted(self.__dict__.keys())
- if attr_choose is None:
- # pylint: disable=unnecessary-lambda-assignment
- attr_choose = lambda attr: not attr.startswith('_')
- return "%s(%s)" % (
- type(self).__name__, ','.join(
- (
- "%s=%s" % (attr, cropped_repr(getattr(self, attr, None)))
- for attr in attr_names
- if attr_choose(attr)
- )
- )
- )
-
-class BinarySingleValue(AbstractBinary):
+class BinarySingleValue(AbstractBinary, Promotable):
''' A representation of a single value as the attribute `.value`.
Subclasses must implement:
@@ 705,12 911,25 @@ class BinarySingleValue(AbstractBinary):
* `transcribe` or `transcribe_value`
'''
+ @classmethod
+ def __init_subclass__(cls, *, value_type, **isc_kw):
+ if not isinstance(value_type, type) and not get_origin(value_type):
+ raise TypeError(
+ f'{cls.__name__}.__init_subclass__: value_type={r(value_type)} is not a type'
+ )
+ super().__init_subclass__(**isc_kw)
+ cls.VALUE_TYPE = value_type
+
def __init__(self, value):
+ ''' Initialise `self` with `value`.
+ '''
+ check_type(value, self.VALUE_TYPE)
self.value = value
def __repr__(self):
return "%s(%r)" % (
- type(self).__name__, getattr(self, 'value', '<no-value>')
+ type(self).__name__,
+ getattr(self, 'value', f'<NO-{self.__class__.__name__}.value>')
)
def __str__(self):
@@ 780,25 999,99 @@ class BinarySingleValue(AbstractBinary):
'''
return cls(value).transcribe()
-class BinaryByteses(AbstractBinary):
+class BinaryBytes(
+ BinarySingleValue,
+ value_type=Union[Buffer, Iterable[Buffer]],
+):
''' A list of `bytes` parsed directly from the native iteration of the buffer.
+ Subclasses are initialised with a `consume=` class parameter
+ indicating how many bytes to console on parse; the default
+ is `...` meaning to consume the entire remaining buffer, but
+ a positive integer can also be supplied to consume exactly
+ that many bytes.
'''
- def __init__(self):
- self.values = []
+ def __init_subclass__(
+ cls,
+ *,
+ consume=...,
+ value_type=Union[Buffer, Iterable[Buffer]],
+ **bsv_kw,
+ ):
+ if consume is not Ellipsis:
+ if not isinstance(consume, int) or consume < 1:
+ raise ValueError(
+ f'class {cls.__name__}: consume should be Ellipsis or a positive int, got {r(consume)}'
+ )
+ super().__init_subclass__(value_type=value_type, **bsv_kw)
+ cls.PARSE_SIZE = consume
def __repr__(self):
- return "%s:%r" % (type(self).__name__, self.values)
+ cls = self.__class__
+ bufs = getattr(self, "_bufs", "NO_BUFS")
+ return f'{cls.__name__}[cls.PARSE_SIZE]:{bufs}'
+
+ def __iter__(self):
+ return iter(self._bufs)
+
+ @property
+ def value(self):
+ ''' The internal list of `bytes` instances joined together.
+ This is a property and may be expensive to compute for a large list.
+ '''
+ return b''.join(self._bufs)
+
+ @value.setter
+ def value(self, bss: Union[Buffer, Iterable[Buffer]]):
+ ''' Set the value from a `bytes` or iterable of `bytes`.
+ '''
+ if isinstance(bss, Buffer):
+ bss = [bss]
+ self._bufs = list(bss)
@classmethod
def parse(cls, bfr: CornuCopyBuffer):
- self = cls()
- self.values.extend(bfr)
+ ''' Consume `cls.PARSE_SIZE` bytes from the buffer and instantiate a new instance.
+ '''
+ return cls(bfr.takev(cls.PARSE_SIZE))
+
+ def transcribe(self):
+ ''' Transcribe each value.
+ '''
+ return self._bufs
+
+ @classmethod
+ def promote(cls, obj):
+ ''' Promote `obj` to a `BinaryBytes` instance.
+ Other instances of `AbstractBinary` will be transcribed into the buffers.
+ Otherwise use `BinarySingleValue.promote(obj)`.
+ '''
+ if isinstance(obj, cls):
+ return obj
+ if isinstance(obj, AbstractBinary):
+ return cls(obj.transcribe())
+ return super().promote(obj)
+
+class ListOfBinary(list, AbstractBinary):
+
+ # the AbstractBinary subclass of the items in the list
+ LIST_ITEM_TYPE = None
+
+ def __init_subclass__(cls, *, item_type):
+ super().__init_subclass__()
+ cls.LIST_ITEM_TYPE = item_type
+
+ @classmethod
+ def parse(cls, bfr: CornuCopyBuffer, **scan_kw):
+ ''' Scan instances of `cls.LIST_ITEM_TYPE` and return a new instance.
+ '''
+ self = cls(cls.LIST_ITEM_TYPE.scan(bfr, **scan_kw))
return self
def transcribe(self):
- yield from iter(self.values)
+ return self
+# TODO: can this just be ListOfBinary above?
class BinaryListValues(AbstractBinary):
''' A list of values with a common parse specification,
such as sample or Boxes in an ISO14496 Box structure.
@@ 808,7 1101,7 @@ class BinaryListValues(AbstractBinary):
self.values = []
def __str__(self):
- return "%s%r" % (type(self).__name__, self.values)
+ return '{self.__class__.__name__}{self.values!r}'
__repr__ = __str__
@@ 853,16 1146,14 @@ class BinaryListValues(AbstractBinary):
if min_count is None:
min_count = count
elif min_count < count:
- raise ValueError("min_count(%s) < count(%s)" % (min_count, count))
+ raise ValueError(f'{min_count=} < {count=}')
if max_count is None:
max_count = count
elif max_count > count:
- raise ValueError("max_count(%s) > count(%s)" % (max_count, count))
+ raise ValueError(f'{max_count=} > {count=}')
if (min_count is not None and max_count is not None
and min_count > max_count):
- raise ValueError(
- "min_count(%s) > max_count(%s)" % (min_count, max_count)
- )
+ raise ValueError(f'{min_count=} > {max_count=}')
self = cls()
values = self.values
func_parse = pt_spec(pt).parse
@@ 887,56 1178,139 @@ class BinaryListValues(AbstractBinary):
if isinstance(value, bytes) else value.transcribe(), self.values
)
-_binary_multi_struct_classes = {}
+@typechecked
+def struct_field_types(
+ struct_format: str,
+ field_names: Union[str, Iterable[str]],
+) -> Mapping[str, type]:
+ ''' Construct a `dict` mapping field names to struct return types.
+
+ Example:
+
+ >>> struct_field_types('>Hs', 'count text_bs')
+ {'count': <class 'int'>, 'text_bs': <class 'bytes'>}
+ '''
+ if isinstance(field_names, str):
+ field_names = field_names.split()
+ else:
+ field_names = list(field_names)
+ fieldmap = {}
+ for c in struct_format:
+ if not c.isalpha():
+ continue
+ try:
+ fieldtype = {
+ 'x': None,
+ 'C': int,
+ 'b': int,
+ 'B': int,
+ 'h': int,
+ 'H': int,
+ 'i': int,
+ 'I': int,
+ 'l': int,
+ 'L': int,
+ 'q': int,
+ 'Q': int,
+ 'n': int,
+ 'N': int,
+ 'e': float,
+ 'f': float,
+ 'd': float,
+ 's': bytes,
+ 'p': str,
+ 'P': int,
+ }[c]
+ except KeyError:
+ raise ValueError(
+ f'no type known for struct spec {c=} in {struct_format=}'
+ )
+ if fieldtype is None:
+ # padding
+ continue
+ try:
+ field_name = field_names.pop(0)
+ except IndexError:
+ raise ValueError(
+ f'no field names left at struct spec {c=} in {struct_format=}'
+ )
+ fieldmap[field_name] = fieldtype
+ if field_names:
+ raise ValueError(f'unused field names {field_names=} vs {struct_format=}')
+ return fieldmap
@pfx
-def BinaryMultiStruct(
- class_name: str, struct_format: str, field_names: Union[str, List[str]]
+def BinaryStruct(
+ class_name: str,
+ struct_format: str,
+ field_names: Union[str, List[str]] = 'value',
):
''' A class factory for `AbstractBinary` `namedtuple` subclasses
- built around complex `struct` formats.
+ built around potentially complex `struct` formats.
Parameters:
* `class_name`: name for the generated class
* `struct_format`: the `struct` format string
- * `field_names`: field name list,
- a space separated string or an interable of strings
+ * `field_names`: optional field name list,
+ a space separated string or an interable of strings;
+ the default is `'value'`, intended for single field structs
Example:
# an "access point" record from the .ap file
- Enigma2APInfo = BinaryMultiStruct('Enigma2APInfo', '>QQ', 'pts offset')
+ Enigma2APInfo = BinaryStruct('Enigma2APInfo', '>QQ', 'pts offset')
# a "cut" record from the .cuts file
- Enigma2Cut = BinaryMultiStruct('Enigma2Cut', '>QL', 'pts type')
+ Enigma2Cut = BinaryStruct('Enigma2Cut', '>QL', 'pts type')
+
+ >>> UInt16BE = BinaryStruct('UInt16BE', '>H')
+ >>> UInt16BE.__name__
+ 'UInt16BE'
+ >>> UInt16BE.format
+ '>H'
+ >>> UInt16BE.struct #doctest: +ELLIPSIS
+ <_struct.Struct object at ...>
+ >>> field = UInt16BE.from_bytes(bytes((2,3)))
+ >>> field
+ UInt16BE('>H',value=515)
+ >>> field.value
+ 515
'''
if isinstance(field_names, str):
field_names = field_names.split()
- if not isinstance(field_names, tuple):
+ elif not isinstance(field_names, tuple):
field_names = tuple(field_names)
if len(set(field_names)) != len(field_names):
- raise ValueError("field names not unique")
- # we memoise the class definitions
- key = (struct_format, field_names, class_name)
- struct_class = _binary_multi_struct_classes.get(key)
- if struct_class:
- return struct_class
- # construct new class
+ raise ValueError(f'repeated name in {field_names=}')
struct = Struct(struct_format)
+ fieldmap = struct_field_types(struct_format, field_names)
for field_name in field_names:
with Pfx(field_name):
if (field_name in ('length', 'struct', 'format')
or hasattr(AbstractBinary, field_name)):
raise ValueError(
- "field name conflicts with AbstractBinary.%s" % (field_name,)
+ f'field name conflicts with AbstractBinary.{field_name}'
)
- tuple_type = namedtuple(class_name or "StructSubValues", field_names)
# pylint: disable=function-redefined
- class struct_class(tuple_type, AbstractBinary):
+ class struct_class(
+ namedtuple(class_name or "StructSubValues", field_names),
+ AbstractBinary,
+ Promotable,
+ ):
''' A struct field for a complex struct format.
'''
+ _struct = struct
+ _field_names = tuple(field_names)
+
+ def __repr__(self):
+ cls = self.__class__
+ values_s = ",".join(
+ f'{attr}={getattr(self,attr)}' for attr in self._field_names
+ )
+ return f'{cls.__name__}({struct.format!r},{values_s})'
+
@classmethod
@promote
def parse(cls, bfr: CornuCopyBuffer):
@@ 952,6 1326,10 @@ def BinaryMultiStruct(
return struct.pack(*self)
if len(field_names) == 1:
+ # structs with a single field
+
+ def __str__(self):
+ return str(self[0])
def __int__(self):
return int(self[0])
@@ 968,7 1346,7 @@ def BinaryMultiStruct(
return self[0]
@classmethod
- def parse_value(cls, bfr: CornuCopyBuffer):
+ def parse_value(cls, bfr: CornuCopyBuffer) -> fieldmap[field_names[0]]:
''' Parse a value from `bfr`, return the value.
'''
bs = bfr.take(struct.size)
@@ 981,58 1359,52 @@ def BinaryMultiStruct(
'''
return struct.pack(value)
+ @classmethod
+ def promote(cls, obj):
+ ''' Promote a single value to an instance of `cls`.
+ '''
+ if isinstance(obj, cls):
+ return obj
+ return cls(**{field_names[0]: obj})
+ else:
+
+ @classmethod
+ def promote(cls, obj):
+ ''' Promote an iterable of field values to an instance of `cls`.
+ '''
+ if isinstance(obj, cls):
+ return obj
+ return cls(
+ **{
+ field_name: item
+ for field_name, item in zip(field_names, obj)
+ }
+ )
+
+ assert isinstance(struct_class, type)
struct_class.__name__ = class_name
+ field_list_s = ",".join(f".{fieldname}" for fieldname in field_names)
struct_class.__doc__ = (
- ''' An `AbstractBinary` `namedtuple` which parses and transcribes
- the struct format `%r` and presents the attributes %r.
- ''' % (struct_format, field_names)
+ f'''An `AbstractBinary` `namedtuple` which parses and transcribes
+ the struct format `{struct_format!r}` and presents the attributes {field_list_s}.
+ '''
)
struct_class.struct = struct
struct_class.format = struct_format
struct_class.length = struct.size
struct_class.field_names = field_names
- _binary_multi_struct_classes[key] = struct_class
return struct_class
-def BinarySingleStruct(
- class_name: str, struct_format: str, field_name: Optional[str] = None
-):
- ''' A convenience wrapper for `BinaryMultiStruct`
- for `struct_format`s with a single field.
-
- Parameters:
- * `class_name`: the class name for the generated class
- * `struct_format`: the struct format string, specifying a
- single struct field
- * `field_name`: optional field name for the value,
- default `'value'`
-
- Example:
-
- >>> UInt16BE = BinarySingleStruct('UInt16BE', '>H')
- >>> UInt16BE.__name__
- 'UInt16BE'
- >>> UInt16BE.format
- '>H'
- >>> UInt16BE.struct #doctest: +ELLIPSIS
- <_struct.Struct object at ...>
- >>> field = UInt16BE.from_bytes(bytes((2,3)))
- >>> field
- UInt16BE(value=515)
- >>> field.value
- 515
- '''
- if field_name is None:
- field_name = 'value'
- return BinaryMultiStruct(class_name, struct_format, field_name)
+BinaryMultiStruct = OBSOLETE(BinaryStruct)
+BinarySingleStruct = OBSOLETE(BinaryStruct)
# various common values
-UInt8 = BinarySingleStruct('UInt8', 'B')
+UInt8 = BinaryStruct('UInt8', 'B')
UInt8.TEST_CASES = (
(0, b'\0'),
(65, b'A'),
)
-Int16BE = BinarySingleStruct('Int16BE', '>h')
+Int16BE = BinaryStruct('Int16BE', '>h')
Int16BE.TEST_CASES = (
(0, b'\0\0'),
(1, b'\0\1'),
@@ 1040,7 1412,7 @@ Int16BE.TEST_CASES = (
(-1, b'\xff\xff'),
(-32768, b'\x80\x00'),
)
-Int16LE = BinarySingleStruct('Int16LE', '<h')
+Int16LE = BinaryStruct('Int16LE', '<h')
Int16LE.TEST_CASES = (
(0, b'\0\0'),
(1, b'\1\0'),
@@ 1048,7 1420,7 @@ Int16LE.TEST_CASES = (
(-1, b'\xff\xff'),
(-32768, b'\x00\x80'),
)
-Int32BE = BinarySingleStruct('Int32BE', '>l')
+Int32BE = BinaryStruct('Int32BE', '>l')
Int32BE.TEST_CASES = (
(0, b'\0\0\0\0'),
(1, b'\0\0\0\1'),
@@ 1056,7 1428,7 @@ Int32BE.TEST_CASES = (
(-1, b'\xff\xff\xff\xff'),
(-2147483648, b'\x80\x00\x00\x00'),
)
-Int32LE = BinarySingleStruct('Int32LE', '<l')
+Int32LE = BinaryStruct('Int32LE', '<l')
Int32LE.TEST_CASES = (
(0, b'\0\0\0\0'),
(1, b'\1\0\0\0'),
@@ 1064,7 1436,7 @@ Int32LE.TEST_CASES = (
(-1, b'\xff\xff\xff\xff'),
(-2147483648, b'\x00\x00\x00\x80'),
)
-UInt16BE = BinarySingleStruct('UInt16BE', '>H')
+UInt16BE = BinaryStruct('UInt16BE', '>H')
UInt16BE.TEST_CASES = (
(0, b'\0\0'),
(1, b'\0\1'),
@@ 1072,7 1444,7 @@ UInt16BE.TEST_CASES = (
(32768, b'\x80\x00'),
(65535, b'\xff\xff'),
)
-UInt16LE = BinarySingleStruct('UInt16LE', '<H')
+UInt16LE = BinaryStruct('UInt16LE', '<H')
UInt16LE.TEST_CASES = (
(0, b'\0\0'),
(1, b'\1\0'),
@@ 1080,7 1452,7 @@ UInt16LE.TEST_CASES = (
(32768, b'\x00\x80'),
(65535, b'\xff\xff'),
)
-UInt32BE = BinarySingleStruct('UInt32BE', '>L')
+UInt32BE = BinaryStruct('UInt32BE', '>L')
UInt32BE.TEST_CASES = (
(0, b'\0\0\0\0'),
(1, b'\0\0\0\1'),
@@ 1089,7 1461,7 @@ UInt32BE.TEST_CASES = (
(4294967294, b'\xff\xff\xff\xfe'),
(4294967295, b'\xff\xff\xff\xff'),
)
-UInt32LE = BinarySingleStruct('UInt32LE', '<L')
+UInt32LE = BinaryStruct('UInt32LE', '<L')
UInt32LE.TEST_CASES = (
(0, b'\0\0\0\0'),
(1, b'\1\0\0\0'),
@@ 1098,7 1470,7 @@ UInt32LE.TEST_CASES = (
(4294967294, b'\xfe\xff\xff\xff'),
(4294967295, b'\xff\xff\xff\xff'),
)
-UInt64BE = BinarySingleStruct('UInt64BE', '>Q')
+UInt64BE = BinaryStruct('UInt64BE', '>Q')
UInt64BE.TEST_CASES = (
(0, b'\0\0\0\0\0\0\0\0'),
(1, b'\0\0\0\0\0\0\0\1'),
@@ 1111,7 1483,7 @@ UInt64BE.TEST_CASES = (
(18446744073709551614, b'\xff\xff\xff\xff\xff\xff\xff\xfe'),
(18446744073709551615, b'\xff\xff\xff\xff\xff\xff\xff\xff'),
)
-UInt64LE = BinarySingleStruct('UInt64LE', '<Q')
+UInt64LE = BinaryStruct('UInt64LE', '<Q')
UInt64LE.TEST_CASES = (
(0, b'\0\0\0\0\0\0\0\0'),
(1, b'\1\0\0\0\0\0\0\0'),
@@ 1124,18 1496,18 @@ UInt64LE.TEST_CASES = (
(18446744073709551614, b'\xfe\xff\xff\xff\xff\xff\xff\xff'),
(18446744073709551615, b'\xff\xff\xff\xff\xff\xff\xff\xff'),
)
-Float64BE = BinarySingleStruct('Float64BE', '>d')
+Float64BE = BinaryStruct('Float64BE', '>d')
Float64BE.TEST_CASES = (
(0.0, b'\0\0\0\0\0\0\0\0'),
(1.0, b'?\xf0\x00\x00\x00\x00\x00\x00'),
)
-Float64LE = BinarySingleStruct('Float64LE', '<d')
+Float64LE = BinaryStruct('Float64LE', '<d')
Float64LE.TEST_CASES = (
(0.0, b'\0\0\0\0\0\0\0\0'),
(1.0, b'\x00\x00\x00\x00\x00\x00\xf0?'),
)
-class BSUInt(BinarySingleValue):
+class BSUInt(BinarySingleValue, value_type=int):
''' A binary serialised unsigned `int`.
This uses a big endian byte encoding where continuation octets
@@ 1216,7 1588,7 @@ class BSUInt(BinarySingleValue):
n >>= 7
return bytes(reversed(bs))
-class BSData(BinarySingleValue):
+class BSData(BinarySingleValue, value_type=Buffer):
''' A run length encoded data chunk, with the length encoded as a `BSUInt`.
'''
@@ 1260,7 1632,15 @@ class BSData(BinarySingleValue):
'''
return BSData(bs).data_offset
-class BSString(BinarySingleValue):
+ @classmethod
+ def promote(cls, obj):
+ if isinstance(obj, cls):
+ return obj
+ if isinstance(obj, Buffer):
+ return cls(bytes(obj))
+ raise TypeError(f'{cls.__name__}.promote: cannot promote {r(obj)}')
+
+class BSString(BinarySingleValue, value_type=str):
''' A run length encoded string, with the length encoded as a BSUInt.
'''
@@ 1294,7 1674,7 @@ class BSString(BinarySingleValue):
payload = value.encode(encoding)
return b''.join((BSUInt.transcribe_value(len(payload)), payload))
-class BSSFloat(BinarySingleValue):
+class BSSFloat(BinarySingleValue, value_type=float):
''' A float transcribed as a `BSString` of `str(float)`.
'''
@@ 1470,9 1850,14 @@ def BinaryMultiValue(class_name, field_m
''' Construct a `SimpleBinary` subclass named `class_name`
whose fields are specified by the mapping `field_map`.
- The `field_map` is a mapping of field name to buffer parsers and transcribers.
+ The `field_map` is a mapping of field name
+ to parse/trasncribe specifications suitable for `pt_spec()`;
+ these are all promoted by `pt_spec` into `AbstractBinary` subclasses.
- *Note*:
+ The `field_order` is an optional ordering of the field names;
+ the default comes from the iteration order of `field_map`.
+
+ *Note* for Python <3.6:
if `field_order` is not specified
it is constructed by iterating over `field_map`.
Prior to Python 3.6, `dict`s do not provide a reliable order
@@ 1487,9 1872,6 @@ def BinaryMultiValue(class_name, field_m
the `.parse` and `.transcribe` methods
accordingly.
- The `field_map` is a mapping of field name
- to a class returned by the `pt_spec()` function.
-
If the class has both `parse_value` and `transcribe_value` methods
then the value itself will be directly stored.
Otherwise the class it presumed to be more complex subclass
@@ 1532,9 1914,9 @@ def BinaryMultiValue(class_name, field_m
>>> bmv.n2
34
>>> bmv #doctest: +ELLIPSIS
- BMV(n1=17, n2=34, n3=119, nd=nd_1_short__bs(short=33154, bs=b'zyxw'), data1=b'AB', data2=b'DEFG')
+ BMV(n1=17, n2=34, n3=119, nd=nd('>H4s',short=33154,bs=b'zyxw'), data1=b'AB', data2=b'DEFG')
>>> bmv.nd #doctest: +ELLIPSIS
- nd_1_short__bs(short=33154, bs=b'zyxw')
+ nd('>H4s',short=33154,bs=b'zyxw')
>>> bmv.nd.bs
b'zyxw'
>>> bytes(bmv.nd)
@@ 1552,11 1934,10 @@ def BinaryMultiValue(class_name, field_m
with Pfx("BinaryMultiValue(%r,...)", class_name):
if field_order is None:
field_order = tuple(field_map)
- if (sys.version_info.major, sys.version_info.minor) < (3, 6):
- warning(
- "Python version %s < 3.6: dicts are not ordered,"
- " and the inferred field order may not be correct: %r",
- sys.version, field_order
+ if len(field_order) > 1 and sys.version_info < (3, 6):
+ raise ValueError(
+ f'class {class_name}: Python version {sys.version} < 3.6:'
+ ' dicts are not ordered and so we cannot infer the field_order'
)
else:
field_order = tuple(
@@ 1593,19 1974,393 @@ def BinaryMultiValue(class_name, field_m
bmv_class.__name__ = class_name
bmv_class.__doc__ = (
- ''' An `SimpleBinary` which parses and transcribes
- the fields `%r`.
- ''' % (field_order,)
+ f'''A `SimpleBinary` which parses and transcribes
+ the fields {field_order!r}.
+ '''
)
return bmv_class
+@decorator
+def binclass(cls, kw_only=True):
+ r'''A decorator for `dataclass`-like binary classes.
+
+ Example use:
+
+ >>> @binclass
+ ... class SomeStruct:
+ ... """A struct containing a count and some flags."""
+ ... count : UInt32BE
+ ... flags : UInt8
+ >>> ss = SomeStruct(count=3, flags=0x04)
+ >>> ss
+ SomeStruct:SomeStruct__dataclass(count=UInt32BE('>L',value=3),flags=UInt8('B',value=4))
+ >>> print(ss)
+ SomeStruct(count=3,flags=4)
+ >>> bytes(ss)
+ b'\x00\x00\x00\x03\x04'
+ >>> SomeStruct.promote(b'\x00\x00\x00\x03\x04')
+ SomeStruct:SomeStruct__dataclass(count=UInt32BE('>L',value=3),flags=UInt8('B',value=4))
+
+ Extending an existing `@binclass` class, for example to add
+ the body of a structure to some header part:
+
+ >>> @binclass
+ ... class HeaderStruct:
+ ... """A header containing a count and some flags."""
+ ... count : UInt32BE
+ ... flags : UInt8
+ >>> @binclass
+ ... class Packet(HeaderStruct):
+ ... body_text : BSString
+ ... body_data : BSData
+ ... body_longs : BinaryStruct(
+ ... 'longs', '>LL', 'long1 long2'
+ ... )
+ >>> packet = Packet(
+ ... count=5, flags=0x03,
+ ... body_text="hello",
+ ... body_data=b'xyzabc',
+ ... body_longs=(10,20),
+ ... )
+ >>> packet
+ Packet:Packet__dataclass(count=UInt32BE('>L',value=5),flags=UInt8('B',value=3),body_text=BSString('hello'),body_data=BSData(b'xyzabc'),body_longs=longs('>LL',long1=10,long2=20))
+ >>> print(packet)
+ Packet(count=5,flags=3,body_text=hello,body_data=b'xyzabc',body_longs=longs(long1=10,long2=20))
+ >>> packet.body_data
+ b'xyzabc'
+ '''
+ name0 = cls.__name__
+
+ # collate the annotated class attributes
+ # these are our own, and those from the superclasses
+ # TODO: should it be an error to find a field twice?
+ attr_annotations = {}
+ for supercls in reversed(cls.__mro__):
+ for attr, anno_type in getattr(supercls, '__annotations__', {}).items():
+ attr_annotations[attr] = anno_type
+ if not attr_annotations:
+ raise TypeError(f'{cls} has no annotated attributes')
+
+ # create the dataclass
+ class dcls:
+ pass
+
+ for attr in attr_annotations:
+ try:
+ attr_value = getattr(cls, attr)
+ except AttributeError:
+ continue
+ setattr(dcls, attr, attr_value)
+ dcls.__annotations__ = attr_annotations
+ dcls.__doc__ = f'The inner dataclass supporting {cls.__module__}.{cls.__name__}.'
+ dcls.__name__ = f'{cls.__name__}__dataclass'
+ dcls = dataclass(dcls, kw_only=kw_only)
+
+ def promote_fieldtypemap(fieldtypemap):
+ ''' Promote the types in a field type map into `AbstractBinary` subclasses.
+ '''
+ for field_name, field_type in tuple(fieldtypemap.items()):
+ if isinstance(field_type, type):
+ # a class
+ if not issubclass(field_type, AbstractBinary):
+ raise TypeError(
+ f'field {field_name!r}, type {field_type} should be a subclass of AbstractBinary'
+ )
+ else:
+ # a Union of types?
+ typing_class = get_origin(field_type)
+ if typing_class is Union:
+ for element_type in get_args(field_type):
+ if element_type is not None and not issubclass(element_type,
+ AbstractBinary):
+ raise TypeError(
+ f'field {field_name!r}, Union element type {element_type} should be a subclass of AbstractBinary'
+ )
+ elif field_type is Ellipsis or isinstance(field_type, int):
+ # a ... or an int indicates a object consuming that many bytes
+ class FieldClass(BinaryBytes, consume=field_type):
+ pass
+
+ FieldClass.__name__ = field_name
+ FieldClass.__doc__ = f'BinaryBytes,consume={field_type})'
+ fieldtypemap[field_name] = FieldClass
+ else:
+ raise TypeError(
+ f'field {field_name!r}, type {field_type} is not supported'
+ )
+
+ # cache a mapping of its fields by name
+ # this dict's keys will be in the fields() order
+ fieldtypemap = {field.name: field.type for field in fields(dcls)}
+ promote_fieldtypemap(fieldtypemap)
+
+ @decorator
+ def bcmethod(func):
+ ''' A decorator for a `BinClass` method
+ to look first for a _direct_ override in `cls.__dict__`
+ then to fall back to the `BinClass` method.
+
+ Note that this resolution is done at `BinClass` definition
+ time, not method call time.
+ '''
+ methodname = func.__name__
+ clsd = cls.__dict__
+ try:
+ method = clsd[methodname]
+ method._desc = f'BinClass:{name0}.{methodname} from base class {cls.__name__}'
+ except KeyError:
+ method = func
+ method._desc = f'BinClass:{name0}.{methodname} from BinClass for {cls.__name__}'
+ return method
+
+ class BinClass(cls, AbstractBinary):
+ ''' The wrapper class for the `@binclass` class.
+ This subclasses `cls` so a to inherit its methods.
+ '''
+
+ # the class being wrapped
+ _baseclass = cls
+ # the generated dataclass
+ _dataclass = dcls
+ # the mapping of field names to types as annotated
+ _datafieldtypes = fieldtypemap
+ # a list of the field names in order
+ _field_names = tuple(fieldtypemap)
+
+ # a list of the fields used by AbstractBinary.self_check
+ FIELD_TYPES = {
+ fieldname: (True, fieldtype)
+ for fieldname, fieldtype in fieldtypemap.items()
+ }
+
+ def __init__(self, **dcls_kwargs):
+ self.__dict__['_data'] = None # get dummy entry in early, aids debugging
+ cls = self.__class__
+ dcls = cls._dataclass
+ # promote nonbinary values to AbstractBinary values
+ dcls_kwargs = {
+ attr: self.promote_field_value(attr, obj)
+ for attr, obj in dcls_kwargs.items()
+ }
+ dataobj = dcls(**dcls_kwargs)
+ self.__dict__['_data'] = dataobj
+
+ def self_check(self):
+ ''' An `@binclass` class instance's raw fields are in `self._data`.
+ '''
+ return AbstractBinary.self_check(
+ self._data, field_types=self.FIELD_TYPES
+ )
+
+ def __str__(self):
+ cls = self.__class__
+ fieldnames = self._field_names
+ if len(fieldnames) == 1:
+ return str(getattr(self._data, fieldnames[0]))
+ return "%s(%s)" % (
+ self.__class__.__name__,
+ ",".join(
+ f'{fieldname}={getattr(self._data,fieldname)}'
+ for fieldname in fieldnames
+ if not fieldname.endswith('_')
+ ),
+ )
+
+ def __repr__(self):
+ cls = self.__class__
+ data = self.__dict__.get('_data')
+ fieldnames = cls._field_names
+ return "%s:%s(%s)" % (
+ self.__class__.__name__,
+ ("NONE" if data is None else data.__class__.__name__),
+ (
+ "NO-DATA" if data is None else ",".join(
+ f'{fieldname}={getattr(data,fieldname)!r}'
+ for fieldname in fieldnames
+ )
+ ),
+ )
+
+ def __getattr__(self, attr: str):
+ ''' Return a data field value, the `.value` attribute if it is a single value field.
+ '''
+ data = self._data
+ try:
+ obj = getattr(data, attr)
+ except AttributeError:
+ gsa = super().__getattr__
+ try:
+ return gsa(attr)
+ except AttributeError as e:
+ raise AttributeError(
+ f'{self.__class__.__name__}.{attr}: no entry in the dataclass instance (self._data) or via super().__getattr__'
+ ) from e
+ # we have a dataclass instance attribute
+ assert isinstance(
+ obj, AbstractBinary
+ ), f'{self._data}.{attr}={r(obj)} is not an AbstractBinary'
+ if is_single_value(obj):
+ return obj.value
+ return obj
+
+ def __setattr__(self, attr, value):
+ ''' Set a data field from `value`.
+ '''
+ cls = self.__class__
+ if attr in cls._datafieldtypes:
+ # dataclass attribute
+ dataobj = self._data
+ datavalue = self.promote_field_value(attr, value)
+ setattr(dataobj, attr, datavalue)
+ else:
+ # ordinary attribute
+ self.__dict__[attr] = value
+
+ @classmethod
+ @bcmethod
+ def parse_field(
+ cls,
+ fieldname: str,
+ bfr: CornuCopyBuffer,
+ fieldtypes: Optional[Mapping[str, type]] = None
+ ):
+ ''' Parse an instance of the field named `fieldname` from `bfr`.
+ Return the field instance.
+ '''
+ if fieldtypes is None:
+ fieldtypes = cls._datafieldtypes
+ else:
+ # a mapping of field name -> type: copy and promote it
+ fieldtypes = dict(**fieldtypes)
+ promote_fieldtypemap(fieldtypes)
+ fieldtype = fieldtypes[fieldname]
+ parse = fieldtype.parse
+ return parse(bfr)
+
+ @classmethod
+ @bcmethod
+ def parse_fields(
+ cls,
+ bfr: CornuCopyBuffer,
+ fieldtypes: Optional[Union[
+ Mapping[str, type],
+ Iterable[str],
+ str,
+ ]] = None,
+ ) -> Mapping[str, AbstractBinary]:
+ ''' Parse all the fields from `cls._datafieldtypes` from `bfr`
+ by calling `cls.parse_field(fieldname,bfr)` for each.
+ Return a mapping of field names to values
+ suitable for passing to `cls`.
+
+ The `fieldtypes` defaults to `cls._datafieldtypes` but may be provided as:
+ - a mapping of field name to type
+ - an iterable of field names, whose types will come from
+ `cls._datafieldtypes`
+ - a string being a space separated list of field names,
+ whose types will come from `cls._datafieldtypes`
+
+ Each of these will be converted to a mapping and then
+ promoted with `promote_fieldtypemap`.
+ '''
+ if fieldtypes is None:
+ # use the default mapping, already promoted
+ fieldtypes = cls._datafieldtypes
+ elif isinstance(fieldtypes, Mapping):
+ # a mapping of field name -> type: copy and promote it
+ fieldtypes = dict(**fieldtypes)
+ promote_fieldtypemap(fieldtypes)
+ elif isinstance(fieldtypes, str):
+ # space separated field names
+ fieldtypes = {
+ fieldname: cls._datafieldtypes[fieldname]
+ for fieldname in fieldtypes.split()
+ }
+ promote_fieldtypemap(fieldtypes)
+ else:
+ # should be an iterable of str (field names)
+ fieldtypes = {
+ fieldname: cls._datafieldtypes[fieldname]
+ for fieldname in fieldtypes
+ }
+ promote_fieldtypemap(fieldtypes)
+ return {
+ fieldname: cls.parse_field(fieldname, bfr, fieldtypes)
+ for fieldname in fieldtypes
+ }
+
+ @classmethod
+ @bcmethod
+ def promote_field_value(cls, fieldname: str, obj):
+ ''' Promote a received `obj` to the appropriate `AbstractBinary` instance.
+ '''
+ try:
+ fieldtype = cls._datafieldtypes[fieldname]
+ except KeyError:
+ if not isinstance(obj, AbstractBinary):
+ raise TypeError(
+ f'promote_field_value({cls.__name__}.promote_field_value({fieldname=},{r(obj)}): not an AbstractBinary and not in {cls.__name__}._datafieldtypes:{sorted(cls._datafieldtypes)}'
+ )
+ else:
+ try:
+ promote = fieldtype.promote
+ except AttributeError:
+ # no .promote, but accept if we're already an instance
+ if not isinstance(obj, fieldtype):
+ # see if we're a Union, if so try promoting to the subtypes
+ uniontype = get_origin(fieldtype)
+ if uniontype is Union:
+ for element_type in get_args(fieldtype):
+ try:
+ promote = element_type.promote
+ except AttributeError:
+ promote = element_type
+ try:
+ obj = promote(obj)
+ except (ValueError, TypeError):
+ pass
+ else:
+ break
+ else:
+ raise TypeError(
+ f'{cls.__name__}.promote_field_value({fieldname=},obj={r(obj)}): cannot promote obj'
+ )
+ else:
+ obj = pfx_call(promote, obj)
+ return obj
+
+ @classmethod
+ @bcmethod
+ def parse(cls, bfr: CornuCopyBuffer):
+ ''' Parse an instance from `bfr`.
+ This default implementation calls `cls(**cls.parse_fields(bfr))`.
+ '''
+ parse_fields = cls.parse_fields
+ fields = parse_fields(bfr)
+ return cls(**fields)
+
+ @bcmethod
+ def transcribe(self):
+ ''' Transcribe this instance.
+ '''
+ cls = self.__class__
+ for fieldname in cls._datafieldtypes:
+ yield getattr(self._data, fieldname).transcribe()
+
+ cls.name0 = name0
+ cls.__name__ = f'{name0}__original'
+ assert BinClass._baseclass is cls
+ assert BinClass._dataclass is dcls
+ BinClass.__name__ = name0
+ return BinClass
+
def BinaryFixedBytes(class_name: str, length: int):
''' Factory for an `AbstractBinary` subclass matching `length` bytes of data.
The bytes are saved as the attribute `.data`.
'''
- return BinarySingleStruct(class_name, f'>{length}s', 'data')
+ return BinaryStruct(class_name, f'>{length}s', 'data')
-class BinaryUTF8NUL(BinarySingleValue):
+class BinaryUTF8NUL(BinarySingleValue, value_type=str):
''' A NUL terminated UTF-8 string.
'''
@@ 1649,7 2404,7 @@ class BinaryUTF8NUL(BinarySingleValue):
nul = bfr.take(1)
if nul != b'\0':
raise RuntimeError(
- "after %d bytes, expected NUL, found %r" % (nul_pos, nul)
+ f'after {nul_pos} bytes, expected NUL, found {nul!r}'
)
return utf8
@@ 1661,7 2416,7 @@ class BinaryUTF8NUL(BinarySingleValue):
yield s.encode('utf-8')
yield b'\0'
-class BinaryUTF16NUL(BinarySingleValue):
+class BinaryUTF16NUL(BinarySingleValue, value_type=str):
''' A NUL terminated UTF-16 string.
'''
@@ 1682,8 2437,7 @@ class BinaryUTF16NUL(BinarySingleValue):
def __init__(self, value: str, *, encoding: str):
if encoding not in self.VALID_ENCODINGS:
raise ValueError(
- 'unexpected encoding %r, expected one of %r' %
- (encoding, self.VALID_ENCODINGS)
+ f'unexpected {encoding=}, expected one of {self.VALID_ENCODINGS!r}'
)
self.encoding = encoding
self.value = value
@@ 1731,3 2485,30 @@ class BinaryUTF16NUL(BinarySingleValue):
'''
yield value.encode(encoding)
yield b'\0\0'
+
+if __name__ == '__main__':
+
+ @binclass
+ class HeaderStruct:
+ """A header containing a count and some flags."""
+
+ count: UInt32BE
+ flags: UInt8
+
+ @binclass
+ class Packet(HeaderStruct):
+ ''' The Packet, subclassing HeaderStruct. '''
+
+ body_text: BSString
+ body_data: BSData
+ body_longs: BinaryStruct('ll_longs', '>LL', 'long1 long2')
+
+ packet = Packet(
+ count=5,
+ flags=0x03,
+ body_text="hello",
+ body_data=b'xyzabc',
+ body_longs=(10, 20),
+ )
+ print(repr(packet))
+ ##breakpoint()
M lib/python/cs/buffer.py +27 -14
@@ 18,11 18,12 @@ import mmap
from stat import S_ISREG
import sys
from threading import Lock, Thread
+from typing import List
from cs.deco import Promotable
-from cs.gimmicks import r
+from cs.gimmicks import Buffer, r
-__version__ = '20250111-post'
+__version__ = '20250428-post'
DISTINFO = {
'keywords': ["python3"],
@@ 31,9 32,8 @@ DISTINFO = {
"Programming Language :: Python :: 3",
"Development Status :: 5 - Production/Stable",
],
- 'markers': [
- 'python_version>=3.3', # for os.pread
- ],
+ 'requires_python':
+ '>=3.3', # for os.pread
'install_requires': [
'cs.deco',
'cs.gimmicks',
@@ 395,6 395,22 @@ class CornuCopyBuffer(Promotable):
return bfr
@classmethod
+ def from_cli_filespec(cls, filespec: str, **kw):
+ ''' Return a `CornuCopyBuffer` fed from the supplied command
+ line file specification `filespec`.
+
+ If `filespec` is `"-"` return a buffer using `sys.stdin`,
+ otherwise treat it as a filename.
+
+ Note: the use of `sys.stdin` relies on `sys.stdin.fileno()`
+ because we need to do binary reads and `sys.stdin` is
+ normally in text mode.
+ '''
+ if filespec == '-':
+ return cls.from_fd(sys.stdin.fileno(), **kw)
+ return cls.from_filename(filespec, **kw)
+
+ @classmethod
def from_bytes(cls, bs, offset=0, length=None, **kw):
''' Return a `CornuCopyBuffer` fed from the supplied bytes `bs`
starting at `offset` and ending after `length`.
@@ 620,7 636,7 @@ class CornuCopyBuffer(Promotable):
while size < len(self):
self.extend(size, short_ok=True)
- def takev(self, size, short_ok=False):
+ def takev(self, size, short_ok=False) -> List[Buffer]:
''' Return the next `size` bytes as a list of chunks
(because the internal buffering is also a list of chunks).
Other arguments are as for `.extend()`.
@@ 633,12 649,9 @@ class CornuCopyBuffer(Promotable):
# extend the buffered data
self.extend(size, short_ok=short_ok)
# post: the buffer is as big as it is going to get for this call
- if size is Ellipsis:
+ if size is Ellipsis or size >= self.buflen:
# take all the fetched data
- taken = self.bufs
- self.bufs = []
- elif size >= self.buflen:
- # take the whole buffer
+ # which should be all the data because of the .extend() above
taken = self.bufs
self.bufs = []
else:
@@ 993,12 1006,12 @@ class CornuCopyBuffer(Promotable):
@classmethod
def promote(cls, obj):
''' Promote `obj` to a `CornuCopyBuffer`,
- used by the @cs.deco.promote` decorator.
+ used by the `@cs.deco.promote` decorator.
Promotes:
* `int`: assumed to be a file descriptor of a file open for binary read
* `str`: assumed to be a filesystem pathname
- * `bytes` and `bytes`like objects: data
+ * `bytes` and `bytes`like objects (`Buffer`s): binary data
* has a `.read1` or `.read` method: assume a file open for binary read
* iterable: assumed to be an iterable of `bytes`like objects
'''
@@ 1008,7 1021,7 @@ class CornuCopyBuffer(Promotable):
return cls.from_fd(obj)
if isinstance(obj, str):
return cls.from_filename(obj)
- if isinstance(obj, (bytes, bytearray, mmap.mmap, memoryview)):
+ if isinstance(obj, Buffer):
return cls.from_bytes(obj)
if hasattr(obj, 'read1') or hasattr(obj, 'read'):
return cls.from_file(obj)
M lib/python/cs/cmdutils.py +38 -28
@@ 9,12 9,12 @@
and other command line related stuff.
This module provides the following main items:
+ - `BaseCommand`: a base class for creating command line programmes
+ with easier setup and usage than libraries like `optparse` or `argparse`
+ - `@popopts`: a decorator which works with `BaseCommand` subcommand
+ methods to parse their command line options
- `@docmd`: a decorator for command methods of a `cmd.Cmd` class
providing better quality of service
- - `BaseCommand`: a base class for creating command line programmes
- with easier setup and usage than libraries like `optparse` or `argparse`
- - `@popopts`: a decorator which works with `BaseCommand` command
- methods to parse their command line options
Editorial: why not arparse?
I find the whole argparse `add_argument` thing very cumbersome
@@ 79,7 79,7 @@ from cs.threads import HasThreadState, T
from cs.typingutils import subtype
from cs.upd import Upd, uses_upd, print # pylint: disable=redefined-builtin
-__version__ = '20250306-post'
+__version__ = '20250426-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 132,7 132,7 @@ def docmd(dofunc):
''' Run a `Cmd` "do" method with some context and handling.
'''
if not funcname.startswith('do_'):
- raise ValueError("function does not start with 'do_': %s" % (funcname,))
+ raise ValueError(f"function does not start with 'do_': {funcname}")
argv0 = funcname[3:]
with Pfx(argv0):
try:
@@ 145,7 145,7 @@ def docmd(dofunc):
exception("%s", e)
return None
- docmd_wrapper.__name__ = '@docmd(%s)' % (funcname,)
+ docmd_wrapper.__name__ = f'@docmd({funcname})'
docmd_wrapper.__doc__ = dofunc.__doc__
return docmd_wrapper
@@ 1340,24 1340,27 @@ class BaseCommand:
subcmd = None # default: no subcmd specific usage available
try:
getopt_spec = getattr(self, 'GETOPT_SPEC', '')
- # catch bare -h or --help if no 'h' in the getopt_spec
+ # catch bare -help or --help or -h (if no 'h' in the getopt_spec)
if (len(argv) == 1
and (argv[0] in ('-help', '--help') or
('h' not in getopt_spec and argv[0] in ('-h',)))):
argv = self._argv = ['help', '-l']
+ has_subcmds = True # fake this mode in order to run cmd_help
else:
- if getopt_spec:
- # legacy GETOPT_SPEC mode
- # we do this regardless in order to honour '--'
- opts, argv = getopt(argv, getopt_spec, '')
- self.apply_opts(opts)
- else:
- # modern mode
- # use the options.COMMON_OPT_SPECS
- options.popopts(argv)
+ # ordinary mode: leading options then preargv
+ if has_subcmds:
+ # do the leading argument parse
+ if not getopt_spec:
+ # the modern way with popopts
+ # use the options.COMMON_OPT_SPECS
+ options.popopts(argv)
+ else:
+ # legacy GETOPT_SPEC mode
+ opts, argv = getopt(argv, getopt_spec, '')
+ self.apply_opts(opts)
# We do this regardless so that subclasses can do some presubcommand parsing
# _after_ any command line options.
- argv = self._argv = self.apply_preargv(argv)
+ argv = self.apply_preargv(argv)
# now prepare self._run, a callable
if not has_subcmds:
# no subcommands, just use the main() method
@@ 1366,6 1369,7 @@ class BaseCommand:
except AttributeError:
# pylint: disable=raise-missing-from
raise GetoptError("no main method and no subcommand methods")
+ self._argv = argv
self._run = self.SubCommandClass(self, main)
else:
# expect a subcommand on the command line
@@ 1374,9 1378,10 @@ class BaseCommand:
# looks like a subcommand name, take it
argv.pop(0)
else:
- # not a command name
+ # not a command name, is there a default command?
default_argv = self.SUBCOMMAND_ARGV_DEFAULT
if not default_argv:
+ # no, emit the short help
warning(
"missing subcommand, expected one of: %s",
', '.join(sorted(subcmds.keys()))
@@ 1403,6 1408,7 @@ class BaseCommand:
with Pfx(subcmd):
return subcommand(argv)
+ self._argv = argv
self._run = _run
except GetoptError as e:
if self.getopt_error_handler(
@@ 1511,9 1517,8 @@ class BaseCommand:
subusage_format, *_ = doc.split('\n\n', 1)
else:
# default usage text - include the docstring below a header
- subusage_format = "\n ".join(
- ['{cmd} ...'] + [doc.split('\n\n', 1)[0]]
- )
+ paragraph1 = doc.split("\n\n", 1)[0]
+ subusage_format = f'{cmd} ...\n {paragraph1}'
if subusage_format:
if short:
subusage_format, *_ = subusage_format.split('\n', 1)
@@ 1553,7 1558,7 @@ class BaseCommand:
and would imply that a `GETOPT_SPEC` was supplied
without an `apply_opt` or `apply_opts` method to implement the options.
'''
- raise NotImplementedError("unhandled option %r" % (opt,))
+ raise NotImplementedError(f'unhandled option {opt!r}')
def apply_opts(self, opts):
''' Apply command line options.
@@ 1563,7 1568,7 @@ class BaseCommand:
'''
badopts = False
for opt, val in opts:
- with Pfx(opt if val is None else "%s %r" % (opt, val)):
+ with Pfx(opt if val is None else f'{opt} {val!r}'):
try:
self.apply_opt(opt, val)
except GetoptError as e:
@@ 1577,9 1582,7 @@ class BaseCommand:
''' Do any preparsing of `argv` before the subcommand/main-args.
Return the remaining arguments.
- This default implementation applies the default options
- supported by `self.options` (an instance of `self.Options`
- class).
+ This default implementation does nothing.
'''
return argv
@@ 2019,6 2022,8 @@ class BaseCommandCmd(Cmd):
self.__command = command
def get_names(self):
+ ''' Return a list of the subcommand names.
+ '''
cmdcls = type(self.__command)
names = []
for method_name in dir(cmdcls):
@@ 2049,7 2054,7 @@ class BaseCommandCmd(Cmd):
return do_subcmd
if subcmd in ('EOF', 'exit', 'quit'):
return lambda _: True
- raise AttributeError("%s.%s" % (self.__class__.__name__, attr))
+ raise AttributeError(f'{self.__class__.__name__}.{attr}')
@uses_cmd_options(quiet=False, verbose=False)
def qvprint(*print_a, quiet, verbose, **print_kw):
@@ 2067,9 2072,14 @@ def vprint(*print_a, **qvprint_kw):
if __name__ == '__main__':
class DemoCommand(BaseCommand):
+ ''' A deomnstration CLI.
+ '''
@popopts
def cmd_demo(self, argv):
+ ''' Usage: {cmd} [args...]
+ Demonstration subcommand.
+ '''
print("This is a demo.")
print("argv =", argv)
M lib/python/cs/debug.py +71 -73
@@ 49,7 49,13 @@ from types import SimpleNamespace as NS
from cs.deco import ALL, decorator
from cs.fs import shortpath
-from cs.lex import s, r, is_identifier, is_dotted_identifier # pylint: disable=unused-import
+from cs.lex import (
+ cropped_repr,
+ s,
+ r,
+ is_identifier,
+ is_dotted_identifier,
+) # pylint: disable=unused-import
import cs.logutils
from cs.logutils import debug, error, warning, D, ifdebug, loginfo
from cs.obj import Proxy
@@ 670,6 676,7 @@ def trace(
'''
citation = funcname(func) ## funccite(func)
+ fmtv = pformat if use_pformat else cropped_repr
def traced_function_wrapper(*a, **kw):
''' Wrapper for `func` to trace call and return.
@@ 689,7 696,7 @@ def trace(
log_cite = log_cite + "from[%s]" % (caller(),)
if call:
fmt, av = func_a_kw_fmt(log_cite, *a, **kw)
- xlog("%sCALL " + fmt, _trace_state.indent, *av)
+ xlog("%sCALL " + fmt, _trace_state.indent, *av)
old_indent = _trace_state.indent
_trace_state.indent += ' '
start_time = time.time()
@@ 702,89 709,80 @@ def trace(
if xlog is X:
xlog_kw['colour'] = 'red'
xlog(
- "%sCALL %s %gs RAISE %r",
- _trace_state.indent,
+ "%sRAISE %s => %s at %gs",
+ old_indent,
log_cite,
+ e,
end_time - start_time,
- e,
**xlog_kw,
)
_trace_state.indent = old_indent
raise
+ end_time = time.time()
+ if inspect.isgeneratorfunction(func):
+ iterator = result
+
+ def traced_generator():
+ while True:
+ next_time = time.time()
+ if call:
+ xlog(
+ "%sNEXT %s at %gs",
+ old_indent,
+ log_cite,
+ next_time - start_time,
+ )
+ try:
+ item = next(iterator)
+ except StopIteration:
+ yield_time = time.time()
+ xlog(
+ "%sDONE %s in %gs",
+ old_indent,
+ log_cite,
+ yield_time - next_time,
+ )
+ break
+ except Exception as e:
+ end_time = time.time()
+ if exception:
+ xlog_kw = {}
+ if xlog is X:
+ xlog_kw['colour'] = 'red'
+ xlog(
+ "%sRAISE %s => %s at %gs",
+ old_indent,
+ log_cite,
+ e,
+ end_time - start_time,
+ **xlog_kw,
+ )
+ _trace_state.indent = old_indent
+ raise
+ else:
+ yield_time = time.time()
+ xlog(
+ "%sYIELD %s => %s at %gs",
+ old_indent,
+ log_cite,
+ fmtv(item),
+ yield_time - next_time,
+ )
+ yield item
+
+ result = traced_generator()
else:
- end_time = time.time()
+ ##xlog("%sRETURN %s <= %s", _trace_state.indent, type(result), log_cite)
if retval:
xlog(
- "%sCALL %s %gs RETURN %s",
- _trace_state.indent,
+ "%sRETURN %s => %s in %gs",
+ old_indent, ##_trace_state.indent,
log_cite,
+ fmtv(result),
end_time - start_time,
- (pformat if use_pformat else repr)(result),
)
- if inspect.isgeneratorfunction(func):
- iterator = result
-
- def traced_generator():
- while True:
- next_time = time.time()
- if call:
- xlog(
- "%sNEXT %s %gs ...",
- _trace_state.indent,
- log_cite,
- next_time - start_time,
- )
- try:
- item = next(iterator)
- except StopIteration:
- yield_time = time.time()
- xlog(
- "%sDONE %s %gs ...",
- _trace_state.indent,
- log_cite,
- yield_time - next_time,
- )
- break
- except Exception as e:
- end_time = time.time()
- if exception:
- xlog_kw = {}
- if xlog is X:
- xlog_kw['colour'] = 'red'
- xlog(
- "%sCALL %s %gs RAISE %r",
- _trace_state.indent,
- log_cite,
- end_time - start_time,
- e,
- **xlog_kw,
- )
- _trace_state.indent = old_indent
- raise
- else:
- yield_time = time.time()
- xlog(
- "%sYIELD %gs %s <= %s",
- _trace_state.indent,
- yield_time - next_time,
- s(item),
- log_cite,
- )
- yield item
-
- result = traced_generator()
- else:
- ##xlog("%sRETURN %s <= %s", _trace_state.indent, type(result), log_cite)
- if retval:
- xlog(
- "%sRETURN %gs %s <= %s",
- _trace_state.indent,
- end_time - start_time,
- s(result),
- log_cite,
- )
- _trace_state.indent = old_indent
- return result
+ _trace_state.indent = old_indent
+ return result
traced_function_wrapper.__name__ = "@trace(%s)" % (citation,)
traced_function_wrapper.__doc__ = "@trace(%s)\n\n" + (func.__doc__ or '')
M lib/python/cs/deco.py +31 -29
@@ 16,8 16,9 @@ import traceback
import typing
from cs.gimmicks import warning
+from cs.typingutils import is_optional
-__version__ = '20250306-post'
+__version__ = '20250428-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 25,7 26,7 @@ DISTINFO = {
"Programming Language :: Python",
"Programming Language :: Python :: 3",
],
- 'install_requires': ['cs.gimmicks'],
+ 'install_requires': ['cs.gimmicks', 'cs.typeutils'],
}
def ALL(func):
@@ 101,33 102,39 @@ def decorator(deco):
def decorate(func, *dargs, **dkwargs):
''' Final decoration when we have the function and the decorator arguments.
'''
- # decorate func
+ # First, collect the attributes of the function/class before deco() has at it.
+ func_doc = getattr(func, '__doc__', None) or ''
+ func_module = getattr(func, '__module__', None)
+ func_name = getattr(func, '__name__', str(func))
+ # Now decorate func.
decorated = deco(func, *dargs, **dkwargs)
- # catch mucked decorators which forget to return the new function
+ # Catch mucked decorators which forget to return the new function.
assert decorated is not None, (
"deco:%r(func:%r,...) -> None" % (deco, func)
)
if decorated is not func:
# We got a wrapper function back, pretty up the returned wrapper.
- # Try functools.update_wrapper, otherwise do stuff by hand.
try:
- from functools import update_wrapper # pylint: disable=import-outside-toplevel
- update_wrapper(decorated, func)
- except (AttributeError, ImportError):
+ from functools import update_wrapper
+ except ImportError:
+ pass
+ else:
try:
- decorated.__name__ = getattr(func, '__name__', str(func))
+ update_wrapper(decorated, func)
except AttributeError:
pass
- doc = getattr(func, '__doc__', None) or ''
- try:
- decorated.__doc__ = doc
- except AttributeError:
- warning("cannot set __doc__ on %r", decorated)
- func_module = getattr(func, '__module__', None)
- try:
- decorated.__module__ = func_module
- except AttributeError:
- pass
+ try:
+ decorated.__name__ = func_name
+ except AttributeError:
+ pass
+ try:
+ decorated.__doc__ = func_doc
+ except AttributeError:
+ warning("cannot set __doc__ on %r", decorated)
+ try:
+ decorated.__module__ = func_module
+ except AttributeError:
+ pass
return decorated
def metadeco(*da, **dkw):
@@ 946,14 953,10 @@ def promote(func, params=None, types=Non
# recognise optional parameters and use their primary type
optional = False
if param.default is not Parameter.empty:
- anno_origin = typing.get_origin(annotation)
- anno_args = typing.get_args(annotation)
- # recognise Optional[T], which becomes Union[T,None]
- if (anno_origin is typing.Union and len(anno_args) == 2
- and anno_args[-1] is type(None)):
+ opt_type = is_optional(annotation)
+ if opt_type:
optional = True
- annotation, _ = anno_args
- optional = True
+ annotation = opt_type
if types is not None and annotation not in types:
continue
try:
@@ 1102,6 1105,5 @@ class Promotable:
pass
else:
return from_type(obj, **from_t_kw)
- raise TypeError(
- f'{cls.__name__}.promote: cannot promote {obj.__class__.__name__}:{obj!r}'
- )
+ # try instantiating the class with obj as its sole argument
+ return cls(obj)
M lib/python/cs/distinfo.py +4 -1
@@ 430,7 430,7 @@ class Module:
return False
return True
else:
- return self.name in stdlib_module_names
+ return self.name.split('.')[0] in stdlib_module_names
@cached_property
@pfx_method(use_str=True)
@@ 868,6 868,9 @@ class Module:
urls=dinfo.pop('urls'),
classifiers=dinfo.pop('classifiers'),
)
+ python_version = dinfo.pop('requires_python', None)
+ if python_version is not None:
+ projspec['requires_python'] = python_version
version = dinfo.pop('version', None)
if version:
projspec['version'] = version
M lib/python/cs/fileutils.py +1 -1
@@ 53,7 53,7 @@ from cs.result import CancellationError
from cs.threads import locked, NRLock
from cs.units import BINARY_BYTES_SCALE
-__version__ = '20250103-post'
+__version__ = '20250429-post'
DISTINFO = {
'keywords': ["python2", "python3"],
M lib/python/cs/fstags.py +4 -4
@@ 624,7 624,7 @@ class FSTagsCommand(BaseCommand, TagsCom
r='recurse',
)
xit = 0
- paths = argv or ['.']
+ paths = argv or ('.',)
for path in paths:
fullpath = realpath(path)
for fspath in ((fullpath,)
@@ 765,7 765,7 @@ class FSTagsCommand(BaseCommand, TagsCom
else:
raise NotImplementedError("unsupported option")
xit = 0
- paths = argv or ['.']
+ paths = argv or ('.',)
for path in paths:
fullpath = realpath(path)
for fspath in ((fullpath,) if directories_like_files else scandirpaths(
@@ 1872,7 1872,7 @@ class TaggedPath(TagSet, HasFSTagsMixin,
*,
state_func: Optional[Callable[[str], Mapping[str, Any]]] = None,
) -> "CachedValue":
- ''' Return `CachedValue` managing the `prefix.name` tag.
+ ''' Return a `CachedValue` managing the `prefix.name` tag.
'''
return CachedValue(self, prefix, name, state_func=state_func)
@@ 2007,7 2007,7 @@ class TaggedPathSet:
'''
# the TaggedPaths
- members: set[TaggedPath]
+ members: Set[TaggedPath]
# mapping of (tag_name,tag_value) to TaggedPath
_by_tag_name: Mapping[str, Set[TaggedPath]] = field(
default_factory=lambda: defaultdict(set)
M lib/python/cs/gimmicks.py +6 -1
@@ 6,7 6,7 @@ Gimmicks and hacks to make some of my ot
less demanding of others.
'''
-__version__ = '20250323-post'
+__version__ = '20250428-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 22,6 22,11 @@ DISTINFO = {
# pylint: disable=unnecessary-lambda-assignment
try:
+ from collections.abc import Buffer
+except ImportError:
+ from typing import ByteString as Buffer
+
+try:
from contextlib import nullcontext # pylint: disable=unused-import
except ImportError:
from contextlib import contextmanager
M lib/python/cs/hashindex.py +104 -61
@@ 94,7 94,7 @@ from icontract import require
from typeguard import typechecked
from cs.cmdutils import BaseCommand, popopts, vprint
-from cs.context import contextif, reconfigure_file
+from cs.context import contextif
from cs.deco import fmtdoc, uses_verbose, uses_cmd_options
from cs.fs import needdir, RemotePath, shortpath
from cs.fstags import FSTags, uses_fstags
@@ 104,7 104,13 @@ from cs.logutils import warning
from cs.pfx import Pfx, pfx, pfx_call
from cs.psutils import pipefrom, run
from cs.resources import RunState, uses_runstate
-from cs.upd import above as above_upd, print, run_task # pylint: disable=redefined-builtin
+from cs.upd import (
+ above as above_upd,
+ print,
+ run_task, # pylint: disable=redefined-builtin
+ Upd,
+ uses_upd,
+)
__version__ = '20241207-post'
@@ 192,8 198,8 @@ class HashIndexCommand(BaseCommand):
warning(f'{hashname=} not known: {e}')
yield 1
else:
- with fstags:
- with super().run_context(**kw):
+ with super().run_context(**kw):
+ with fstags:
yield
@staticmethod
@@ 291,17 297,21 @@ class HashIndexCommand(BaseCommand):
r=(
'relative',
''' Emit relative paths in the listing.
- This requires each path to be a directory.''',
+ This requires each command line path to be a directory.''',
)
)
@uses_runstate
- def cmd_ls(self, argv, *, runstate: RunState):
+ @uses_upd
+ def cmd_ls(self, argv, *, runstate: RunState, upd: Upd):
''' Usage: {cmd} [options...] [[host:]path...]
Walk filesystem paths and emit a listing.
The default path is the current directory.
+ In quiet mode (-q) the hash indicies are just updated
+ and nothing is printed.
'''
options = self.options
output_format = options.output_format
+ quiet = options.quiet
relative = options.relative
if not argv:
argv = ['.']
@@ 315,21 325,27 @@ class HashIndexCommand(BaseCommand):
warning("not a directory and -r (relative) specified")
xit = 1
continue
- for h, fspath in hashindex(path, relative=relative):
- runstate.raiseif()
- if h is not None:
- print(output_format.format(hashcode=h, fspath=fspath))
+ current_dirpath = None
+ with run_task("scan") as proxy:
+ for h, fspath in hashindex(path, relative=relative):
+ runstate.raiseif()
+ dirpath = dirname(fspath)
+ if dirpath != current_dirpath:
+ proxy.text = shortpath(dirpath)
+ current_dirpath = dirpath
+ if h is not None:
+ quiet or print(output_format.format(hashcode=h, fspath=fspath))
return xit
@popopts(
- mv='move_mode',
+ ln=('link_mode', 'Hard link files instead of moving them.'),
s='symlink_mode',
)
@typechecked
def cmd_rearrange(self, argv):
''' Usage: {cmd} {{[[user@]host:]refdir|-}} [[user@]rhost:]srcdir [dstdir]
Rearrange files from srcdir into dstdir based on their positions in refdir.
- Other arguments:
+ Arguments:
refdir The reference directory, which may be local or remote
or "-" indicating that a hash index will be read from
standard input.
@@ 341,11 357,19 @@ class HashIndexCommand(BaseCommand):
options = self.options
badopts = False
doit = options.doit
- move_mode = options.move_mode
+ hashname = options.hashname
+ move_mode = not options.link_mode
quiet = options.quiet
- verbose = options.verbose
+ verbose = options.verbose or not quiet
symlink_mode = options.symlink_mode
- refdir = self.poppathspec(argv, 'refdir', check_isdir=True)
+ if not argv:
+ warning("missing refdir")
+ badopts = True
+ elif argv[0] == '-':
+ argv.pop(0)
+ refdir = None # read hashindex from standard input
+ else:
+ refdir = self.poppathspec(argv, 'refdir', check_isdir=True)
srcdir = self.poppathspec(argv, 'srcdir', check_isdir=True)
if argv:
dstdir = self.poppathspec(argv, 'dstdir', check_isdir=True)
@@ 360,49 384,58 @@ class HashIndexCommand(BaseCommand):
if badopts:
raise GetoptError('bad arguments')
xit = 0
- # scan the reference directory
- with run_task(f'scan refdir {refdir}'):
- fspaths_by_hashcode = hashindex_map(refdir, relative=True)
+ if refdir is None:
+ # read hash index from standard input
+ fspaths_by_hashcode = defaultdict(list)
+ for hashcode, fspath in read_hashindex(sys.stdin, hashname=hashname):
+ fspaths_by_hashcode[hashcode].append(fspath)
+ else:
+ # scan the reference directory
+ with run_task(f'scan refdir {refdir}'):
+ fspaths_by_hashcode = hashindex_map(refdir, relative=True)
if not fspaths_by_hashcode:
quiet or print("no files in refdir, nothing to rearrange")
return xit
# rearrange the source directory.
+ assert srcdir.host == dstdir.host
if srcdir.host is None:
# local srcdir and dstdir
- # make stdout line buffered if srcdir is local
- with contextif(
- not quiet,
- reconfigure_file,
- sys.stdout,
- line_buffering=True,
- ):
- rearrange(
- srcdir.fspath,
- fspaths_by_hashcode,
- dstdir.fspath,
- move_mode=move_mode,
- symlink_mode=symlink_mode,
- )
+ rearrange(
+ srcdir.fspath,
+ fspaths_by_hashcode,
+ dstdir.fspath,
+ move_mode=move_mode,
+ symlink_mode=symlink_mode,
+ verbose=verbose,
+ )
else:
# remote srcdir and dstdir
xit = remote_rearrange(
srcdir.host,
+ srcdir.fspath,
dstdir.fspath,
fspaths_by_hashcode,
move_mode=move_mode,
symlink_mode=symlink_mode,
+ verbose=verbose,
)
return xit
@uses_fstags
- @popopts(delete='Delete from dstdir, passes --delete to rsync.')
+ @popopts(
+ bwlimit_='Rsync bandwidth limit, passed to rsync.',
+ delete='Delete from dstdir, passed to rsync.',
+ partial='Keep partially transferred files, passed to rsync.',
+ )
def cmd_rsync(self, argv, *, fstags: FSTags):
''' Usage: {cmd} [options] srcdir dstdir
Rearrange dstdir according to srcdir then rsync srcdir into dstdir.
'''
options = self.options
+ bwlimit = options.bwlimit
delete = options.delete
doit = options.doit
+ partial = options.partial
quiet = options.quiet
runstate = options.runstate
ssh_exe = options.ssh_exe
@@ 413,15 446,9 @@ class HashIndexCommand(BaseCommand):
fspaths_by_hashcode = hashindex_map(srcdir, relative=True)
xit = 0
# rearrange the source directory.
- if dstdir.host is None:
- # local srcdir and dstdir
- # make stdout line buffered if srcdir is local
- with contextif(
- not quiet,
- reconfigure_file,
- sys.stdout,
- line_buffering=True,
- ):
+ with run_task(f'rearrange dstdir {dstdir}'):
+ if dstdir.host is None:
+ # local srcdir and dstdir
rearrange(
srcdir.fspath,
fspaths_by_hashcode,
@@ 429,28 456,31 @@ class HashIndexCommand(BaseCommand):
move_mode=True,
symlink_mode=False,
)
- else:
- # remote srcdir and dstdir
- xit = remote_rearrange(
- dstdir.host,
- dstdir.fspath,
- fspaths_by_hashcode,
- move_mode=True,
- symlink_mode=False,
- )
+ else:
+ # remote srcdir and dstdir
+ xit = remote_rearrange(
+ dstdir.host,
+ srcdir.fspath,
+ dstdir.fspath,
+ fspaths_by_hashcode,
+ move_mode=True,
+ symlink_mode=False,
+ )
if xit == 0:
# rsync source to destination
with above_upd():
run(
[
'rsync',
+ not doit and '-n',
('-e', ssh_exe),
+ not quiet and '-i',
+ verbose and '-v',
+ partial and '--partial',
+ bwlimit and ('--bwlimit', bwlimit),
+ doit and not quiet and sys.stderr.isatty() and '--progress',
'-ar',
delete and '--delete',
- not doit and '-n',
- not quiet and '-i',
- verbose and '-v',
- doit and not quiet and sys.stderr.isatty() and '--progress',
f'--exclude={fstags.tagsfile_basename}',
'--',
f'{srcdir}/',
@@ 706,6 736,9 @@ def hashindex_map(dirpath: str,
def dir_filepaths(dirpath: str, *, fstags: FSTags):
''' Generator yielding the filesystem paths of the files in `dirpath`.
'''
+ if not isdirpath(dirpath):
+ raise ValueError(f'dir_filepaths: not a directory: {dirpath=}')
+ # TODO: use cs.fs.scandirtree (os.walk ignores errors)
for subdirpath, dirnames, filenames in os.walk(dirpath):
dirnames[:] = sorted(dirnames)
for filename in sorted(filenames):
@@ 747,7 780,7 @@ def dir_remap(
dir_filepaths(srcdirpath), fspaths_by_hashcode, hashname=hashname
)
-@uses_cmd_options(doit=True, hashname=None)
+@uses_cmd_options(doit=True, hashname=None, verbose=True)
@uses_fstags
@uses_runstate
@require(
@@ 770,6 803,7 @@ def rearrange(
doit: bool,
fstags: FSTags,
runstate: RunState,
+ verbose: bool,
):
''' Rearrange the files in `dirpath` according to the
hashcode->[relpaths] `fspaths_by_hashcode`.
@@ 801,6 835,7 @@ def rearrange(
continue
filename = basename(srcpath)
if filename.startswith('.') or filename == fstags.tagsfile_basename:
+ # skip hidden or fstags files
continue
opname = "ln -s" if symlink_mode else "mv" if move_mode else "ln"
with Pfx(srcpath):
@@ 814,25 849,30 @@ def rearrange(
## "rdstpath:%r is not a clean subpath" % (rdstpath,)
##)
if rsrcpath == rdstpath:
+ # already there, skip
continue
dstpath = joinpath(dstdirpath, rdstpath)
if doit:
needdir(dirname(dstpath), use_makedirs=True, log=warning)
+ # merge the src to the dst
+ # do a real move if there is only one rfspaths
+ # otherwise a link and then a later remove
try:
merge(
srcpath,
dstpath,
opname=opname,
hashname=hashname,
- move_mode=False, # we do our own remove below
+ move_mode=move_mode and len(rfspaths) == 1,
symlink_mode=symlink_mode,
fstags=fstags,
doit=doit,
+ verbose=True,
)
except FileExistsError as e:
warning("%s %s -> %s: %s", opname, srcpath, dstpath, e)
else:
- if move_mode and rsrcpath not in rfspaths:
+ if move_mode and len(rfspaths) > 1 and rsrcpath not in rfspaths:
if doit:
to_remove.add(srcpath)
# purge the srcpaths last because we might want them multiple
@@ 854,6 894,7 @@ def rearrange(
@typechecked
def remote_rearrange(
rhost: str,
+ srcdir: str,
dstdir: str,
fspaths_by_hashcode: Mapping[BaseHashCode, List[str]],
*,
@@ 865,8 906,8 @@ def remote_rearrange(
symlink_mode: bool,
verbose: bool,
):
- ''' Rearrange a remote directory `dstdir` on `rhost`
- according to the hashcode mapping `fspaths_by_hashcode`.
+ ''' Rearrange a remote directory `srcdir` on `rhost` into `dstdir`
+ on `rhost` according to the hashcode mapping `fspaths_by_hashcode`.
'''
# remote srcdir and dstdir
# prepare the remote input
@@ 883,9 924,10 @@ def remote_rearrange(
('-h', hashname),
quiet and '-q',
verbose and '-v',
- move_mode and '--mv',
+ not (move_mode or symlink_mode) and '--ln',
symlink_mode and '-s',
'-',
+ RemotePath.str(None, srcdir),
RemotePath.str(None, dstdir),
],
hashindex_exe=hashindex_exe,
@@ 957,6 999,7 @@ def merge(
"# identical content at",
shortpath(dstpath),
verbose=verbose,
+ flush=True,
)
if doit:
pfx_call(os.remove, srcpath)
M lib/python/cs/iso14496.py +686 -620
@@ 14,41 14,59 @@ ISO make the standard available here:
from base64 import b64encode, b64decode
from collections import namedtuple
-from contextlib import contextmanager
-from datetime import datetime
+try:
+ from collections.abc import Buffer
+except ImportError:
+ from typing import ByteString as Buffer
+from contextlib import closing, contextmanager
+from datetime import datetime, UTC
+from functools import cached_property
from getopt import getopt, GetoptError
import os
import sys
-from typing import Iterable, List, Tuple
+from typing import Iterable, List, Mapping, Tuple, Union
+from uuid import UUID
from icontract import require
from typeguard import typechecked
from cs.binary import (
+ AbstractBinary,
UInt8,
Int16BE,
Int32BE,
UInt16BE,
UInt32BE,
UInt64BE,
+ BinaryBytes,
BinaryUTF8NUL,
BinaryUTF16NUL,
SimpleBinary,
BinaryListValues,
- BinaryMultiStruct,
+ BinaryStruct,
BinaryMultiValue,
BinarySingleValue,
+ ListOfBinary,
+ binclass,
+ parse_offsets,
pt_spec,
)
from cs.buffer import CornuCopyBuffer
-from cs.cmdutils import BaseCommand
+from cs.cmdutils import BaseCommand, popopts
+from cs.deco import decorator
from cs.fs import scandirpaths
from cs.fstags import FSTags, uses_fstags
from cs.imageutils import sixel_from_image_bytes
-from cs.lex import get_identifier, get_decimal_value
-from cs.logutils import warning
-from cs.pfx import Pfx, pfx_method, XP
-from cs.py.func import prop
+from cs.lex import (
+ cropped_repr,
+ cutsuffix,
+ get_identifier,
+ get_decimal_value,
+ printt,
+ tabulate,
+)
+from cs.logutils import warning, debug
+from cs.pfx import Pfx, pfx_call, pfx_method, XP
from cs.tagset import TagSet, Tag
from cs.threads import locked_property, ThreadState
from cs.units import transcribe_bytes_geek as geek, transcribe_time
@@ 160,15 178,13 @@ class MP4Command(BaseCommand):
B = deref_box(over_box, path)
print(path, "offset=%d" % B.offset, B)
+ @popopts(H=('skip_header', 'Skip the Box header.'))
def cmd_extract(self, argv):
''' Usage: {cmd} [-H] filename boxref output
Extract the referenced Box from the specified filename into output.
- -H Skip the Box header.
'''
- skip_header = False
- if argv and argv[0] == '-H':
- argv.pop(0)
- skip_header = True
+ options = self.options
+ skip_header = options.skip_header
if not argv:
warning("missing filename")
badopts = True
@@ 185,18 201,22 @@ class MP4Command(BaseCommand):
else:
output = argv.pop(0)
if argv:
- warning("extra argments after boxref: %s", ' '.join(argv))
+ warning("extra argments after boxref: %r", argv)
badopts = True
if badopts:
raise GetoptError("invalid arguments")
- over_box = parse(filename)
- over_box.dump()
+ top_box_type, *sub_box_types = boxref.split('.')
B = over_box
for box_type_s in boxref.split('.'):
B = getattr(B, box_type_s.upper())
- with Pfx(filename):
- fd = os.open(filename, os.O_RDONLY)
- bfr = CornuCopyBuffer.from_fd(fd)
+ bfr = CornuCopyBuffer.from_filename(filename)
+ with closing(bfr):
+ for topbox in Box.scan(bfr):
+ if topbox.box_type_s == top_box_type:
+ break
+ else:
+ warning("no top box of type %r found", top_box_type)
+ return 1
offset = B.offset
need = B.length
if skip_header:
@@ 211,7 231,6 @@ class MP4Command(BaseCommand):
chunk = chunk[need]
ofp.write(chunk)
need -= len(chunk)
- os.close(fd)
def cmd_info(self, argv):
''' Usage: {cmd} [{{-|filename}}]...]
@@ 253,41 272,97 @@ class MP4Command(BaseCommand):
print(' ', tag.name, '=', repr(tag.value))
return xit
- def cmd_parse(self, argv):
- ''' Usage: {cmd} [{{-|filename}}...]
+ @popopts(
+ with_data='Include the data components of boxes.',
+ with_fields='Include a line for each box field.',
+ with_offsets='Include Box and Box body offsets in the dump.',
+ )
+ def cmd_scan(self, argv):
+ ''' Usage: {cmd} [--with-data] [--with-fields] [{{-|filename}} [type_paths...]]
Parse the named files (or stdin for "-").
'''
+ options = self.options
if not argv:
argv = ['-']
- for spec in argv:
- with Pfx(spec):
- if spec == '-':
- parsee = sys.stdin.fileno()
- else:
- parsee = spec
- with PARSE_MODE(discard_data=True):
- over_box = parse(parsee)
- over_box.dump(crop_length=None)
+ filespec = argv.pop(0)
+ type_paths = list(argv)
+ xit = 0
+ with Pfx("%r", filespec):
+ print(filespec)
+ try:
+ bfr = CornuCopyBuffer.from_cli_filespec(filespec)
+ except FileNotFoundError as e:
+ warning("scannot scan: %s", e)
+ return 1
+ with PARSE_MODE(discard_data=not options.with_data):
+ rows = []
+ seen_paths = {path: False for path in type_paths}
+ scan_table = []
+ for topbox in Box.scan(bfr):
+ if not type_paths:
+ scan_table.extend(
+ topbox.dump_table(
+ recurse=True,
+ dump_fields=options.with_fields,
+ dump_offsets=options.with_offsets,
+ )
+ )
+ else:
+ for type_path in type_paths:
+ first_match = True
+ toptype, *tail_types = type_path.split('.')
+ if topbox.box_type_s == toptype:
+ if not tail_types:
+ if first_match:
+ print(type_path)
+ first_match = False
+ seen_paths[type_path] = True
+ scan_table.extend(
+ topbox.dump_table(
+ recurse=True,
+ dump_fields=options.with_fields,
+ dump_offsets=options.with_offsets,
+ indent=' '
+ )
+ )
+ else:
+ for subbox in topbox.descendants(tail_types):
+ if first_match:
+ print(type_path)
+ first_match = False
+ seen_paths[type_path] = True
+ scan_table.extend(
+ subbox.dump_table(
+ recurse=True,
+ dump_fields=options.with_fields,
+ dump_offsets=options.with_offsets,
+ indent=' '
+ )
+ )
+ printt(*scan_table)
+ if type_paths:
+ for type_path in type_paths:
+ if not seen_paths[type_path]:
+ warning("no match for %r", type_path)
+ xit = 1
+ return xit
+ @popopts(tag_prefix_='Specify the tag prefix, default {TAG_PREFIX!r}.')
def cmd_tags(self, argv):
- ''' Usage: {cmd} [{{-p,--prefix}} prefix] path
- Report the tags of `path` based on embedded MP4 metadata.
+ ''' Usage: {cmd} [--tag-prefix prefix] path
+ Report the tags of path based on embedded MP4 metadata.
'''
+ options = self.options
+ tag_prefix = options.tag_prefix
+ if tag_prefix is None:
+ tag_prefix = self.TAG_PREFIX
xit = 0
fstags = FSTags()
- tag_prefix = self.TAG_PREFIX
- opts, argv = getopt(argv, 'p:', longopts=['prefix'])
- for option, value in opts:
- with Pfx(option):
- if option in ('-p', '--prefix'):
- tag_prefix = value
- else:
- raise NotImplementedError("unsupported option")
if not argv:
raise GetoptError("missing path")
path = argv.pop(0)
if argv:
- raise GetoptError("extra arguments after path: %r" % (argv,))
+ raise GetoptError(f'extra arguments after path: {argv!r}')
with fstags:
out(path)
with Pfx(path):
@@ 378,7 453,7 @@ def deref_box(B, path):
B = nextB
return B
-Matrix9Long = BinaryMultiStruct(
+Matrix9Long = BinaryStruct(
'Matrix9Long', '>lllllllll', 'v0 v1 v2 v3 v4 v5 v6 v7 v8'
)
@@ 451,12 526,9 @@ class TimeStampMixin:
0xfffffffffffffffe, 0xffffffffffffffff):
return None
try:
- dt = datetime.utcfromtimestamp(self.value)
+ dt = pfx_call(datetime.fromtimestamp, self.value, UTC)
except (OverflowError, OSError) as e:
- warning(
- "%s.datetime: datetime.utcfromtimestamp(%s): %s, returning None",
- type(self).__name__, self.value, e
- )
+ warning("%s.datetime: returning None", type(self).__name__, e)
return None
return dt.replace(year=dt.year - 66)
@@ 494,12 566,11 @@ class BoxHeader(BinaryMultiValue('BoxHea
MAX_BOX_SIZE_32 = 2**32 - 8
@classmethod
+ @parse_offsets
def parse(cls, bfr: CornuCopyBuffer):
''' Decode a box header from `bfr`.
'''
self = cls()
- # note start of header
- self.offset = bfr.offset
box_size = UInt32BE.parse_value(bfr)
box_type = self.box_type = bfr.take(4)
if box_size == 0:
@@ 516,8 587,6 @@ class BoxHeader(BinaryMultiValue('BoxHea
self.user_type = bfr.take(16)
else:
self.user_type = None
- # note end of self
- self.end_offset = bfr.offset
self.type = box_type
return self
@@ 534,19 603,55 @@ class BoxHeader(BinaryMultiValue('BoxHea
if self.box_type == b'uuid':
yield self.user_type
+ @cached_property
+ def type_uuid(self) -> UUID:
+ ''' The `UUID` for the box header type, if `self.type` is `b'uuid'`,
+ made from `self.user_type`.
+ '''
+ if self.type != b'uuid':
+ raise AttributeError(
+ f'{self.__class__.__name__}.box_type_uuid: header type is not b"uuid"'
+ )
+ return UUID(bytes=self.user_type)
+
class BoxBody(SimpleBinary):
''' Abstract basis for all `Box` bodies.
'''
- FIELD_TYPES = dict(offset=int, post_offset=int)
+ FIELD_TYPES = dict(offset=int, end_offset=int)
SUBCLASSES_BY_BOXTYPE = {}
@classmethod
def __init_subclass__(cls, bodyclass_name=None, doc=None):
+ super().__init_subclass__()
if bodyclass_name is not None:
cls.__name__ = bodyclass_name
if doc is not None:
cls.__doc__ = doc
+ # apply some default docstrings to known methods
+ for method_name, method_doc_str in (
+ ('parse_fields', 'Gather the fields of `{cls.__name__}`.'),
+ ('transcribe', 'Transcribe a `{cls.__name__}`.'),
+ ):
+ method = getattr(cls, method_name)
+ if not (getattr(method, '__doc__', None) or '').strip():
+ try:
+ method.__doc__ = method_doc_str.format(cls=cls)
+ except AttributeError as e:
+ debug(
+ "%s: cannot set %s.__doc__: %s", cls.__name__, method.__name__, e
+ )
+ if cls.__name__ == 'BinClass':
+ # This came from the BinClass inside the @binclass decorator.
+ # Because this subclasses BoxBody (because it subclasses cls, a BoxBody)
+ # we get it when made, before it gets its __name__.
+ # Skip the registration here.
+ pass
+ else:
+ BoxBody._register_subclass_boxtypes(cls)
+
+ @staticmethod
+ def _register_subclass_boxtypes(cls, prior_cls=None):
# update the mapping of box_type to BoxBody subclass
try:
# explicit list of box_type byte strings
@@ 556,6 661,7 @@ class BoxBody(SimpleBinary):
try:
box_type = cls.boxbody_type_from_class()
except ValueError as e:
+ debug("cannot infer box type from cls %s %r: %s", cls, cls.__name__, e)
box_types = ()
else:
box_types = (box_type,)
@@ 567,9 673,13 @@ class BoxBody(SimpleBinary):
# new box_type as expected
SUBCLASSES_BY_BOXTYPE[box_type] = cls
else:
- raise TypeError(
- f'box_type {box_type!r} already in BoxBody.SUBCLASSES_BY_BOXTYPE as {existing_box_class.__name__}'
- )
+ if prior_cls is not None and existing_box_class is prior_cls:
+ # replace prior_cls with cls
+ SUBCLASSES_BY_BOXTYPE[box_type] = cls
+ else:
+ raise TypeError(
+ f'box_type {box_type!r} already in BoxBody.SUBCLASSES_BY_BOXTYPE as {existing_box_class.__name__}'
+ )
@staticmethod
@require(lambda box_type: len(box_type) == 4)
@@ 603,22 713,20 @@ class BoxBody(SimpleBinary):
If there are zero matches, return `None`.
Otherwise return the matching box.
'''
+ # .boxes - pretend we have an empty .boxes if missing
+ if attr == 'boxes':
+ return ()
# .TYPE - the sole item in self.boxes matching b'type'
if len(attr) == 4 and attr.isupper():
- box, = getattr(self, attr + 's')
+ box, = getattr(self, f'{attr}s')
return box
# .TYPEs - all items of self.boxes matching b'type'
- if len(attr) == 5 and (attr.endswith('s') or attr.endswith('0')):
+ # .TYPE0 - the sole box in self.boxes or None if empty
+ if len(attr) == 5 and attr.endswith(('s', '0')):
attr4 = attr[:4]
if attr4.isupper():
box_type = attr4.lower().encode('ascii')
- try:
- boxes = self.boxes
- except AttributeError:
- warning("%s.%s: no .boxes", self.__class__.__name__, attr)
- boxes = []
- else:
- boxes = [box for box in boxes if box.box_type == box_type]
+ boxes = [box for box in self.boxes if box.box_type == box_type]
if attr.endswith('s'):
return boxes
if attr.endswith('0'):
@@ 626,15 734,25 @@ class BoxBody(SimpleBinary):
return None
box, = boxes
return box
- raise AttributeError("%s.%s" % (type(self).__name__, attr))
+ gsa = super().__getattr__
+ try:
+ return gsa(attr)
+ except AttributeError as e:
+ raise AttributeError(f'{self.__class__.__name__}.{attr}') from e
- def __str__(self):
- return super().__str__(getattr(self, '_parsed_field_names', ()))
+ def __str__(self, attr_names=None):
+ if attr_names is None:
+ attr_names = sorted(
+ attr_name for attr_name in getattr(self, '_parsed_field_names', ())
+ if not attr_name.startswith('_') and attr_name != 'boxes'
+ )
+ return super().__str__(attr_names)
def __iter__(self):
- yield from ()
+ yield from self.boxes
@classmethod
+ @parse_offsets
def parse(cls, bfr: CornuCopyBuffer):
''' Create a new instance and gather the `Box` body fields from `bfr`.
@@ 655,7 773,7 @@ class BoxBody(SimpleBinary):
store the instance as the field `field_name+'__Binary'`
for transcription.
- Note that this disassociaes the plain value attribute
+ Note that this disassociates the plain value attribute
from what gets transcribed.
'''
instance = binary_cls.parse(bfr)
@@ 669,7 787,8 @@ class BoxBody(SimpleBinary):
`binary_cls` may also be an `int`, in which case that many
bytes are read from `bfr`.
'''
- if isinstance(binary_cls, int):
+ if binary_cls is ... or isinstance(binary_cls, int):
+ # collect raw data
value = bfr.take(binary_cls)
else:
value = pt_spec(binary_cls).parse(bfr)
@@ 700,11 819,11 @@ class BoxBody(SimpleBinary):
self._parsed_field_names,
)
- def parse_boxes(self, bfr: CornuCopyBuffer, **kw):
+ def parse_boxes(self, bfr: CornuCopyBuffer, **box_scan_kw):
''' Utility method to parse the remainder of the buffer as a
sequence of `Box`es.
'''
- self.boxes = list(Box.scan(bfr, **kw))
+ self.boxes = list(Box.scan(bfr, **box_scan_kw))
self._parsed_field_names.append('boxes')
@classmethod
@@ 712,12 831,26 @@ class BoxBody(SimpleBinary):
''' Compute the Box's 4 byte type field from the class name.
'''
class_name = cls.__name__
- if len(class_name) == 11 and class_name.endswith('BoxBody'):
- class_prefix = class_name[:4]
+ if ((class_prefix := cutsuffix(class_name,
+ ('BoxBody', 'BoxBody2'))) is not class_name
+ and len(class_prefix) == 4):
if class_prefix.rstrip('_').isupper():
return class_prefix.replace('_', ' ').lower().encode('ascii')
raise ValueError(f'no automatic box type for class named {class_name!r}')
+@decorator
+def boxbodyclass(cls):
+ ''' A decorator for `@binclass` style `BoxBody` subclasses
+ which reregisters the new binclass in the
+ `BoxBody.SUBCLASSES_BY_BOXTYPE` mapping.
+ '''
+ if not issubclass(cls, BoxBody):
+ raise TypeError(f'@boxbodyclass: {cls=} is not a subclass of BoxBody')
+ cls0 = cls
+ cls = binclass(cls0)
+ BoxBody._register_subclass_boxtypes(cls, cls0)
+ return cls
+
class Box(SimpleBinary):
''' Base class for all boxes - ISO14496 section 4.2.
@@ 745,12 878,12 @@ class Box(SimpleBinary):
try:
body = self.body
except AttributeError:
- s = "%s:NO_BODY" % (type_name,)
+ s = f'{type_name}:NO_BODY'
else:
- s = "%s[%d]:%s" % (type_name, self.parse_length, body)
+ s = f'{type_name}[{self.parse_length}]{body}'
unparsed_bs = getattr(self, 'unparsed_bs', None)
if unparsed_bs and unparsed_bs != b'\0':
- s += ":unparsed=%r" % (unparsed_bs[:16],)
+ s += f':{unparsed_bs[:16]=}'
return s
__repr__ = __str__
@@ 785,13 918,13 @@ class Box(SimpleBinary):
yield from iter(self.body)
@classmethod
+ @parse_offsets(report=True)
def parse(cls, bfr: CornuCopyBuffer):
''' Decode a `Box` from `bfr` and return it.
'''
self = cls()
- self.offset = bfr.offset
header = self.header = BoxHeader.parse(bfr)
- with Pfx("%s.parse", header.box_type):
+ with Pfx("%s[%s].parse", cls.__name__, header.box_type):
length = header.box_size
if length is Ellipsis:
end_offset = Ellipsis
@@ 804,22 937,17 @@ class Box(SimpleBinary):
body_offset = bfr_tail.offset
self.body = body_class.parse(bfr_tail)
# attach subBoxen to self
- boxes = getattr(self.body, 'boxes', None)
- if boxes:
- for box in boxes:
- box.parent = self
+ for subbox in self.body.boxes:
+ subbox.parent = self
self.body.parent = self
self.body.offset = body_offset
- self.body.post_offset = bfr_tail.offset
self.body.self_check()
self.unparsed_offset = bfr_tail.offset
self.unparsed = list(bfr_tail)
if bfr_tail is not bfr:
assert not bfr_tail.bufs, "bfr_tail.bufs=%r" % (bfr_tail.bufs,)
bfr_tail.flush()
- self.end_offset = bfr.offset
self.self_check()
- bfr.report_offset(self.offset)
copy_boxes = PARSE_MODE.copy_boxes
if copy_boxes:
copy_boxes(self)
@@ 934,20 1062,32 @@ class Box(SimpleBinary):
return self.header.type
@property
- def box_type_s(self):
+ def box_type_s(self) -> str:
''' The `Box` header type as a string.
- If the header type bytes decode as ASCII, return that,
- otherwise the header bytes' repr().
+ If the header type is a UUID, return its `str` form.
+ Otherwise, if the header type bytes decode as ASCII, return that.
+ Otherwise the header bytes' repr().
'''
box_type_b = bytes(self.box_type)
+ if box_type_b == b'uuid':
+ return str(self.box_type_uuid)
try:
box_type_name = box_type_b.decode('ascii')
except UnicodeDecodeError:
box_type_name = repr(box_type_b)
+ else:
+ if not all(c.isprintable() for c in box_type_name):
+ box_type_name = repr(box_type_b)
return box_type_name
@property
+ def box_type_uuid(self) -> UUID:
+ ''' The `Box` header type `UUID` for boxes whose `box_type` is `b'uuid'`.
+ '''
+ return self.header.type_uuid
+
+ @property
def box_type_path(self):
''' The type path to this Box.
'''
@@ 972,15 1112,20 @@ class Box(SimpleBinary):
'''
return self.header.user_type
- # NB: a @property instead of @prop to preserve AttributeError
@property
def BOX_TYPE(self):
- ''' The default .BOX_TYPE is inferred from the class name.
+ ''' The default `.BOX_TYPE` is inferred from the class name.
'''
- return type(self).boxbody_type_from_class()
+ try:
+ return self.boxbody_type_from_class()
+ except ValueError as e:
+ raise AttributeError(
+ f'no {self.__class__.__name__}.BOX_TYPE: {e}'
+ ) from e
def ancestor(self, box_type):
''' Return the closest ancestor box of type `box_type`.
+ Raise `ValueError` if there is no such ancestor.
'''
if isinstance(box_type, str):
box_type = box_type.encode('ascii')
@@ 989,31 1134,110 @@ class Box(SimpleBinary):
if parent.box_type == box_type:
return parent
parent = parent.parent
- return parent
+ raise ValueError(f'no ancestor with {box_type=}')
+
+ def descendants(self, sub_box_types: str | List):
+ ''' A generator to scan descendants of this box for boxes
+ matching `sub_box_types`.
+
+ The `sub_box_types` may be a dot separated string or a list.
+ '''
+ if isinstance(sub_box_types, str):
+ sub_box_types = sub_box_types.split('.')
+ box_type_s, *tail_box_types = sub_box_types
+ for subbox in self.boxes:
+ if subbox.box_type_s == box_type_s:
+ if tail_box_types:
+ yield from subbox.descendants(tail_box_types)
+ else:
+ yield subbox
- def dump(self, **kw):
- ''' Dump this Box.
+ def dump_table(
+ self,
+ table=None,
+ indent='',
+ subindent=' ',
+ dump_fields=False,
+ dump_offsets=False,
+ recurse=False,
+ file=None
+ ) -> List[Tuple[str, str]]:
+ ''' Dump this `Box`
'''
- return dump_box(self, **kw)
+ if table is None:
+ table = []
+ for level, box, subboxes in self.walk(limit=(None if recurse else 0)):
+ row_indent = indent + subindent * level
+ body = box.body
+ if body.__class__ is BoxBody:
+ box_desc = ''
+ else:
+ box_desc = body.__class__.__doc__.strip().split("\n")[0]
+ if dump_fields:
+ table.append(
+ (
+ f'{row_indent}{box.box_type_s}:{body.__class__.__name__}',
+ box_desc,
+ )
+ )
+ else:
+ box_content = str(body)
+ box_content__ = cutsuffix(box_content, '()')
+ if box_content__ is not box_content:
+ if box_desc:
+ box_content = f'{box_content__}: {box_desc}'
+ else:
+ box_content = box_content__
+ table.append((
+ f'{row_indent}{box.box_type_s}',
+ box_content,
+ ))
+ # indent the subrows
+ if dump_fields:
+ field_indent = row_indent + subindent
+ for field_name in sorted(filter(
+ lambda name: all((
+ not name.startswith('_'),
+ (dump_offsets or name not in ('offset', 'end_offset')),
+ (not recurse or name not in ('boxes',)),
+ )),
+ body.__dict__.keys(),
+ )):
+ field = getattr(body, field_name)
+ table.append((f'{field_indent}.{field_name}', cropped_repr(field)))
+ return table
- def walk(self) -> Iterable[Tuple["Box", List["Box"]]]:
+ def dump(self, file=None, **dump_table_kw):
+ ''' Dump this `Box` to `file` (default `sys.stdout` per `cs.lex.printt`.
+ Other keyword paramaters are passed to `Box.dump_table`.
+ '''
+ printt(*self.dump_table(**dump_table_kw), file=file)
+
+ def walk(self,
+ *,
+ level=0,
+ limit=None) -> Iterable[Tuple["Box", List["Box"]]]:
''' Walk this `Box` hierarchy.
- Yields the starting box and its children as `(self,subboxes)`
- and then yields `(subbox,subsubboxes)` for each child in turn.
+ Yields the starting box and its children as `(level,self,subboxes)`
+ and then yields `(level+1,subbox,subsubboxes)` for each child in turn,
+ recursing into the subboxes.
As with `os.walk`, the returned `subboxes` list
may be modified in place to prune or reorder the subsequent walk.
'''
# We don't go list(self) or [].extend(self) because both of those fire
- # the transcription of the box because of list's preallocation heuristics.
- # Instead we make a bare iterator and list() that, specific
+ # the transcription of the box because of list's preallocation heuristics
+ # (it measures the length of each box).
+ # Instead we make a bare iterator and list() that; specific
# incantation from Peter Otten.
- subboxes = list(iter(self))
- yield self, subboxes
- for subbox in subboxes:
- if isinstance(subbox, Box):
- yield from subbox.walk()
+ subboxes = list(iter(self.boxes))
+ yield level, self, subboxes
+ if limit is None or limit > 0:
+ for subbox in subboxes:
+ yield from subbox.walk(
+ level=level + 1, limit=(None if limit is None else limit - 1)
+ )
def metatags(self):
''' Return a `TagSet` containing metadata for this box.
@@ 1024,8 1248,6 @@ class Box(SimpleBinary):
meta_box = self.META0
if meta_box:
tags.update(meta_box.tagset(), prefix=box_prefix + '.meta')
- else:
- pass # X("NO .META0")
udta_box = self.UDTA0
if udta_box:
pass # X("UDTA?")
@@ 1043,7 1265,7 @@ class Box(SimpleBinary):
''' Walk the `Box` hierarchy looking for metadata.
Yield `(Box,TagSet)` for each `b'moov'` or `b'trak'` `Box`.
'''
- for box, _ in self.walk():
+ for _, box, _ in self.walk():
if box.box_type in (b'moov', b'trak'):
yield box, box.metatags()
@@ 1051,6 1273,30 @@ class Box(SimpleBinary):
Box.FIELD_TYPES['parent'] = (False, (type(None), Box))
BoxBody.FIELD_TYPES['parent'] = Box
+class ListOfBoxes(ListOfBinary, item_type=Box):
+ ''' A `ListOfBinary` containing `Box`es.
+ '''
+
+ def __str__(self):
+ last_box_type = None
+ last_count = None
+ boxgroups = []
+ for box in self:
+ if last_box_type is None or last_box_type != box.type:
+ if last_box_type is not None:
+ boxgroups.append((box.box_type_s, last_count))
+ last_box_type = box.box_type
+ last_count = 1
+ else:
+ last_count += 1
+ if last_box_type is not None:
+ boxgroups.append((box.box_type_s, last_count))
+ type_listing = ",".join(
+ box_type_s if count == 1 else f'{box_type_s}[{count}]'
+ for box_type_s, count in boxgroups
+ )
+ return f'{self.__class__.__name__}:{len(self)}:{type_listing}'
+
def add_body_subclass(superclass, box_type, section, desc):
''' Create and register a new `BoxBody` class that is simply a subclass of
another.
@@ 1064,7 1310,7 @@ def add_body_subclass(superclass, box_ty
class _SubClass(
superclass,
bodyclass_name=classname,
- doc=f'An {box_type!r} {desc} box - ISO14496 section {section}.',
+ doc=f'A {box_type!r} {desc} box - ISO14496 section {section}.',
):
def transcribe(self):
@@ 1074,67 1320,8 @@ def add_body_subclass(superclass, box_ty
return _SubClass
-class HasBoxesMixin:
-
- def __iter__(self):
- return iter(self.boxes)
-
- def __getattr__(self, attr):
- # .TYPE - the sole item in self.boxes matching b'type'
- if len(attr) == 4 and attr.isupper():
- box, = getattr(self, attr + 's')
- return box
- # .TYPEs - all items of self.boxes matching b'type'
- if len(attr) == 5 and attr.endswith('s'):
- attr4 = attr[:4]
- if attr4.isupper():
- box_type = attr4.lower().encode('ascii')
- boxes = [box for box in self.boxes if box.box_type == box_type]
- return boxes
- raise AttributeError(type(self).__name__ + '.' + attr)
-
-class OverBox(BinaryListValues, HasBoxesMixin):
- ''' A fictitious `Box` encompassing all the Boxes in an input buffer.
- '''
-
- @property
- def boxes(self):
- ''' Alias `.value` as `.boxes`: the `Box`es encompassed by this `OverBox`.
- '''
- return self.values
-
- # TODO: this seems to parse a single `Box`: can we drop `OverBox`?
- @classmethod
- def parse(cls, bfr: CornuCopyBuffer):
- ''' Parse the `OverBox`.
- '''
- offset = bfr.offset
- self = super().parse(bfr, pt=Box)
- self.offset = offset
- self.end_offset = bfr.offset
- return self
-
- @property
- def length(self):
- ''' The `OverBox` is as long as the subsidary Boxes.
- '''
- return sum(map(len, self.boxes))
-
- def dump(self, **kw):
- ''' Dump this OverBox.
- '''
- return dump_box(self, **kw)
-
- def walk(self):
- ''' Walk the `Box`es in the `OverBox`.
-
- This does not yield the `OverBox` itself, it isn't really a `Box`.
- '''
- for box in self:
- yield from box.walk()
-
class FullBoxBody(BoxBody):
- ''' A common extension of a basic BoxBody, with a version and flags field.
+ ''' A common extension of a basic `BoxBody`, with a version and flags field.
ISO14496 section 4.2.
'''
@@ 1159,6 1346,22 @@ class FullBoxBody(BoxBody):
'''
return (self.flags0 << 16) | (self.flags1 << 8) | self.flags2
+@boxbodyclass
+class FullBoxBody2(BoxBody):
+ ''' A common extension of a basic `BoxBody`, with a version and flags field.
+ ISO14496 section 4.2.
+ '''
+ version: UInt8
+ flags0: UInt8
+ flags1: UInt8
+ flags2: UInt8
+
+ @property
+ def flags(self):
+ ''' The flags value, computed from the 3 flag bytes.
+ '''
+ return (self.flags0 << 16) | (self.flags1 << 8) | self.flags2
+
class MDATBoxBody(BoxBody):
''' A Media Data Box - ISO14496 section 8.1.1.
'''
@@ 1217,31 1420,14 @@ class FREEBoxBody(BoxBody):
if free_size > 0:
yield bytes(free_size)
+@boxbodyclass
class FTYPBoxBody(BoxBody):
''' An 'ftyp' File Type box - ISO14496 section 4.3.
Decode the major_brand, minor_version and compatible_brands.
'''
-
- FIELD_TYPES = dict(
- BoxBody.FIELD_TYPES,
- major_brand=bytes,
- minor_version=int,
- brands_bs=bytes,
- )
-
- def parse_fields(self, bfr: CornuCopyBuffer, **kw):
- ''' Gather the `major_brand`, `minor_version` and `brand_bs` fields.
- '''
- super().parse_fields(bfr, **kw)
- self.major_brand = bfr.take(4)
- self.minor_version = UInt32BE.parse_value(bfr)
- self.brands_bs = b''.join(bfr)
-
- @pfx_method
- def transcribe(self):
- yield self.major_brand
- yield UInt32BE.transcribe_value(self.minor_version)
- yield self.brands_bs
+ major_brand: 4
+ minor_version: UInt32BE
+ brands_bs: ...
@property
def compatible_brands(self):
@@ 1252,107 1438,99 @@ class FTYPBoxBody(BoxBody):
for offset in range(0, len(self.brands_bs), 4)
]
-class PDINBoxBody(FullBoxBody):
+class PDINBoxBody(FullBoxBody2):
''' A 'pdin' Progressive Download Information box - ISO14496 section 8.1.3.
'''
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- pdinfo=list,
- )
+ # field names for the tuples in a PDINBoxBody
+ PDInfo = BinaryStruct('PDInfo', '>LL', 'rate initial_delay')
- # field names for the tuples in a PDINBoxBody
- PDInfo = BinaryMultiStruct('PDInfo', '>LL', 'rate initial_delay')
+ class PDInfoList(ListOfBinary, item_type=PDInfo):
+ pass
+
+ pdinfo: PDInfoList
- def parse_fields(self, bfr: CornuCopyBuffer, **kw):
- ''' Gather the normal version information
- and then the `(rate,initial_delay)` pairs of the data section
- as the `pdinfo` field.
- '''
- super().parse_fields(bfr, **kw)
- self.add_field('pdinfo', list(PDINBoxBody.PDInfo.scan(bfr)))
-
+@boxbodyclass
class ContainerBoxBody(BoxBody):
- ''' Common subclass of several things with `.boxes`.
+ ''' Common superclass of several things with `.boxes`.
'''
-
- FIELD_TYPES = dict(BoxBody.FIELD_TYPES, boxes=list)
+ boxes: ListOfBoxes
- @pfx_method
- def parse_fields(self, bfr: CornuCopyBuffer):
- super().parse_fields(bfr)
- self.parse_boxes(bfr)
-
- def __iter__(self):
- return iter(self.boxes)
-
+@boxbodyclass
class MOOVBoxBody(ContainerBoxBody):
''' An 'moov' Movie box - ISO14496 section 8.2.1.
Decode the contained boxes.
'''
-class MVHDBoxBody(FullBoxBody):
+@boxbodyclass
+class MVHDBoxBody(FullBoxBody2):
''' An 'mvhd' Movie Header box - ISO14496 section 8.2.2.
'''
+ creation_time: Union[TimeStamp32, TimeStamp64]
+ modification_time: Union[TimeStamp32, TimeStamp64]
+ timescale: UInt32BE
+ duration: Union[UInt32BE, UInt64BE]
+ rate_long: Int32BE
+ volume_short: Int16BE
+ reserved1_: 10 # 2-reserved, 2x4 reserved
+ matrix: Matrix9Long
+ predefined1_: 24 # 6x4 predefined
+ next_track_id: UInt32BE
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- creation_time=(True, (TimeStamp32, TimeStamp64)),
- modification_time=(True, (TimeStamp32, TimeStamp64)),
- timescale=UInt32BE,
- duration=(True, (UInt32BE, UInt64BE)),
- rate_long=Int32BE,
- volume_short=Int16BE,
- reserved1=bytes,
- matrix=Matrix9Long,
- predefined1=bytes,
- next_track_id=UInt32BE,
- )
-
- def parse_fields(self, bfr: CornuCopyBuffer):
- super().parse_fields(bfr)
+ @classmethod
+ def parse_fields(cls, bfr: CornuCopyBuffer) -> Mapping[str, AbstractBinary]:
+ # parse the fixed fields from the superclass, FullBoxBody2
+ parse_fields = super().parse_fields
+ superfields = super()._datafieldtypes
+ field_values = parse_fields(bfr, superfields)
+ ##field_values = super().parse_fields(bfr)
+ version = field_values['version'].value
# obtain box data after version and flags decode
- if self.version == 0:
- self.parse_field('creation_time', bfr, TimeStamp32)
- self.parse_field('modification_time', bfr, TimeStamp32)
- self.parse_field('timescale', bfr, UInt32BE)
- self.parse_field('duration', bfr, UInt32BE)
- elif self.version == 1:
- self.parse_field('creation_time', bfr, TimeStamp64)
- self.parse_field('modification_time', bfr, TimeStamp64)
- self.parse_field('timescale', bfr, UInt32BE)
- self.parse_field('duration', bfr, UInt64BE)
+ if version == 0:
+ field_values.update(
+ super().parse_fields(
+ bfr,
+ dict(
+ creation_time=TimeStamp32,
+ modification_time=TimeStamp32,
+ timescale=UInt32BE,
+ duration=UInt32BE,
+ )
+ )
+ )
+ elif version == 1:
+ field_values.update(
+ super().parse_fields(
+ bfr,
+ dict(
+ creation_time=TimeStamp64,
+ modification_time=TimeStamp64,
+ timescale=UInt32BE,
+ duration=UInt64BE,
+ )
+ )
+ )
else:
- raise ValueError("MVHD: unsupported version %d" % (self.version,))
- self.parse_field('rate_long', bfr, Int32BE)
- self.parse_field('volume_short', bfr, Int16BE)
- self.parse_field('reserved1', bfr, 10) # 2-reserved, 2x4 reserved
- self.parse_field('matrix', bfr, Matrix9Long)
- self.parse_field('predefined1', bfr, 24) # 6x4 predefined
- self.parse_field('next_track_id', bfr, UInt32BE)
- return self
+ raise ValueError(f'{cls.__name__}: unsupported {version=}')
+ field_values.update(
+ super().parse_fields(
+ bfr,
+ [
+ 'rate_long', 'volume_short', 'reserved1_', 'matrix',
+ 'predefined1_', 'next_track_id'
+ ],
+ )
+ )
+ return field_values
- def transcribe(self):
- yield super().transcribe()
- yield self.creation_time
- yield self.modification_time
- yield self.timescale
- yield self.duration
- yield self.rate_long
- yield self.volume_short
- yield self.reserved1
- yield self.matrix
- yield self.predefined1
- yield self.next_track_id
-
- @prop
+ @property
def rate(self):
''' Rate field converted to float: 1.0 represents normal rate.
'''
rate_long = self.rate_long
return (rate_long >> 16) + (rate_long & 0xffff) / 65536.0
- @prop
+ @property
def volume(self):
''' Volume field converted to float: 1.0 represents full volume.
'''
@@ 1361,101 1539,104 @@ class MVHDBoxBody(FullBoxBody):
add_body_subclass(ContainerBoxBody, 'trak', '8.3.1', 'Track')
-class TKHDBoxBody(FullBoxBody):
+@boxbodyclass
+class TKHDBoxBody(FullBoxBody2):
''' A 'tkhd' Track Header box - ISO14496 section 8.2.2.
'''
- TKHDMatrix = BinaryMultiStruct(
+ TKHDMatrix = BinaryStruct(
'TKHDMatrix', '>lllllllll', 'v0 v1 v2 v3 v4 v5 v6 v7 v8'
)
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- creation_time=(True, (TimeStamp32, TimeStamp64)),
- modification_time=(True, (TimeStamp32, TimeStamp64)),
- track_id=UInt32BE,
- reserved1=UInt32BE,
- duration=(True, (UInt32BE, UInt64BE)),
- reserved2=UInt32BE,
- reserved3=UInt32BE,
- layer=Int16BE,
- alternate_group=Int16BE,
- volume=Int16BE,
- reserved4=UInt16BE,
- matrix=TKHDMatrix,
- width=UInt32BE,
- height=UInt32BE,
- )
+ creation_time: Union[TimeStamp32, TimeStamp64]
+ modification_time: Union[TimeStamp32, TimeStamp64]
+ track_id: UInt32BE
+ reserved1_: UInt32BE
+ duration: Union[UInt32BE, UInt64BE]
+ reserved2_: UInt32BE
+ reserved3_: UInt32BE
+ layer: Int16BE
+ alternate_group: Int16BE
+ volume: Int16BE
+ reserved4_: UInt16BE
+ matrix: TKHDMatrix
+ width: UInt32BE
+ height: UInt32BE
- def parse_fields(self, bfr: CornuCopyBuffer, **kw):
- super().parse_fields(bfr, **kw)
+ @classmethod
+ def parse_fields(cls, bfr: CornuCopyBuffer) -> Mapping[str, AbstractBinary]:
+ X("{cls.__name__=} parse_fields")
+ # parse the fixed fields from the superclass, FullBoxBody2
+ parse_fields = super().parse_fields
+ superfields = super()._datafieldtypes
+ field_values = parse_fields(bfr, superfields)
+ ##field_values = super().parse_fields(bfr)
+ version = field_values['version'].value
# obtain box data after version and flags decode
- if self.version == 0:
- self.parse_field('creation_time', bfr, TimeStamp32)
- self.parse_field('modification_time', bfr, TimeStamp32)
- self.parse_field('track_id', bfr, UInt32BE)
- self.parse_field('reserved1', bfr, UInt32BE)
- self.parse_field('duration', bfr, UInt32BE)
- elif self.version == 1:
- self.parse_field('creation_time', bfr, TimeStamp64)
- self.parse_field('modification_time', bfr, TimeStamp64)
- self.parse_field('track_id', bfr, UInt32BE)
- self.parse_field('reserved1', bfr, UInt32BE)
- self.parse_field('duration', bfr, UInt64BE)
+ if version == 0:
+ field_values.update(
+ super().parse_fields(
+ bfr,
+ dict(
+ creation_time=TimeStamp32,
+ modification_time=TimeStamp32,
+ track_id=UInt32BE,
+ reserved1_=UInt32BE,
+ duration=UInt32BE,
+ )
+ )
+ )
+ elif version == 1:
+ field_values.update(
+ super().parse_fields(
+ bfr,
+ dict(
+ creation_time=TimeStamp64,
+ modification_time=TimeStamp64,
+ track_id=UInt32BE,
+ reserved1_=UInt32BE,
+ duration=UInt64BE,
+ )
+ )
+ )
else:
- raise ValueError("TRHD: unsupported version %d" % (self.version,))
- self.parse_field('reserved2', bfr, UInt32BE)
- self.parse_field('reserved3', bfr, UInt32BE)
- self.parse_field('layer', bfr, Int16BE)
- self.parse_field('alternate_group', bfr, Int16BE)
- self.parse_field('volume', bfr, Int16BE)
- self.parse_field('reserved4', bfr, UInt16BE)
- self.parse_field('matrix', bfr, TKHDBoxBody.TKHDMatrix)
- self.parse_field('width', bfr, UInt32BE)
- self.parse_field('height', bfr, UInt32BE)
+ raise ValueError(f'{cls.__name__}: unsupported {version=}')
+ field_values.update(
+ super().parse_fields(
+ bfr,
+ [
+ 'reserved2_', 'reserved3_', 'layer', 'alternate_group',
+ 'volume', 'reserved4_', 'matrix', 'width', 'height'
+ ],
+ )
+ )
+ return field_values
- def transcribe(self):
- yield super().transcribe()
- yield self.creation_time
- yield self.modification_time
- yield self.track_id
- yield self.reserved1
- yield self.duration
- yield self.reserved2
- yield self.reserved3
- yield self.layer
- yield self.alternate_group
- yield self.volume
- yield self.reserved4
- yield self.matrix
- yield self.width
- yield self.height
-
- @prop
+ @property
def track_enabled(self):
''' Test flags bit 0, 0x1, track_enabled.
'''
return (self.flags & 0x1) != 0
- @prop
+ @property
def track_in_movie(self):
''' Test flags bit 1, 0x2, track_in_movie.
'''
return (self.flags & 0x2) != 0
- @prop
+ @property
def track_in_preview(self):
''' Test flags bit 2, 0x4, track_in_preview.
'''
return (self.flags & 0x4) != 0
- @prop
+ @property
def track_size_is_aspect_ratio(self):
''' Test flags bit 3, 0x8, track_size_is_aspect_ratio.
'''
return (self.flags & 0x8) != 0
- @prop
+ @property
def timescale(self):
''' The `timescale` comes from the movie header box (8.3.2.3).
'''
@@ 1511,6 1692,7 @@ add_body_subclass(
)
add_body_subclass(ContainerBoxBody, 'mdia', '8.4.1', 'Media')
+# TODO: as for MVHD
class MDHDBoxBody(FullBoxBody):
''' A MDHDBoxBody is a Media Header box - ISO14496 section 8.4.2.
'''
@@ 1522,7 1704,7 @@ class MDHDBoxBody(FullBoxBody):
timescale=UInt32BE,
duration=(True, (UInt32BE, UInt64BE)),
language_short=UInt16BE,
- pre_defined=UInt16BE,
+ pre_defined_=UInt16BE,
)
def parse_fields(self, bfr: CornuCopyBuffer):
@@ 1542,9 1724,9 @@ class MDHDBoxBody(FullBoxBody):
self.parse_field('timescale', bfr, UInt32BE)
self.parse_field('duration', bfr, UInt64BE)
else:
- raise NotImplementedError("unsupported version %d" % (self.version,))
+ raise NotImplementedError(f'unsupported {self.version=}')
self.parse_field('language_short', bfr, UInt16BE)
- self.parse_field('pre_defined', bfr, UInt16BE)
+ self.parse_field('pre_defined_', bfr, UInt16BE)
def transcribe(self):
yield super().transcribe()
@@ 1553,9 1735,9 @@ class MDHDBoxBody(FullBoxBody):
yield self.timescale
yield self.duration
yield self.language_short
- yield self.pre_defined
+ yield self.pre_defined_
- @prop
+ @property
def language(self):
''' The ISO 639‐2/T language code as decoded from the packed form.
'''
@@ 1569,130 1751,111 @@ class MDHDBoxBody(FullBoxBody):
]
).decode('ascii')
-class HDLRBoxBody(FullBoxBody):
+@boxbodyclass
+class HDLRBoxBody(FullBoxBody2):
''' A HDLRBoxBody is a Handler Reference box - ISO14496 section 8.4.3.
'''
-
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- pre_defined=UInt32BE,
- handler_type_long=UInt32BE,
- reserved1=UInt32BE,
- reserved2=UInt32BE,
- reserved3=UInt32BE,
- name=BinaryUTF8NUL,
- )
-
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `handler_type_long` and `name` fields.
- '''
- super().parse_fields(bfr)
- # NB: handler_type is supposed to be an unsigned long, but in
- # practice seems to be 4 ASCII bytes, so we present it as a string
- # for readability
- self.parse_field('pre_defined', bfr, UInt32BE)
- self.parse_field('handler_type_long', bfr, UInt32BE)
- self.parse_field('reserved1', bfr, UInt32BE)
- self.parse_field('reserved2', bfr, UInt32BE)
- self.parse_field('reserved3', bfr, UInt32BE)
- self.parse_field('name', bfr, BinaryUTF8NUL)
-
- def transcribe(self):
- yield super().transcribe()
- yield self.pre_defined
- yield self.handler_type_long
- yield self.reserved1
- yield self.reserved2
- yield self.reserved3
- yield self.name
+ pre_defined_: UInt32BE
+ handler_type_long: UInt32BE
+ reserved1_: UInt32BE
+ reserved2_: UInt32BE
+ reserved3_: UInt32BE
+ name: BinaryUTF8NUL
@property
def handler_type(self):
''' The handler_type as an ASCII string, its usual form.
'''
- return bytes(self.handler_type_long).decode('ascii')
+ return bytes(self._data.handler_type_long).decode('ascii')
add_body_subclass(ContainerBoxBody, b'minf', '8.4.4', 'Media Information')
add_body_subclass(FullBoxBody, 'nmhd', '8.4.5.2', 'Null Media Header')
-class ELNGBoxBody(FullBoxBody):
- ''' A ELNGBoxBody is a Extended Language Tag box - ISO14496 section 8.4.6.
+@boxbodyclass
+class ELNGBoxBody(FullBoxBody2):
+ ''' A `ELNGBoxBody` is a Extended Language Tag box - ISO14496 section 8.4.6.
'''
-
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- extended_language=BinaryUTF8NUL,
- )
-
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `extended_language` field.
- '''
- super().parse_fields(bfr)
- # extended language based on RFC4646
- self.parse_field('extended_language', bfr, BinaryUTF8NUL)
+ # extended language based on RFC4646
+ extended_language: BinaryUTF8NUL
- def transcribe(self):
- yield super().transcribe()
- yield self.extended_language
-
-add_body_subclass(ContainerBoxBody, b'stbl', '8.5.1', 'Sample Table')
+class EntryCountListOfBoxes(FullBoxBody2):
+ ''' An intermediate `FullBoxBody` subclass which contains more boxes
+ whose number if specified with a leading `entry_count`
+ whose defaut type is `UInt32BE`.
-class _SampleTableContainerBoxBody(FullBoxBody):
- ''' An intermediate FullBoxBody subclass which contains more boxes.
+ This is a common superclass of `_SampleTableContainerBoxBody` and
'''
+ boxes: ListOfBoxes
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- entry_count=UInt32BE,
- boxes=list,
- )
+ ENTRY_COUNT_TYPE = None
+
+ def __init_subclass__(cls, count_type=UInt32BE, **ickw):
+ super().__init_subclass__(**ickw)
+ cls.ENTRY_COUNT_TYPE = count_type
def __iter__(self):
return iter(self.boxes)
- def parse_fields(self, bfr: CornuCopyBuffer):
+ @property
+ def entry_count(self):
+ ''' The `entry_count` is the number of `Box`es.
+ '''
+ return len(self.boxes)
+
+ @classmethod
+ def parse_fields(cls, bfr: CornuCopyBuffer):
''' Gather the `entry_count` and `boxes`.
'''
- super().parse_fields(bfr)
- # obtain box data after version and flags decode
- self.entry_count = UInt32BE.parse(bfr)
- self.parse_boxes(bfr, count=int(self.entry_count.value))
+ # parse the fixed fields from the superclass, FullBoxBody2
+ parse_fields = super().parse_fields
+ superfields = super()._datafieldtypes
+ field_values = parse_fields(bfr, superfields)
+ entry_count = cls.ENTRY_COUNT_TYPE.parse_value(bfr)
+ field_values.update(boxes=ListOfBoxes.parse(bfr, count=entry_count))
+ self = cls(**field_values)
+ return self
def transcribe(self):
yield super().transcribe()
- yield self.entry_count
+ yield self.ENTRY_COUNT_TYPE(self.entry_count)
yield self.boxes
-add_body_subclass(
- _SampleTableContainerBoxBody, b'stsd', '8.5.2', 'Sample Description'
-)
+class _SampleTableContainerBoxBody(EntryCountListOfBoxes):
+ pass
+@boxbodyclass
class _SampleEntry(BoxBody):
''' Superclass of Sample Entry boxes.
'''
+ reserved_: 6
+ data_reference_index: UInt16BE
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `data_reference_inde` field.
- '''
- super().parse_fields(bfr)
- self.add_field('reserved', bfr.take(6))
- self.parse_field('data_reference_index', bfr, UInt16BE)
-
+@boxbodyclass
class BTRTBoxBody(BoxBody):
''' BitRateBoxBody - section 8.5.2.2.
'''
+ bufferSizeDB: UInt32BE
+ maxBitrate: UInt32BE
+ avgBitRate: UInt32BE
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `bufferSizeDB`, `maxBitrate` and `avgBitrate` fields.
- '''
- super().parse_fields(bfr)
- self.parse_field('bufferSizeDB', bfr, UInt32BE)
- self.parse_field('maxBitrate', bfr, UInt32BE)
- self.parse_field('avgBitRate', bfr, UInt32BE)
+@boxbodyclass
+class STDPBoxBody(FullBoxBody2):
+ ''' A `STDPBoxBody` is a DegradationPriorityBox - ISO14496 section 8.5.3.2.
+ '''
+
+ class PriorityList(ListOfBinary, item_type=UInt16BE):
+ pass
-add_body_subclass(
- _SampleTableContainerBoxBody, b'stdp', '8.5.3', 'Degradation Priority'
-)
+ priority: PriorityList
+
+@boxbodyclass
+class STSDBoxBody(BoxBody):
+ ''' A `STSDBoxBody` is a SampleDescriptionBoxBody - ISO14496 section 8.5.2.2.
+ '''
+ reserved_: 6 # 6 8-bit integers
+ data_reference_index: UInt16BE
+
+add_body_subclass(ContainerBoxBody, b'stbl', '8.5.1', 'Sample Table')
TTSB_Sample = namedtuple('TTSB_Sample', 'count delta')
@@ 1712,10 1875,10 @@ def add_generic_sample_boxbody(
struct_format_v1 = struct_format_v0
class_name = box_type.decode('ascii').upper() + 'BoxBody'
sample_class_name = class_name + 'Sample'
- sample_type_v0 = BinaryMultiStruct(
+ sample_type_v0 = BinaryStruct(
sample_class_name + 'V0', struct_format_v0, sample_fields
)
- sample_type_v1 = BinaryMultiStruct(
+ sample_type_v1 = BinaryStruct(
sample_class_name + 'V1', struct_format_v1, sample_fields
)
@@ 1820,12 1983,8 @@ class CSLGBoxBody(FullBoxBody):
'greatestDecodeToDisplayDelta', 'compositionStartTime',
'compositionEndTime'
)
- CSLGParamsLong = BinaryMultiStruct(
- 'CSLGParamsLong', '>lllll', CSLG_PARAM_NAMES
- )
- CSLGParamsQuad = BinaryMultiStruct(
- 'CSLGParamsLong', '>qqqqq', CSLG_PARAM_NAMES
- )
+ CSLGParamsLong = BinaryStruct('CSLGParamsLong', '>lllll', CSLG_PARAM_NAMES)
+ CSLGParamsQuad = BinaryStruct('CSLGParamsLong', '>qqqqq', CSLG_PARAM_NAMES)
def parse_fields(self, bfr: CornuCopyBuffer):
''' Gather the compositionToDTSShift`, `leastDecodeToDisplayDelta`,
@@ 1872,11 2031,11 @@ class ELSTBoxBody(FullBoxBody):
''' An 'elst' Edit List FullBoxBody - section 8.6.6.
'''
- V0EditEntry = BinaryMultiStruct(
+ V0EditEntry = BinaryStruct(
'ELSTBoxBody_V0EditEntry', '>Llhh',
'segment_duration media_time media_rate_integer media_rate_fraction'
)
- V1EditEntry = BinaryMultiStruct(
+ V1EditEntry = BinaryStruct(
'ELSTBoxBody_V1EditEntry', '>Qqhh',
'segment_duration media_time media_rate_integer media_rate_fraction'
)
@@ 2065,7 2224,7 @@ class STSCBoxBody(FullBoxBody):
entries_bs=bytes,
)
- STSCEntry = BinaryMultiStruct(
+ STSCEntry = BinaryStruct(
'STSCEntry', '>LLL',
'first_chunk samples_per_chunk sample_description_index'
)
@@ 2176,35 2335,18 @@ class CO64BoxBody(FullBoxBody):
## offsets.append(UInt64BE.from_buffer(bfr))
## return offsets
-class DREFBoxBody(FullBoxBody):
+@boxbodyclass
+class DREFBoxBody(EntryCountListOfBoxes):
''' A 'dref' Data Reference box containing Data Entry boxes - section 8.7.2.1.
'''
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- entry_count=UInt32BE,
- boxes=list,
- )
-
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `entry_count` and `boxes` fields.
- '''
- super().parse_fields(bfr)
- self.parse_field('entry_count', bfr, UInt32BE)
- self.parse_boxes(bfr, count=int(self.entry_count.value))
-
add_body_subclass(ContainerBoxBody, b'udta', '8.10.1', 'User Data')
-class CPRTBoxBody(FullBoxBody):
+class CPRTBoxBody(FullBoxBody2):
''' A 'cprt' Copyright box - section 8.10.2.
'''
-
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `language` and `notice` fields.
- '''
- super().parse_fields(bfr)
- self.parse_field('language_packed', bfr, UInt16BE)
- self.parse_field('notice', bfr, UTF8or16Field)
+ language_packed: UInt16BE
+ notice: UTF8or16Field
@property
def language(self):
@@ 2225,81 2367,83 @@ class CPRTBoxBody(FullBoxBody):
(ord(ch1) - 0x60) & 0x1f, ((ord(ch2) - 0x60) & 0x1f) << 5,
((ord(ch3) - 0x60) & 0x1f) << 10
)
- self.language_packed.value = packed
+ self.language_packed = packed
-class METABoxBody(FullBoxBody):
+@boxbodyclass
+class METABoxBody(FullBoxBody2):
''' A 'meta' Meta BoxBody - section 8.11.1.
'''
+ theHandler: Box
+ boxes: ListOfBoxes
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- theHandler=Box,
- boxes=list,
- )
def __iter__(self):
return iter(self.boxes)
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `theHandler` `Box` and gather the following Boxes as `boxes`.
- '''
- super().parse_fields(bfr)
- self.parse_field('theHandler', bfr, Box)
- ## we don't have a .parent yet - does this break the handler path?
- ## self.theHandler.parent = self.parent
- self.parse_boxes(bfr)
@pfx_method
def __getattr__(self, attr):
- ''' Present the `ilst` attributes if present.
+ ''' Attributes not found on `self` in the normal way
+ are also looked up on the `.ILST` subbox
+ if there is one.
'''
- if attr != 'boxes':
- try:
- # direct attribute access
- return super().__getattr__(attr)
- except AttributeError as e:
- # otherwise dereference through the .ilst subbox if present
- ilst = super().__getattr__('ILST0')
- if ilst is not None:
- value = getattr(ilst, attr, None)
- if value is not None:
- return value
+ try:
+ # direct attribute access
+ return super().__getattr__(attr)
+ except AttributeError as e:
+ # otherwise dereference through the .ilst subbox if present
+ ilst = super().__getattr__('ILST0')
+ if ilst is not None:
+ value = getattr(ilst, attr, None)
+ if value is not None:
+ return value
raise AttributeError(f'{self.__class__.__name__}.{attr}')
-# class to glom all the bytes
-_ILSTRawSchema = pt_spec(
- (lambda bfr: bfr.take(...), lambda bs: bs),
- name='ILSTRawSchema',
-)
+class _attr_schema(namedtuple('attributed_schema',
+ 'attribute_name schema_class')):
+
+ def __repr__(self):
+ return f'schema({self.attribute_name}={self.schema_class.__name__})'
+
+class _ILSTRawSchema(BinaryBytes):
+ ''' All the bytes in an ILST.
+ '''
+
+_ILSTRawSchema.__name__ = 'ILSTRawSchema'
def ILSTRawSchema(attribute_name):
''' Attribute name and type for ILST raw schema.
'''
- return attribute_name, _ILSTRawSchema
+ return _attr_schema(attribute_name, _ILSTRawSchema)
# class to decode bytes as UTF-8
-_ILSTTextSchema = pt_spec(
- (
- lambda bfr: bfr.take(...).decode('utf-8'),
- lambda txt: txt.encode('utf-8'),
+class _ILSTTextSchema(
+ pt_spec(
+ (
+ lambda bfr: bfr.take(...).decode('utf-8'),
+ lambda txt: txt.encode('utf-8'),
+ ),
+ name='ILSTTextSchema',
+ value_type=str,
),
- name='ILSTTextSchema',
-)
+ value_type=str,
+):
+ pass
def ILSTTextSchema(attribute_name):
''' Attribute name and type for ILST text schema.
'''
- return attribute_name, _ILSTTextSchema
+ return _attr_schema(attribute_name, _ILSTTextSchema)
def ILSTUInt32BESchema(attribute_name):
''' Attribute name and type for ILST `UInt32BE` schema.
'''
- return attribute_name, UInt32BE
+ return _attr_schema(attribute_name, UInt32BE)
def ILSTUInt8Schema(attribute_name):
''' Attribute name and type for ILST `UInt8BE` schema.
'''
- return attribute_name, UInt8
+ return _attr_schema(attribute_name, UInt8)
# class to decode n/total as a pair of UInt32BE values
_ILSTAofBSchema = BinaryMultiValue(
@@ 2309,7 2453,7 @@ def ILSTUInt8Schema(attribute_name):
def ILSTAofBSchema(attribute_name):
''' Attribute name and type for ILST "A of B" schema.
'''
- return attribute_name, _ILSTAofBSchema
+ return _attr_schema(attribute_name, _ILSTAofBSchema)
# class to decode bytes as UTF-8 of ISO8601 datetime string
_ILSTISOFormatSchema = pt_spec(
@@ 2317,13 2461,14 @@ def ILSTAofBSchema(attribute_name):
lambda bfr: datetime.fromisoformat(bfr.take(...).decode('utf-8')),
lambda dt: dt.isoformat(sep=' ', timespec='seconds').encode('utf-8'),
),
- name='ILSTTextSchema'
+ name='ILSTISOFormatSchema',
+ value_type=datetime,
)
def ILSTISOFormatSchema(attribute_name):
''' Attribute name and type for ILST ISO format schema.
'''
- return attribute_name, _ILSTISOFormatSchema
+ return _attr_schema(attribute_name, _ILSTISOFormatSchema)
itunes_media_type = namedtuple('itunes_media_type', 'type stik')
@@ 2341,7 2486,7 @@ itunes_store_country_code = namedtuple(
'country_name iso_3166_1_code itunes_store_code'
)
-class _ILSTUTF8Text(BinarySingleValue):
+class _ILSTUTF8Text(BinarySingleValue, value_type=str):
''' A full-buffer piece of UTF-8 encoded text.
'''
@@ 2366,6 2511,7 @@ class _ILSTUTF8Text(BinarySingleValue):
'''
return value.encode('utf-8')
+@boxbodyclass
class ILSTBoxBody(ContainerBoxBody):
''' Apple iTunes Information List, container for iTunes metadata fields.
@@ 2379,12 2525,6 @@ class ILSTBoxBody(ContainerBoxBody):
https://github.com/sergiomb2/libmp4v2/wiki/iTunesMetadata
'''
- FIELD_TYPES = dict(
- ContainerBoxBody.FIELD_TYPES,
- tags=TagSet,
- )
- FIELD_TRANSCRIBERS = dict(tags=lambda _: None,)
-
# the schema names are available as attributes
SUBBOX_SCHEMA = {
b'\xa9alb': ILSTTextSchema('album_title'),
@@ 2498,10 2638,14 @@ class ILSTBoxBody(ContainerBoxBody):
}
}
- # pylint: disable=attribute-defined-outside-init,too-many-locals
- # pylint: disable=too-many-statements,too-many-branches
- def parse_fields(self, bfr: CornuCopyBuffer):
- super().parse_fields(bfr)
+ # make a mapping of schema long attribute names to (schema_code,schema)
+ SUBBOX_SCHEMA_BY_LONG_ATTRIBUTE = {
+ schema.attribute_name: (schema_code, schema)
+ for schema_code, schema in reversed(SUBBOX_SCHEMA.items())
+ }
+
+ def __init__(self, **dcls_fields):
+ super().__init__(**dcls_fields)
self.tags = TagSet()
for subbox in self.boxes:
subbox_type = bytes(subbox.box_type)
@@ 2547,7 2691,7 @@ class ILSTBoxBody(ContainerBoxBody):
subbox_schema = self.SUBBOX_SCHEMA.get(subbox_type)
if subbox_schema is None:
bs = databfr.take(...)
- warning("%r: no schema, stashing bytes %r", subbox_type, bs)
+ ##warning("%r: no schema, stashing bytes %r", subbox_type, bs)
data_box.add_field(
'subbox__' + subbox_type.decode('ascii'), bs
)
@@ 2578,64 2722,38 @@ class ILSTBoxBody(ContainerBoxBody):
def __getattr__(self, attr):
# see if this is a schema long name
- for schema_code, schema in self.SUBBOX_SCHEMA.items():
- if schema.attribute_name == attr:
- subbox_attr = schema_code.decode('iso8859-1').upper()
- with Pfx(
- "%s.%s: schema:%r: self.%s",
- self.__class__.__name__,
- attr,
- schema_code,
- subbox_attr,
- ):
- return getattr(self, subbox_attr)
- return super().__getattr__(attr)
+ try:
+ schema_code, schema = self.SUBBOX_SCHEMA_BY_LONG_ATTRIBUTE[attr]
+ except KeyError:
+ # not a long attribute name
+ return super().__getattr__(attr)
+ subbox_attr = schema_code.decode('iso8859-1').upper()
+ with Pfx(
+ "%s.%s: schema:%r: self.%s",
+ self.__class__.__name__,
+ attr,
+ schema_code,
+ subbox_attr,
+ ):
+ return getattr(self, subbox_attr)
-class VMHDBoxBody(FullBoxBody):
+OpColor = BinaryStruct('OpColor', '>HHH', 'red green blue')
+
+@boxbodyclass
+class VMHDBoxBody(FullBoxBody2):
''' A 'vmhd' Video Media Headerbox - section 12.1.2.
'''
-
- OpColor = BinaryMultiStruct('OpColor', '>HHH', 'red green blue')
-
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- graphicsmode=UInt16BE,
- opcolor=OpColor,
- )
+ OpColor = BinaryStruct('OpColor', '>HHH', 'red green blue')
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `graphicsmode` and `opcolor` fields.
- '''
- super().parse_fields(bfr)
- self.parse_field('graphicsmode', bfr, UInt16BE)
- self.parse_field('opcolor', bfr, VMHDBoxBody.OpColor)
+ graphicsmode: UInt16BE
+ opcolor: OpColor
- def transcribe(self):
- yield super().transcribe()
- yield self.graphicsmode
- yield self.opcolor
-
-class SMHDBoxBody(FullBoxBody):
+@boxbodyclass
+class SMHDBoxBody(FullBoxBody2):
''' A 'smhd' Sound Media Headerbox - section 12.2.2.
'''
-
- FIELD_TYPES = dict(
- FullBoxBody.FIELD_TYPES,
- balance=Int16BE,
- reserved=UInt16BE,
- )
-
- def parse_fields(self, bfr: CornuCopyBuffer):
- ''' Gather the `balance` field.
- '''
- super().parse_fields(bfr)
- self.parse_field('balance', bfr, Int16BE)
- self.parse_field('reserved', bfr, UInt16BE)
-
- def transcribe(self):
- yield super().transcribe()
- yield self.balance
- yield self.reserved
+ balance: Int16BE
+ reserved: UInt16BE
def parse_tags(path, tag_prefix=None):
''' Parse the tags from `path`.
@@ 2657,6 2775,7 @@ def parse_tags(path, tag_prefix=None):
tags = new_tags
yield box, tags
+@parse_offsets
def parse(o):
''' Return the `OverBox` from a source (str, int, bytes, file).
@@ 2686,59 2805,6 @@ def parse(o):
os.close(fd)
return over_box
-def parse_fields(bfr, copy_offsets=None, **kw):
- ''' Parse an ISO14496 stream from the CornuCopyBuffer `bfr`,
- yield top level OverBoxes.
-
- Parameters:
- * `bfr`: a `CornuCopyBuffer` provided the stream data,
- preferably seekable
- * `discard_data`: whether to discard unparsed data, default `False`
- * `copy_offsets`: callable to receive `Box` offsets
- '''
- if copy_offsets is not None:
- bfr.copy_offsets = copy_offsets
- yield from OverBox.scan(bfr, **kw)
-
-# pylint: disable=too-many-branches
-def dump_box(B, indent='', fp=None, crop_length=170, indent_incr=None):
- ''' Recursively dump a Box.
- '''
- if fp is None:
- fp = sys.stdout
- if indent_incr is None:
- indent_incr = ' '
- fp.write(indent)
- summary = str(B)
- if crop_length is not None:
- if len(summary) > crop_length - len(indent):
- summary = summary[:crop_length - len(indent) - 4] + '...)'
- fp.write(summary)
- fp.write('\n')
- boxes = getattr(B, 'boxes', None)
- body = getattr(B, 'body', None)
- if body is not None:
- for field_name in sorted(filter(lambda name: not name.startswith('_'),
- body.__dict__.keys())):
- if field_name == 'boxes':
- boxes = None
- field = getattr(body, field_name)
- if isinstance(field, BinaryListValues):
- if field_name != 'boxes':
- fp.write(indent + indent_incr)
- fp.write(field_name)
- if field.value:
- fp.write(':\n')
- else:
- fp.write(': []\n')
- for subbox in field.values:
- subbox.dump(
- indent=indent + indent_incr, fp=fp, crop_length=crop_length
- )
- if boxes:
- for subbox in boxes:
- subbox.dump(indent=indent + indent_incr, fp=fp, crop_length=crop_length)
-
# pylint: disable=too-many-locals,too-many-branches
def report(box, indent='', fp=None, indent_incr=None):
''' Report some human friendly information about a box.
M lib/python/cs/lex.py +34 -4
@@ 47,7 47,7 @@ from cs.pfx import Pfx, pfx_call, pfx_me
from cs.py.func import funcname
from cs.seq import common_prefix_length, common_suffix_length
-__version__ = '20250414-post'
+__version__ = '20250428-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 177,7 177,7 @@ def typed_str(o, use_cls=False, use_repr
X("foo = %s", s(foo))
'''
# pylint: disable=redefined-outer-name
- o_s = repr(o) if use_repr else str(o)
+ o_s = cropped_repr(o) if use_repr else str(o)
if max_length is not None:
o_s = cropped(o_s, max_length)
s = "%s:%s" % (type(o) if use_cls else type(o).__name__, o_s)
@@ 1149,8 1149,10 @@ def as_lines(chunks, partials=None):
# pylint: disable=redefined-outer-name
def cutprefix(s, prefix):
- ''' Strip a `prefix` from the front of `s`.
+ ''' Remove `prefix` from the front of `s` if present.
Return the suffix if `s.startswith(prefix)`, else `s`.
+ As with `str.startswith`, `prefix` may be a `str` or a `tuple` of `str`.
+ If a tuple, the first matching prefix from the tuple will be removed.
Example:
@@ 1161,15 1163,30 @@ def cutprefix(s, prefix):
'abc.def'
>>> cutprefix(abc_def, '.zzz') is abc_def
True
+ >>> cutprefix('this_that', ('this', 'thusly'))
+ '_that'
+ >>> cutprefix('thusly_that', ('this', 'thusly'))
+ '_that'
'''
if prefix and s.startswith(prefix):
+ if isinstance(prefix, tuple):
+ # a tuple of str
+ for pfx in prefix:
+ if s.startswith(pfx):
+ return s[len(pfx):]
+ # no match, return the original object
+ return s
+ # a str
return s[len(prefix):]
+ # no match, return the original object
return s
# pylint: disable=redefined-outer-name
def cutsuffix(s, suffix):
- ''' Strip a `suffix` from the end of `s`.
+ ''' Remove `suffix` from the end of `s` if present.
Return the prefix if `s.endswith(suffix)`, else `s`.
+ As with `str.endswith`, `suffix` may be a `str` or a `tuple` of `str`.
+ If a tuple, the first matching suffix from the tuple will be removed.
Example:
@@ 1180,9 1197,22 @@ def cutsuffix(s, suffix):
'abc.def'
>>> cutsuffix(abc_def, '.zzz') is abc_def
True
+ >>> cutsuffix('this_that', ('that', 'tother'))
+ 'this_'
+ >>> cutsuffix('this_tother', ('that', 'tother'))
+ 'this_'
'''
if suffix and s.endswith(suffix):
+ if isinstance(suffix, tuple):
+ # a tuple of str
+ for sfx in suffix:
+ if s.endswith(sfx):
+ return s[:-len(sfx)]
+ # no match, return the original object
+ return s
+ # a str
return s[:-len(suffix)]
+ # no match, return the original object
return s
def common_prefix(*strs):
M lib/python/cs/progress.py +56 -38
@@ 18,7 18,7 @@
Example:
- for item in progressbar(items):
+ for item in progressbar(items, "task name"):
....
'''
@@ 33,13 33,12 @@ from typing import Callable, Optional
from icontract import ensure
from typeguard import typechecked
-from cs.deco import decorator, uses_quiet
+from cs.deco import decorator, fmtdoc, uses_quiet
from cs.logutils import debug, exception
from cs.py.func import funcname
from cs.queues import IterableQueue, QueueIterator
from cs.resources import RunState, uses_runstate
from cs.seq import seq
-from cs.threads import bg
from cs.units import (
transcribe_time,
transcribe as transcribe_units,
@@ 64,7 63,6 @@ DISTINFO = {
'cs.py.func',
'cs.resources',
'cs.seq',
- 'cs.threads',
'cs.units',
'cs.upd',
'icontract',
@@ 390,6 388,7 @@ class BaseProgress(object):
@contextmanager
@uses_quiet
@uses_upd
+ @fmtdoc
def bar(
self,
label=None,
@@ 409,13 408,14 @@ class BaseProgress(object):
It returns the `UpdProxy` which displays the progress bar.
Parameters:
- * `label`: a label for the progress bar,
+ * `label`: an optional label for the progress bar,
default from `self.name`.
- * `statusfunc`: an optional function to compute the progress bar text
- accepting `(self,label,width)`.
- * `width`: an optional width expressing how wide the progress bar
- text may be.
- The default comes from the `proxy.width` property.
+ * `insert_pos`: where to insert the progress bar within the `cs.Upd`,
+ default `1`
+ * `poll`: an optional callable which will receive `self`,
+ which can be used to update the progress state before
+ updating the progress bar display; useful if the progress
+ should be updates from some other programme state
* `recent_window`: optional timeframe to define "recent" in seconds;
if the default `statusfunc` (`Progress.status`) is used
this is passed to it
@@ 424,11 424,15 @@ class BaseProgress(object):
this may also be a `bool`, which if true will use `Upd.print`
in order to interoperate with `Upd`.
* `stalled`: optional string to replace the word `'stalled'`
- in the status line; for a worked this might be betteer as `'idle'`
- * `insert_pos`: where to insert the progress bar, default `1`
- * `poll`: an optional callable accepting a `BaseProgress`
- which can be used to update the progress state before
- updating the progress bar display
+ in the status line; for a worker this might be better as `'idle'`
+ * `statusfunc`: an optional function to compute the progress bar text
+ accepting `(self,label,width)`; default `Progress.status`
+ * `update_period`: an optional frequency with which to update the display,
+ default from `DEFAULT_UPDATE_PERIOD` ({DEFAULT_UPDATE_PERIOD}s);
+ if set to `0` then the display is updated whenever `self` is updated
+ * `width`: an optional width expressing how wide the progress bar
+ text may be.
+ The default comes from the `proxy.width` property.
Example use:
@@ 471,7 475,7 @@ class BaseProgress(object):
cancel_ticker = False
- def ticker():
+ def _ticker():
''' Worker to update the progress bar every `update_period` seconds.
'''
time.sleep(update_period)
@@ 479,8 483,6 @@ class BaseProgress(object):
update(self, None)
time.sleep(update_period)
- if update_period == 0:
- self.notify_update.add(update)
try:
start_pos = self.position
with upd.insert(
@@ 489,8 491,12 @@ class BaseProgress(object):
text_auto=text_auto,
) as proxy:
update(self, None)
- if update_period > 0:
- bg(ticker, daemon=True)
+ if update_period == 0:
+ # update every time the Progress is updated
+ self.notify_update.add(update)
+ elif update_period > 0:
+ # update every update_period seconds
+ Thread(target=_ticker, name=f'{label}-ticker', daemon=True).start()
yield proxy
finally:
cancel_ticker = True
@@ 543,7 549,7 @@ class BaseProgress(object):
* `cancelled`: an optional callable to test for iteration cancellation
Other parameters are passed to `Progress.bar`.
- Example use:
+ Example uses:
from cs.units import DECIMAL_SCALE
rows = [some list of data]
@@ 551,17 557,17 @@ class BaseProgress(object):
for row in P.iterbar(rows, incfirst=True):
... do something with each row ...
- f = open(data_filename, 'rb')
- datalen = os.stat(f).st_size
- def readfrom(f):
- while True:
- bs = f.read(65536)
- if not bs:
- break
- yield bs
- P = Progress(total=datalen)
- for bs in P.iterbar(readfrom(f), itemlenfunc=len):
- ... process the file data in bs ...
+ with open(data_filename, 'rb') as f:
+ datalen = os.stat(f).st_size
+ def readfrom(f):
+ while True:
+ bs = f.read(65536)
+ if not bs:
+ break
+ yield bs
+ P = Progress(total=datalen)
+ for bs in P.iterbar(readfrom(f), itemlenfunc=len):
+ ... process the file data in bs ...
'''
if cancelled is None:
cancelled = lambda: runstate.cancelled
@@ 582,14 588,23 @@ class BaseProgress(object):
proxy.text = None
def qbar(self, label=None, **iterbar_kw) -> QueueIterator:
- ''' Set up a progress bar, return a `QueueIterator` for receiving items.
- This is a shim for `Progress.iterbar` which dispatches a
- worker to iterate a queue which received items placed on
- the queue.
+ ''' Set up a progress bar, return a closeable `Queue`-like object
+ for receiving items. This is a shim for `Progress.iterbar`
+ which dispatches a worker to iterate items put onto a queue.
+
+ Example:
+
+ Q = Progress.qbar("label")
+ try:
+ ... do work, calling Q.put(item) ...
+ finally:
+ Q.close()
'''
Q = IterableQueue(name=label)
def qbar_worker():
+ ''' Consume the items from `Q`, updating the progress bar.
+ '''
for _ in self.iterbar(Q, label=label, **iterbar_kw):
pass
@@ 1057,6 1072,7 @@ def progressbar(
total=None,
units_scale=UNSCALED_SCALE,
upd: Upd,
+ report_print=None,
**iterbar_kw
):
''' Convenience function to construct and run a `Progress.iterbar`
@@ 1084,7 1100,7 @@ def progressbar(
for row in progressbar(rows):
... do something with row ...
'''
- if upd is None or upd.disabled:
+ if not report_print and (upd is None or upd.disabled):
return it
if total is None:
try:
@@ 1093,7 1109,9 @@ def progressbar(
total = None
return Progress(
name=label, position=position, total=total, units_scale=units_scale
- ).iterbar(it, **iterbar_kw)
+ ).iterbar(
+ it, report_print=report_print, **iterbar_kw
+ )
@decorator
def auto_progressbar(func, label=None, report_print=False):
M lib/python/cs/progress_tests.py +2 -2
@@ 4,10 4,10 @@
# - Cameron Simpson <cs@cskk.id.au> 23dec2015
#
-from __future__ import absolute_import
import time
import unittest
-from .progress import Progress, DEFAULT_THROUGHPUT_WINDOW
+
+from cs.progress import Progress, DEFAULT_THROUGHPUT_WINDOW
class TestProgress(unittest.TestCase):
''' Test `cs.progress.Progress`.
M lib/python/cs/py/doc.py +27 -3
@@ 23,7 23,7 @@ from cs.logutils import warning
from cs.pfx import Pfx, pfx_call
from cs.py.modules import module_attributes
-__version__ = '20241007-post'
+__version__ = '20250426-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 74,7 74,7 @@ def module_doc(
list_item = f'{nl}- <a name="{anchor}"></a>`{str(header)}`: {stripped_dedent(obj_doc,sub_indent=" ")}'
return list_item
- full_docs.append(f'\n\nModule contents:')
+ full_docs.append('\n\nShort summary:')
for Mname, obj in sorted(module_attributes(module), key=sort_key):
with Pfx(Mname):
if ALL and Mname not in ALL:
@@ 85,6 85,29 @@ def module_doc(
if obj_module is not module:
# name imported from another module
continue
+ docstring = (getattr(obj, '__doc__', None) or '').strip()
+ if not docstring:
+ continue
+ line1 = " ".join(
+ line.strip()
+ for line in docstring.split("\n\n")[0].split(". ")[0].split("\n")
+ )
+ if line1[0].isupper() and not line1.endswith('.'):
+ line1 += '.'
+ full_docs.append(f'\n* `{Mname}`: {line1}')
+
+ full_docs.append('\n\nModule contents:')
+ for Mname, obj in sorted(module_attributes(module), key=sort_key):
+ with Pfx(Mname):
+ if ALL and Mname not in ALL:
+ continue
+ if not filter_key(Mname):
+ continue
+ obj_module = getmodule(obj)
+ if obj_module is not module:
+ # name imported from another module
+ continue
+ assert obj_module
obj_doc = obj_docstring(obj) if obj_module else ''
if not callable(obj):
if obj_doc:
@@ 135,7 158,7 @@ def module_doc(
indent("Usage: " + usage_text, " "),
)
)
- full_docs.append(doc_item(Mname, f'Class `{classname_etc}', obj_doc))
+ full_docs.append(doc_item(Mname, f'Class `{classname_etc}`', obj_doc))
seen_names = set()
direct_attrs = dict(obj.__dict__)
# iterate over specified names or default names in order
@@ 191,6 214,7 @@ def module_doc(
warning("UNHANDLED %r, neither function nor class", Mname)
return ''.join(full_docs)
+# TODO: use inspect.getdoc() initially
def obj_docstring(obj):
''' Return a docstring for `obj` which has been passed through `stripped_dedent`.
M lib/python/cs/queues.py +2 -2
@@ 33,7 33,7 @@ from cs.resources import (
from cs.result import CancellationError
from cs.seq import seq, unrepeated
-__version__ = '20250306-post'
+__version__ = '20250426-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 307,7 307,7 @@ class PushQueue(MultiOpenMixin, RunState
def __init__(
self,
name: str,
- functor: Callable[Any, Iterable],
+ functor: Callable[[Any], Iterable],
outQ,
runstate: RunState,
):
M lib/python/cs/testutils.py +2 -1
@@ 14,7 14,7 @@ from cs.context import push_cmgr, pop_cm
from cs.debug import thread_dump
from cs.deco import decorator
-__version__ = '20241122-post'
+__version__ = '20250426-post'
DISTINFO = {
'keywords': ["python3"],
@@ 26,6 26,7 @@ DISTINFO = {
],
'install_requires': [
'cs.context',
+ 'cs.debug',
'cs.deco',
],
}
M lib/python/cs/tmuxutils.py +147 -15
@@ 3,29 3,79 @@
''' Utlity functions for working with tmux(1).
'''
-from contextlib import closing, contextmanager
+from contextlib import contextmanager
from dataclasses import dataclass, field
+from datetime import datetime
+from functools import cached_property
+from getopt import GetoptError
import os
-from os import O_RDWR
from os.path import join as joinpath
+import re
from shlex import join as shq
-from socket import socket, AF_UNIX, SHUT_RD, SHUT_WR, SOCK_STREAM
from subprocess import CompletedProcess, PIPE, Popen
+import sys
from threading import Lock
from typing import List
from icontract import require
from typeguard import typechecked
+from cs.cmdutils import BaseCommand, popopts
from cs.context import stackattrs
from cs.fs import HasFSPath
+from cs.lex import cutprefix
from cs.logutils import info, warning
-from cs.pfx import Pfx, pfx, pfx_call, pfx_method
+from cs.pfx import Pfx, pfx_method
from cs.psutils import run
from cs.queues import IterableQueue
from cs.resources import MultiOpenMixin
+from cs.result import Result
+from cs.threads import bg
-from cs.debug import trace, X, s, r
+def main(argv=None):
+ ''' The CLI mode for `cs.tmuxutils`.
+ '''
+ return TMuxUtilsCommand(argv).run()
+
+class TMuxUtilsCommand(BaseCommand):
+ ''' CLI to access various cs.tmuxutils facilities.
+ '''
+
+ SUBCOMMAND_ARGV_DEFAULT = ['ls', '-n']
+
+ @contextmanager
+ def run_context(self):
+ ''' Establish a tmux(1) control connection during the command run.
+ '''
+ with super().run_context():
+ with TmuxControl() as tmux:
+ with stackattrs(self.options, tmux=tmux):
+ yield
+
+ @trace
+ @popopts(a='all_sessions', n=('number_list', 'Number the listing.'))
+ def cmd_ls(self, argv):
+ ''' Usage: {cmd} [-n]
+ List sessions.
+ '''
+ if argv:
+ raise GetoptError(f'extra arguments: {argv!r}')
+ options = self.options
+ all_sessions = options.all_sessions
+ number_list = options.number_list
+ tmux = options.tmux
+ n = 0
+ for session_id, annotations, parsed in tmux.sessions():
+ if not all_sessions and not isinstance(session_id, str):
+ continue
+ n += 1
+ if number_list:
+ print(
+ f'{n:3d}', session_id, "created",
+ parsed['created_dt'].isoformat(sep=' ')
+ )
+ else:
+ print(session_id, "created", parsed['created_dt'].isoformat(sep=' '))
@dataclass
class TmuxCommandResponse:
@@ 77,7 127,7 @@ class TmuxCommandResponse:
while True:
bs = rf.readline()
if not bs:
- raise EOFError()
+ raise EOFError
if bs.startswith((b'%end ', b'%error ')):
break
output.append(bs)
@@ 96,6 146,66 @@ class TmuxCommandResponse:
notifications=notifications,
)
+ @cached_property
+ def lines(self):
+ ''' A tuple of `str` from the response output, including the trailing newline.
+ '''
+ return tuple(
+ line_bs.decode('utf-8', errors='replace') for line_bs in self.output
+ )
+
+ @staticmethod
+ def parse_session_line(line):
+ ''' Parse a tmux(1) `list-sessions` response line
+ into a `(id,annotations,parsed)` 3-tuple where:
+ - `id` is the session id, and `int` if unnamed, a `str` if named
+ - `annotations` a list of text parts of the "(text)" suffixes
+ - `parsed` is a dict containing parsed values
+
+ The `parsed` dict always contains:
+ - `nwindows`, the number of windows in the session
+ - `created`, the UNIX timestamp of when the session was created
+ - `attached`, whether the session is currently attached
+ '''
+ id_s, etc = line.rstrip().split(': ', 1)
+ try:
+ session_id = int(id_s)
+ except ValueError:
+ session_id = id_s
+ with Pfx("session %s", session_id):
+ annotations = []
+ parsed = dict(
+ attached=False,
+ created=None,
+ nwindows=None,
+ session_id=session_id,
+ )
+ if m := re.match(r'^(\d+) windows', etc):
+ parsed['nwindows'] = int(m.group(1))
+ etc = etc[m.end():]
+ while etc:
+ if m := re.match(r'^\s*\(([^()]+)\)', etc):
+ annotation = m.group(1)
+ annotations.append(annotation)
+ etc = etc[m.end():]
+ created_date = cutprefix(annotation, 'created ')
+ if annotation == 'attached':
+ parsed['attached'] = True
+ elif created_date != annotation:
+ try:
+ dt = datetime.strptime(created_date, '%a %b %d %H:%M:%S %Y')
+ except ValueError as e:
+ warning("cannot parse created date %r: %s", created_date, e)
+ else:
+ parsed['created'] = dt.timestamp()
+ parsed['created_dt'] = dt
+ else:
+ warning("unhandled annotation %r", annotation)
+ else:
+ warning("unparsed session information: %r", etc)
+ break
+ return session_id, annotations, parsed
+
class TmuxControl(HasFSPath, MultiOpenMixin):
''' A class to control tmux(1) via its control socket.
@@ 112,13 222,16 @@ class TmuxControl(HasFSPath, MultiOpenMi
TMUX = 'tmux'
- def __init__(self, socketpath=None, notify=None):
+ def __init__(self, socketpath=None, notify=None, tmux_exe=None):
if socketpath is None:
socketpath = self.get_socketpath()
if notify is None:
notify = self.default_notify
+ if tmux_exe is None:
+ tmux_exe = self.TMUX
self.fspath = socketpath
self.notify = notify
+ self.tmux_exe = tmux_exe
self._lock = Lock()
@staticmethod
@@ 143,16 256,20 @@ class TmuxControl(HasFSPath, MultiOpenMi
def startup_shutdown(self):
''' Open/close the control socket.
'''
- with Popen([self.TMUX, '-S', self.fspath, '-C'], stdin=PIPE,
- stdout=PIPE) as P:
+ with Popen(
+ [self.tmux_exe, '-S', self.fspath, '-C'],
+ stdin=PIPE,
+ stdout=PIPE,
+ ) as P:
try:
pending = [] # queue of pending Results
with stackattrs(self, rf=P.stdout, wf=P.stdin, pending=pending):
- workerT = bg(self._worker)
+ workerT = bg(self._worker, name='tmux response parser')
with stackattrs(self, workerT=workerT):
yield
finally:
P.stdin.close()
+ workerT.join()
P.wait()
def default_notify(self, bs: bytes):
@@ 170,6 287,8 @@ class TmuxControl(HasFSPath, MultiOpenMi
pending = self.pending
notify = self.notify
lock = self._lock
+ # consume the initial greeting response
+ rsp0 = TmuxCommandResponse.read_response(rf, notify=notify)
while True:
rsp = TmuxCommandResponse.read_response(rf, notify=notify)
if rsp is None:
@@ 197,15 316,30 @@ class TmuxControl(HasFSPath, MultiOpenMi
wf.flush()
return R
- # TODO: worker thread to consume the control data and complete Results
-
@pfx_method
@typechecked
def __call__(self, tmux_command: str) -> TmuxCommandResponse:
+ ''' Call us with a `tmux_command`, return the `TmuxCommandResponse`.
+ '''
with self:
R = self.submit(tmux_command)
return R()
+ def sessions(self):
+ ''' Return a list of `(session_id,annotations,parsed)` 3-tuples
+ as from `parse_session_line` for the current sessions.
+ '''
+ rsp = self('list-session')
+ return [rsp.parse_session_line(line) for line in rsp.lines]
+
+ def session_names(self):
+ ''' Return a list of the session names.
+ '''
+ return [
+ session_id for session_id, annotations, parsed in self.sessions()
+ if isinstance(session_id, str)
+ ]
+
def tmux(tmux_command, *tmux_args) -> CompletedProcess:
''' Execute the tmux(1) command `tmux_command`.
'''
@@ 236,6 370,4 @@ def run_pane(*argv) -> IterableQueue:
shcmd = shq(argv)
if __name__ == '__main__':
- with TmuxControl() as tm:
- print(tm('list-sessions'))
- print(tm('list-panes'))
+ sys.exit(main(sys.argv))
M lib/python/cs/typingutils.py +19 -2
@@ 3,9 3,9 @@
''' Trite hacks for use with typing.
'''
-from typing import TypeVar
+from typing import get_args, get_origin, TypeVar, Union
-__version__ = '20230331-post'
+__version__ = '20250503-post'
DISTINFO = {
'keywords': ["python3"],
@@ 16,6 16,23 @@ DISTINFO = {
'install_requires': [],
}
+def is_optional(annotation):
+ ''' Check if `annotation` is an `Optional[type]`.
+ Return `type` if so, `None` otherwise.
+ '''
+ origin = get_origin(annotation)
+ if origin is not Union:
+ return None
+ try:
+ t, none = get_args(annotation)
+ except ValueError:
+ # an Optional is [type,None]
+ return None
+ if none is not None and none is not type(None):
+ # [type1,type2] is not an Optional
+ return None
+ return t
+
def subtype(t, name=None):
''' Construct a `TypeVar` for subtypes of the type `t`.
M lib/python/cs/upd.py +4 -2
@@ 92,7 92,7 @@ except ImportError as curses_e:
warning("cannot import curses: %s", curses_e)
curses = None
-__version__ = '20240630-post'
+__version__ = '20250426-post'
DISTINFO = {
'keywords': ["python2", "python3"],
@@ 957,7 957,9 @@ class Upd(SingletonMixin, MultiOpenMixin
i += 1
time.sleep(tick_delay)
- Thread(target=_ticker, daemon=True).start()
+ Thread(
+ target=_ticker, name="%s-task-ticker" % label, daemon=True
+ ).start()
proxy.text = '...'
start_time = time.time()
try:
M lib/python/cs/vcs/hg.py +1 -1
@@ 134,7 134,7 @@ class VCS_Hg(VCS):
def log_entries(self, *revs):
''' Return the log entry for the specified revision `rev`.
'''
- with trace(self._pipefrom)(
+ with self._pipefrom(
'log',
*(('-r', rev) for rev in revs),
'--template',
M pkg_tags +14 -13
@@ 15,7 15,7 @@ cs.app.osx.defaults ok_revision=c4bf6397
cs.app.osx.misc ok_revision=a8db5e27f6d608f7bad1854b0c06aa31338fa544 features={"20240622":["macos_version"]} pypi.release="20240622.1"
cs.app.osx.objc ok_revision=bafa7354f7242e42dd089064362915d523d762da pypi.release="20240622"
cs.app.osx.plist ok_revision="599e3e37c0af4e7ba92c2134c0a90d17effcfc39" pypi.release="20221228"
-cs.app.osx.spaces ok_revision="79e8ec468b70ea38aa0835694cd80d52f5c262e6" features={"20240622":["SpacesCommand"]} pypi.release="20250306"
+cs.app.osx.spaces ok_revision="18d2a8e3069260982b9cca6982097413b7f70893" features={"20240622":["SpacesCommand"]} pypi.release="20250306"
cs.app.pilfer
cs.app.pilfer.urls
cs.app.playon ok_revision="6d46bd98f407fc8ff2326ce12af36517bd7a8483" pypi.release="20241007" features={"20221228":["PlayonCommand_cmd_api","PlayonCommand_cmd_cds","PlayonCommand_cmd_poll"]}
@@ 23,18 23,18 @@ cs.app.portfwd pypi.release="20221228" o
cs.app.ssh pypi.release=20161003 ok_revision="9ced6f54c76e3cde47b62721320820cab9082bc3"
cs.app.svcd pypi.release="20221228" ok_revision="7c5680713e8e717fefca746ec7cac6f3173d3c72"
cs.app.ydl pypi.release="20220318" ok_revision="6c4e9e800dde94ed935a1b31a532f68cbe63adb8"
-cs.binary pypi.release="20240630" ok_revision="66f18265a8db3bc69e7eb97716af302983f117ca" features={"20230212":["BinaryMixin_load","BinaryMixin_save"],"20240630":["AbstractBinary__write","BinarySingleValue__value_from_bytes"]}
-cs.buffer pypi.release="20250111" ok_revision="0b661fd6394e8b5a130c19f70dd5242ae19e8758" features={"20230212":["CornuCopyBuffer_promote"],"20240201":["CornuCopyBuffer_read1"],"20240630":["CornuCopyBuffer__final_offset"]}
+cs.binary pypi.release="20250501" ok_revision=bf27ebd353dcdc58c68bf7e0d6309bb38e22faa7 features={"20230212":["BinaryMixin_load","BinaryMixin_save"],"20240630":["AbstractBinary__write","BinarySingleValue__value_from_bytes"],"20250501":["BinaryStruct","ListOfBinary","bs","is_single_value","struct_field_types"]}
+cs.buffer pypi.release="20250428" ok_revision=a554bd4b6973914b04d75e84055783dfffc1e16a features={"20230212":["CornuCopyBuffer_promote"],"20240201":["CornuCopyBuffer_read1"],"20240630":["CornuCopyBuffer__final_offset"],"20250428":["CornuCopyBuffer__from_cli_filespec"]}
cs.cache pypi.release="20250111" ok_revision="707f72e21fdea1728fa5e45dae2eb7148fceb7b9" features={"20240412":["CachingMapping"],"20240422":["ConvCache","convof"],"20250103":["cachedmethod","file_based","file_property"]}
cs.clockutils pypi.release=20190101
-cs.cmdutils pypi.release="20250306" ok_revision="440a7d6ca0ce3711561622282818c1323b1dff7f" features={"20210913":["apply_preargv"],"20220429":["dashed_subcommands","dotted_subcommand","popargv","runstate_signals"],"20220605":["poparg","popopts"],"20220626":["poparg_unpop"],"20220918":["BaseCommand_DEFAULT_SIGNALS"],"20230407":["BaseCommandOptions_dataclass","BaseCommand_cmdloop"],"20240316":["uses_cmd_options"],"20240630":["BaseCommandOptions__as_dict"],"20241005":["vprint"],"20241110":["vqprint"],"20241119":["BaseCommand__SubCommandClass"],"20241206":["popopts__decorator"],"20241218":["cmd_repl"]}
+cs.cmdutils pypi.release="20250426" ok_revision="642a773c405cd5f977af59ce7e1943d6ac732b9e" features={"20210913":["apply_preargv"],"20220429":["dashed_subcommands","dotted_subcommand","popargv","runstate_signals"],"20220605":["poparg","popopts"],"20220626":["poparg_unpop"],"20220918":["BaseCommand_DEFAULT_SIGNALS"],"20230407":["BaseCommandOptions_dataclass","BaseCommand_cmdloop"],"20240316":["uses_cmd_options"],"20240630":["BaseCommandOptions__as_dict"],"20241005":["vprint"],"20241110":["vqprint"],"20241119":["BaseCommand__SubCommandClass"],"20241206":["popopts__decorator"],"20241218":["cmd_repl"]}
cs.configutils pypi.release="20250103" ok_revision=dee19836696005b95121c3f5b1bc3b0173c26c26 features={"20220430":["HasConfigIni"],"20220606":["info_dict"]}
cs.context pypi.release="20250412" ok_revision=cd7c30169e0e796a7b6b36870470e9d03b607dc3 features={"20210306":["stackable_state"],"20210727":["push_cmgr"],"20211114":["ContextManagerMixin"],"20220227":["stack_signals"],"20230109":["contextif"],"20230125":["ContextManagerMixin_as_contextmanager"],"20230212":["stackset"],"20240212":["reconfigure_file"],"20240412":["withall","withif"],"20240630":["closeall","with_self"],"20250306":["setup_cmgr__enter_value__next2"]}
cs.csvutils pypi.release="20220606" ok_revision=e10878892741e99dfdc3a0137a657ecad337a613
cs.curlytplt
cs.dateutils ok_revision="8d0cd6ae028e8feece9fe66b3deb636e8fee1408" pypi.release="20230210"
cs.debug pypi.release="20250325" ok_revision="4b1017effb33726b53ec441d68a67e1f2fd9ea8a" features={"20230613":["patch_builtins"],"20241005":["CS_DEBUG_TRACE","abrk","log_via_print"]}
-cs.deco pypi.release="20250306" ok_revision=f304a522ea512313ad437c86fb1a1652ea961254 features={"20220905":["ALL","default_params"],"20221106":["promote"],"20221106.1":["promote_optional"],"20230210":["promote_dot_as_typename"],"20230212":["Promotable"],"20241003":["uses_cmd_option"],"20241109":["uses_doit","uses_force","uses_quiet","uses_verbose"],"20250306":["attr"]}
+cs.deco pypi.release="20250428" ok_revision="4622ee33810e5a11b258b90e1a175ba280c1d655" features={"20220905":["ALL","default_params"],"20221106":["promote"],"20221106.1":["promote_optional"],"20230210":["promote_dot_as_typename"],"20230212":["Promotable"],"20241003":["uses_cmd_option"],"20241109":["uses_doit","uses_force","uses_quiet","uses_verbose"],"20250306":["attr"]}
cs.delta ok_revision=b5ee8df7f2f919ad4881022e26959dfc74653967 pypi.release="20240622"
cs.djutils ok_revision=a3397b32d6df1d7c07c9530c905da6dfc303b8fa features={"20241110":["DjangoBaseCommand"],"20241119":["DjangoSpecificSubCommand"],"20241222":["options_settings"],"20250111":["model_batches_qs"],"20250213":["model_instances"]} pypi.release="20250219"
cs.dockerutils ok_revision=f846ece6e9f9353b443df14f81512ae6438681e6 pypi.release="20240519" features={"20240305":["DockerRun__network"]}
@@ 44,11 44,11 @@ cs.env pypi.release="20230407" ok_revisi
cs.excutils pypi.release="20250306" ok_revision="98b5effd951fd57952508ffff3a1fbcfef09ee15"
cs.ffmpegutils ok_revision="67dd2032154d8a0aeb4d80e51edecf6198ecc95d" pypi.release="20241122" features={"20241122":["FFMPEG_DOCKER_IMAGE"]}
cs.filestate pypi.release=20181107
-cs.fileutils pypi.release="20250103" ok_revision=b44b316352a44d8c59feaf82f7da285feab29688 features={"20210731":["atomic_filename"],"20210906":["atomic_filename"],"20211208":["gzifopen"],"20241122":["atomic_copy2"],"20250103":["rename_excl"]}
+cs.fileutils pypi.release="20250429" ok_revision=a0da542b9ad23ee555388f7948fcda409d31e9e3 features={"20210731":["atomic_filename"],"20210906":["atomic_filename"],"20211208":["gzifopen"],"20241122":["atomic_copy2"],"20250103":["rename_excl"]}
cs.fs ok_revision="65c4177564100424d092d78478af8e7f640e5b2c" pypi.release="20250414" features={"20220429":["FSPathBasedSingleton","HasFSPath","fnmatchdir","is_clean_subpath","longpath","needdir","shortpath"],"20240422":["scandirpaths","scandirtree"],"20250325":["update_linkdir"],"20250414":["HasFSPath__lt","RemotePath"]}
cs.fsm ok_revision=b05a07abe155bbb10776c7a8ee7aaeba13177a13 pypi.release="20250120" features={"20231018":["fsm_transitions_as_dot__history_style"],"20240630":["CancellationError"],"20240721":["FSM__fsm_print_state_diagram"]}
cs.fstags pypi.release="20241122" ok_revision=e6c567623d54c8e893c796587b2976cc67505148 features={"20220918":["uses_fstags"],"20230212":["TaggedPath_findup","TaggedPath_keypath","TaggedPath_parent"],"20240422":["fix_fstags__repr"],"20240709":["CachedValue"]}
-cs.gimmicks ok_revision="5304d45b48abf477a077743b7b23cb0aa066d83d" pypi.release="20250323" features={"20211208":["TimeoutError"],"20220429":["devnull"],"20230212":["gimmick_r","gimmick_s"],"20230331":["open_append"]}
+cs.gimmicks ok_revision=e3a39d35d32d5baa1566f6e6250b27a4e55432b9 pypi.release="20250428" features={"20211208":["TimeoutError"],"20220429":["devnull"],"20230212":["gimmick_r","gimmick_s"],"20230331":["open_append"],"20250428":["Buffer"]}
cs.gvutils ok_revision="4c81e06630110ce18f0422ece389d623cbc1bfaf" pypi.release="20230816" features={"20220805.1":["DOTNodeMixin"],"20220827":["gvprint_capture","gvprint_dataurl"],"20220827.1":["dataurl_encoding"],"20221207":["gvsvg"]}
cs.hashindex ok_revision="1697ec5598b226ceeece76de9f4e057807bb7c2f" pypi.release="20241207" features={"20240216":["read_remote_hashindex","run_remote_hashindex"],"20240709":["blake3"]}
cs.hashutils ok_revision="2030d99fdf99da2d111ecdf71841b9fbc55bbfac" pypi.release="20250414.1" features={"20240412":["BaseHashCode__get_hashfunc","blake3"],"20250414":["BLAKE3"]}
@@ 58,7 58,7 @@ cs.imageutils ok_revision="1f307905c3dd3
cs.inttypes pypi.release=20150120 ok_revision=b4c205f9ed8706db33e7285fcd511092966eda7d
cs.iso14496 pypi.release="20241122" ok_revision=d119731d38eed9058fcf6dd4e344b78244965f1f
cs.later pypi.release="20240630" ok_revision="819a6782e492719c8ec508be77bebd84117e8cfc" features={"20240305":["Later__thread_states"]}
-cs.lex pypi.release="20250414" ok_revision="427ba7175cf4241441737ea501350d2e32faca85" features={"20210906":["strip_prefix_n"],"20220227":["camelcase","snakecase"],"20230210":["FFloat","FInt","FNumericMixin","has_format_attributes_inherit"],"20230217":["get_prefix_n"],"20231018":["is_uc_identifier"],"20240211":["split_remote_path"],"20240519":["get_suffix_part"],"20240630":["indent"],"20241109":["tabulate"],"20241119":["stripped_dedent__sub_indent"],"20241207":["tabulate__newlines"],"20250103":["BaseToken__token_classes","Identifier","NumericValue","QuotedString"],"20250323":["printt"]}
+cs.lex pypi.release="20250428" ok_revision="0656e553c8190b3e106e718f62ed9d0376f9ea3e" features={"20210906":["strip_prefix_n"],"20220227":["camelcase","snakecase"],"20230210":["FFloat","FInt","FNumericMixin","has_format_attributes_inherit"],"20230217":["get_prefix_n"],"20231018":["is_uc_identifier"],"20240211":["split_remote_path"],"20240519":["get_suffix_part"],"20240630":["indent"],"20241109":["tabulate"],"20241119":["stripped_dedent__sub_indent"],"20241207":["tabulate__newlines"],"20250103":["BaseToken__token_classes","Identifier","NumericValue","QuotedString"],"20250323":["printt"]}
cs.logutils pypi.release="20250323" ok_revision=a3b20bbeb0e556fdbe18407a812abddf5c818380 features={"20210718":["supplant_root_logger"],"20220227":["patch_formatter"],"20220530":["QUIET"],"20240630":["LoggingState"]}
cs.mailutils pypi.release="20210306" ok_revision="48733f5381f7056442a6d8fc80adcfdff20bd455"
cs.mappings pypi.release="20250306" ok_revision="74ccb1b0fc0645e8dcb8d132e2aba83748d5f1c4" features={"20210906":["RemappedMappingProxy"],"20220912":["TypedKeyMixin"],"20220912.3":["StrKeyedDefaultDict","TypedKeyClass","UUIDKeyedDefaultDict"],"20250306":["mapped_property","missingdict"]}
@@ 79,12 79,12 @@ cs.predicate pypi.release="20210306" ok_
cs.prgress
cs.progress pypi.release="20250412" ok_revision=d943ffe8feb7bd893a3ef7cca8c144041cee6bbe features={"20210803":["progress_runstate"],"20230212":["update_period"],"20240412":["BaseProgress__update_period"],"20250306":["Progress__qbar"],"20250412":["fix_progressbar_no_tty"]}
cs.psutils pypi.release="20250108.1" ok_revision=e30b06bec323b666f3cde7ff67750104d46839e8 features={"20220429":["signal_handler"],"20220531":["print_argv"],"20240211":["prep_argv","run__input"],"20241206":["run__remote"],"20250108":["run__fold"]}
-cs.py.doc pypi.release="20241007" ok_revision=a639a998cb404958be0e1e6860d39dc1a3dc616c
+cs.py.doc pypi.release="20250426" ok_revision=f3ab8e29f48276fe14775575f12e06c5263880e1
cs.py.func pypi.release="20240630" ok_revision="0dd7f10b229df7807892a255ae6983d2136b819e" features={"20210717":["trace_decorator"],"20210913":["func_a_kw_fmt"],"20220311.1":["callif"],"20221207":["func_a_kw"]}
cs.py.modules pypi.release="20241122" ok_revision="494204872267e1baa6abbc034af40932ae28a095" features={"20220606":["import_extra"]}
cs.py.stack pypi.release="20250306" ok_revision="2bddd66695de848f04309ca54c28c40af262ea57" features={"20240412":["stack_dump__select"]}
cs.py3 pypi.release="20220523" ok_revision="325b1a7d99ea4853e44b94ea96115537f76dfe47"
-cs.queues pypi.release="20250306" ok_revision=b11715aafbeae75715a445dee1e8a0895f011832 features={"20210913":["ListQueue"],"20210924":["iterable_channel"],"20220805":["ListQueue_append"],"20220918":["QueueIterator"],"20240211":["ListQueue__unique"],"20240305":["get_batch"],"20240412":["QueueIterator__iter_batch","QueueIterator__next_batch"]}
+cs.queues pypi.release="20250426" ok_revision=e9f02ca056291fecded4a9e9a0d9747d2681c42d features={"20210913":["ListQueue"],"20210924":["iterable_channel"],"20220805":["ListQueue_append"],"20220918":["QueueIterator"],"20240211":["ListQueue__unique"],"20240305":["get_batch"],"20240412":["QueueIterator__iter_batch","QueueIterator__next_batch"]}
cs.range pypi.release="20230701" ok_revision=e733b04f48fcd613a257210411883fe0522ea424 features={"20230518":["as_list"]}
cs.resources pypi.release="20250325" ok_revision=ffa945d8eb38d0ed3e05bbf069c434c37cf36ca3 features={"20220429":["catch_signal"],"20220918":["openif"],"20230503":["RunState_poll_cancel"],"20231221":["Runstate_raiseif"],"20240201":["MultiOpenMixin_is_open"],"20240412":["RunState__iter"],"20250103":["RunState__raiseif"],"20250325":["RunState__PAUSED"]}
cs.result pypi.release="20250306" ok_revision="080f60d7225148f5360f384677de6f110bb9d847" features={"20220311":["tasks"],"20220805":["FSM"],"20221118":["in_thread"],"20221207":["CancellationError_keywords"],"20230212":["Result_post_notify"],"20250103":["not_cancelled"]}
@@ 102,14 102,15 @@ cs.sqltags ok_revision="977113cd51737423
cs.tagset ok_revision=d5bf9847157202e21fef9f9b95dde26418687eeb features={"20221228":["TagFile_prune"],"20230126":["TagSet_is_stale"],"20240422":["fix_Tag__str__bad_TypeError","jsonable"],"20240422.2":["jsonable__for_json"]} pypi.release="20250306"
cs.tarutils ok_revision=b5ed41d868eba32d0098c9d651a36df8eb52c7ff pypi.release="20240318"
cs.taskqueue ok_revision="879d12f2e8d786307b2bee942c38f08ff73d8633" features={"20221207":["BaseTask"]} pypi.release="20250120"
-cs.testutils ok_revision="95ada344b6507ca853570f1788d69bdf64c3e080" features={"20230109":["SetupTeardownMixin","product_test"],"20240623":["assertSingleThread"]} pypi.release="20241122"
+cs.testutils ok_revision="5720c3a86f1611bdb5323168d3c4e3ed6e46cdd2" features={"20230109":["SetupTeardownMixin","product_test"],"20240623":["assertSingleThread"]} pypi.release="20250426"
cs.threads ok_revision="89e5900d57041ed26e687c931258fd73ad2fa71b" pypi.release="20250325" features={"20230125":["HasThreadState"],"20230212":["HasThreadState_Thread"],"20230331":["joinif"],"20240412":["HasThreadState__pre_enter_objects","NRLock"]}
cs.timeseries ok_revision="795e4c4be59054f58d2c7e32e8a24f652c51cfb8" pypi.release="20240316"
cs.timeutils ok_revision=cd7c71e316826f30a41dc4c0257e826beb869d88
cs.tty
-cs.typingutils ok_revision="3654853d2facea604cfef62988be6ba53a262166" features={"20230331":["subtype"]} pypi.release="20230331"
+cs.typeutils
+cs.typingutils ok_revision="52320a5ef1df5436b63defe2d1eda1dd5bbcd998" features={"20230331":["subtype"],"20250428":["is_optional"]} pypi.release="20250503"
cs.units
-cs.upd ok_revision="08f7d62b98d3c1b2532c4b8c9199a0c756914c6c" pypi.release="20240630" features={"20230212":["with_upd_proxy"],"20230217":["Upd_MultiOpenMixin"],"20240216":["without"],"20240412":["without"]}
+cs.upd ok_revision="37f6ffd00d1c80db5ad1d7d983dbaed669a65613" pypi.release="20250426" features={"20230212":["with_upd_proxy"],"20230217":["Upd_MultiOpenMixin"],"20240216":["without"],"20240412":["without"]}
cs.urlutils ok_revision="7eeaa1d7ef5d4e82b88e9d614e8646d80a5344e7" pypi.release="20231129"
cs.x ok_revision="04993b7bcc25b66e2477a5efa3b3ae9f12c7dcd6" pypi.release="20240630" features={"20231129":["via_logger"]}
cs.xml ok_revision=ab95976b267e5d8e4561d4603499c8530de51249
A => release/cs.binary-20250501/CHANGES.txt +84 -0
@@ 0,0 1,84 @@
+lib/python/cs/binary.py: cs.binary: new bs(bytes) class with a compact repr
+lib/python/cs/binary.py: cs.binary: add some examples to the module docstring
+lib/python/cs/binary.py: cs.binary: SimpleBinary.__str__: recognise single value MultiStructBinary-generated classes and crop repr(value.value) to avoid looking like a tuple, provide __str__ and __repr__ for single field MultiStructBinary-generated classes
+lib/python/cs/binary.py: cs.binary: pt_spec: better names for generated classes, provide __doc__ for the single value class
+lib/python/cs/binary.py: cs.binary: SimpleBinary.__str__: also use the short form for BinarySingleValue fields
+lib/python/cs/binary.py: cs.binary: new is_single_value(obj) to test for single value binary objects, use it in SimpleBinary.__str__
+lib/python/cs/binary.py: cs.binary: make AbstractBinary Promotable to support .from_typename methods
+lib/python/cs/binary.py: cs.binary: new @binclass class decorator, EXPERIMENTAL, for wrapping binary classes defined like data classes (UNTESTED)
+lib/python/cs/binary.py: cs.binary: tweak docstrings
+lib/python/cs/binary.py: cs.binary: @binclass: bugfixes, expand the example and make it a doctest
+lib/python/cs/binary.py: cs.binary: docstring updates, minor refactors
+lib/python/cs/binary.py: cs.binary: AbstractBinary: provide a __str__ like that from SimpleBinary, and a __repr__; SimpleBinary: just inherit __str__ from AbstractBinary
+lib/python/cs/binary.py: cs.binary: is_single_value: recognise BaseMultiValueBinary
+lib/python/cs/binary.py: cs.binary: BinaryMultiStruct: provide a promote() method for single value structs
+lib/python/cs/binary.py: cs.binary: import collections.abc.Buffer (or define it as typing.ByteString)
+lib/python/cs/binary.py: cs.binary: AbstractBinary: new property _field_names being an iterable of the field names
+lib/python/cs/binary.py: cs.binary: BinaryMultiStruct: define _field_names from the struct definition
+lib/python/cs/binary.py: cs.binary: AbstractBinary.__str__: accept attr_choose=True to include all attr_names
+lib/python/cs/binary.py: cs.binary: BinarySingleValue: require a type for the value in the class definition and check the value in __init__
+lib/python/cs/binary.py: cs.binary: some docstring tweaks
+lib/python/cs/binary.py: cs.binary: BinaryMultiStruct: for single value structs, move __str__,__repr__ up, comment out the __repr__
+lib/python/cs/binary.py: cs.binary: new BinaryMultiStruct.promote() accepting an iterable of field values
+lib/python/cs/binary.py: cs.binary: BSString: set its type=str, missed earlier
+lib/python/cs/binary.py: cs.binary: new DSData.promote() accepting bytes
+lib/python/cs/binary.py: cs.binary: @binclass: completely rework the dataclass generation, support inheritance from super binclasses, various other fixes and improvements
+lib/python/cs/binary.py: cs.binary: pt_spec: accept optional type= parameter, infer type from parse function return annotation if None
+lib/python/cs/binary.py: cs.iso14496,cs.binary: move @parse_offsets from cs.iso14496 to cs.binary
+lib/python/cs/binary.py: cs.binary: @binclass: new .parse_field(fieldname,bfr) method to return an AbstractBinary instance from bfr of the type of fieldname
+lib/python/cs/binary.py: cs.binary: BinaryMultiStruct: logic fix for promotion of field_names to a tuple of str
+lib/python/cs/binary.py: cs.binary: BinaryMultiStruct: do not memoise class names, pointless restriction which may silently make bugs
+lib/python/cs/binary.py: cs.binary: new struct_field_types(struct_format,field_names) to return a mapping of field names to struct.unpack-returned types
+lib/python/cs/binary.py: cs.binary: BinaryMultiStruct: for single value structs, annotate the prase-value() function with the return type (also aids pt_spec())
+lib/python/cs/binary.py: cs.binary: format strings for exceptions, docstring tweaks, comments, TODOs
+lib/python/cs/binary.py: cs.binary: @binclass: provide a FIELD_MAPS mapping in the generates class for use by AbstractBinary.self_check()
+lib/python/cs/binary.py: cs.binary: fold BinaryMultiStruct and BinarySingleStruct into BinaryStruct, leave compatibility names behind
+lib/python/cs/binary.py: cs.binary: BinaryByteses: subclasses should receive a consume= class parameter indicating how many bytes to consume, default ... (all available bytes)
+lib/python/cs/binary.py: cs.binary: pt_spec: optional as_repr and as_str parameters to provide __repr__ and __str__ methods
+lib/python/cs/binary.py: cs.binary: pt_spec: rename type parameter to value_type
+lib/python/cs/binary.py: cs.binary: BinarySingleValue.__init_subclass__: rename type parameter to value_type
+lib/python/cs/binary.py: cs.binary: BinaryByteses: subclass BinarySingleValue, provide value_type class parameter
+lib/python/cs/binary.py: cs.binary: update class parameters from type to value_type
+lib/python/cs/binary.py: cs.binary: @binclass: rename a variable to avoid a builtin name, update the field type sanity check to accomodate variable fields and to upgrade ints or Ellipsis to BinaryByteses
+lib/python/cs/binary.py: cs.binary: @binclass: new parse_field and parse_fields class methods, modify parse() to init the class with the result of parse_fields
+lib/python/cs/binary.py: cs.binary: get Buffer from cs.gimmicks, remove some debug, adjust some formatting, add a type check
+lib/python/cs/binary.py: cs.binary: BinarySingleValue.__init_subclass__: construct a suitably annotated and typechecked __init__, avoid isinstance of subscripted generic
+lib/python/cs/binary.py: cs.binary: __init_subclass__: provide a default value_type (why duped with the class definition?) and plumb to super().__init_subclass__()
+lib/python/cs/binary.py: cs.binary: replace mentions of BinaryMultiStruct with BinaryStruct
+lib/python/cs/binary.py: cs.binary: BinaryStruct: include the struct format string in the repr() output
+lib/python/cs/binary.py: cs.binary: tweak docstest, comments, another docstring
+lib/python/cs/binary.py: cs.binary: @binclass: rename fieldmap to fieldtypemap; extract the code which promotes the annotations to classes as promote_fieldtypemap() for reuse
+lib/python/cs/binary.py: cs.binary: @binclass: parse_field,parse_fields: promote any supplied field type mapping
+lib/python/cs/binary.py: cs.binary: flatten: update docstr9ing with more detail, rename chunks var to transcription for clarity
+lib/python/cs/binary.py: cs.binary: docstring and comment updates, add simple test case when run as main
+lib/python/cs/binary.py: cs.binary: is_single_value: drop reference to abandonned BaseMultiValueBinary, seems to be covered by @binclass
+lib/python/cs/binary.py: cs.binary: BinarySingleValue: subclass Promotable, drop explicit .promote() class method since Promotable.promote() now covers this use case
+lib/python/cs/binary.py: cs.binary: rename BinaryByteses to BinaryBytes, add .promote() which will suck in an arbitrary AbstractBinary instance
+lib/python/cs/binary.py: cs.binary: renamed BinaryByteses to BinaryBytes
+lib/python/cs/binary.py: cs,binary: @binclass: new internal @bcmethod decorator for methods in BinClass which may be overridden by _direct_ methods in the wrapped class
+lib/python/cs/binary.py: cs.binary: @binclass: BinClass: record the template class as ._baseclass, make __repr__ robust when used early
+lib/python/cs/binary.py: cs.binary: @binclass: new BinClass.__setattr__: promote and set the dataclass instance attributes, otherwise set in self.__dict__
+lib/python/cs/binary.py: cs.binary: @binclass: new BinClass.__getattr__: unknown attributes must come from the dataclass instance, return obj.value for single value objects
+lib/python/cs/binary.py: cs.binary: @binclass: BinClass.parse_fields: the optional fieldtypes may be a mapping or an iterable or fieldnames or space separated list of field names whose types come from cls._datafieldtypes
+lib/python/cs/binary.py: cs.binary: @binclass: BinClass.promote_field_value: when the fieldtypemap contains a Union, try to promote to each union member type in turn; drop the vestigates of the old __getattr__ and __setattr__
+lib/python/cs/binary.py: cs.binary: @binclass: rename the template class, name the BinClass, add some assertions, minor tweak
+lib/python/cs/binary.py: cs.binary: update imports, remove debug, clean a little lint
+lib/python/cs/binary.py: cs.binary: replace a lot of percent formatting with f-strings
+lib/python/cs/binary.py: cs.binary: @binclass: add missing assignment to name0
+lib/python/cs/binary.py: cs.binary: AbstractBinary.self_check: accept optional field_types; BinClass.self_check: pass the dataclass instance and our FIELD_TYPES to AbstractClass.self_check
+lib/python/cs/binary.py: cs.binary: @binclass: BinaryClass.__str__: omit fields whose names end with an underscore
+lib/python/cs/binary.py: cs.binary: remove some debug
+lib/python/cs/binary.py: cs.binary: @binclass: BinClass.parse_field: accept optional fieldtypes
+lib/python/cs/binary.py: cs.binary: new ListOfBinary, a class holding a list of instances of some binary class
+lib/python/cs/binary.py: cs.binary: BinarySingleValue.__init__ now a conventional init calling typeguard.check_type() on the supplied value
+lib/python/cs/binary.py: cs.binary: ListOfBinary.transcribe: just return self, it's iterable
+lib/python/cs/binary.py: cs.binary: ListOfBinary.parse is a class method
+lib/python/cs/binary.py: cs.binary: @binclass: BinClass.parse: break out the access to cls.parse_fields for easier debugging
+lib/python/cs/binary.py: cs.binary: is_single_value: break out logic to aid debugging
+lib/python/cs/binary.py: cs.binary: ListOfBinary: not a BinarySingleValue, drop .value, break out logic in .parse() to aid debugging
+lib/python/cs/binary.py: cs.binary: @binclass: BinClass.__str__: obtain the field value from self._data
+lib/python/cs/binary.py: cs.binary: @binclass: BinCLass.__getattr__: fall back to the superclass __getattr__
+lib/python/cs/binary.py: cs.binary: remove debugging
+lib/python/cs/binary.py: cs.binary: small docstring clarification
+lib/python/cs/binary.py: cs.binary: try to make the module docstring more approachable
+lib/python/cs/binary.py: cs.binary: DISTINFO+typeguard
A => release/cs.binary-20250501/SUMMARY.txt +10 -0
@@ 0,0 1,10 @@
+@binclass: a dataclass-like way to specify binary structures.
+Rename BinaryByteses to BinaryBytes.
+Rename BinaryMultiStruct and BinarySingleStruct to BinaryStruct.
+New bs(bytes) subclass with a compact repr().
+BinarySingleValue classes now expect a type specification for the value.
+BinarySingleValue now subclasses cs.deco.Promotable.
+New struct_field_types(struct_format,field_names) to return a mapping of field names to struct.unpack-returned types.
+New is_single_value(obj) to test for single value binary objects.
+New ListOfBinary, a class holding a list of instances of some binary class.
+Many internal updates and improvements.
A => release/cs.buffer-20250428/CHANGES.txt +9 -0
@@ 0,0 1,9 @@
+lib/python/cs/buffer.py: cs.buffer: some trite ruff autofixes
+lib/python/cs/buffer.py: cs.buffer: ruff else-if -> elif autofixes
+lib/python/cs/buffer.py: cs.buffer: replace percent formatting with format strings
+lib/python/cs/buffer.py: cs.buffer: add missed string closing quote
+lib/python/cs/buffer.py: cs.buffer: DISTINFO + requires_python>=3.3
+lib/python/cs/buffer.py: merge default => pypi
+lib/python/cs/buffer.py: cs.buffer: CornuCopyBuffer: new from_cli_filespec() factory method accepting "-" or a filename for use but CLIs
+lib/python/cs/buffer.py: cs.buffer: some type annotations, check against Buffer instead of (bytes,bytearray,mmap.mmap,memoryview), streamline a little logic, some docstring updates
+lib/python/cs/buffer.py: merge main -> media
A => release/cs.buffer-20250428/SUMMARY.txt +2 -0
@@ 0,0 1,2 @@
+New CornuCopyBuffer.from_cli_filespec() factory method accepting "-" or a filename for use by CLIs.
+Some minor internal things.
A => release/cs.cmdutils-20250418/CHANGES.txt +9 -0
@@ 0,0 1,9 @@
+lib/python/cs/cmdutils.py: cs.cmdutils: BaseCOmmand.cmd_info: use cs.lex.printt() to print the fields table
+lib/python/cs/cmdutils.py: cs.cmdutils: docstring fix, update reference to BaseCommand.Options in docstring example
+lib/python/cs/cmdutils.py: cs.cmdutils: BaseCommand._prerun_setup: if there is no identifierish subcommand name but there is self.SUBCOMMAND_ARGV_DEFAULT, prefix it to whatever argv there is
+lib/python/cs/cmdutils.py: cs.cmdutils: consistent testing for the presence of subcommands; make SubCommand.has_subcommands a method instead of a property
+lib/python/cs/cmdutils.py: cs.cmdutils: docstring tweak
+lib/python/cs/cmdutils.py: cs.cmdutils: formatting tweaks
+lib/python/cs/cmdutils.py: cs.cmdutils: split_usage: take the first paragraph is Usage: missing, prepend the generic Usage: line
+lib/python/cs/cmdutils.py: cs.cmdutils: BaseCommand._prerun_setup: bugfix application of default argv, was setting self._argv too early; tweak some comments and a docstring
+lib/python/cs/cmdutils.py: cs.cmdutils: clean some lint
A => release/cs.cmdutils-20250418/SUMMARY.txt +3 -0
@@ 0,0 1,3 @@
+BaseCommand: honour SUBCOMMAND_ARGV_DEFAULT even when there are no subcommands.
+BaseCommand._prerun_setup: bugfix application of default argv, was setting self._argv too early.
+Some documentation updates.
A => release/cs.cmdutils-20250426/CHANGES.txt +2 -0
@@ 0,0 1,2 @@
+lib/python/cs/cmdutils.py: cs.cmdutils: format string fix
+lib/python/cs/cmdutils.py: cs.cmdutils: module docstring tweak
A => release/cs.cmdutils-20250426/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+Minor changes.
A => release/cs.deco-20250428/CHANGES.txt +5 -0
@@ 0,0 1,5 @@
+lib/python/cs/deco.py: cs.deco: @decorator: bugfix the post decorate attribute update of __name__, __doc__ etc - collect these values _before_ the decoration
+lib/python/cs/deco.py: cs.deco: @promote: update the optional type detection to use cs.typingutils.is_optional
+lib/python/cs/deco.py: cs.deco: Promotable.promote: fall back to calling cls(obj) instead of raising a TypeError - the class init can do that
+lib/python/cs/deco.py: cs.deco: import cs.typingutils for is_optional()
+lib/python/cs/deco.py: cs.deco: @decorator: restore use of functools.update_wrapper as it propagates the function signature
A => release/cs.deco-20250428/SUMMARY.txt +2 -0
@@ 0,0 1,2 @@
+@decorator: bugfix the post decorate attribute update of __name__, __doc__ etc - collect these values _before_ the decoration.
+Promotable.promote: fall back to calling cls(obj) instead of raising a TypeError - the class init can do that.
A => release/cs.fileutils-20250429/CHANGES.txt +2 -0
@@ 0,0 1,2 @@
+lib/python/cs/fileutils.py: cs.fileutils: lockfile: protect in=-process calls with an NRLock to reduce filesystem contention between threads and to detect recursive attempts at the same lockfile
+lib/python/cs/fileutils.py: cs.fileutils: update imports
A => release/cs.fileutils-20250429/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+lockfile: protect in-process calls with an NRLock to reduce filesystem contention between threads and to detect recursive attempts at the same lockfile.
A => release/cs.gimmicks-20250428/CHANGES.txt +1 -0
@@ 0,0 1,1 @@
+lib/python/cs/gimmicks.py: cs.gimmicks: define Buffer
A => release/cs.gimmicks-20250428/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+Define Buffer from collections.abc.Buffer, or from typing.ByteString for older Pythons.
A => release/cs.lex-20250428/CHANGES.txt +3 -0
@@ 0,0 1,3 @@
+lib/python/cs/lex.py: cs.lex: cutprefix,cutsuffix: also accept a tuple of str like str.startswith and str.endswith
+lib/python/cs/lex.py: cs.lex: typed_str: use cropped_repr() instead of repr()
+lib/python/cs/lex.py: merge main -> media
A => release/cs.lex-20250428/SUMMARY.txt +2 -0
@@ 0,0 1,2 @@
+cutprefix,cutsuffix: also accept a tuple of str like str.startswith and str.endswith.
+typed_str: use cropped_repr() instead of repr().
A => release/cs.py.doc-20250426/CHANGES.txt +4 -0
@@ 0,0 1,4 @@
+lib/python/cs/py/doc.py: cs.py.doc: module_doc: new doc_item inner function to format an item, now using a list instead of a heading - more compact and readable
+lib/python/cs/py/doc.py: cs.py.doc: module_doc: restore mangled command usage
+lib/python/cs/py/doc.py: cs.py.doc: module_doc: provide a short summary of every module top level name before the full docs
+lib/python/cs/py/doc.py: cs.py.doc: add a TODO comment
A => release/cs.py.doc-20250426/SUMMARY.txt +3 -0
@@ 0,0 1,3 @@
+module_doc: new doc_item inner function to format an item, now using a list instead of a heading - more compact and readable.
+module_doc: restore mangled command usage.
+module_doc: provide a short summary of every module top level name before the full docs.
A => release/cs.queues-20250426/CHANGES.txt +1 -0
@@ 0,0 1,1 @@
+lib/python/cs/queues.py: cs.queues: fix typing annotation, failing on Python 3.8
A => release/cs.queues-20250426/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+Fix typing annotation, failing on Python 3.8.
A => release/cs.testutils-20250426/CHANGES.txt +2 -0
@@ 0,0 1,2 @@
+lib/python/cs/testutils.py: cs.testutils: simple self test in "main" mode
+lib/python/cs/testutils.py: cs.testutils: import cs.debug for thread_dump()
A => release/cs.testutils-20250426/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+Update DISTINFO.
A => release/cs.typingutils-20250428/CHANGES.txt +1 -0
@@ 0,0 1,1 @@
+lib/python/cs/typingutils.py: cs.typingutils: new is_optional(annotation) function to recognise Optional[type] annotations, returning type if true, None otherwise
A => release/cs.typingutils-20250428/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+New is_optional(annotation) function to recognise Optional[type] annotations, returning type if true, None otherwise.
A => release/cs.typingutils-20250503/CHANGES.txt +1 -0
@@ 0,0 1,1 @@
+lib/python/cs/typingutils.py: cs.typingutils: is_optional: Optional[T] has a get_args of (T,<NoneType>), not (T,None)
A => release/cs.typingutils-20250503/SUMMARY.txt +1 -0
@@ 0,0 1,1 @@
+is_optional: bugfix: Optional[T] has a get_args of (T,<NoneType>), not (T,None).
A => release/cs.upd-20250426/CHANGES.txt +3 -0
@@ 0,0 1,3 @@
+lib/python/cs/upd.py: cs.upd: above: fix contextif() call
+lib/python/cs/upd.py: cs.upd: run_task: establish a Pfx context around the task
+lib/python/cs/upd.py: cs.Upd: Upd.run_task: give a name to the ticker Thread
A => release/cs.upd-20250426/SUMMARY.txt +2 -0
@@ 0,0 1,2 @@
+above: bugfix contextif() call.
+run_task: establish a Pfx context around the task, give a name to the ticker Thread.