Internals

_mpmetrics

C helpers for multiprocess-safe metrics

This module provides various concurrency primitives, typically backed by shared memory. You probably want to use mpmetrics.atomic instead of the atomic types in this module, as they may not always be available on all architectures.

class _mpmetrics.AtomicDouble(mem)

Construct a new atomic double backed by ‘mem’. All atomic operations use the sequentially-consistent memory order. On architectures not supporting atomic doubles, this class will be None.

add(amount, raise_on_overflow=True) float

Add ‘amount’ to the backing float and return the value from before the addition. The value of ‘raise_on_overflow’ is ignored.

get() float

Return the current value of the backing float.

set(value)

Set the backing float to ‘value’.

class _mpmetrics.AtomicInt32(mem)

Construct a new atomic 32-bit signed integer backed by ‘mem’. All atomic operations use the sequentially-consistent memory order. On architectures not supporting atomic 32-bit signed integers, this class will be None.

add(amount, raise_on_overflow=True) int

Add ‘amount’ to the backing int and return the value from before the addition. If the addition overflows, the result will wrap around (using two’s complement addition) and, if ‘raise_on_overflow’ is True, an exception will be raised.

get() int

Return the current value of the backing int.

set(value)

Set the backing int to ‘value’.

class _mpmetrics.AtomicInt64(mem)

Construct a new atomic 64-bit signed integer backed by ‘mem’. All atomic operations use the sequentially-consistent memory order. On architectures not supporting atomic 64-bit signed integers, this class will be None.

add(amount, raise_on_overflow=True) int

Add ‘amount’ to the backing int and return the value from before the addition. If the addition overflows, the result will wrap around (using two’s complement addition) and, if ‘raise_on_overflow’ is True, an exception will be raised.

get() int

Return the current value of the backing int.

set(value)

Set the backing int to ‘value’.

class _mpmetrics.AtomicUInt32(mem)

Construct a new atomic 32-bit unsigned integer backed by ‘mem’. All atomic operations use the sequentially-consistent memory order. On architectures not supporting atomic 32-bit unsigned integers, this class will be None.

add(amount, raise_on_overflow=True) int

Add ‘amount’ to the backing int and return the value from before the addition. If the addition overflows, the result will wrap around (using two’s complement addition) and, if ‘raise_on_overflow’ is True, an exception will be raised.

get() int

Return the current value of the backing int.

set(value)

Set the backing int to ‘value’.

class _mpmetrics.AtomicUInt64(mem)

Construct a new atomic 64-bit unsigned integer backed by ‘mem’. All atomic operations use the sequentially-consistent memory order. On architectures not supporting atomic 64-bit unsigned integers, this class will be None.

add(amount, raise_on_overflow=True) int

Add ‘amount’ to the backing int and return the value from before the addition. If the addition overflows, the result will wrap around (using two’s complement addition) and, if ‘raise_on_overflow’ is True, an exception will be raised.

get() int

Return the current value of the backing int.

set(value)

Set the backing int to ‘value’.

class _mpmetrics.Buffer(mem)

Create a buffer backed by ‘mem’. This is a base class for other C classes in _mpmetrics.

class _mpmetrics.Lock(mem)

Create a shared memory mutex lock backed by ‘mem’. On POSIX systems, this uses the POSIX threads mutex lock implementation. This lock is not reentrant: trying to acquire it multiple times or release it without first having acquired it will cause a deadlock.

This class may be used as a context manager. Acquiring the lock is blocking when entering a critical section.

acquire(block=True, timeout=None) bool

Acquire the lock. If ‘block’ is False, then try to acquire the lock without blocking. If ‘timeout’ is not None, then wait for at most ‘timeout’ seconds before aborting. Returns True if the lock was acquired, or False otherwise.

release()

Release the lock.

mpmetrics.atomic

multiprocess-safe atomics

This module contains atomic types which automatically fall back to locking implementations on architectures which only support 32-bit atomics.

mpmetrics.atomic.AtomicDouble

Either _mpmetrics.AtomicDouble, or LockingDouble if the former is not supported.

mpmetrics.atomic.AtomicInt64

Either _mpmetrics.AtomicInt64, or LockingInt64 if the former is not supported.

mpmetrics.atomic.AtomicUInt64

Either _mpmetrics.AtomicUInt64, or LockingUInt64 if the former is not supported.

class mpmetrics.atomic.LockingDouble(mem, heap=None)

