Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughReplaced per-task status updates with DB-side batch updates and introduced DB-side grouped status summaries. Added Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Scheduler
participant UpdateGroup
participant Task
participant DB as PostgresDB
Scheduler->>UpdateGroup: trigger group completion handling
UpdateGroup->>Task: request status summary (_fetch_status_summary)
Task->>DB: SELECT grouped latest retry statuses
DB-->>Task: status summary rows
Task-->>UpdateGroup: return status summary
alt batch update required
UpdateGroup->>Task: batch_update_status_to_db(workflow_id, group_name, update_time, status, message, exit_code, exclude_task_name?)
Task->>DB: UPDATE tasks SET ... WHERE end_time IS NULL AND latest retry per name AND (name != exclude?)
DB-->>Task: update result
Task-->>UpdateGroup: confirmation
end
UpdateGroup-->>Scheduler: final acknowledgement
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/job/task.py`:
- Around line 1412-1471: The batch_update_status_to_db routine issues a
correlated subquery on tasks using workflow_id, group_name, name and retry_id
but the existing index tasks_status_id_name lacks group_name, causing full
scans; add a composite index to support the correlated MAX(retry_id) query (e.g.
create index on (workflow_id, group_name, name, retry_id)) via a DB migration or
schema change so the UPDATE in batch_update_status_to_db can use an index;
ensure the migration is applied before deploying this code and pick a clear
index name (e.g. tasks_workflow_group_name_name_retry_id_idx).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 03d6e4c1-4de1-466d-8f04-4ff8f8673c87
📒 Files selected for processing (2)
src/utils/job/jobs.pysrc/utils/job/task.py
…kGroup.update_status_to_db()
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/utils/job/task.py (1)
2010-2018: Give the summary rows a real type.Passing
List[Dict]between_fetch_status_summary()and_aggregate_status()makes thestatus/lead/countcontract runtime-only. A small dataclass here would make the shape explicit and keep the DB-to-domain conversion in one place.As per coding guidelines, "Prefer dataclasses over dictionaries when passing structured data with multiple fields in Python" and "Add type annotations where they improve code clarity and catch errors in Python code".
Also applies to: 2031-2036
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/task.py` around lines 2010 - 2018, Create a small dataclass (e.g., TaskStatusSummary with fields status: str, lead: bool, count: int) and use it as the concrete type returned by _fetch_status_summary and accepted by _aggregate_status instead of List[Dict]; update the function signatures to return List[TaskStatusSummary], import dataclasses (or use `@dataclass`) and typing.List, and inside _fetch_status_summary convert each DB row/dict to TaskStatusSummary objects so the DB-to-domain conversion is centralized and the contract is statically typed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/job/task.py`:
- Around line 2019-2029: Task.list_by_group_name lost the previous "fail fast on
empty-group" behavior because _fetch_status_summary now returns [] which
_aggregate_status treats as RUNNING; restore the previous failure by detecting
an empty summary and raising an error instead of letting the aggregate fallback
succeed. In practice, in Task.list_by_group_name (the caller of
_fetch_status_summary/_aggregate_status) check the return value of
_fetch_status_summary (or the list passed into _aggregate_status) and if it is
empty/None for the given group_name and workflow_id, raise a clear exception
(e.g., ValueError/RuntimeError) indicating "no latest task rows for group" so
the missing/corrupt DB state surfaces rather than being treated as RUNNING.
Ensure this check runs for verbose=False code path where the fallback currently
occurs so behavior matches prior fail-fast semantics.
---
Nitpick comments:
In `@src/utils/job/task.py`:
- Around line 2010-2018: Create a small dataclass (e.g., TaskStatusSummary with
fields status: str, lead: bool, count: int) and use it as the concrete type
returned by _fetch_status_summary and accepted by _aggregate_status instead of
List[Dict]; update the function signatures to return List[TaskStatusSummary],
import dataclasses (or use `@dataclass`) and typing.List, and inside
_fetch_status_summary convert each DB row/dict to TaskStatusSummary objects so
the DB-to-domain conversion is centralized and the contract is statically typed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 4cd4a162-6922-46c4-bc9d-ac543a8a3324
📒 Files selected for processing (1)
src/utils/job/task.py
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
src/utils/job/task.py (1)
2044-2054:⚠️ Potential issue | 🟠 MajorPreserve the empty-group failure here.
The old
Task.list_by_group_name(..., verbose=False)path raised when a group had no latest task rows. Returning[]here makes_aggregate_status()fall through toRUNNING, so missing or corrupt latest-task data can be persisted as a normal group transition instead of surfacing the DB inconsistency.💡 Suggested fix
- return database.execute_fetch_command( - fetch_cmd, (workflow_id, group_name, workflow_id, group_name), True) + status_summary = database.execute_fetch_command( + fetch_cmd, (workflow_id, group_name, workflow_id, group_name), True) + if not status_summary: + raise osmo_errors.OSMODatabaseError( + f'No tasks were found for {group_name} of workflow {workflow_id}.') + return status_summary🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/task.py` around lines 2044 - 2054, The current query in Task.list_by_group_name (the block building fetch_cmd and calling database.execute_fetch_command) silently returns an empty list when no latest-task rows exist for a group, which lets _aggregate_status treat it as RUNNING; change this so the former behavior is preserved by detecting an empty result from database.execute_fetch_command for (workflow_id, group_name, workflow_id, group_name) and raising the same error the old path used (e.g., a ValueError or the existing Task/DB-specific exception) instead of returning [] so missing/corrupt latest-task data surfaces as a DB inconsistency during group aggregation.
🧹 Nitpick comments (2)
src/utils/job/workflow.py (1)
1267-1268: Rename or document this as a metadata-only helper.Line 1267 now loads each
TaskGroupviafetch_metadata_from_db(), which intentionally leavestasksempty.get_group_objs()still reads like a full group fetch, so future callers can easily assumegroup.tasksis populated when it is not.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/workflow.py` around lines 1267 - 1268, get_group_objs currently calls TaskGroup.fetch_metadata_from_db which intentionally leaves group.tasks empty; rename or clearly document this behavior and update callers to avoid assuming tasks are populated. Either (A) rename fetch_metadata_from_db to a metadata-only name or add a clear docstring/comment on TaskGroup.fetch_metadata_from_db and on get_group_objs indicating it returns metadata-only group objects with empty group.tasks, or (B) change get_group_objs to call the full loader (e.g., TaskGroup.fetch_from_db or the method that populates tasks) or explicitly populate group.tasks before returning. Ensure references to TaskGroup.fetch_metadata_from_db, get_group_objs, and group.tasks are updated so future readers/callers cannot mistakenly rely on tasks being present.src/utils/job/task.py (1)
2035-2039: Make the status-summary contract explicit.Passing
List[Dict]with magic'status','lead', and'count'keys between helpers keeps the shape opaque to readers and to static checking. A small dataclass for these rows would make the aggregation API self-documenting and harder to drift. As per coding guidelines, "Prefer dataclasses over dictionaries when passing structured data with multiple fields in Python" and "Add type annotations where they improve code clarity and catch errors in Python code".Also applies to: 2056-2061
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/task.py` around lines 2035 - 2039, Replace the opaque List[Dict] contract used by _fetch_status_summary with a small dataclass (e.g., StatusSummaryRow) that declares the fields status: str, lead: Optional[str] (or str if always present), and count: int; update the return type of _fetch_status_summary to List[StatusSummaryRow] and change any helper functions or variables referenced around the same area (the aggregation helper calls at ~2056-2061) to accept and emit StatusSummaryRow instances instead of dicts so the aggregation API is explicit and type-checkable. Ensure imports for dataclasses and typing are added and adjust any code that previously accessed row['status']/['lead']/['count'] to use attribute access (row.status, row.lead, row.count).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/job/tests/test_task.py`:
- Around line 42-44: The create_container helper currently uses a mutable
default for volume_mounts (List[Any] = []), which can leak state between tests;
change the signature to use volume_mounts: Optional[List[Any]] = None and inside
the function initialize it to an empty list when None (e.g., if volume_mounts is
None: volume_mounts = []), updating any references in the body accordingly to
avoid shared mutable defaults.
---
Duplicate comments:
In `@src/utils/job/task.py`:
- Around line 2044-2054: The current query in Task.list_by_group_name (the block
building fetch_cmd and calling database.execute_fetch_command) silently returns
an empty list when no latest-task rows exist for a group, which lets
_aggregate_status treat it as RUNNING; change this so the former behavior is
preserved by detecting an empty result from database.execute_fetch_command for
(workflow_id, group_name, workflow_id, group_name) and raising the same error
the old path used (e.g., a ValueError or the existing Task/DB-specific
exception) instead of returning [] so missing/corrupt latest-task data surfaces
as a DB inconsistency during group aggregation.
---
Nitpick comments:
In `@src/utils/job/task.py`:
- Around line 2035-2039: Replace the opaque List[Dict] contract used by
_fetch_status_summary with a small dataclass (e.g., StatusSummaryRow) that
declares the fields status: str, lead: Optional[str] (or str if always present),
and count: int; update the return type of _fetch_status_summary to
List[StatusSummaryRow] and change any helper functions or variables referenced
around the same area (the aggregation helper calls at ~2056-2061) to accept and
emit StatusSummaryRow instances instead of dicts so the aggregation API is
explicit and type-checkable. Ensure imports for dataclasses and typing are added
and adjust any code that previously accessed row['status']/['lead']/['count'] to
use attribute access (row.status, row.lead, row.count).
In `@src/utils/job/workflow.py`:
- Around line 1267-1268: get_group_objs currently calls
TaskGroup.fetch_metadata_from_db which intentionally leaves group.tasks empty;
rename or clearly document this behavior and update callers to avoid assuming
tasks are populated. Either (A) rename fetch_metadata_from_db to a metadata-only
name or add a clear docstring/comment on TaskGroup.fetch_metadata_from_db and on
get_group_objs indicating it returns metadata-only group objects with empty
group.tasks, or (B) change get_group_objs to call the full loader (e.g.,
TaskGroup.fetch_from_db or the method that populates tasks) or explicitly
populate group.tasks before returning. Ensure references to
TaskGroup.fetch_metadata_from_db, get_group_objs, and group.tasks are updated so
future readers/callers cannot mistakenly rely on tasks being present.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 228200ac-3778-4d00-83a2-03d536e3f6ff
📒 Files selected for processing (4)
src/utils/job/jobs.pysrc/utils/job/task.pysrc/utils/job/tests/test_task.pysrc/utils/job/workflow.py
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/utils/job/tests/test_task_db.py (1)
117-225: Add one test for theFAILED_START_TIMEOUTbatch-update branch.Coverage is good, but there’s no test for the special
FAILED_START_TIMEOUTbehavior (which intentionally excludesRUNNINGtasks insrc/utils/job/task.py). Adding this would lock in an important edge-case path.🧪 Suggested additional test
class BatchUpdateStatusDbTest(TaskDbFixture): @@ def test_batch_update_only_updates_latest_retry(self): ... self.assertEqual(row1['status'], 'FAILED') + + def test_batch_update_failed_start_timeout_excludes_running(self): + self._insert_workflow() + self._insert_group() + self._insert_task('task-running', lead=True, status='RUNNING') + self._insert_task('task-waiting', status='WAITING') + + now = datetime.datetime.now() + task.Task.batch_update_status_to_db( + database=self._get_db(), + workflow_id=WORKFLOW_ID, + group_name=GROUP_NAME, + update_time=now, + status=task.TaskGroupStatus.FAILED_START_TIMEOUT, + message='start timeout', + ) + + running_row = self._fetch_task_status('task-running') + self.assertEqual(running_row['status'], 'RUNNING') + self.assertIsNone(running_row['end_time']) + + waiting_row = self._fetch_task_status('task-waiting') + self.assertEqual(waiting_row['status'], 'FAILED_START_TIMEOUT') + self.assertIsNotNone(waiting_row['end_time'])🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/job/tests/test_task_db.py` around lines 117 - 225, Add a new test method in BatchUpdateStatusDbTest that exercises the FAILED_START_TIMEOUT branch of Task.batch_update_status_to_db: insert workflow and group, create at least one task with status 'RUNNING' and one with a non-running status (e.g., 'PENDING'), call task.Task.batch_update_status_to_db with status=task.TaskGroupStatus.FAILED_START_TIMEOUT (and an update_time/message), then assert the RUNNING task's status and end_time remain unchanged while the non-running task's status is updated to 'FAILED_START_TIMEOUT' (use the existing helpers _insert_workflow, _insert_group, _insert_task, and _fetch_task_status to locate rows).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/job/tests/test_task_db.py`:
- Around line 109-114: The helper _fetch_task_status currently queries tasks by
workflow_id, name and retry_id only; update it to also accept and filter by
group_name (or use the test's GROUP_NAME constant) so the SQL WHERE clause
includes "AND group_name = %s" and the parameter tuple includes the group_name
value; adjust the function signature (_fetch_task_status(self, task_name: str,
retry_id: int = 0, group_name: str = GROUP_NAME) or similar) and all call sites
in the tests to pass the correct group_name so the query returns the intended
row.
---
Nitpick comments:
In `@src/utils/job/tests/test_task_db.py`:
- Around line 117-225: Add a new test method in BatchUpdateStatusDbTest that
exercises the FAILED_START_TIMEOUT branch of Task.batch_update_status_to_db:
insert workflow and group, create at least one task with status 'RUNNING' and
one with a non-running status (e.g., 'PENDING'), call
task.Task.batch_update_status_to_db with
status=task.TaskGroupStatus.FAILED_START_TIMEOUT (and an update_time/message),
then assert the RUNNING task's status and end_time remain unchanged while the
non-running task's status is updated to 'FAILED_START_TIMEOUT' (use the existing
helpers _insert_workflow, _insert_group, _insert_task, and _fetch_task_status to
locate rows).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: a58b3786-2bb4-48d7-843c-46ad2bb623ae
📒 Files selected for processing (2)
src/utils/job/tests/BUILDsrc/utils/job/tests/test_task_db.py
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #742 +/- ##
==========================================
+ Coverage 42.69% 42.76% +0.07%
==========================================
Files 202 203 +1
Lines 26790 26844 +54
Branches 7588 7603 +15
==========================================
+ Hits 11439 11481 +42
- Misses 15240 15255 +15
+ Partials 111 108 -3
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
…otected-access Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…roup_name Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Description
UpdateGroupjob still makes inefficient DB calls and Pydantic object constructions.This PR improves the following:
Index to Create:
Issue #411 #646
Checklist
Summary by CodeRabbit
Refactor
Tests