Skip to content

CORE: add global init status check#1303

Open
wfaderhold21 wants to merge 3 commits into
openucx:masterfrom
wfaderhold21:topic/strict_coll_check
Open

CORE: add global init status check#1303
wfaderhold21 wants to merge 3 commits into
openucx:masterfrom
wfaderhold21:topic/strict_coll_check

Conversation

@wfaderhold21

Copy link
Copy Markdown
Collaborator

What

Extends an opt-in asymmetric-datatype check for rooted collectives to include a status check of ucc_coll_init ensuring all ranks either pass or fail.

Why ?

Previously a rank could fail ucc_coll_init and return an error while other ranks may succeed and hang waiting on the failed process.

@greptile-apps

greptile-apps Bot commented May 1, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR extends the optional check_asymmetric_dt mechanism so that all ranks participating in gather/scatter collectives always call ucc_service_dt_check, regardless of whether their local ucc_coll_init succeeded. The local init status is encoded in two new int16_t slots (values[4,5]) in the allreduce buffer using the same min/max trick, allowing all ranks to agree on the global outcome rather than having failed ranks silently exit and leave others hanging.

  • ucc_coll.c: Removes the early-exit after ucc_coll_init for gather/scatter types when check_asymmetric_dt is enabled, passing local_status and a possibly-NULL task to the revised ucc_service_dt_check signature.
  • ucc_service_coll.c: Expands the allreduce payload from 4 to 6 int16_t values, adds a manual schedule-initialization path for the NULL-base_team case, and improves error-path task-completion handling.
  • ucc_coll_utils.c: Fixes ucc_copy_asymmetric_buffer to use task->bargs.team->size (ucc_team_t *, always valid) instead of task->team->params.size (ucc_base_team_t *, now potentially NULL).

Confidence Score: 4/5

Safe to merge; the core hang-prevention logic is correct and force-completion paths are carefully guarded against double-completion.

The scheduling changes correctly encode init status in two extra int16_t slots and always route gather/scatter through ucc_service_dt_check, eliminating the targeted hang. The force-completion pattern correctly prevents ucc_schedule_completed_handler from triggering a second completion. The ucc_coll_utils.c fix correctly switches to task->bargs.team. The change touches several interacting paths in the collective scheduling mechanism, warranting careful review.

src/core/ucc_service_coll.c — the force-completion logic in ucc_dt_check_allreduce_post and ucc_dt_check_actual_wrapper_post interacts non-obviously with the UCC_EVENT_COMPLETED_SCHEDULE firing path.

Important Files Changed

Filename Overview
src/core/ucc_coll.c Removes early-exit guards around ucc_coll_init for gather/scatter types when check_asymmetric_dt is enabled; all ranks now reach ucc_service_dt_check regardless of local status. Scratch-buffer cleanup before the call is correct.
src/core/ucc_service_coll.c Allreduce payload grows to 6 int16_t slots; manual schedule init for NULL-base_team case; force-completion paths set n_completed_tasks = n_tasks before calling ucc_task_complete on the schedule, correctly preventing double-complete from the subsequent UCC_EVENT_COMPLETED_SCHEDULE.
src/core/ucc_service_coll.h Updated docstring and signature for ucc_service_dt_check; clearly documents the must-be-called-on-every-rank contract.
src/schedule/ucc_schedule.h ucc_dt_check_state gains a 6th slot pair for init-status encoding and a new ar_status field for service-allreduce failures.
src/utils/ucc_coll_utils.c Fixes ucc_copy_asymmetric_buffer to use task->bargs.team->size (always valid) instead of task->team->params.size (now potentially NULL).

Reviews (4): Last reviewed commit: "REVIEW: extend dt check to include init ..." | Re-trigger Greptile

Comment thread src/core/ucc_coll.c
Comment on lines 250 to 284
status = ucc_coll_init(team->score_map, &op_args, &task);
if (UCC_ERR_NOT_SUPPORTED == status) {
ucc_debug("failed to init collective: not supported");
goto free_scratch;
} else if (ucc_unlikely(status < 0)) {
char coll_args_str[256] = {0};
ucc_coll_args_str(&op_args.args, team->rank, team->size, coll_args_str,
sizeof(coll_args_str));
ucc_error("failed to init collective: %s, err: (%d) %s", coll_args_str,
status, ucc_status_string(status));
goto free_scratch;
}

