Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/data.table.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,9 @@ SEXP bmerge(SEXP idt, SEXP xdt, SEXP icolsArg, SEXP xcolsArg,
double dquickselect(double *x, int n);
double iquickselect(int *x, int n);
double i64quickselect(int64_t *x, int n);
double dfloyd_rivest(double *x, int n);
double ifloyd_rivest(int *x, int n);
double i64floyd_rivest(int64_t *x, int n);

// fread.c
double wallclock(void);
Expand Down
69 changes: 47 additions & 22 deletions src/gsumm.c
Original file line number Diff line number Diff line change
Expand Up @@ -880,43 +880,68 @@ SEXP gmedian(SEXP x, SEXP narmArg) {
const bool nosubset = irowslen==-1;
switch(TYPEOF(x)) {
case REALSXP: {
double *subd = REAL(PROTECT(allocVector(REALSXP, maxgrpn))); // allocate once upfront and reuse
int64_t *xi64 = (int64_t *)REAL(x);
double *xd = REAL(x);
for (int i=0; i<ngrp; ++i) {
int thisgrpsize = grpsize[i], nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
else subd[j-nacount] = xd[k];
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
{
double *thread_subd = malloc(maxgrpn * sizeof(double));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to allocate outside of parallel and use R alloc, so the memory usage can be tracked by apps that track memory usage using R API.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, will do.

if (!thread_subd) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
#pragma omp for
for (int i=0; i<ngrp; ++i) {
int thisgrpsize = grpsize[i], nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
k = nosubset ? k : (irows[k]==NA_INTEGER ? NA_INTEGER : irows[k]-1);
if (k==NA_INTEGER || (isInt64 ? xi64[k]==NA_INTEGER64 : ISNAN(xd[k]))) nacount++;
else thread_subd[j-nacount] = xd[k];
}
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
if (nacount && !narm) {
ansd[i] = NA_REAL;
} else {
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to put 100 as a parameter, so we can explore impact of this code path branches by options

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could do. Like all of these parameters, it should be tuned.

ansd[i] = isInt64 ?
(thisgrpsize > 100 ? i64floyd_rivest((int64_t *)thread_subd, thisgrpsize) : i64quickselect((int64_t *)thread_subd, thisgrpsize)) :
(thisgrpsize > 100 ? dfloyd_rivest(thread_subd, thisgrpsize) : dquickselect(thread_subd, thisgrpsize));
}
}
thisgrpsize -= nacount; // all-NA is returned as NA_REAL via n==0 case inside *quickselect
ansd[i] = (nacount && !narm) ? NA_REAL : (isInt64 ? i64quickselect((void *)subd, thisgrpsize) : dquickselect(subd, thisgrpsize));
free(thread_subd);
}}
break;
case LGLSXP: case INTSXP: {
int *subi = INTEGER(PROTECT(allocVector(INTSXP, maxgrpn)));
int *xi = INTEGER(x);
for (int i=0; i<ngrp; i++) {
const int thisgrpsize = grpsize[i];
int nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
else subi[j-nacount] = xi[k];
#pragma omp parallel num_threads(getDTthreads(ngrp, true))
{
int *thread_subi = malloc(maxgrpn * sizeof(int));
if (!thread_subi) error(_("Unable to allocate thread-local buffer in gmedian")); // # nocov
#pragma omp for
for (int i=0; i<ngrp; i++) {
const int thisgrpsize = grpsize[i];
int nacount=0;
for (int j=0; j<thisgrpsize; ++j) {
int k = ff[i]+j-1;
if (isunsorted) k = oo[k]-1;
if (nosubset ? xi[k]==NA_INTEGER : (irows[k]==NA_INTEGER || (k=irows[k]-1,xi[k]==NA_INTEGER))) nacount++;
else thread_subi[j-nacount] = xi[k];
}
if (nacount && !narm) {
ansd[i] = NA_REAL;
} else {
const int validsize = thisgrpsize-nacount;
// Use Floyd-Rivest for groups larger than 100, quickselect for smaller
ansd[i] = validsize > 100 ? ifloyd_rivest(thread_subi, validsize) : iquickselect(thread_subi, validsize);
}
}
ansd[i] = (nacount && !narm) ? NA_REAL : iquickselect(subi, thisgrpsize-nacount);
free(thread_subi);
}}
break;
default:
error(_("Type '%s' is not supported by GForce %s. Either add the prefix %s or turn off GForce optimization using options(datatable.optimize=1)"), type2char(TYPEOF(x)), "median (gmedian)", "stats::median(.)");
}
if (!isInt64) copyMostAttrib(x, ans);
// else the integer64 class needs to be dropped since double is always returned by gmedian
UNPROTECT(2); // ans, subd|subi
UNPROTECT(1); // ans
return ans;
}

Expand Down
71 changes: 71 additions & 0 deletions src/quickselect.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,74 @@ double i64quickselect(int64_t *x, int n)
int64_t a, b;
BODY(i64swap);
}

// Floyd-Rivest selection algorithm
Comment thread
ben-schwen marked this conversation as resolved.

#undef FLOYD_RIVEST_BODY
#define FLOYD_RIVEST_BODY(SWAP, TYPE) \
if (n == 0) return NA_REAL; \
unsigned long med = n / 2 - (n % 2 == 0); \
unsigned long l = 0, ir = n - 1; \
while (ir > l) { \
if (ir - l > 600) { \
unsigned long nn = ir - l + 1; \
unsigned long i = med - l + 1; \
double z = log((double)nn); \
double s = 0.5 * exp(2.0*z/3.0); \
double sd = 0.5 * sqrt(z*s*(nn-s)/nn); \
if (i < nn/2) sd = -sd; \
unsigned long newL = (unsigned long)(MAX((long)l, (long)(med - i*s/nn + sd))); \
unsigned long newR = (unsigned long)(MIN((long)ir, (long)(med + (nn-i)*s/nn + sd))); \
l = newL; \
ir = newR; \
} \
TYPE pivot = x[med]; \
unsigned long i = l, j = ir; \
SWAP(x + l, x + med); \
if (x[ir] > pivot) { \
SWAP(x + ir, x + l); \
pivot = x[l]; \
} \
while (i < j) { \
SWAP(x + i, x + j); \
i++; j--; \
while (x[i] < pivot) i++; \
while (x[j] > pivot) j--; \
} \
if (x[l] == pivot) { \
SWAP(x + l, x + j); \
} else { \
j++; \
SWAP(x + j, x + ir); \
} \
if (j <= med) l = j + 1; \
if (med <= j) ir = j - 1; \
} \
a = x[med]; \
if (n % 2 == 1) { \
return (double)a; \
} else { \
b = x[med + 1]; \
for (unsigned long i = med + 2; i < n; i++) { \
if (x[i] < b) b = x[i]; \
} \
return ((double)a + (double)b) / 2.0; \
}

double dfloyd_rivest(double *x, int n)
{
double a, b;
FLOYD_RIVEST_BODY(dswap, double);
}

double ifloyd_rivest(int *x, int n)
{
int a, b;
FLOYD_RIVEST_BODY(iswap, int);
}

double i64floyd_rivest(int64_t *x, int n)
{
int64_t a, b;
FLOYD_RIVEST_BODY(i64swap, int64_t);
}
Loading