Skip to content

Commit ce77f5d

Browse files
author
Grok Compression
committed
shared memory: add interface
1 parent e450a90 commit ce77f5d

14 files changed

Lines changed: 3248 additions & 7 deletions

INSTALL.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,33 @@ The `CUDA_MODULE_LOADING=EAGER` environment variable is set automatically by the
221221
Tests cover: single/batch compress and decompress, lossy/lossless, bit depths 8/10/12/16,
222222
mono and RGB, cinema 2K profile, and CPU-only fallback (`-G -2`).
223223

224+
### Shared Memory Batch Tests
225+
226+
The SHM batch tests verify the shared-memory inter-process protocol used for batch
227+
compression and decompression. Each test spawns the codec binary as a child process and
228+
communicates via POSIX shared memory and semaphores.
229+
230+
To run SHM batch tests via `ctest`:
231+
232+
$ cd /PATH/TO/BUILD
233+
$ ctest -R grk_shm_batch -V
234+
235+
Individual tests:
236+
237+
$ ctest -R grk_shm_batch_compress -V
238+
$ ctest -R grk_shm_batch_decompress -V
239+
240+
Or run the binaries directly:
241+
242+
$ ./bin/grk_shm_batch_compress ./bin/grk_compress
243+
$ ./bin/grk_shm_batch_decompress ./bin/grk_decompress
244+
245+
Each test compresses/decompresses 4 frames (64x64, 3-channel, 12-bit) through the SHM
246+
protocol and validates pixel-perfect round-trip fidelity.
247+
248+
Note: these two tests share POSIX named semaphores and SHM segments, so they must not
249+
run in parallel. The CTest configuration uses `RESOURCE_LOCK` to enforce this automatically.
250+
224251
## macOS
225252

226253
macOS builds are configured similar to *NIX builds.

src/lib/codec/CMakeLists.txt

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ include_directories(
2020
${CMAKE_CURRENT_SOURCE_DIR}/formats/fileio
2121
${CMAKE_CURRENT_SOURCE_DIR}/common
2222
${CMAKE_CURRENT_SOURCE_DIR}/apps
23+
${CMAKE_CURRENT_SOURCE_DIR}/shared
2324
)
2425