/* Setup non-blocking datatype check for rooted collectives
*
* This implements transparent validation using a schedule with two tasks:
* 1. Allreduce validation task: uses MIN reduction with min/max trick to detect mismatches
* 2. Actual collective task: the real gather/scatter operation
*
* Validation uses allreduce (MIN) on [dt, -dt, mem, -mem]:
* - Message size: 8 bytes (4 × int16_t, doesn't scale with number of ranks)
* - After reduction: min(dt) == -min(-dt) means all ranks have same datatype
*
* Dependencies: allreduce validation → actual task
* If validation fails, the dependency mechanism prevents the actual task from posting.
*/
if (coll_args->coll_type == UCC_COLL_TYPE_GATHER ||
coll_args->coll_type == UCC_COLL_TYPE_GATHERV ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTER ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTERV) {
/* Check if datatype validation is needed and create schedule if so */
ucc_coll_task_t *validated_task;

validated_task = ucc_service_dt_check(team, task, &status);
if (!validated_task) {
ucc_error("failed to create dt_check schedule: %s",
ucc_status_string(status));
goto coll_finalize;
if (ucc_global_config.check_asymmetric_dt &&
(coll_args->coll_type == UCC_COLL_TYPE_GATHER ||
coll_args->coll_type == UCC_COLL_TYPE_GATHERV ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTER ||
coll_args->coll_type == UCC_COLL_TYPE_SCATTERV)) {

if (task == NULL && op_args.asymmetric_save_info.scratch != NULL) {
ucc_mc_free(op_args.asymmetric_save_info.scratch);
op_args.asymmetric_save_info.scratch = NULL;
}
task_wrap = ucc_service_dt_check(team, coll_args, status, task,
&wrap_err);
if (ucc_unlikely(!task_wrap)) {
status = wrap_err;
if (task) {
goto coll_finalize;
}
goto free_scratch;
}
task = task_wrap;
} else {
if (UCC_ERR_NOT_SUPPORTED == status) {
ucc_debug("failed to init collective: not supported");
goto free_scratch;
} else if (ucc_unlikely(status < 0)) {
char coll_args_str[256] = {0};
ucc_coll_args_str(&op_args.args, team->rank, team->size,
coll_args_str, sizeof(coll_args_str));
ucc_error("failed to init collective: %s, err: (%d) %s",
coll_args_str, status, ucc_status_string(status));
goto free_scratch;
}
/* Return schedule if validation was needed, or original task if not */
task = validated_task;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Missing error logging on ucc_coll_init failure in the DT-check path

When check_asymmetric_dt is enabled for gather/scatter operations, if ucc_coll_init returns an error (including UCC_ERR_NOT_SUPPORTED), the failure is silently forwarded to ucc_service_dt_check with no log message at all. The original else branch (now only reached for other collective types) still emits ucc_debug/ucc_error on failure. The DT-check path should emit at least a debug-level message so that the asymmetric allreduce step can be linked back to the init failure during troubleshooting.

Comment thread src/core/ucc_service_coll.c Outdated
Comment on lines +563 to +575
if (local_status != UCC_OK || !UCC_DT_IS_PREDEFINED(local_dt)) {
dt_check->values[0] = (int16_t) UCC_ERR_NOT_SUPPORTED;
dt_check->values[1] = -(int16_t) UCC_ERR_NOT_SUPPORTED;
/* Record the actual init failure only for non-DT errors so the
* actual_wrapper_post can surface the right status code. */
dt_check->init_status = (local_status != UCC_OK &&
UCC_DT_IS_PREDEFINED(local_dt))
? local_status : UCC_OK;
} else {
/* Predefined datatypes are always contiguous - safe to cast to int16 */
dt_check->values[0] = (int16_t) local_dt;
dt_check->values[1] = -(int16_t) local_dt;
}
dt_check->values[2] = (int16_t) local_mem_type;
dt_check->values[3] = -(int16_t) local_mem_type;
/* Setup subset for full team */
dt_check->subset.myrank = team->rank;
dt_check->subset.map.type = UCC_EP_MAP_FULL;
dt_check->values[0] = (int16_t) local_dt;
dt_check->values[1] = -(int16_t) local_dt;
dt_check->init_status = UCC_OK;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 init_status not recorded when both local_status != UCC_OK and !UCC_DT_IS_PREDEFINED(local_dt)

The ternary expression evaluates to UCC_OK whenever !UCC_DT_IS_PREDEFINED(local_dt), even if local_status is a genuine non-DT error (e.g., UCC_ERR_NO_MEMORY). In that case actual_wrapper_post falls through to err = UCC_ERR_NOT_SUPPORTED, hiding the real failure from the caller. Ranks that fail for non-DT reasons would surface the wrong error code.

Comment on lines 244 to 275
static ucc_status_t ucc_dt_check_allreduce_post(ucc_coll_task_t *allreduce_wrapper)
{
ucc_dt_check_state_t *dt_check = UCC_DT_CHECK_FROM_TASK(allreduce_wrapper);
ucc_team_t *team = allreduce_wrapper->bargs.team;
ucc_status_t status;

/* Safety check */
if (!dt_check) {
allreduce_wrapper->status = UCC_ERR_INVALID_PARAM;
return UCC_ERR_INVALID_PARAM;
}

/* Start in-place service allreduce with MIN operation on 4 int16_t values */
status = ucc_service_allreduce(team, dt_check->values, dt_check->values,
UCC_DT_INT16, 4, UCC_OP_MIN,
dt_check->subset, &dt_check->check_req);
if (status != UCC_OK) {
ucc_schedule_t *schedule = allreduce_wrapper->schedule;

allreduce_wrapper->status = status;
return status;
/* ucc_schedule_start already set the schedule to UCC_INPROGRESS before
* firing SCHEDULE_STARTED. Fail the whole schedule now so that
* ucc_collective_finalize_internal will not refuse to run because the
* top-level request is still UCC_INPROGRESS. */
if (schedule) {
schedule->n_completed_tasks = schedule->n_tasks;
schedule->super.status = status;
ucc_task_complete(&schedule->super);
}
return UCC_OK;
}
allreduce_wrapper->status = UCC_INPROGRESS;
/* Enqueue wrapper task for progress */
return ucc_progress_queue_enqueue(team->contexts[0]->pq, allreduce_wrapper);
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 allreduce_wrapper never has ucc_task_complete called on it when its post fails

When ucc_service_allreduce fails inside ucc_dt_check_allreduce_post, the code sets allreduce_wrapper->status = status and then directly calls ucc_task_complete(&schedule->super) to fail the whole schedule. However, ucc_task_complete is never invoked on allreduce_wrapper itself, so UCC_EVENT_COMPLETED and UCC_EVENT_ERROR notifications to actual_wrapper are never sent. Please verify that ucc_schedule_finalize iterates all tasks and calls their finalize callbacks regardless of whether they were ever posted, to avoid leaking the actual_task returned by ucc_coll_init.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant