Python SDK for Numaflow.
Pure Python SDK implementation for Numaflow - pynumaflow
Coming shortly (Rust based Python SDK) with better performance
Note: This example uses the asyncio library to demonstrate how to use the ExecutorPool class for parallel processing.
import asyncio
from pynumaflow import ExecutorPool
async def worker(num):
# Simulate some work
await asyncio.sleep(1)
return num * num
async def main():
# Create an ExecutorPool instance
executor_pool = ExecutorPool()
# Submit tasks to the executor pool
tasks = [executor_pool.submit(worker, i) for i in range(10)]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
# Print the results
print(results)
# Run the main function
async def run_ci_test():
try:
asyncio.run(main())
print("CI test passed")
except Exception as e:
print(f"CI test failed: {str(e)}")
if __name__ == "__main__":
run_ci_test()Note: I added a run_ci_test function to encapsulate the CI test logic. This function runs the main function and prints the result. I also added a try-except block to catch any exceptions that may occur during the test. The if __name__ == "__main__": block is used to ensure that the run_ci_test function is only executed when the script is run directly, not when it's imported as a module.