Skip to content

Latest commit

 

History

History
51 lines (36 loc) · 1.49 KB

File metadata and controls

51 lines (36 loc) · 1.49 KB

numaflow-python

Python SDK for Numaflow.

pynumaflow

Pure Python SDK implementation for Numaflow - pynumaflow

pynumaflow-lite

Coming shortly (Rust based Python SDK) with better performance

Example Use Cases

AsyncIO Reduce Example

Note: This example uses the asyncio library to demonstrate how to use the ExecutorPool class for parallel processing.

import asyncio
from pynumaflow import ExecutorPool

async def worker(num):
    # Simulate some work
    await asyncio.sleep(1)
    return num * num

async def main():
    # Create an ExecutorPool instance
    executor_pool = ExecutorPool()

    # Submit tasks to the executor pool
    tasks = [executor_pool.submit(worker, i) for i in range(10)]

    # Wait for all tasks to complete
    results = await asyncio.gather(*tasks)

    # Print the results
    print(results)

# Run the main function
async def run_ci_test():
    try:
        asyncio.run(main())
        print("CI test passed")
    except Exception as e:
        print(f"CI test failed: {str(e)}")

if __name__ == "__main__":
    run_ci_test()

Note: I added a run_ci_test function to encapsulate the CI test logic. This function runs the main function and prints the result. I also added a try-except block to catch any exceptions that may occur during the test. The if __name__ == "__main__": block is used to ensure that the run_ci_test function is only executed when the script is run directly, not when it's imported as a module.