An atomic double implemented using a lock

add(amount, raise_on_overflow=True)

Add ‘amount’ to the backing atomic.

Parameters:
  • amount (Union[int, float]) – The amount to add

  • raise_on_overflow (bool) – Whether to raise an exception on overflow

Returns:

The value from before the addition.

Return type:

Union[int, float]

get()

Return the current value of the backing atomic

set(value)

Set the backing atomic to value.

class mpmetrics.atomic.LockingInt64(mem, heap=None)

An atomic 64-bit signed integer implemented using a lock

add(amount, raise_on_overflow=True)

Add ‘amount’ to the backing atomic.

Parameters:
  • amount (Union[int, float]) – The amount to add

  • raise_on_overflow (bool) – Whether to raise an exception on overflow

Returns:

The value from before the addition.

Return type:

Union[int, float]

get()

Return the current value of the backing atomic

set(value)

Set the backing atomic to value.

class mpmetrics.atomic.LockingUInt64(mem, heap=None)

An atomic 64-bit unsigned integer implemented using a lock

add(amount, raise_on_overflow=True)

Add ‘amount’ to the backing atomic.

Parameters:
  • amount (Union[int, float]) – The amount to add

  • raise_on_overflow (bool) – Whether to raise an exception on overflow

Returns:

The value from before the addition.

Return type:

Union[int, float]

get()

Return the current value of the backing atomic

set(value)

Set the backing atomic to value.

mpmetrics.generics

Helpers for pickling polymorphic classes.

class mpmetrics.generics.FloatType(name, cls)

Helper for classes polymorphic over floats.

This is a helper class to allow classes which are polymorphic over floats to be pickled. For example:

import pickle
from mpmetrics.generics import FloatType

def MyClass(__name__, x):
    return type(__name__, (), locals())

MyClass = FloatType('MyClass', MyClass)
assert MyClass[2.7].x == 2.7
assert pickle.loads(pickle.dumps(MyClass[2.7]())).x == 2.7
class mpmetrics.generics.IntType(name, cls)

Helper for classes polymorphic over integers.

This is a helper class to allow classes which are polymorphic over ints to be pickled. For example:

import pickle
from mpmetrics.generics import IntType

def MyClass(__name__, x):
    return type(__name__, (), locals())

MyClass = IntType('MyClass', MyClass)
assert MyClass[5].x == 5
assert pickle.loads(pickle.dumps(MyClass[5]())).x == 5
class mpmetrics.generics.ListType(name, cls, elemtype)

Helper to combine other types.

This is a helper class to allow classes which are polymorphic over multiple types to be pickled. For example:

import pickle
from mpmetrics.generics import IntType, ListType

def MyClass(__name__, xs):
    return type(__name__, (), locals())

MyClass = ListType('MyClass', MyClass, IntType)
assert MyClass[1, 2, 3].xs == (1, 2, 3)
assert pickle.loads(pickle.dumps(MyClass[1, 2, 3]())).xs == (1, 2, 3)
class mpmetrics.generics.ObjectType(name, cls)

Helper for classes polymorphic over classes.

This is a helper class to allow classes which are polymorphic over python classes to be pickled. For example:

import pickle
from mpmetrics.generics import ObjectType

def MyClass(__name__, cls):
    return type(__name__, (), locals())

MyClass = ObjectType('MyClass', MyClass)
assert MyClass[int].cls is int
assert pickle.loads(pickle.dumps(MyClass[int]())).cls is int
class mpmetrics.generics.ProductType(name, cls, argtypes, args=())

Helper to combine other types.

This is a helper class to allow classes which are polymorphic over multiple types to be pickled. For example:

import pickle
from mpmetrics.generics import IntType, ObjectType, ProductType

def MyClass(__name__, cls, x):
    return type(__name__, (), locals())

MyClass = ProductType('MyClass', MyClass, (ObjectType, IntType))
assert MyClass[int, 5].cls is int
assert MyClass[int, 5].x == 5
assert pickle.loads(pickle.dumps(MyClass[int, 5]())).x == 5
mpmetrics.generics.saveattr(get)

Save the result of __getattr__.

Parameters:

get – A __getattr__ implementation

Wrap get and save the result with setattr.

mpmetrics.heap

A shared memory allocator.

class mpmetrics.heap.Heap(map_size=4096)

A shared memory allocator.

This is a basic arena-style allocator. The core algorithm is (effectively):

