Skip to content

Commit f741cb7

Browse files
committed
high level scan api draft
1 parent e8cd130 commit f741cb7

10 files changed

Lines changed: 910 additions & 4 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vortex-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ tracing = { workspace = true, features = ["std", "log"] }
3131
tracing-subscriber = { workspace = true, features = ["env-filter"] }
3232
url = { workspace = true, features = [] }
3333
vortex = { workspace = true, features = ["object_store"] }
34+
arrow-array = { workspace = true, features = ["ffi"] }
3435

3536
[dev-dependencies]
3637
tempfile = { workspace = true }

vortex-ffi/cinclude/vortex.h

Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@
1818
*/
1919
#define BinaryView_MAX_INLINED_SIZE 12
2020

21+
typedef enum {
22+
VX_CARD_UNKNOWN = 0,
23+
VX_CARD_ESTIMATE = 1,
24+
VX_CARD_MAXIMUM = 2,
25+
} vx_cardinality;
26+
2127
/**
2228
* Variant enum for Vortex primitive types.
2329
*/
@@ -204,6 +210,18 @@ typedef enum {
204210
LOG_LEVEL_TRACE = 5,
205211
} vx_log_level;
206212

213+
typedef enum {
214+
VX_S_INCLUDE_ALL = 0,
215+
VX_S_INCLUDE_RANGE = 1,
216+
VX_S_EXCLUDE_RANGE = 2,
217+
} vx_scan_selection_include;
218+
219+
typedef enum {
220+
VX_ESTIMATE_UNKNOWN = 0,
221+
VX_ESTIMATE_EXACT = 1,
222+
VX_ESTIMATE_INEXACT = 2,
223+
} vx_estimate_boundary;
224+
207225
/**
208226
* Physical type enum, represents the in-memory physical layout but might represent a different logical type.
209227
*/
@@ -295,6 +313,8 @@ typedef struct Nullability Nullability;
295313

296314
typedef struct Primitive Primitive;
297315

316+
typedef struct VxFileHandle VxFileHandle;
317+
298318
/**
299319
* Base type for all Vortex arrays.
300320
*
@@ -344,6 +364,14 @@ typedef struct vx_array_sink vx_array_sink;
344364
*/
345365
typedef struct vx_binary vx_binary;
346366

367+
/**
368+
* A data source is a reference to multiple possibly remote files. When
369+
* created, it opens first file to determine the schema from DType, all
370+
* other operations are deferred till a scan is requested. You can request
371+
* multiple file scans from a data source
372+
*/
373+
typedef struct vx_data_source vx_data_source;
374+
347375
/**
348376
* A Vortex data type.
349377
*
@@ -378,6 +406,14 @@ typedef struct vx_expression vx_expression;
378406
*/
379407
typedef struct vx_file vx_file;
380408

409+
/**
410+
* A Partition is a unit of work for a worker thread from which you can
411+
* get vx_arrays.
412+
*/
413+
typedef struct vx_partition vx_partition;
414+
415+
typedef struct vx_scan vx_scan;
416+
381417
/**
382418
* A handle to a Vortex session.
383419
*/
@@ -398,6 +434,67 @@ typedef struct vx_struct_fields vx_struct_fields;
398434
*/
399435
typedef struct vx_struct_fields_builder vx_struct_fields_builder;
400436

437+
typedef int (*vx_fs_use_vortex)(const char *schema, const char *path);
438+
439+
typedef void (*vx_fs_set_userdata)(void *userdata);
440+
441+
typedef void (*vx_fs_open)(void *userdata, const char *path, vx_error **err);
442+
443+
typedef void (*vx_fs_create)(void *userdata, const char *path, vx_error **err);
444+
445+
typedef void (*vx_list_callback)(void *userdata, const char *name, int is_dir);
446+
447+
typedef void (*vx_fs_list)(const void *userdata,
448+
const char *path,
449+
vx_list_callback callback,
450+
vx_error **error);
451+
452+
typedef const VxFileHandle *vx_file_handle;
453+
454+
typedef void (*vx_fs_close)(vx_file_handle handle);
455+
456+
typedef uint64_t (*vx_fs_size)(vx_file_handle handle, vx_error **err);
457+
458+
typedef uint64_t (
459+
*vx_fs_read)(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, vx_error **err);
460+
461+
typedef uint64_t (
462+
*vx_fs_write)(vx_file_handle handle, uint64_t offset, size_t len, uint8_t *buffer, vx_error **err);
463+
464+
typedef void (*vx_fs_sync)(vx_file_handle handle, vx_error **err);
465+
466+
typedef void (*vx_glob_callback)(void *userdata, const char *file);
467+
468+
typedef void (*vx_glob)(const char *glob, vx_glob_callback callback, vx_error **err);
469+
470+
/**
471+
* Host must either implement all or none of fs_* callbacks.
472+
*/
473+
typedef struct {
474+
const char *files;
475+
/**
476+
* Whether to use Vortex filesystem or host's filesystem.
477+
* Return 1 to use Vortex for a given schema ("file", "s3") and path.
478+
* Return 0 to use host's filesystem.
479+
*/
480+
vx_fs_use_vortex fs_use_vortex;
481+
vx_fs_set_userdata fs_set_userdata;
482+
vx_fs_open fs_open;
483+
vx_fs_create fs_create;
484+
vx_fs_list fs_list;
485+
vx_fs_close fs_close;
486+
vx_fs_size fs_size;
487+
vx_fs_read fs_read;
488+
vx_fs_write fs_write;
489+
vx_fs_sync fs_sync;
490+
vx_glob glob;
491+
} vx_data_source_options;
492+
493+
typedef struct {
494+
vx_cardinality cardinality;
495+
uint64_t rows;
496+
} vx_data_source_row_count;
497+
401498
/**
402499
* Options supplied for opening a file.
403500
*/
@@ -460,6 +557,28 @@ typedef struct {
460557
unsigned long row_offset;
461558
} vx_file_scan_options;
462559

