With TypeVar’s mypy usually requires that there be a bound for you to actually be able to call any methods on your T instances, in order to prove that the method will be there. But TypeVarTuple doesn’t have any bound argument, so how are you supposed to do anything with the objects?
A TypeVarTuple
has an implicit upper bound of tuple[object, ...]
. There’s currently no way to specify a custom upper bound. There have been some early discussions about adding this capability, but the rules have not yet been worked out. Similarly, a ParamSpec
currently always has an implicit upper bound of Callable[..., Any]
.
Do you have a particular use case in mind? Perhaps you could post a minimal code sample that demonstrates what you’re trying to do. There may be ways to accommodate your use case without the need for a custom upper bound.
My use case is that I am writing a gather function like asyncio.gather
but that spreads the calls across processes instead of just tasks. A call would look like:
await dist_gather((f, x), (g, y))
Where f
and g
are functions and x
and y
are arguments. So this would cause f
and g
to be executed in different processes. In order for this to work, x
and y
need to be serializable (i.e. obey a Protocol
I have defined). I was trying to express it with TypeVarTuple
because of ParamSpec
not supporting bound
either, really ideally both would work. I want users to get a clear static error if they try to pass something that would fail to serialize.
The workarounds I’ve found so far involve a significant ergonomic or type safety sacrifice. If you look at how typeshed currently handles asyncio.gather
you can see that they already struggle with it, the definition is huge and has hacks to disable some forms of checking.
The challenges are:
- You must be able to enforce that the tuples contains a function in the first element, and that the argument types that specific function expects follow (
P.args
handles this) - There is a variable number of tuple arguments
- Those tuples themselves have a variable number of elements
- Each tuple represents a desired call to a different function that might take and return different arguments, so you can’t use the same
ParamSpec
for all of them. You have a variable number ofParamSpec
in effect. - And I need to enforce that the arguments and return type inside all of those
ParamSpec
follow myProtocol
.
So what typeshed does for asyncio.gather
is manually define a bunch of ParamSpec
and TypeVar
and then define a separate overload for each possible number of inputs, which is what I’ve done, but I don’t have a way to add the checking for obeying my protocol without giving up on checking of the other things above.
Letting each tuple[Callable[[T], None], T]
have its own T
is currently not possible, since TypeVarTuple
transformations have yet to be supported.
The short answer is “this isn’t possible right now”, the longer option depends on if this is an existing API that can’t be changed, or if it’s new/unstable/open to breaking.
Some observations here
- I would avoid attempting to emulate
asyncio.gather
if this is a new/unstable API and you care about type safety. Modeling after asyncio taskgroups, or any of the executors in concurrent.futures will provide for more options which can be expressed in a type-safe manner. - You’re somewhat reinventing asyncio.run_in_executor paired with a concurrent.futures.ProcessPoolExecutor and dask here. While you may have an entirely valid reason to do so, it’s worth pointing out existing tools here.
- It’s practically impossible to guarantee serializability in the type system for functions, so that would be the constraint I would drop, and emphasize in documentation (see below)
functools.partial
objects are serializable by pickle, dill, and cloudpickle so long as the function and the bound arguments are.
If you decide to continue working on something like this, given the above observations, I’d focus on either wrapping an existing pattern for this which doesn’t have issues of an incorrect type, or if designing your own interface, do something that looks like one of the patterns which can be type safe. I’d probably also stick to submitting one function + args & kwargs or partial at a time returning a task or future, and model it after asyncio task groups for API and when the results are guaranteed to exist.
dangerous/non-serializable example
While it is possible to serialize the generator and its internal state, it would be unsafe to do so without intelligently stubbing out the generator here and ensuring all use in a process is handled via IPC with the original process and interaction with that generator.
import itertools
def _encapsulation():
monotonic = itertools.count()
def gen():
tok = next(monotonic)
... # do something relying on that being monotonically increasing
return gen
public_interface = _encapsulation()
several other kinds of functions exist with this problem, some more benign than others (such as those with caching, but only to reduce computation, not for idempotency)