Skip to content

LSDB Operations Implementation#1359

Open
dougbrn wants to merge 66 commits into
mainfrom
operations
Open

LSDB Operations Implementation#1359
dougbrn wants to merge 66 commits into
mainfrom
operations

Conversation

@dougbrn

@dougbrn dougbrn commented May 4, 2026

Copy link
Copy Markdown
Contributor

Closes #1342

Motivation

This PR implements a sizable rewrite of LSDB's backend relationship with Dask. Where previously, LSDB was built directly on top of Dask Dataframe and much of LSDBs functionality relied on dask primitives to dictate graph construction (for the most part, in some cases we moved to custom delayed-style graphs for better control). With LSDB Operations, LSDB takes complete control of graph construction through it's own custom set of Operations.

The driving motivation being that we have observed graph sizes/construction time to be a clear limiter on scalability, and when relying on Dask for graph construction we have several bottlenecks/fall over modes related to tasks getting too much information stored in them and pre-culled graph sizes being large. However, in principle we know exactly what the optimized graph should look like for nearly all LSDB workflows. Workflows either involved spatial partition matching, which we know exactly which partitions should be interwoven, or they are simple map_partitions style work. LSDB Operations are designed to arrive at this optimal graph directly.

The exact results depend on the workflow, but as a rough estimate we expect a ~3-5x improvement in number of graph tasks and graph size in memory. This can have downstream speedups in actual computation, simply by avoiding the Dask per task overhead (~1ms) through reducing the total number of tasks.

API Changes

The major change is that the ._ddf property, which accessed the underlying Dask Dataframe directly is no longer available, because there is no longer an underlying Dask DataFrame!

The public API is for the most part untouched, but there are exceptions:

Additions:

  • Adds an explicit Catalog.to_dask_dataframe method as a replacement for the loss of ._ddf
  • Adds Catalog.exploded_columns (previously only available through ._ddf)

Removals:

  • Catalog.merge is removed as it was just a wrapper for Dask's merge, use it directly in Dask Catalog.to_dask_dataframe().merge
  • remove HealpixDataset.get_partition_index

