How to manage dirty workers #3488
Replies: 3 comments 6 replies
-
|
Can you share a snippet of the code handling it? If you are doing it asynchronously then the dirty worker is able to start a request in ytour app , when the result comes, your app send it back to the webapp . Let me know |
Beta Was this translation helpful? Give feedback.
-
|
I'm not using async, i had hope to have it now sync, here's the code: the def app(environ, start_response):
"""WSGI application that demonstrates dirty worker integration."""
path = environ.get('PATH_INFO', '/')
method = environ.get('REQUEST_METHOD', 'GET')
# Parse query string
query = parse_qs(environ.get('QUERY_STRING', ''))
# Get dirty client
client = get_dirty_client()
try:
if path == '/':
result = {
"message": "Dirty Workers Demo",
"dirty_enabled": client is not None,
"pid": os.getpid(),
"endpoints": {
"/command": "Run command",
"/stats": "Get stats",
}
}
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result, indent=2).encode()]
elif path == '/command':
cmd = query.get('cmd')
cmd = cmd[0] if cmd else ""
if not cmd:
start_response('400 Bad Request', [('Content-Type', 'application/json')])
return [json.dumps({"error": "Missing cmd parameter"}).encode()]
if client is None:
start_response('500 Internal Server Error', [('Content-Type', 'application/json')])
return [json.dumps({"error": "Dirty workers not enabled"}).encode()]
print(f"Executing cmd: {cmd}")
resp = client.execute("gunicorn_test.dirty_app:ComputeApp", "stats")
print(resp)
def generate():
try:
for chunk in client.stream("gunicorn_test.dirty_app:ComputeApp", "cmd", cmd):
yield chunk.encode('utf-8') if isinstance(chunk, str) else chunk
except DirtyTimeoutError:
print("closing client")
client.close()
yield b""
# Stream response headers
headers = [
('Content-Type', 'text/plain; charset=utf-8'),
('X-Accel-Buffering', 'no'),
('Cache-Control', 'no-cache'),
]
start_response('200 OK', headers)
return generate()
elif path == '/stats':
if client is None:
start_response('500 Internal Server Error', [('Content-Type', 'application/json')])
return [json.dumps({"error": "Dirty workers not enabled"}).encode()]
compute_stats = client.execute(
"my_app.dirty_app:ComputeApp",
"stats"
)
result = {
"compute_app": compute_stats,
"http_worker_pid": os.getpid(),
}
start_response('200 OK', [('Content-Type', 'application/json')])
return [json.dumps(result, indent=2).encode()]
else:
# 404 for unknown paths
start_response('404 Not Found', [('Content-Type', 'application/json')])
return [json.dumps({"error": "Not found"}).encode()]
except Exception as e:
start_response('500 Internal Server Error', [('Content-Type', 'application/json')])
return [json.dumps({
"error": str(e),
"type": type(e).__name__
}).encode()] |
Beta Was this translation helpful? Give feedback.
-
|
@marcinkuzminski You can now do it with the 25.1.0 release. 2 ways:
|
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hey,
Thanks for creating this feature! Looks awesome, i'm experimenting with it and run into an issue when i run 2 workers, with 2 dirty workers, and they perform a very long tasks, e.g 10 minutes token generation stream.
This works nice when handling 2 requests, but when 3rd comes there are no more dirty workers to handle extra requests.
I can't figure out how it's possible to monitor that and potentially kill or scale-up dirty workers ? Could you provide some examples how to do this ?
Beta Was this translation helpful? Give feedback.
All reactions