Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 252 additions & 4 deletions src/common.cu
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,10 @@ void Barrier(struct threadArgs *args) {
while(counter[epoch] != args->nThreads)
pthread_cond_wait(&cond[epoch], &lock[epoch]);
#ifdef MPI_SUPPORT
MPI_Barrier(MPI_COMM_WORLD);
{
MPI_Comm bc = (args->mpi_comm != MPI_COMM_NULL) ? args->mpi_comm : MPI_COMM_WORLD;
MPI_Barrier(bc);
}
#endif
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
Expand Down Expand Up @@ -332,11 +335,12 @@ void Allreduce(struct threadArgs* args, T* value, int average) {
average == 2 ? MPI_MIN :
average == 3 ? MPI_MAX :
average == 4 ? MPI_SUM : MPI_Op();
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, MPI_COMM_WORLD);
MPI_Comm ac = (args->mpi_comm != MPI_COMM_NULL) ? args->mpi_comm : MPI_COMM_WORLD;
MPI_Allreduce(MPI_IN_PLACE, (void*)&accumulator[epoch], 1, ty, op, ac);
}
#endif

if(average == 1) accumulator[epoch] /= args->totalProcs*args->nThreads;
if(average == 1) accumulator[epoch] /= args->nProcs*args->nThreads;
counter[epoch] = 0;
pthread_cond_broadcast(&cond[epoch]);
}
Expand Down Expand Up @@ -618,6 +622,23 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
if (cudaGraphLaunches >= 1) deltaSec = deltaSec/cudaGraphLaunches;
Allreduce(args, &deltaSec, average);

// Save per-pair deltaSec for sv_data before global averaging
double pairDeltaSec = deltaSec;

#ifdef MPI_SUPPORT
// When using split sub-communicator, the Allreduce above only averages within
// the pair. Do a second global Allreduce so the original output reflects all pairs.
if (args->mpi_comm != MPI_COMM_NULL && args->thread+1 == args->nThreads) {
// deltaSec was already averaged within the pair; each pair's nProcs procs
// hold the same pair_avg. MPI_SUM across totalProcs procs gives
// nProcs * sum_of_pair_avgs; dividing by totalProcs yields the global average.
double globalDelta = deltaSec;
MPI_Allreduce(MPI_IN_PLACE, &globalDelta, 1, MPI_DOUBLE, MPI_SUM, MPI_COMM_WORLD);
globalDelta /= args->totalProcs;
deltaSec = globalDelta;
}
#endif