def malloc(size):
    old_base = base
    base += size
    return old_base

We do not keep track of free blocks, so Heap.Block.free() is a no-op.

Memory is requested from the OS in page-sized blocks. As we don’t map all of our memory up front, it’s possible that different processes will map new pages at different addresses. Therefore, we keep track of the address where each page is mapped, and ensure blocks do not cross page boundaries. Larger-than-page-size blocks are supported by aligning the block to the page size and mapping all pages in that block in one go.

class Block(heap, start, size)

A block of memory allocated from a Heap.

__init__(heap, start, size)

Create a new Block.

Parameters:
  • heap (Heap) – The heap this block is from

  • start (int) – The offset of this block within the heap

  • size (int) – The size of this block

deref()

Dereference this block

Returns:

The memory referenced by this block

Return type:

memoryview

Dereference the block, faulting in unmapped pages as necessary.

free()

Free this block

__init__(map_size=4096)

Create a new Heap.

Parameters:

map_size (int) – The granularity to use when requesting memory from the OS

malloc(size, alignment=64)

Allocate shared memory.

Parameters:
  • size (int) – The amount of shared memory to allocate, in bytes

  • alignment (int) – The minimum alignment of the memory

Returns:

A block of shared memory

Return type:

Block

Allocate at least size bytes of shared memory. It will be aligned to at least alignment.

mpmetrics.types

Various types backed by (shared) memory

mpmetrics.types.Array = <mpmetrics.generics.ProductType object>

An array of values backed by (shared) memory.

You can access values in an Array just like it was a list:

from mpmetrics.heap import Heap
from mpmetrics.types import Array, Box, Double

assert Array[Double, 5].size == Double.size * 5
a = Box[Array[Double, 5]](Heap())
assert type(a[0]) == Double
a[4].value = 6.28
Array.__init__(mem, heap=None)

Create a new Array.

Parameters:
mpmetrics.types.Box = <mpmetrics.generics.ObjectType object>

A heap-allocated box to put values in

This class “boxes” another class using heap-allocated memory. For example, you could create a Double like:

from mpmetrics.heap import Heap
from mpmetrics.types import Double

block = Heap().malloc(Double.size)
d = Double(block.deref())

But d._mem is a memoryview which can’t be pickled. Box takes care of keeping track of the memory block:

from mpmetrics.heap import Heap
from mpmetrics.types import Box, Double

d = Box[Double](Heap())
Box.__init__(heap, *args, **kwargs)

Create a new object on the heap

Parameters:
  • heap (mpmetrics.heap.Heap) – The heap to use when allocating the object

  • *args – Any additional arguments are passed to the boxed class

  • **kwargs – Any additional keyword arguments are passed to the boxed class.

The superclass’s __init__ is called with a newly-allocated buffer as the first argument, any positional arguments to this function, the keyword argument heap set to heap, and any additional keyword arguments to this function.

abstract Box._getstate()

Return keyword arguments to pass to _setstate.

Returns:

A dictionary of keyword arguments for _setstate

Return type:

dict

This method is optional; if it is not implemented then no additional keyword arguments will be passed to _setstate.

abstract Box._setstate(mem, heap=None, **kwargs)

Initialize internal state after unpickling.

Parameters:
  • mem (memoryview) – The backing memory

  • heap (mpmetrics.heap.Heap) – The heap mem was allocated from

  • **kwargs – Any additional arguments from _getstate

This method must be implemented by boxed types.

class mpmetrics.types.Dict(mem, heap)

A dict backed by (shared) memory.

All methods of dict are supported. External synchronization (such as from a _mpmetrics.Lock) must be provided when accessing any method.

class mpmetrics.types.Double(mem, heap=None)

A double backed by (shared) memory.

value: float

The value itself. You can read and modify this value as necessary. For example:

from mpmetrics.heap import Heap
from mpmetrics.types import Box, Double

var = Box[Double](Heap())
assert var.value == 0
var.value += 1
assert var.value == 1
size: int = 8

The size of a double, in bytes

align: int = 8

The alignment of a double, in bytes

__init__(mem, heap=None)

Create a new Double.

Parameters:
  • mem (memoryview) – The backing memory

  • heap – Unused

class mpmetrics.types.Int64(mem, heap=None)

An int64_t backed by (shared) memory.

value: int

The value itself. You can read and modify this value as necessary. For example:

from mpmetrics.heap import Heap
from mpmetrics.types import Box, Int64

