| .gitignore | Loading last commit info... | |
| README.md | ||
| coworkers.py | ||
| setup.py |
Coworkers
Coworkers est un module Python de gestion de workers asynchrones, conçu pour exécuter des tâches dans des boucles asyncio executer dans des thread distincts. Ce module propose plusieurs types de workers :
- PassiveWorker : soumet des tâches asynchrones directement à la boucle asyncio.
- ActiveWorker : execute les tâches disponibles dans la file d'attente une à la fois.
- MTActiveWorker : execute n tâches disponibles dans la file d'attente de manière asynchrone.
Installation
Clonez ce dépôt pour l'intégrer dans votre projet Python :
git clone https://git.jimw.fr/coworkers
Ou installer le via pip :
pip install git+https://git.jimw.fr/coworkers
Utilisation
Importation du module
from coworkers import PassiveWorker, ActiveWorker
import asyncio
PassiveWorker
Le PassiveWorker soumet directement les coroutines à la boucle asyncio sans les gérer via une file d’attente. Cela permet de déléguer des tâches asynchrones sans passer par un système de file. Ce worker est utile dans les cas où les tâches utilisent peu de ressources et tirent parti de l'asynchrone.
async def example_task():
print("Tâche exécutée par PassiveWorker")
# Création et démarrage du PassiveWorker
worker = PassiveWorker(name="PassiveWorkerExample")
worker.start()
# Soumission de la coroutine à exécuter
future = worker.submit_coroutine(example_task())
result = future.result()
# Arrêter et fermer le worker
worker.stop()
worker.close()
ActiveWorker
Le ActiveWorker récupère les tâches depuis une file d'attente et les exécute une par une. Les workers peuvent partager une file d'attente. Ce worker exécute une tâche à la fois et est le plus utile lorsque les tâches ne tirent pas parti de l'asynchrone (tâche bloquante).
import queue
from threading import current_thread
async def example_task():
print(f"Tâche exécutée par {current_thread().name}")
# Création et démarrage de deux ActiveWorker partageant la même queue
tasks = queue.Queue()
worker_1 = ActiveWorker(queue=tasks)
worker_2 = ActiveWorker(queue=tasks)
worker_1.start()
worker_2.start()
# Soumission de la coroutine à exécuter via la file d'attente
# la tâche sera executer soit par worker_1 soit par worker_2
future = worker_1.submit_coroutine(example_task())
# Arrêter et fermer le worker
worker.stop()
worker.close()
MTActiveWorker
Le MTActiveWorker récupère les tâches depuis une file d'attente et les exécute une par une. Les workers peuvent partager une file d'attente. Ce worker exécute plusieurs tâche à la fois et offre un compromis entre PassiveWorker et ActiveWorker.
import queue
from threading import current_thread
async def example_task():
await asyncio.sleep(1)
print(f"Tâche exécutée par {current_thread().name}")
# Création et démarrage de deux ActiveWorker partageant la même queue
tasks = queue.Queue()
worker_1 = MTActiveWorker(queue=tasks, max_tasks=4)
worker_2 = MTActiveWorker(queue=tasks, max_tasks=4)
worker_1.start()
worker_2.start()
# Soumission de la coroutine à exécuter via la file d'attente
# la tâche sera executer soit par worker_1 soit par worker_2
future = worker_1.submit_coroutine(example_task())
# Arrêter et fermer le worker
worker.stop()
worker.close()
Journalisation
Le module utilise le module logging pour gérer les logs d'activité et les erreurs de chaque worker.
Pour activer les logs détaillés dans votre application :
import logging
logging.basicConfig(level=logging.DEBUG)