Behavior Changes:

  • map_partitions meta has been expanded to accept more input data types (series or dict), and coerces results more heavily into dataframes (which return as catalogs)
  • HealpixDataset.get_partition returns HealpixDataset not ddf
  • .partitions supports healpix pixels as well as indices
  • to_delayed defaults to optimize=false instead of true
  • HealpixDataset.partitions is now iterable
  • HealpixDataset.sample no longer computes multiple times
  • HealpixDataset.random_sample doesn’t use delayed
  • prune_empty_partitions no longer has persist option
  • Column selection: Selecting a single column (e.g. catalog["my_column"]) now returns a catalog rather than a dd.series, however single column syntax is still usable for mask-style filtering (e.g. catalog[catalog["my_column"] == True]
  • No longer have a nest accessor (for nested columns) available from a catalog column

Technical State

Unit test suite has been fully adapted to operations and is now all passing. Mostly only mypy failures now, some of which actually currently exist in the codebase as this PR introduces a pandas typing dev dependency that gives mypy additional typing context that it previously lacked.

Docs have been reviewed (one pass) both for code changes (._ddf replacement) and language consistency with operations. Static notebooks were checked for obvious language/code changes, but have not been rerun.

The operations branch was tested on a focus week sprint, which is useful to mention just to illustrate it's been tested outside of the scope of our CI.

Future Considerations

  • The recently added LSDB Streaming implementation had some nice performance gains over the current LSDB (particularly by avoiding the large pre-culled graph sizes present in pre-operations LSDB), but potentially less so when compared against operations. While this is nice, we should look further into optimizing streaming for operations.
  • Query optimization: We have enough control over the graphs to be able to retroactively optimize our graph execution, for example loading all columns but only providing a workflow that works with 1 of them. We didn't want to include this in our initial implementation but it is worth investigating in the near future.
  • Series Integration: We currently sort of replicate some series behavior such as column selection, mask filtering, or map_partitions returning a scalar, with single column data frames. We may want to consider switching to either our own Operations backed Series or integrating with Dask Series. Some exploration on this has been done on the sean/series branch

@review-notebook-app

Copy link
Copy Markdown

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@dougbrn dougbrn changed the title [DO NOT MERGE] LSDB Operations Implementation LSDB Operations Implementation Jun 24, 2026
@dougbrn dougbrn marked this pull request as ready for review June 24, 2026 16:51
@dougbrn dougbrn requested a review from delucchi-cmu June 24, 2026 16:52

@delucchi-cmu delucchi-cmu left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm only like 15% through looking at it, but this is enough to make me very happy:

Image

There are just so many cases where the change cleans up existing interfaces. I'm sure there's sadness coming, so this is probably a nice place to leave it for today.

One major thing this PR needs is a longer description. We'll need to be clear about breaking changes, the motivation, what is NOT being implemented yet, etc. It's going to show up quite a bit in blame =D

Comment thread src/lsdb/catalog/dataset/healpix_dataset.py Outdated
Comment thread src/lsdb/catalog/dataset/healpix_dataset.py
Comment thread src/lsdb/catalog/dataset/healpix_dataset.py
Comment thread src/lsdb/operations/lsdb_ops.py
Comment thread src/lsdb/operations/lsdb_ops.py
Comment thread src/lsdb/streams/catalog_streams.py
Comment thread src/lsdb/loaders/dataframe/margin_catalog_generator.py Outdated
Comment thread src/lsdb/io/schema.py
Comment thread src/lsdb/io/to_hats.py Outdated
Comment thread src/lsdb/io/to_hats.py Outdated
Comment thread src/lsdb/loaders/dataframe/from_dataframe_utils.py
Comment thread src/lsdb/operations/functions/concat_catalog_data.py Outdated
Comment thread src/lsdb/operations/lsdb_ops.py
Comment thread src/lsdb/operations/lsdb_ops.py
Comment thread tests/lsdb/operations/test_ops.py Outdated


def test_coerce_to_meta_unsupported_scalar_type_raises_type_error():
# KNOWN ISSUE: an object of a type pandas doesn't recognize as a dtype

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the dataframe contains a row with HealpixPixel, we'll get a type error? I like to do that kinda thing A LOT in my homework pipelines, when there's some summary statistics I'm creating for each pixel. Or am I misreading this note?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is for _coerce_to_meta, which is used at the end of a map_partitions call to coerce the result structure into a dataframe. So if you return something with a HealpixPixel intended as a column, then that should be okay. But if you have a map_partitions call that returns a HealpixPixel, then that would fail and you would need to wrap it in a supported return type (like {"pixel_result": my_HealpixPixel}. Does that seem reasonable?

@smcguire-cmu smcguire-cmu Jun 29, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we coerce any unsupported dtype to an object column? I think that's what dask map_partitions would do currently and make it a series with a single element per partition, and I agree it's something I use currently

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, that seems like what we want. I've updated the method to now have fallbacks to the np.object type whenever it tries to cast the type as a pandas type and fails.

dougbrn and others added 7 commits June 25, 2026 14:35
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Comment thread src/lsdb/loaders/hats/read_hats.py Outdated
Comment thread src/lsdb/catalog/dataset/healpix_dataset.py
Comment on lines +320 to +351
@overload
def map_partitions(
self,
func: Callable[..., pd.DataFrame],
*args: Any,
meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None,
include_pixel: bool = False,
compute_single_partition: bool = False,
partition_index: int | HealpixPixel | None = None,
**kwargs: Any,
) -> Self: ...
@overload
def map_partitions(
self,
func: Callable[..., pd.Series],
*args: Any,
meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None,
include_pixel: bool = False,
compute_single_partition: bool = False,
partition_index: int | HealpixPixel | None = None,
**kwargs: Any,
) -> Self | dd.Series: ...
def map_partitions(
self,
func: Callable[..., npd.NestedFrame],
*args,
meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None,
include_pixel: bool = False,
compute_single_partition: bool = False,
partition_index: int | HealpixPixel | None = None,
**kwargs,
) -> Self | dd.Series:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't actually ever return a dd.Series anymore, right? If it's always a catalog we can remove the overloads and change the type hints to remove the types within the Callable from func and remove the dd.Series return type.

Comment on lines +396 to +398
Self or dd.Series
A new catalog with each partition replaced with the output of the function applied to the original
partition. If the function returns a non dataframe output, a dask Series will be returned.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same return type issue as above

Comment on lines +419 to +425
if not isinstance(new_op.meta, pd.DataFrame):
warnings.warn(
"output of the function must be a DataFrame to generate an LSDB `Catalog`. "
"`map_partitions` will return a dask object instead of a Catalog.",
RuntimeWarning,
)
return new_cat.to_dask_dataframe()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this work with our meta inferencing logic? Is this reachable?

self, op_class: type[MapPartitions], func, *args, meta=None, **kwargs
) -> Self:
new_op = op_class(self._operation, func, *args, meta=meta, **kwargs)
return self._create_updated_dataset(op=new_op)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in get_item but not updated in the Catalog subclass, so the margin will never be updated. We should override this with margin logic in the subclass, and potentially change map_partitions to use it so we don't have to overwrite map_partitions in Catalog.

Comment thread src/lsdb/catalog/dataset/healpix_dataset.py Outdated
if new_dec_col != dec_col:
updated_params["dec_column"] = new_dec_col
return self._create_updated_dataset(ddf=ndf, updated_catalog_info_params=updated_params)
new_op = MapPartitions(self._operation, lambda df: df.rename(columns=columns))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the self.map_partitions so the margin also gets updated in the Catalog

Comment on lines -264 to -265
if len(self.dataframe.nested_columns) > 0:
ddf = ddf.astype({col: self.dataframe[col].dtype for col in self.dataframe.nested_columns})

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this no longer needed?

dougbrn and others added 2 commits July 1, 2026 15:24
Co-authored-by: Sean McGuire <123987820+smcguire-cmu@users.noreply.github.com>
@github-actions

github-actions Bot commented Jul 1, 2026

Copy link
Copy Markdown
Before [4e1fdfc] After [605740d] Ratio Benchmark (Parameter)
8.71±0.06s 3.85±0.01s ~0.44 benchmarks.time_lazy_crossmatch_many_columns_overlapping_suffixes
9.56±0.1s 9.02±0.04s 0.94 benchmarks.time_save_big_catalog
159±4ms 125±1ms 0.79 benchmarks.time_open_many_columns_list
44.1±1ms 31.5±0.7ms 0.71 benchmarks.time_polygon_search
340±6ms 232±2ms 0.68 benchmarks.time_open_many_columns_default
26.1±0.9ms 14.1±0.2ms 0.54 benchmarks.time_box_filter_on_partition
3.09±0.02s 1.51±0.01s 0.49 benchmarks.time_open_many_columns_all
8.48±0.02s 3.83±0.01s 0.45 benchmarks.time_lazy_crossmatch_many_columns_all_suffixes
162±2ms 68.0±0.5ms 0.42 benchmarks.time_kdtree_crossmatch
1.01±0.01s 170±1ms 0.17 benchmarks.time_create_midsize_catalog

Click here to view all benchmarks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement LSDB Custom Graphs

4 participants