Skip to content
Snippets Groups Projects

Phiphi: flow_runner_flow: hook up to database

Implements the actual working version of the flow_runner_flow.

Currently failing mypy with:

Running mypy command: mypy projects/phiphi/ --no-incremental
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:29: error: "Generator[Session, None, None]" has no attribute "__enter__"; maybe "__iter__"?  [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:29: error: "Generator[Session, None, None]" has no attribute "__exit__"; maybe "__next__"?  [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:102: error: "Generator[Session, None, None]" has no attribute "__enter__"; maybe "__iter__"?  [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:102: error: "Generator[Session, None, None]" has no attribute "__exit__"; maybe "__next__"?  [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:124: error: "Generator[Session, None, None]" has no attribute "__enter__"; maybe "__iter__"?  [attr-defined]
projects/phiphi/phiphi/api/projects/job_runs/flow_runner_flow.py:124: error: "Generator[Session, None, None]" has no attribute "__exit__"; maybe "__next__"?  [attr-defined]
Found 6 errors in 1 file (checked 97 source files)

Due to lines as:

        with platform_db.get_session() as session:

@benjamincerigo any ideas on how platform_db.get_session() should actually be used?

Edited by Daniel Burkhardt Cerigo

Merge request reports

Loading
Loading

Activity

Filter activity
  • Approvals
  • Assignees & reviewers
  • Comments (from bots)
  • Comments (from users)
  • Commits & branches
  • Edits
  • Labels
  • Lock status
  • Mentions
  • Merge request status
  • Tracking
  • 47 return job_params_dict
    42 with platform_db.get_session() as session:
    43 gather = gathers.crud.get_gather(
    44 session=session, project_id=project_id, gather_id=job_source_id
    45 )
    46 if gather is None:
    47 raise ValueError(f"Gather with ID {job_source_id=} not found.")
    48 elif type(gather) == gathers.apify_facebook_posts.schemas.ApifyFacebookPostGatherResponse:
    49 job_params = apify_input_schemas.ApifyFacebookPostsInput(
    50 only_posts_older_than=gather.only_posts_older_than,
    51 only_posts_newer_than=gather.only_posts_newer_than,
    52 account_urls=gather.account_url_list,
    53 results_per_url_limit=gather.limit_posts_per_account,
    54 )
    55 else:
    56 raise NotImplementedError(f"Run for gather type {type(gather)=} not implemented yet.")
  • 80 if type(job_params) == apify_input_schemas.ApifyFacebookPostsInput:
    81 deployment_name = "gather_apify_facebook_posts_flow/main_deployment"
    82 else:
    83 raise NotImplementedError(
    84 f"Run for job_params type {type(job_params)=} not implemented yet."
    85 )
    86 job_run_flow: objects.FlowRun = asyncio.run(
    87 deployments.run_deployment(
    88 name=deployment_name,
    89 parameters=job_params.dict(by_alias=True),
    90 as_subflow=True,
    91 timeout=0, # this means it returns immediately with the metadata
    92 tags=[
    93 f"project_id:{project_id}",
    94 f"job_type:{job_type}",
    95 f"job_source_id:{job_source_id}",
  • 127 109 job_run_id: ID of the row in the job_runs table.
    128 job_run_flow: Flow run object for the job.
    129 110 """
    130 _ = {
    131 "prefect_inner_flow_run_id": job_run_flow.id,
    132 "prefect_inner_flow_run_name": job_run_flow.name,
    133 "prefect_inner_flow_run_status": "started",
    134 "prefect_inner_flow_run_started_at": datetime.now(),
    135 }
    136 # Update row in job_runs table for `job_run_id`
    111 job_run_update_started = job_runs.schemas.JobRunUpdateStarted(
    112 id=job_run_id,
    113 flow_run_id=str(runtime.flow_run.id),
    114 flow_run_name=runtime.flow_run.name,
    115 )
    116 with platform_db.get_session() as session:
    • So I asked chat-gpt about using one or multiple sessions for a prefect flow, answer.

      I think that using a session for each single update/get is the best option. Mainly becuase we want to the frontend to have the correct job_run.status even if the flow is not finished. If we used one session for all updates/gets the db transaction would not be complete until the flow was finished and the db would not be updated.

      As such I think that having a session per task is a good option. Let's keep it as it but be aware of this for future work.

    • Mmm mmm ye, makes sense. I guess you can do session.commit() or something explicitly if you wanted, but ya, I think lets try with this be be aware.

    • Please register or sign in to reply
  • Great stuff. Only positive comments.

  • Benjamin Cerigo approved this merge request

    approved this merge request

  • mentioned in commit 688e7b62

  • Please register or sign in to reply
    Loading