#if CUDART_VERSION >= 11030
if (cudaGraphLaunches >= 1) {
//destroy cuda graph
Expand Down Expand Up @@ -696,6 +717,49 @@ testResult_t BenchTime(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t

args->bw[0] += busBw;
args->bw_count[0]++;
// Record per-size full perf data for split verbose mode (using per-pair values)
if (args->sv_sizes) {
// Compute per-pair bandwidth from pairDeltaSec (not global deltaSec)
double pairAlgBw, pairBusBw;
args->collTest->getBw(count, wordSize(type), pairDeltaSec, &pairAlgBw, &pairBusBw, args->nProcs*args->nThreads*args->nGpus);
double pairTimeUsec = pairDeltaSec * 1.0E6;

size_t msgSize = max(args->sendBytes, args->expectedBytes);
size_t svCount = args->nbytes / wordSize(type);
// Find or create entry for this size
int idx = -1;
for (int i = 0; i < args->sv_nSizes; i++) {
if (args->sv_sizes[i] == msgSize) { idx = i; break; }
}
if (idx < 0) {
// New size entry — grow arrays if needed
if (args->sv_nSizes >= args->sv_maxSizes) {
int newMax = args->sv_maxSizes ? args->sv_maxSizes * 2 : 16;
args->sv_sizes = (size_t*)realloc(args->sv_sizes, newMax * sizeof(size_t));
args->sv_counts = (size_t*)realloc(args->sv_counts, newMax * sizeof(size_t));
args->sv_data = (double*)realloc(args->sv_data, newMax * 6 * sizeof(double));
for (int i = args->sv_maxSizes; i < newMax; i++) {
args->sv_sizes[i] = 0; args->sv_counts[i] = 0;
for (int k = 0; k < 6; k++) args->sv_data[i * 6 + k] = 0.0;
}
args->sv_maxSizes = newMax;
}
idx = args->sv_nSizes++;
args->sv_sizes[idx] = msgSize;
args->sv_counts[idx] = svCount;
for (int k = 0; k < 6; k++) args->sv_data[idx * 6 + k] = 0.0;
}
int base = idx * 6;
if (in_place == 0) {
args->sv_data[base + 0] = pairTimeUsec;
args->sv_data[base + 1] = pairAlgBw;
args->sv_data[base + 2] = pairBusBw;
} else {
args->sv_data[base + 3] = pairTimeUsec;
args->sv_data[base + 4] = pairAlgBw;
args->sv_data[base + 5] = pairBusBw;
}
}
return testSuccess;
}

Expand Down Expand Up @@ -1256,8 +1320,11 @@ testResult_t run() {
}

char *splitMaskEnv = NULL;
int splitNumColors = 0; // total number of split groups (0 means no split)
if (splitMaskEnv = getenv("NCCL_TESTS_SPLIT_MASK")) {
color = proc & strtoul(splitMaskEnv, NULL, 16);
unsigned long mask = strtoul(splitMaskEnv, NULL, 16);
color = proc & mask;
splitNumColors = (int)(mask + 1);
} else if (splitMaskEnv = getenv("NCCL_TESTS_SPLIT")) {
if (
(strncasecmp(splitMaskEnv, "AND", strlen("AND")) == 0 && parseInt(splitMaskEnv + strlen("AND"), &color)) ||
Expand Down Expand Up @@ -1503,6 +1570,9 @@ testResult_t run() {
threads[t].args.totalProcs=totalProcs;
threads[t].args.nProcs=ncclProcs;
threads[t].args.proc=ncclProc;
#ifdef MPI_SUPPORT
threads[t].args.mpi_comm=mpi_comm;
#endif
threads[t].args.nThreads=nThreads;
threads[t].args.thread=t;
threads[t].args.nGpus=nGpus;
Expand All @@ -1524,6 +1594,21 @@ testResult_t run() {
threads[t].args.errors=errors+t;
threads[t].args.bw=bw+t;
threads[t].args.bw_count=bw_count+t;
// Per-size tracking: only allocate when split verbose is enabled
threads[t].args.sv_sizes=NULL;
threads[t].args.sv_counts=NULL;
threads[t].args.sv_data=NULL;
threads[t].args.sv_nSizes=0;
threads[t].args.sv_maxSizes=0;
if (splitNumColors > 0) {
char* vEnv = getenv("NCCL_TESTS_SPLIT_VERBOSE");
if (vEnv && atoi(vEnv) != 0) {
threads[t].args.sv_maxSizes = 16;
threads[t].args.sv_sizes = (size_t*)calloc(16, sizeof(size_t));
threads[t].args.sv_counts = (size_t*)calloc(16, sizeof(size_t));
threads[t].args.sv_data = (double*)calloc(16 * 6, sizeof(double));
}
}
threads[t].args.initGpuMem = initGpuMem + t;
threads[t].args.bufferMemory = bufferMemory + t;
threads[t].args.devMemUsed = devMemUsed + t;
Expand All @@ -1545,6 +1630,20 @@ testResult_t run() {
errors[0] += errors[t];
bw[0] += bw[t];
bw_count[0] += bw_count[t];
// Merge per-size data from thread t into thread 0
if (threads[t].args.sv_sizes && threads[0].args.sv_sizes) {
for (int s = 0; s < threads[t].args.sv_nSizes; s++) {
// Find matching size in thread 0
int idx = -1;
for (int s0 = 0; s0 < threads[0].args.sv_nSizes; s0++) {
if (threads[0].args.sv_sizes[s0] == threads[t].args.sv_sizes[s]) { idx = s0; break; }
}
if (idx >= 0) {
for (int k = 0; k < 6; k++)
threads[0].args.sv_data[idx * 6 + k] += threads[t].args.sv_data[s * 6 + k];
}
}
}
devMemUsed[0] = std::max(devMemUsed[0], devMemUsed[t]);
initGpuMem[0] = std::max(initGpuMem[0], initGpuMem[t]);
bufferMemory[0] = std::max(bufferMemory[0], bufferMemory[t]);
Expand Down Expand Up @@ -1594,6 +1693,155 @@ testResult_t run() {
const double check_avg_bw = envstr ? atof(envstr) : -1;
bw[0] /= bw_count[0];

#ifdef MPI_SUPPORT
// Per-split bandwidth report: gather each split group's full perf data to proc 0
// and print individual GPU-pair results in the same format as the original nccl-tests output.
// Activated by setting NCCL_TESTS_SPLIT_VERBOSE=1 alongside NCCL_TESTS_SPLIT_MASK.
{
char* verboseEnv = getenv("NCCL_TESTS_SPLIT_VERBOSE");
if (verboseEnv && atoi(verboseEnv) != 0 && splitNumColors > 0) {
int numColors = splitNumColors;
int nSizes = threads[0].args.sv_nSizes;

// Synchronize nSizes across all procs
MPI_Allreduce(MPI_IN_PLACE, &nSizes, 1, MPI_INT, MPI_MAX, MPI_COMM_WORLD);

if (nSizes > 0) {
// Prepare local arrays aligned to proc 0's size order
size_t* localSizes = (size_t*)calloc(nSizes, sizeof(size_t));
size_t* localCounts = (size_t*)calloc(nSizes, sizeof(size_t));
double* localData = (double*)calloc(nSizes * 6, sizeof(double));
if (threads[0].args.sv_sizes) {
int myN = threads[0].args.sv_nSizes;
for (int s = 0; s < myN && s < nSizes; s++) {
localSizes[s] = threads[0].args.sv_sizes[s];
localCounts[s] = threads[0].args.sv_counts[s];
for (int k = 0; k < 6; k++)
localData[s * 6 + k] = threads[0].args.sv_data[s * 6 + k];
}
}

// Broadcast sizes and counts from proc 0 so all procs know the order
MPI_Bcast(localSizes, nSizes * sizeof(size_t), MPI_BYTE, 0, MPI_COMM_WORLD);
MPI_Bcast(localCounts, nSizes * sizeof(size_t), MPI_BYTE, 0, MPI_COMM_WORLD);

// Reorder local data to match proc 0's size order
double* orderedData = (double*)calloc(nSizes * 6, sizeof(double));
if (threads[0].args.sv_sizes) {
for (int s = 0; s < nSizes; s++) {
for (int ls = 0; ls < threads[0].args.sv_nSizes; ls++) {
if (threads[0].args.sv_sizes[ls] == localSizes[s]) {
for (int k = 0; k < 6; k++)
orderedData[s * 6 + k] = threads[0].args.sv_data[ls * 6 + k];
break;
}
}
}
}

// Gather all procs' data to proc 0
double* allData = NULL;
if (proc == 0) {
allData = (double*)calloc(totalProcs * nSizes * 6, sizeof(double));
}
MPI_Gather(orderedData, nSizes * 6, MPI_DOUBLE, allData, nSizes * 6, MPI_DOUBLE, 0, MPI_COMM_WORLD);

if (proc == 0) {
// Get type/op/root names for output (same for all sizes in a run)
const char* typeName = test_typenames[nccltype];
const char* opName = test_opnames[ncclop];

// Compute per-size max busBw across all pairs (for SLOW detection)
double* maxBusBwOop = (double*)calloc(nSizes, sizeof(double));
double* maxBusBwIp = (double*)calloc(nSizes, sizeof(double));
for (int s = 0; s < nSizes; s++) {
for (int c = 0; c < numColors; c++) {
int off = c * nSizes * 6 + s * 6;
if (allData[off + 2] > maxBusBwOop[s]) maxBusBwOop[s] = allData[off + 2];
if (allData[off + 5] > maxBusBwIp[s]) maxBusBwIp[s] = allData[off + 5];
}
}

int largestIdx = nSizes - 1;

printf("#\n");
printf("# ============= Per-GPU-pair Bandwidth Breakdown =============\n");

for (int c = 0; c < numColors; c++) {
// Determine SLOW based on largest size busBw ratio
int off_lg = c * nSizes * 6 + largestIdx * 6;
double rLgOop = (maxBusBwOop[largestIdx] > 0) ? allData[off_lg + 2] / maxBusBwOop[largestIdx] * 100.0 : 0.0;
double rLgIp = (maxBusBwIp[largestIdx] > 0) ? allData[off_lg + 5] / maxBusBwIp[largestIdx] * 100.0 : 0.0;
bool pairSlow = (rLgOop < 90.0 || rLgIp < 90.0);

printf("#\n");
printf("# --- GPU Pair %d ---%s\n", c, pairSlow ? " <<< SLOW" : "");
printf("# out-of-place in-place \n");
printf("# size count type redop root time algbw busbw #wrong time algbw busbw #wrong \n");
printf("# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s) \n");

for (int s = 0; s < nSizes; s++) {
int off = c * nSizes * 6 + s * 6;
double timeOop = allData[off + 0];
double algBwOop = allData[off + 1];
double busBwOop = allData[off + 2];
double timeIp = allData[off + 3];
double algBwIp = allData[off + 4];
double busBwIp = allData[off + 5];

// Format numbers with auto-precision like the original nccl-tests output
// getFloatStr: fits value into fixed width by adjusting decimal places
auto fmtFloat = [](double val, int w, char* buf) {
int power = 0;
for (uint64_t v = 1; val >= v; v *= 10) power++;
if (power < w-2) sprintf(buf, "%*.2f", w, val);
else if (power < w-1) sprintf(buf, "%*.1f", w, val);
else sprintf(buf, "%*.0f", w, val);
};
char tO[8], aO[7], bO[7], tI[8], aI[7], bI[7];
fmtFloat(timeOop, 7, tO); fmtFloat(algBwOop, 6, aO); fmtFloat(busBwOop, 6, bO);
fmtFloat(timeIp, 7, tI); fmtFloat(algBwIp, 6, aI); fmtFloat(busBwIp, 6, bI);

printf("%12zu %12zu %8s %6s %6i %7s %6s %6s N/A %7s %6s %6s N/A\n",
localSizes[s], localCounts[s], typeName, opName, -1,
tO, aO, bO, tI, aI, bI);
}
}

// Summary based on largest message size
printf("#\n# --- Summary (based on largest msg size: %zu bytes) ---\n", localSizes[largestIdx]);
printf("# %8s %11s %7s %11s %7s %s\n",
"GPU_Pair", "OOP(GB/s)", "vs_max", "IP(GB/s)", "vs_max", "Status");
for (int c = 0; c < numColors; c++) {
int off = c * nSizes * 6 + largestIdx * 6;
double busBwOop = allData[off + 2];
double busBwIp = allData[off + 5];
double rOop = (maxBusBwOop[largestIdx] > 0) ? busBwOop / maxBusBwOop[largestIdx] * 100.0 : 0.0;
double rIp = (maxBusBwIp[largestIdx] > 0) ? busBwIp / maxBusBwIp[largestIdx] * 100.0 : 0.0;
bool pairSlow = (rOop < 90.0 || rIp < 90.0);
printf("# %8d %11.2f %6.1f%% %11.2f %6.1f%% %s\n",
c, busBwOop, rOop, busBwIp, rIp,
pairSlow ? "<<< SLOW" : "OK");
}
printf("#\n");
printf("# ==============================================================\n");
printf("#\n");

free(maxBusBwOop); free(maxBusBwIp);
}
free(localSizes); free(localCounts); free(localData);
free(orderedData); free(allData);
}
}
}
// Free per-size tracking arrays
for (int t = 0; t < nThreads; t++) {
free(threads[t].args.sv_sizes);
free(threads[t].args.sv_counts);
free(threads[t].args.sv_data);
}
#endif

writeResultFooter(errors, bw, check_avg_bw, program_invocation_short_name);
if (memory_report) {
memInfo_t memInfos[3];
Expand Down
12 changes: 12 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ struct threadArgs {
int nGpus;
int* gpus;
int localRank;
#ifdef MPI_SUPPORT
MPI_Comm mpi_comm; // split sub-communicator; used by Barrier/Allreduce
#endif
void** sendbuffs;
size_t sendBytes;
size_t sendInplaceOffset;
Expand All @@ -154,6 +157,15 @@ struct threadArgs {
int* errors;
double* bw;
int* bw_count;
// Per-size full performance tracking for split verbose mode
// sv_data layout: [nSizes * 6] — for each size index:
// [0] timeOop [1] algBwOop [2] busBwOop
// [3] timeIp [4] algBwIp [5] busBwIp
size_t* sv_sizes; // message sizes (bytes)
size_t* sv_counts; // element counts
double* sv_data; // packed perf data per size
int sv_nSizes; // number of sizes recorded
int sv_maxSizes; // allocated capacity

int reportErrors;

Expand Down