Source code for chopsticks.queue

"""An API for feeding operations asynchronously to Chopsticks."""
from __future__ import print_function
import sys
import traceback
from types import MethodType
from functools import partial
from collections import defaultdict, deque
from .tunnel import loop, PY2, ErrorResult, RemoteException, BaseTunnel
from .group import Group, GroupOp

__metaclass__ = type


class NotCompleted(Exception):
    """No value has been received by an AsyncResult."""


[docs]class AsyncResult: """The deferred result of a queued operation.""" def __init__(self): self._callback = None self._value = NotCompleted
[docs] def with_callback(self, callback): """Attach a callback to be called when a value is set.""" # Chopsticks is not currently multithreaded, so in the intended usage # there is no race condition where a value could be set before the # callback is registered. # # We just validate that the usage is as intended. assert self._callback is None, "Callback already set." assert self._value is NotCompleted, "Value already set." self._callback = callback return self
@property def value(self): """Get the value of the result. Raise NotCompleted if the task has not yet run. """ if self._value is NotCompleted: raise NotCompleted('The operation has not completed.') return self._value def _set(self, obj): """Set the value of the callback.""" self._value = obj if self._callback: try: self._callback(self._value) except Exception: print('Error dispatching async callback', file=sys.stderr) traceback.print_exc()
def iteritems(d): """Compatibility shim for dict iteration.""" if PY2: return d.iteritems() else: return d.items()
[docs]class Queue: """A queue of tasks to be performed. Queues build on Groups and Tunnels in order to feed tasks as quickly as possible to all connected hosts. All methods accept a parameter `target`, which specifies which tunnels the operation should be performed with. This can be specified as a :class:`Tunnel` or a :class:`Group`. Each one returns an :class:`AsyncResult` which can be used to receive the result of the operation. """ def __init__(self): self.queued = {} self.running = False def _enqueue_group(self, methname, group, args, kwargs): """Enqueue an operation on a Group of tunnels.""" async_result = AsyncResult() op = GroupOp(async_result._set) for tunnel in group.tunnels: r = self._enqueue_tunnel(methname, tunnel, args, kwargs) r.with_callback(op.make_callback(tunnel.host)) return async_result def _enqueue_tunnel(self, methname, tunnel, args, kwargs): """Enqueue an operation on a Tunnel.""" async_funcname = '_%s_async' % methname async_func = getattr(tunnel, async_funcname) async_result = AsyncResult() try: queue = self.queued[tunnel] except KeyError: queue = self.queued[tunnel] = deque() self.connect(tunnel) if self.running: queue[0]() # start the connect def callback(result): async_result._set(result) assert queue[0] is bound queue.popleft() if queue: queue[0]() else: del self.queued[tunnel] if not self.queued: loop.stop() bound = partial(async_func, callback, *args, **kwargs) queue.append(bound) return async_result def mkhandler(methname): """Create a wrapper for queueing the 'methname' operation.""" def enqueue(self, target, *args, **kwargs): if not isinstance(target, (BaseTunnel, Group)): raise TypeError( 'Invalid target; expected Tunnel or Group' ) if isinstance(target, Group): m = self._enqueue_group else: m = self._enqueue_tunnel return m(methname, target, args, kwargs) if PY2: enqueue.func_name == methname else: enqueue.__name__ = methname enqueue.__doc__ = ( "Queue a :meth:`~chopsticks.tunnel.BaseTunnel.{meth}()` operation " "to be run on the target.".format(meth=methname).lstrip() ) return enqueue connect = mkhandler('connect') call = mkhandler('call') fetch = mkhandler('fetch') put = mkhandler('put') del mkhandler # fetch is slightly different because it constructs different local paths # for each host:
[docs] def fetch(self, target, remote_path, local_path=None): """Queue a :meth:`~chopsticks.tunnel.BaseTunnel.fetch()` operation to be run on the target. """ # noqa if isinstance(target, BaseTunnel): return self._enqueue_tunnel( 'fetch', target, (), {'remote_path': remote_path, 'local_path': local_path} ) async_result = AsyncResult() op = GroupOp(async_result._set) for tun, local_path in Group._local_paths(target.tunnels, local_path): r = self._enqueue_tunnel( 'fetch', tun, (), {'remote_path': remote_path, 'local_path': local_path} ) r.with_callback(op.make_callback(tun.host)) return async_result
[docs] def run(self): """Run all items in the queue. This method does not return until the queue is empty. """ self.running = True try: for host, queue in iteritems(self.queued): if not queue: continue queue[0]() loop.run() finally: self.running = False