var = Box[Int64](Heap())
assert var.value == 0
var.value += 1
assert var.value == 1
size: int = 8

The size of an int64_t, in bytes

align: int = 8

The alignment of an int64_t, in bytes

__init__(mem, heap=None)

Create a new Int64.

Parameters:
  • mem (memoryview) – The backing memory

  • heap – Unused

class mpmetrics.types.List(mem, heap)

A list backed by (shared) memory.

All methods of list are supported. External synchronization (such as from a _mpmetrics.Lock) must be provided when accessing any method.

class mpmetrics.types.Object(mem, heap)

A python object pickled in (shared) memory

This is a base class for python objects backed by shared memory. Whenever the object is accessed, it is unpickled from the backing memory. When it is modified, it is pickled to the backing memory.

This class itself does not contain the actual object. Instead, it contains the start/size/length of the block containing the object. When the object grows too large for the block, the old block is free’d and a new one is allocated.

This class provides no synchronization. All methods should be accessed under some other form of synchonization, such as a _mpmetrics.Lock.

__init__(mem, heap)

Create a new Object.

Parameters:
  • mem (memoryview) – The memory used to store information about the buffer

  • heap (mpmetrics.heap.Heap) – The heap to use when (re)allocating the buffer

class mpmetrics.types.Size_t(mem, heap=None)

A size_t backed by (shared) memory.

value: int

The value itself. You can read and modify this value as necessary. For example:

from mpmetrics.heap import Heap
from mpmetrics.types import Box, Size_t

var = Box[Size_t](Heap())
assert var.value == 0
var.value += 1
assert var.value == 1
size: int = 8

The size of a size_t, in bytes

align: int = 8

The alignment of a size_t, in bytes

__init__(mem, heap=None)

Create a new Size_t.

Parameters:
  • mem (memoryview) – The backing memory

  • heap – Unused

class mpmetrics.types.Struct(mem, heap=None)

A structured group of fields backed by (shared) memory.

This is a base class that can be subclassed to create C-style structs:

from mpmetrics.heap import Heap
from mpmetrics.types import Double, Size_t, Struct

class MyStruct(mpmetrics.types.Struct):
    _fields_ = {
        'a': Double,
        'b': Size_t,
    }

assert MyStruct.size == Double.size + Size_t.size
s = Box[MyStruct](Heap())
assert type(s.a) == Double
assert type(s.b) == Size_t
class property _fields_: dict[str, Any]

The fields of the struct, in order. Upon initialization, each value is initialized with a block of memory equal to its .size. Padding is added as necessary to ensure alignment.

Subclasses must implement this property.

__init__(mem, heap=None)

Create a new Struct.

Parameters:
property align

The alignment of the struct, in bytes

property size

The size of the struct, in bytes

class mpmetrics.types.UInt64(mem, heap=None)

A uint64_t backed by (shared) memory.

value: int

The value itself. You can read and modify this value as necessary. For example:

from mpmetrics.heap import Heap
from mpmetrics.types import Box, UInt64

var = Box[UInt64](Heap())
assert var.value == 0
var.value += 1
assert var.value == 1
size: int = 8

The size of a uint64_t, in bytes

align: int = 8

The alignment of a uint64_t, in bytes

__init__(mem, heap=None)

Create a new UInt64.

Parameters:
  • mem (memoryview) – The backing memory

  • heap – Unused

mpmetrics.util

Various small utilities.

mpmetrics.util.align(x, a)

Align x to a

Parameters:
  • x (int) – The value to align

  • a (int) – The alignment; must be a power of two

Returns:

The smallest multiple of a greater than x

Return type:

int

mpmetrics.util.align_down(x, a)

Align x down to a

Parameters:
  • x (int) – The value to align

  • a (int) – The alignment; must be a power of two

Returns:

The largest multiple of a less than x

Return type:

int

class mpmetrics.util.classproperty(fget=None, fset=None, fdel=None, doc=None)

Like property but for classes.

mpmetrics.util.genmask(hi, lo)

Generate a mask with bits between hi lo set.

Parameters:
  • hi (int) – The highest bit to set, inclusive

  • lo (int) – The lowest bit to set, inclusive

Returns:

The bitmask

Return type:

int

hi must be greater than lo. Bits are numbered in “little-endian” order, starting from zero. The following invariant holds:

mask = 0
for n in range(lo, hi):
    mask |= 1 << n
assert mask == genmask(hi, lo)