Phiphi: flow_runner_flow: hook up to database
Implements the actual working version of the flow_runner_flow.
Currently failing mypy
with:
Running mypy command: mypy projects/phiphi/ --no-incremental
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:29: error: "Generator[Session, None, None]" has no attribute "__enter__"; maybe "__iter__"? [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:29: error: "Generator[Session, None, None]" has no attribute "__exit__"; maybe "__next__"? [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:102: error: "Generator[Session, None, None]" has no attribute "__enter__"; maybe "__iter__"? [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:102: error: "Generator[Session, None, None]" has no attribute "__exit__"; maybe "__next__"? [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:124: error: "Generator[Session, None, None]" has no attribute "__enter__"; maybe "__iter__"? [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:124: error: "Generator[Session, None, None]" has no attribute "__exit__"; maybe "__next__"? [attr-defined]
Found 6 errors in 1 file (checked 97 source files)
Due to lines as:
with platform_db.get_session() as session:
@benjamincerigo any ideas on how platform_db.get_session()
should actually be used?
Merge request reports
Activity
requested review from @benjamincerigo
assigned to @dbcerigo
The latest updates on your projects. Learn more about Vercel for Git
↗ ︎Name Status Preview Comments Updated (UTC) phiphi ✅ Ready (Inspect)Visit Preview 💬 Add feedbackMay 22, 2024 8:02am Edited by Benjamin Cerigoadded 31 commits
-
1c5e8453...afd16d2d - 23 commits from branch
main
- 820709ff - Phiphi: fix imports formatting
- fdf19aeb - Phiphi: implement flow runner read_job_params
- ffa06e27 - Phiphi: implement flow runner start_flow_run
- 4637f01c - Phiphi: add init for job_runs module
- 91ab6494 - Phiphi: implement flow runner update started
- 4e720c1a - Phiphi: implement flow runner update completed
- 427cc0b4 - Phiphi: fix: correct args within flow runner flow
- 0b7604e3 - Phiphi: improve docs for flow runner flow
Toggle commit list-
1c5e8453...afd16d2d - 23 commits from branch
added 13 commits
-
67ff5b2e...a4ddb76a - 2 commits from branch
main
- e11e1289 - 1 earlier commit
- 08190d25 - Phiphi: fix function spelling mistake
- 933319c0 - Phiphi: fix imports formatting
- 03899935 - Phiphi: implement flow runner read_job_params
- f3eddd33 - Phiphi: implement flow runner start_flow_run
- a0600d99 - Phiphi: add init for job_runs module
- d36ad9b8 - Phiphi: implement flow runner update started
- 738c4f7c - Phiphi: implement flow runner update completed
- 1fad6e0b - Phiphi: fix: correct args within flow runner flow
- 8eb71d88 - Phiphi: improve docs for flow runner flow
- eecf023f - Phiphi: implement flow_runner_flow deployment
Toggle commit list-
67ff5b2e...a4ddb76a - 2 commits from branch
@benjamincerigo build should be passing now. Note: should review and merge !138 (merged) first!
- Resolved by Daniel Burkhardt Cerigo
47 return job_params_dict 42 with platform_db.get_session() as session: 43 gather = gathers.crud.get_gather( 44 session=session, project_id=project_id, gather_id=job_source_id 45 ) 46 if gather is None: 47 raise ValueError(f"Gather with ID {job_source_id=} not found.") 48 elif type(gather) == gathers.apify_facebook_posts.schemas.ApifyFacebookPostGatherResponse: 49 job_params = apify_input_schemas.ApifyFacebookPostsInput( 50 only_posts_older_than=gather.only_posts_older_than, 51 only_posts_newer_than=gather.only_posts_newer_than, 52 account_urls=gather.account_url_list, 53 results_per_url_limit=gather.limit_posts_per_account, 54 ) 55 else: 56 raise NotImplementedError(f"Run for gather type {type(gather)=} not implemented yet.") 80 if type(job_params) == apify_input_schemas.ApifyFacebookPostsInput: 81 deployment_name = "gather_apify_facebook_posts_flow/main_deployment" 82 else: 83 raise NotImplementedError( 84 f"Run for job_params type {type(job_params)=} not implemented yet." 85 ) 86 job_run_flow: objects.FlowRun = asyncio.run( 87 deployments.run_deployment( 88 name=deployment_name, 89 parameters=job_params.dict(by_alias=True), 90 as_subflow=True, 91 timeout=0, # this means it returns immediately with the metadata 92 tags=[ 93 f"project_id:{project_id}", 94 f"job_type:{job_type}", 95 f"job_source_id:{job_source_id}", - Comment on lines +93 to +95
127 109 job_run_id: ID of the row in the job_runs table. 128 job_run_flow: Flow run object for the job. 129 110 """ 130 _ = { 131 "prefect_inner_flow_run_id": job_run_flow.id, 132 "prefect_inner_flow_run_name": job_run_flow.name, 133 "prefect_inner_flow_run_status": "started", 134 "prefect_inner_flow_run_started_at": datetime.now(), 135 } 136 # Update row in job_runs table for `job_run_id` 111 job_run_update_started = job_runs.schemas.JobRunUpdateStarted( 112 id=job_run_id, 113 flow_run_id=str(runtime.flow_run.id), 114 flow_run_name=runtime.flow_run.name, 115 ) 116 with platform_db.get_session() as session: So I asked chat-gpt about using one or multiple sessions for a prefect flow, answer.
I think that using a session for each single update/get is the best option. Mainly becuase we want to the frontend to have the correct
job_run.status
even if the flow is not finished. If we used one session for all updates/gets the db transaction would not be complete until the flow was finished and the db would not be updated.As such I think that having a session per task is a good option. Let's keep it as it but be aware of this for future work.
mentioned in commit 688e7b62