2526
set(GROK_CODEC_SRCS
@@ -68,8 +69,11 @@ if(GROK_HAVE_LIBJPEG)
6869
target_link_libraries(${GROK_CODEC_NAME} PRIVATE ${JPEG_LIBNAME})
6970
endif()
7071

71-
if(UNIX)
72-
target_link_libraries(${GROK_CODEC_NAME} PUBLIC ${CMAKE_THREAD_LIBS_INIT})
72+
find_package(Threads REQUIRED)
73+
target_link_libraries(${GROK_CODEC_NAME} PUBLIC Threads::Threads)
74+
# shared memory (shm_open, sem_open) used by Messenger.h
75+
if(CMAKE_SYSTEM_NAME STREQUAL "Linux")
76+
target_link_libraries(${GROK_CODEC_NAME} PRIVATE rt)
7377
endif()
7478
set_target_properties(${GROK_CODEC_NAME} PROPERTIES ${GROK_LIBRARY_PROPERTIES})
7579
target_compile_options(${GROK_CODEC_NAME} PRIVATE ${GROK_COMPILE_OPTIONS})
@@ -96,3 +100,6 @@ install(TARGETS ${INSTALL_LIBS}
96100
install(FILES grok_codec.h
97101
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${GROK_INSTALL_SUBDIR} COMPONENT Headers
98102
)
103+
install(FILES shared/Messenger.h
104+
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${GROK_INSTALL_SUBDIR} COMPONENT Headers
105+
)

src/lib/codec/apps/GrkCompress.cpp

Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ using namespace grk;
6262
#include "grk_string.h"
6363
#include "spdlog/sinks/basic_file_sink.h"
6464
#include "GrkCompress.h"
65+
#include "Messenger.h"
6566

6667
void exit_func()
6768
{
@@ -361,6 +362,13 @@ int GrkCompress::main(int argc, const char** argv, grk_image* in_image, grk_stre
361362
goto cleanup;
362363
}
363364

365+
// CPU shared memory batch compress
366+
if(initParams.parameters.shared_memory_interface)
367+
{
368+
success = shmBatchCompress(&initParams);
369+
goto cleanup;
370+
}
371+
364372
// MJ2 directory encode: all input images → single .mj2 file
365373
if(initParams.parameters.cod_format == GRK_FMT_MJ2 && initParams.inputFolder.set_imgdir)
366374
{
@@ -506,6 +514,195 @@ int GrkCompress::main(int argc, const char** argv, grk_image* in_image, grk_stre
506514
return success;
507515
}
508516

517+
int GrkCompress::shmBatchCompress(CompressInitParams* initParams)
518+
{
519+
using namespace grk_plugin;
520+
521+
setUpSignalHandler();
522+
523+
// parse "GRK_MSGR_BATCH_IMAGE,width,stride,height,spp,depth" from batch_src
524+
std::string batchSrc(initParams->inputFolder.imgdirpath);
525+
Msg batchMsg(batchSrc);
526+
auto tag = batchMsg.next();
527+
if(tag != GRK_MSGR_BATCH_IMAGE)
528+
{
529+
spdlog::error("shmBatchCompress: expected {} tag, got '{}'", GRK_MSGR_BATCH_IMAGE, tag);
530+
return 1;
531+
}
532+
uint32_t imgWidth = batchMsg.nextUint();
533+
uint32_t imgStride = batchMsg.nextUint();
534+
(void)imgStride;
535+
uint32_t imgHeight = batchMsg.nextUint();
536+
uint32_t imgSpp = batchMsg.nextUint();
537+
uint32_t imgDepth = batchMsg.nextUint();
538+
539+
size_t uncompressedFrameSize = Messenger::uncompressedFrameSize(imgWidth, imgHeight, imgSpp);
540+
// conservative upper bound for compressed frame size
541+
size_t compressedFrameSize = (size_t)imgWidth * imgHeight * imgSpp * ((imgDepth + 7) / 8) * 3 / 2;
542+
if(compressedFrameSize < 4096)
543+
compressedFrameSize = 4096;
544+
const size_t numFrames = 8;
545+
546+
setMessengerLogger(new MessengerLogger("[SHM-CPU] "));
547+
548+
auto* parameters = &initParams->parameters;
549+
550+
// shared state for flush synchronization
551+
std::atomic<uint32_t> framesProcessed{0};
552+
std::atomic<uint32_t> flushTarget{0};
553+
std::atomic<bool> flushRequested{false};
554+
std::mutex flushMutex;
555+
std::condition_variable flushCv;
556+
557+
auto processor = [&](std::string message) {
558+
Msg msg(message);
559+
auto msgTag = msg.next();
560+
561+
if(msgTag == GRK_MSGR_BATCH_SUBMIT_UNCOMPRESSED)
562+
{
563+
auto clientFrameId = msg.nextUint();
564+
auto uncompressedFrameId = msg.nextUint();
565+
uint8_t* srcPixels = messenger_->getUncompressedFrame(uncompressedFrameId);
566+
if(!srcPixels)
567+
{
568+
spdlog::error("shmBatchCompress: null uncompressed frame {}", uncompressedFrameId);
569+
return;
570+
}
571+
572+
// create grk_image from raw pixel data
573+
auto cmptparms = std::make_unique<grk_image_comp[]>(imgSpp);
574+
for(uint32_t c = 0; c < imgSpp; ++c)
575+
{
576+
memset(&cmptparms[c], 0, sizeof(grk_image_comp));
577+
cmptparms[c].dx = 1;
578+
cmptparms[c].dy = 1;
579+
cmptparms[c].w = imgWidth;
580+
cmptparms[c].h = imgHeight;
581+
cmptparms[c].prec = (uint8_t)imgDepth;
582+
cmptparms[c].sgnd = false;
583+
}
584+
GRK_COLOR_SPACE clrspc = (imgSpp >= 3) ? GRK_CLRSPC_SRGB : GRK_CLRSPC_GRAY;
585+
grk_image* image = grk_image_new((uint16_t)imgSpp, cmptparms.get(), clrspc, true);
586+
if(!image)
587+
{
588+
spdlog::error("shmBatchCompress: failed to create image for frame {}", clientFrameId);
589+
messenger_->reclaimUncompressed(uncompressedFrameId);
590+
messenger_->send(GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED, uncompressedFrameId);
591+
return;
592+
}
593+
594+
image->x0 = 0;
595+
image->y0 = 0;
596+
image->x1 = imgWidth;
597+
image->y1 = imgHeight;
598+
599+
// copy pixel data from SHM (interleaved 16-bit) into planar int32_t components
600+
auto* src16 = reinterpret_cast<const uint16_t*>(srcPixels);
601+
for(uint32_t y = 0; y < imgHeight; ++y)
602+
{
603+
for(uint32_t x = 0; x < imgWidth; ++x)
604+
{
605+
for(uint32_t c = 0; c < imgSpp; ++c)
606+
{
607+
auto* data = static_cast<int32_t*>(image->comps[c].data);
608+
data[y * imgWidth + x] = (int32_t)src16[(y * imgWidth + x) * imgSpp + c];
609+
}
610+
}
611+
}
612+
613+
// done reading uncompressed buffer — release it back to client
614+
messenger_->send(GRK_MSGR_BATCH_PROCESSED_UNCOMPRESSED, uncompressedFrameId);
615+
616+
// compress to memory buffer
617+
grk_cparameters localParams = *parameters;
618+
if(localParams.mct == 255)
619+
localParams.mct = (imgSpp >= 3) ? 1 : 0;
620+
621+
// get a compressed buffer slot from the server queue
622+
BufferSrc compressedSlot;
623+
if(!messenger_->availableBuffers_.waitAndPop(compressedSlot))
624+
{
625+
spdlog::error("shmBatchCompress: no compressed buffer available");
626+
grk_object_unref(&image->obj);
627+
return;
628+
}
629+
630+
grk_stream_params streamParams{};
631+
streamParams.buf = compressedSlot.framePtr_;
632+
streamParams.buf_len = compressedFrameSize;
633+
634+
grk_object* codec = grk_compress_init(&streamParams, &localParams, image);
635+
uint64_t compressedLen = 0;
636+
if(codec)
637+
{
638+
compressedLen = grk_compress(codec, nullptr);
639+
grk_object_unref(codec);
640+
}
641+
grk_object_unref(&image->obj);
642+
643+
if(!compressedLen)
644+
{
645+
spdlog::error("shmBatchCompress: compression failed for frame {}", clientFrameId);
646+
messenger_->availableBuffers_.push(compressedSlot);
647+
return;
648+
}
649+
650+
messenger_->send(GRK_MSGR_BATCH_SUBMIT_COMPRESSED, clientFrameId, compressedSlot.frameId_,
651+
compressedLen);
652+
653+
uint32_t count = ++framesProcessed;
654+
if(flushRequested.load() && count >= flushTarget.load())
655+
{
656+
std::lock_guard<std::mutex> lk(flushMutex);
657+
flushCv.notify_all();
658+
}
659+
}
660+
else if(msgTag == GRK_MSGR_BATCH_PROCESSSED_COMPRESSED)
661+
{
662+
auto compressedFrameId = msg.nextUint();
663+
messenger_->reclaimCompressed(compressedFrameId);
664+
}
665+
else if(msgTag == GRK_MSGR_BATCH_FLUSH)
666+
{
667+
auto enqueuedCount = msg.nextUint();
668+
flushTarget.store(enqueuedCount);
669+
flushRequested.store(true);
670+
{
671+
std::unique_lock<std::mutex> lk(flushMutex);
672+
flushCv.wait(lk, [&] { return framesProcessed.load() >= enqueuedCount; });
673+
}
674+
}
675+
else if(msgTag == GRK_MSGR_BATCH_SHUTDOWN)
676+
{
677+
messenger_->running = false;
678+
}
679+
};
680+
681+
uint32_t num_threads = std::thread::hardware_concurrency();
682+
if(num_threads == 0)
683+
num_threads = 4;
684+
685+
MessengerInit init(false, // server
686+
grokToClientMessageBuf, grokSentSynch, clientReceiveReadySynch,
687+
clientToGrokMessageBuf, clientSentSynch, grokReceiveReadySynch, processor,
688+
num_threads, uncompressedFrameSize, compressedFrameSize, numFrames);
689+
690+
messenger_ = new Messenger(init);
691+
692+
// tell client the buffer layout
693+
messenger_->send(GRK_MSGR_BATCH_COMPRESS_INIT, imgWidth, imgStride, imgHeight, imgSpp, imgDepth,
694+
compressedFrameSize, numFrames);
695+
696+
// wait until shutdown
697+
while(messenger_->running)
698+
std::this_thread::sleep_for(std::chrono::milliseconds(50));
699+
700+
delete messenger_;
701+
messenger_ = nullptr;
702+
703+
return 0;
704+
}
705+
509706
int GrkCompress::pluginBatchCompress(CompressInitParams* initParams)
510707
{
511708
setUpSignalHandler();

src/lib/codec/apps/GrkCompress.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@
2020
#include "common.h"
2121
#include "IImageFormat.h"
2222

23+
namespace grk_plugin
24+
{
25+
struct Messenger;
26+
}
27+
2328
namespace grk
2429
{
2530
struct CompressInitParams
@@ -41,15 +46,18 @@ struct CompressInitParams
4146
class GrkCompress
4247
{
4348
public:
44-
GrkCompress(void) = default;
49+
GrkCompress(void) : messenger_(nullptr) {}
4550
~GrkCompress(void) = default;
4651
int main(int argc, const char* argv[], grk_image* in_image, grk_stream_params* out_buffer);
4752

4853
private:
54+
int shmBatchCompress(CompressInitParams* initParams);
4955
int pluginBatchCompress(CompressInitParams* initParams);
5056
GrkRC pluginMain(int argc, const char* argv[], CompressInitParams* initParams);
5157
GrkRC parseCommandLine(int argc, const char* argv[], CompressInitParams* initParams);
5258
int compress(const std::string& inputFile, CompressInitParams* initParams);
59+
60+
grk_plugin::Messenger* messenger_;
5361
};
5462

5563
} // namespace grk

0 commit comments

Comments
 (0)