programing

셀러리에서 작업 상태를 확인하는 방법은 무엇입니까?

telebox 2023. 7. 21. 21:31
반응형

셀러리에서 작업 상태를 확인하는 방법은 무엇입니까?

작업이 셀러리에서 실행 중인지 확인하려면 어떻게 해야 합니까(특히 셀러리 장고 사용)?

설명서를 읽고 구글을 검색했지만 다음과 같은 전화가 표시되지 않습니다.

my_example_task.state() == RUNNING

저의 사용 사례는 트랜스코딩을 위한 외부(자바) 서비스가 있다는 것입니다.트랜스코드할 문서를 보낼 때 해당 서비스를 실행하는 태스크가 실행 중인지 확인하고, 실행 중이지 않으면 (다시) 시작하려고 합니다.

저는 현재 안정적인 버전인 2.4를 사용하고 있다고 생각합니다.

task_id(.delay()에서 제공됨)를 반환하고 나중에 셀러리 인스턴스에 상태에 대해 문의합니다.

x = method.delay(1,2)
print x.task_id

요청 시 다음 task_id를 사용하여 새 AsyncResult를 가져옵니다.

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

하나AsyncResult태스크 ID의 개체는 태스크 ID만 가지고 있는 경우 태스크 상태를 얻기 위해 FAQ에서 권장하는 방법입니다.

하지만, 셀러리 3.x 기준으로 사람들이 주의를 기울이지 않으면 물릴 수 있는 중요한 경고가 있습니다.이는 실제로 특정 사용 사례 시나리오에 따라 다릅니다.

기본적으로 셀러리는 "실행 중" 상태를 기록하지 않습니다.

Celery가 태스크가 실행 중임을 기록하려면 다음과 같이 설정해야 합니다.True이를 테스트하는 간단한 작업은 다음과 같습니다.

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

task_track_started이라False는 "상태 표시"입니다.PENDING작업이 시작되었음에도 불구하고.설정하는 경우task_track_startedTrue그러면 주 정부는STARTED.

»PENDING"모르겠어요"라는 뜻입니다.

안 안AsyncResult국가와 함께PENDING셀러리가 작업 상태를 모른다는 것 이상의 의미는 없습니다.이것은 여러 가지 이유 때문일 수 있습니다.

첫째로, 하는나,AsyncResult잘못된 작업 ID로 구성할 수 있습니다.이러한 "작업"은 Celery에 의해 보류 중인 것으로 간주됩니다.

>>> task.AsyncResult("invalid").status
'PENDING'

좋아요, 그래서 아무도 분명히 잘못된 아이들을 먹이지 않을 거예요.AsyncResult공정하지만, 그것은 또한 성공적으로 실행되었지만 셀러리가 잊어버린 작업을 고려할 효과도 있습니다.일부 사용 사례 시나리오에서는 이 문제가 발생할 수 있습니다.문제의 일부는 작업 결과를 유지하도록 Celery가 구성된 방식에 달려 있습니다. 이는 결과 백엔드의 "톰스톤"을 사용할 수 있는지 여부에 달려 있기 때문입니다. ("톰스톤"은 작업이 종료된 방식을 기록하는 데이터 청크에 대한 Celery 설명서에서 사용되는 용어입니다.)사용.AsyncResult그렇다면 전혀 작동하지 않을 것입니다.True더 성가신 문제는 셀러리가 묘비를 기본적으로 만료한다는 것입니다.기본 설정은 24시간으로 설정됩니다.따라서 작업을 시작하고 ID를 장기 저장소에 기록한 후 24시간이 지나면AsyncResult는 과함께는, 태상것이 .PENDING.

모든 "실제 작업"은 다음에서 시작합니다.PENDING 태상을 얻는 것. 그래서 얻는 것.PENDING작업이 요청되었지만 어떤 이유로든 작업이 이 이상 진행되지 않았음을 의미할 수 있습니다.또는 작업이 실행되었지만 셀러리가 상태를 잊어버렸다는 의미일 수도 있습니다.