560+
typedef struct {
561+
uint64_t *idx;
562+
size_t idx_len;
563+
vx_scan_selection_include include;
564+
} vx_scan_selection;
565+
566+
typedef struct {
567+
const vx_expression *projection;
568+
const vx_expression *filter;
569+
uint64_t row_range_begin;
570+
uint64_t row_range_end;
571+
vx_scan_selection selection;
572+
uint64_t limit;
573+
uint64_t max_threads;
574+
int ordered;
575+
} vx_scan_options;
576+
577+
typedef struct {
578+
uint64_t estimate;
579+
vx_estimate_boundary type;
580+
} vx_estimate;
581+
463582
#ifdef __cplusplus
464583
extern "C" {
465584
#endif // __cplusplus
@@ -604,6 +723,34 @@ size_t vx_binary_len(const vx_binary *ptr);
604723
*/
605724
const char *vx_binary_ptr(const vx_binary *ptr);
606725

726+
/**
727+
* Clone a borrowed [`vx_data_source`], returning an owned [`vx_data_source`].
728+
*
729+
*
730+
* Must be released with [`vx_data_source_free`].
731+
*/
732+
const vx_data_source *vx_data_source_clone(const vx_data_source *ptr);
733+
734+
/**
735+
* Free an owned [`vx_data_source`] object.
736+
*/
737+
void vx_data_source_free(const vx_data_source *ptr);
738+
739+
/**
740+
* Create a new owned datasource which must be freed by the caller
741+
*/
742+
const vx_data_source *
743+
vx_data_source_new(const vx_session *session, const vx_data_source_options *opts, vx_error **err);
744+
745+
/**
746+
* Create a non-owned dtype referencing dataframe.
747+
* This dtype's lifetime is bound to underlying data source.
748+
* Caller should not free this dtype.
749+
*/
750+
const vx_dtype *vx_data_source_dtype(const vx_data_source *ds);
751+
752+
void vx_data_source_get_row_count(const vx_data_source *ds, vx_data_source_row_count *rc);
753+
607754
/**
608755
* Clone a borrowed [`vx_dtype`], returning an owned [`vx_dtype`].
609756
*
@@ -747,6 +894,8 @@ uint8_t vx_dtype_time_unit(const DType *dtype);
747894
*/
748895
const vx_string *vx_dtype_time_zone(const DType *dtype);
749896

897+
void vx_type_to_arrow_schema(const vx_dtype *_dtype, FFI_ArrowSchema *_schema, vx_error **_err);
898+
750899
/**
751900
* Free an owned [`vx_error`] object.
752901
*/
@@ -927,6 +1076,47 @@ vx_array_iterator *vx_file_scan(const vx_session *session,
9271076
*/
9281077
void vx_set_log_level(vx_log_level level);
9291078

1079+
/**
1080+
* Free an owned [`vx_scan`] object.
1081+
*/
1082+
void vx_scan_free(vx_scan *ptr);
1083+
1084+
/**
1085+
* Free an owned [`vx_partition`] object.
1086+
*/
1087+
void vx_partition_free(vx_partition *ptr);
1088+
1089+
vx_scan *vx_data_source_scan(const vx_data_source *data_source,
1090+
const vx_scan_options *options,
1091+
vx_estimate *estimate,
1092+
vx_error **err);
1093+
1094+
/**
1095+
* Get next owned partition out of a scan request.
1096+
* Caller must free this partition using vx_partition_free.
1097+
* This method is thread-safe.
1098+
* If using in a sync multi-thread runtime, users are encouraged to create a
1099+
* worker thread per partition.
1100+
* Returns NULL and doesn't set err on exhaustion.
1101+
* Returns NULL and sets err on error.
1102+
*/
1103+
vx_partition *vx_scan_next(vx_scan *scan, vx_error **err);
1104+
1105+
void vx_partition_row_count(const vx_partition *partition, vx_estimate *count, vx_error **err);
1106+
1107+
void vx_partition_scan_arrow(const vx_partition *_partition, FFI_ArrowArrayStream *_stream, vx_error **err);
1108+
1109+
/**
1110+
* Get next vx_array out of this partition.
1111+
* Thread-unsafe.
1112+
*/
1113+
const vx_array *vx_partition_next(vx_partition *partition, vx_error **err);
1114+
1115+
/**
1116+
* Scan progress between 0.0 and 1.0
1117+
*/
1118+
double vx_scan_progress(const vx_scan *_scan);
1119+
9301120
/**
9311121
* Free an owned [`vx_session`] object.
9321122
*/

0 commit comments

Comments
 (0)