LSDB Operations Implementation#1359
Conversation
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
* wip * fix search and crossmatch unit tests * fix join
* fix est size * fix other unit tests
* add verify meta to ops * improve how verify_meta works * fix failing tests
* add boolean catalog filter logic * add partitioning test
delucchi-cmu
left a comment
There was a problem hiding this comment.
I'm only like 15% through looking at it, but this is enough to make me very happy:
There are just so many cases where the change cleans up existing interfaces. I'm sure there's sadness coming, so this is probably a nice place to leave it for today.
One major thing this PR needs is a longer description. We'll need to be clear about breaking changes, the motivation, what is NOT being implemented yet, etc. It's going to show up quite a bit in blame =D
|
|
||
|
|
||
| def test_coerce_to_meta_unsupported_scalar_type_raises_type_error(): | ||
| # KNOWN ISSUE: an object of a type pandas doesn't recognize as a dtype |
There was a problem hiding this comment.
So if the dataframe contains a row with HealpixPixel, we'll get a type error? I like to do that kinda thing A LOT in my homework pipelines, when there's some summary statistics I'm creating for each pixel. Or am I misreading this note?
There was a problem hiding this comment.
This test is for _coerce_to_meta, which is used at the end of a map_partitions call to coerce the result structure into a dataframe. So if you return something with a HealpixPixel intended as a column, then that should be okay. But if you have a map_partitions call that returns a HealpixPixel, then that would fail and you would need to wrap it in a supported return type (like {"pixel_result": my_HealpixPixel}. Does that seem reasonable?
There was a problem hiding this comment.
Could we coerce any unsupported dtype to an object column? I think that's what dask map_partitions would do currently and make it a series with a single element per partition, and I agree it's something I use currently
There was a problem hiding this comment.
Gotcha, that seems like what we want. I've updated the method to now have fallbacks to the np.object type whenever it tries to cast the type as a pandas type and fails.
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
Co-authored-by: Melissa DeLucchi <113376043+delucchi-cmu@users.noreply.github.com>
| @overload | ||
| def map_partitions( | ||
| self, | ||
| func: Callable[..., pd.DataFrame], | ||
| *args: Any, | ||
| meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None, | ||
| include_pixel: bool = False, | ||
| compute_single_partition: bool = False, | ||
| partition_index: int | HealpixPixel | None = None, | ||
| **kwargs: Any, | ||
| ) -> Self: ... | ||
| @overload | ||
| def map_partitions( | ||
| self, | ||
| func: Callable[..., pd.Series], | ||
| *args: Any, | ||
| meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None, | ||
| include_pixel: bool = False, | ||
| compute_single_partition: bool = False, | ||
| partition_index: int | HealpixPixel | None = None, | ||
| **kwargs: Any, | ||
| ) -> Self | dd.Series: ... | ||
| def map_partitions( | ||
| self, | ||
| func: Callable[..., npd.NestedFrame], | ||
| *args, | ||
| meta: pd.DataFrame | pd.Series | dict | Iterable | tuple | None = None, | ||
| include_pixel: bool = False, | ||
| compute_single_partition: bool = False, | ||
| partition_index: int | HealpixPixel | None = None, | ||
| **kwargs, | ||
| ) -> Self | dd.Series: |
There was a problem hiding this comment.
We don't actually ever return a dd.Series anymore, right? If it's always a catalog we can remove the overloads and change the type hints to remove the types within the Callable from func and remove the dd.Series return type.
| Self or dd.Series | ||
| A new catalog with each partition replaced with the output of the function applied to the original | ||
| partition. If the function returns a non dataframe output, a dask Series will be returned. |
There was a problem hiding this comment.
Same return type issue as above
| if not isinstance(new_op.meta, pd.DataFrame): | ||
| warnings.warn( | ||
| "output of the function must be a DataFrame to generate an LSDB `Catalog`. " | ||
| "`map_partitions` will return a dask object instead of a Catalog.", | ||
| RuntimeWarning, | ||
| ) | ||
| return new_cat.to_dask_dataframe() |
There was a problem hiding this comment.
How does this work with our meta inferencing logic? Is this reachable?
| self, op_class: type[MapPartitions], func, *args, meta=None, **kwargs | ||
| ) -> Self: | ||
| new_op = op_class(self._operation, func, *args, meta=meta, **kwargs) | ||
| return self._create_updated_dataset(op=new_op) |
There was a problem hiding this comment.
This is used in get_item but not updated in the Catalog subclass, so the margin will never be updated. We should override this with margin logic in the subclass, and potentially change map_partitions to use it so we don't have to overwrite map_partitions in Catalog.
| if new_dec_col != dec_col: | ||
| updated_params["dec_column"] = new_dec_col | ||
| return self._create_updated_dataset(ddf=ndf, updated_catalog_info_params=updated_params) | ||
| new_op = MapPartitions(self._operation, lambda df: df.rename(columns=columns)) |
There was a problem hiding this comment.
This should use the self.map_partitions so the margin also gets updated in the Catalog
| if len(self.dataframe.nested_columns) > 0: | ||
| ddf = ddf.astype({col: self.dataframe[col].dtype for col in self.dataframe.nested_columns}) |
There was a problem hiding this comment.
Is this no longer needed?
Co-authored-by: Sean McGuire <123987820+smcguire-cmu@users.noreply.github.com>
Click here to view all benchmarks. |
Closes #1342
Motivation
This PR implements a sizable rewrite of LSDB's backend relationship with Dask. Where previously, LSDB was built directly on top of Dask Dataframe and much of LSDBs functionality relied on dask primitives to dictate graph construction (for the most part, in some cases we moved to custom delayed-style graphs for better control). With LSDB Operations, LSDB takes complete control of graph construction through it's own custom set of
Operations.The driving motivation being that we have observed graph sizes/construction time to be a clear limiter on scalability, and when relying on Dask for graph construction we have several bottlenecks/fall over modes related to tasks getting too much information stored in them and pre-culled graph sizes being large. However, in principle we know exactly what the optimized graph should look like for nearly all LSDB workflows. Workflows either involved spatial partition matching, which we know exactly which partitions should be interwoven, or they are simple map_partitions style work. LSDB Operations are designed to arrive at this optimal graph directly.
The exact results depend on the workflow, but as a rough estimate we expect a ~3-5x improvement in number of graph tasks and graph size in memory. This can have downstream speedups in actual computation, simply by avoiding the Dask per task overhead (~1ms) through reducing the total number of tasks.
API Changes
The major change is that the
._ddfproperty, which accessed the underlying Dask Dataframe directly is no longer available, because there is no longer an underlying Dask DataFrame!The public API is for the most part untouched, but there are exceptions:
Additions:
Catalog.to_dask_dataframemethod as a replacement for the loss of ._ddfCatalog.exploded_columns(previously only available through ._ddf)Removals:
Catalog.mergeis removed as it was just a wrapper for Dask's merge, use it directly in DaskCatalog.to_dask_dataframe().mergeBehavior Changes:
Technical State
Unit test suite has been fully adapted to operations and is now all passing. Mostly only mypy failures now, some of which actually currently exist in the codebase as this PR introduces a pandas typing dev dependency that gives mypy additional typing context that it previously lacked.
Docs have been reviewed (one pass) both for code changes (._ddf replacement) and language consistency with operations. Static notebooks were checked for obvious language/code changes, but have not been rerun.
The operations branch was tested on a focus week sprint, which is useful to mention just to illustrate it's been tested outside of the scope of our CI.
Future Considerations