아야!AsyncResult저한테는 안 통할 겁니다.제가 뭘 더 할 수 있을까요?

저는 업무 자체를 파악하는 것보다 목표를 파악하는 것을 선호합니다.저는 몇 가지 작업 정보를 보관하고 있지만 목표를 추적하는 것은 정말 부차적입니다.목표는 셀러리와 독립적으로 스토리지에 저장됩니다.계산을 수행해야 하는 요청이 달성된 목표에 따라 달라지면, 이미 달성된 목표인지 확인하고, 달성된 목표가 있으면 이 캐시된 목표를 사용합니다. 그렇지 않으면 목표에 영향을 줄 태스크를 시작하고, HTTP 요청을 만든 클라이언트에 결과를 기다려야 한다는 응답을 보냅니다.


위의 변수 이름과 하이퍼링크는 Celery 4.x용입니다.3.x에서 해당 변수와 하이퍼링크는 다음과 같습니다. , .

Task객체가 있습니다..request성속, 속을포니다함합을 합니다.AsyncRequest따라 상태.task:

task.AsyncResult(task.request.id).state

또한 사용자 지정 상태를 생성하고 작업 실행 시 값을 업데이트할 수 있습니다.다음은 문서의 예입니다.

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

오래된 질문이지만 최근에 이 문제에 부딪혔습니다.

task_id를 가져오려는 경우 다음과 같이 할 수 있습니다.

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

이제 task_id가 무엇인지 정확히 알고 이를 사용하여 AsyncResult를 가져올 수 있습니다.

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

셀러리 FAQ의 이 API를 사용하십시오.

result = app.AsyncResult(task_id)

이것은 잘 작동합니다.

2020년 답변:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job {process.task_id} \
             is being processed. Status {state}"

시도:

task.AsyncResult(task.request.id).state

그러면 셀러리 작업 상태가 제공됩니다.셀러리 작업이 이미 실패 상태에 있는 경우 다음 예외가 발생합니다.

raised unexpected: KeyError('exc_type',)

나는 유용한 정보를 에서 찾았습니다.

셀러리 프로젝트 작업자 가이드 점검 작업자

저의 경우, 셀러리가 실행되고 있는지 확인하고 있습니다.

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

검사를 통해 필요한 정보를 얻을 수 있습니다.

  • 셀러리 APP의 첫 번째 기능:

vimy_celery_dll/app1.py

app = Celery(worker_name)
  • 다음으로, 셀러리 앱 모듈에서 앱 가져오기 태스크 파일로 변경합니다.

vi tasks/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

res = method.delay()
    
print(f"id={res.id}, state={res.state}, status={res.status} ")

print(res.get())

간단한 작업의 경우 http://flower.readthedocs.io/en/latest/screenshots.html 및 http://policystat.github.io/jobtastic/ 을 사용하여 모니터링을 수행할 수 있습니다.

그리고 복잡한 작업의 경우, 다른 많은 모듈을 다루는 작업을 말합니다.특정 작업 장치에 진행 상황과 메시지를 수동으로 기록하는 것이 좋습니다.

위의 프로그램 방식과는 별도로 플라워 태스크 상태를 사용하는 것을 쉽게 볼 수 있습니다.

셀러리 이벤트를 사용한 실시간 모니터링.Flower는 셀러리 클러스터를 모니터링하고 관리하기 위한 웹 기반 도구입니다.

  1. 작업 진행 상황 및 기록
  2. 작업 세부 정보(인수, 시작 시간, 런타임 등)를 표시하는 기능
  3. 그래프 및 통계

공식 문서:플라워 - 셀러리 모니터링 도구

설치:

$ pip install flower

용도:

http://localhost:5555

업데이트: 버전 관리에 문제가 있습니다. 플라워(버전=0.9.7)는 플라워를 설치할 때 셀러리(버전=4.4.7) 이상에서만 작동하며, 상위 버전의 셀러리는 4.4.7로 제거되며 등록된 작업에서는 작동하지 않습니다.

언급URL : https://stackoverflow.com/questions/9034091/how-to-check-task-status-in-celery

반응형