Skip to content

[Prism] Fix gRPC deadline exceeded errors during bundle failure by passing errgroup context#38472

Merged
shunping merged 8 commits into
apache:masterfrom
shunping:fail-dofn-without-deadline-exceed
May 12, 2026
Merged

[Prism] Fix gRPC deadline exceeded errors during bundle failure by passing errgroup context#38472
shunping merged 8 commits into
apache:masterfrom
shunping:fail-dofn-without-deadline-exceed

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

@shunping shunping commented May 12, 2026

In executePipeline, bundle execution goroutines previously used the parent job context instead of the errgroup context (egctx). When a bundle failed, egctx was cancelled but other concurrent bundle execution goroutines did not detect cancellation. This caused eg.Wait() to block indefinitely, preventing Prism from reporting JobState_FAILED promptly and leading to 60-second gRPC DEADLINE_EXCEEDED errors on the client side.

Example failed tests:

With the proposed fix, the following steps will be triggered to ensure both the server and the client exit gracefully.

  1. As soon as any bundle fails, eg cancels egctx (We need to pass egctx instead of ctx in s.Execute() below).

    eg.Go(func() error {
    s := stages[rb.StageID]
    wk := wks[s.envID]
    if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
    // Ensure we clean up on bundle failure
    j.Logger.Error("Bundle Failed.", slog.Any("error", err))
    em.FailBundle(rb)
    return err
    }
    return nil
    })

  2. All other concurrent bundle execution goroutines immediately detect <-egctx.Done() and exit.

    func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) {

    progress:
    for {
    select {
    case <-ctx.Done():

  3. eg.Wait() unblocks immediately, allowing RunPipeline to call j.Failed(err).

    case rb, ok := <-bundles:
    if !ok {
    err := eg.Wait()
    j.Logger.Info("pipeline done!", slog.String("job", j.String()))
    j.Logger.Debug("finished state", slog.String("job", j.String()), slog.Any("error", err), slog.String("stages", em.DumpStages()))
    return err
    }

  4. Prism immediately broadcasts JobState_FAILED over GetStateStream, and the Python SDK client gracefully receives and raises the underlying worker exception without encountering a deadline exceeded error.

    if err := executePipeline(j.RootCtx, wks, j); err != nil && !errors.Is(err, jobservices.ErrCancel) {
    j.Failed(err)
    return
    }

@shunping
Copy link
Copy Markdown
Collaborator Author

r: @lostluck

@shunping shunping marked this pull request as ready for review May 12, 2026 16:25
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a concurrency issue in the Prism runner where bundle execution goroutines were not properly respecting cancellation signals. By switching to the errgroup context, the runner now ensures that when one bundle fails, all other concurrent operations are cancelled immediately. This change prevents indefinite blocking, allowing the runner to report job failures promptly and avoiding unnecessary gRPC deadline exceeded errors on the client side.

Highlights

  • Context Propagation: Updated bundle execution to use the errgroup context instead of the parent job context, ensuring that bundle failures trigger immediate cancellation of concurrent tasks.
  • Error Handling: Resolved an issue where eg.Wait() would block indefinitely during bundle failures, preventing prompt reporting of job failures and causing gRPC deadline exceeded errors.
  • Test Coverage: Added new Go and Python integration tests to verify that pipelines exit gracefully and report errors promptly when a DoFn fails.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request improves the Prism runner's error handling by utilizing the errgroup context during bundle execution. This ensures that if any bundle fails, all other concurrent bundle goroutines are immediately canceled, preventing the runner from hanging and allowing for prompt error reporting. The changes include new regression tests in both Go and Python. Review feedback identified a bug in the Python test where incorrect pipeline options were imported, which would result in a NameError during execution.

Comment thread sdks/python/apache_beam/runners/portability/prism_runner_test.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@codecov
Copy link
Copy Markdown

codecov Bot commented May 12, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 55.76%. Comparing base (fba639a) to head (143785d).
⚠️ Report is 9 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #38472      +/-   ##
============================================
- Coverage     57.54%   55.76%   -1.79%     
+ Complexity     5329     2095    -3234     
============================================
  Files          1399     1099     -300     
  Lines        198776   172274   -26502     
  Branches       4980     1350    -3630     
============================================
- Hits         114385    96061   -18324     
+ Misses        80476    73817    -6659     
+ Partials       3915     2396    -1519     
Flag Coverage Δ
go 28.70% <100.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@shunping
Copy link
Copy Markdown
Collaborator Author

cc'ed @claudevdm

@shunping shunping requested a review from lostluck May 12, 2026 18:19
Copy link
Copy Markdown
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@github-actions github-actions Bot removed the python label May 12, 2026
@shunping shunping merged commit ec65d64 into apache:master May 12, 2026
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants