"""An API for feeding operations asynchronously to Chopsticks."""
from __future__ import print_function
import sys
import traceback
from functools import partial
from collections import deque
from .tunnel import loop, PY2, BaseTunnel
from .group import Group, GroupOp
__metaclass__ = type
class NotCompleted(Exception):
"""No value has been received by an AsyncResult."""
[docs]class AsyncResult:
"""The deferred result of a queued operation."""
def __init__(self):
self._callback = None
self._value = NotCompleted
[docs] def with_callback(self, callback):
"""Attach a callback to be called when a value is set."""
# Chopsticks is not currently multithreaded, so in the intended usage
# there is no race condition where a value could be set before the
# callback is registered.
#
# We just validate that the usage is as intended.
assert self._callback is None, "Callback already set."
assert self._value is NotCompleted, "Value already set."
self._callback = callback
return self
@property
def value(self):
"""Get the value of the result.
Raise NotCompleted if the task has not yet run.
"""
if self._value is NotCompleted:
raise NotCompleted('The operation has not completed.')
return self._value
def _set(self, obj):
"""Set the value of the callback."""
self._value = obj
if self._callback:
try:
self._callback(self._value)
except Exception:
print('Error dispatching async callback', file=sys.stderr)
traceback.print_exc()
def iteritems(d):
"""Compatibility shim for dict iteration."""
if PY2:
return d.iteritems()
else:
return d.items()
[docs]class Queue:
"""A queue of tasks to be performed.
Queues build on Groups and Tunnels in order to feed tasks as quickly as
possible to all connected hosts.
All methods accept a parameter `target`, which specifies which tunnels the
operation should be performed with. This can be specified as a
:class:`Tunnel` or a :class:`Group`.
Each one returns an :class:`AsyncResult` which can be used to receive the
result of the operation.
"""
def __init__(self):
self.queued = {}
self.running = False
def _enqueue_group(self, methname, group, args, kwargs):
"""Enqueue an operation on a Group of tunnels."""
async_result = AsyncResult()
op = GroupOp(async_result._set)
for tunnel in group.tunnels:
r = self._enqueue_tunnel(methname, tunnel, args, kwargs)
r.with_callback(op.make_callback(tunnel.host))
return async_result
def _enqueue_tunnel(self, methname, tunnel, args, kwargs):
"""Enqueue an operation on a Tunnel."""
async_funcname = '_%s_async' % methname
async_func = getattr(tunnel, async_funcname)
async_result = AsyncResult()
try:
queue = self.queued[tunnel]
except KeyError:
queue = self.queued[tunnel] = deque()
self.connect(tunnel)
if self.running:
queue[0]() # start the connect
def callback(result):
async_result._set(result)
assert queue[0] is bound
queue.popleft()
if queue:
queue[0]()
else:
del self.queued[tunnel]
if not self.queued:
loop.stop()
bound = partial(async_func, callback, *args, **kwargs)
queue.append(bound)
return async_result
def mkhandler(methname):
"""Create a wrapper for queueing the 'methname' operation."""
def enqueue(self, target, *args, **kwargs):
if not isinstance(target, (BaseTunnel, Group)):
raise TypeError(
'Invalid target; expected Tunnel or Group'
)
if isinstance(target, Group):
m = self._enqueue_group
else:
m = self._enqueue_tunnel
return m(methname, target, args, kwargs)
if PY2:
enqueue.func_name == methname
else:
enqueue.__name__ = methname
enqueue.__doc__ = (
"Queue a :meth:`~chopsticks.tunnel.BaseTunnel.{meth}()` operation "
"to be run on the target.".format(meth=methname).lstrip()
)
return enqueue
connect = mkhandler('connect')
call = mkhandler('call')
fetch = mkhandler('fetch')
put = mkhandler('put')
del mkhandler
# fetch is slightly different because it constructs different local paths
# for each host:
[docs] def fetch(self, target, remote_path, local_path=None):
"""Queue a :meth:`~chopsticks.tunnel.BaseTunnel.fetch()` operation to be run on the target. """ # noqa
if isinstance(target, BaseTunnel):
return self._enqueue_tunnel(
'fetch', target,
(),
{'remote_path': remote_path, 'local_path': local_path}
)
async_result = AsyncResult()
op = GroupOp(async_result._set)
for tun, local_path in Group._local_paths(target.tunnels, local_path):
r = self._enqueue_tunnel(
'fetch', tun, (),
{'remote_path': remote_path, 'local_path': local_path}
)
r.with_callback(op.make_callback(tun.host))
return async_result
[docs] def run(self):
"""Run all items in the queue.
This method does not return until the queue is empty.
"""
self.running = True
try:
for host, queue in iteritems(self.queued):
if not queue:
continue
queue[0]()
loop.run()
finally:
self.running = False