diff --git a/docs/source/llm_examples/__init__.py b/docs/source/llm_examples/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/docs/source/llm_examples/async_concurrency.py b/docs/source/llm_examples/async_concurrency.py new file mode 100644 index 00000000..b4717061 --- /dev/null +++ b/docs/source/llm_examples/async_concurrency.py @@ -0,0 +1,61 @@ +"""Fork/join async concurrency with templates. + +Demonstrates: +- Running multiple LLM template calls concurrently with ``asyncio.gather`` +- Using ``asyncio.to_thread`` to run synchronous template calls in parallel +""" + +import argparse +import asyncio +import functools +import os + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Async template +# --------------------------------------------------------------------------- + + +@Template.define +def analyze_average_age(ages: list[int]) -> int: + """Analyze the dataset of ages {ages} and return the average age of + participants. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main(provider: LiteLLMProvider): + analysis = functools.partial( + asyncio.to_thread, handler(provider)(analyze_average_age) + ) + results = await asyncio.gather( + analysis([25, 30, 35, 40]), + analysis([20, 28, 17, 30]), + analysis([22, 27, 31, 29]), + analysis([24, 26, 32, 38]), + analysis([21, 29, 33, 37]), + ) + for i, result in enumerate(results): + print(f"Group {i}: average age = {result}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Analyze average ages concurrently") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + asyncio.run(main(provider)) diff --git a/docs/source/llm_examples/batch_translate.py b/docs/source/llm_examples/batch_translate.py new file mode 100644 index 00000000..66b4999f --- /dev/null +++ b/docs/source/llm_examples/batch_translate.py @@ -0,0 +1,70 @@ +"""Batch translation with instruction injection. + +Demonstrates: +- ``@Template.define`` for a translation template with injected instructions +""" + +import argparse +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.handlers.llm.evaluation import RestrictedEvalProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Translation template +# --------------------------------------------------------------------------- + + +@Template.define +def translate(target_language: str, instructions: str = "") -> Template[[str], str]: + """ + Write a `Template` that translates a string of English text into {target_language} + If any instructions are provided, include them in the prompt: {instructions} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Batch translation with instruction injection" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--max-steps", + type=int, + default=5, + help="Maximum number of steps before giving up", + ) + parser.add_argument( + "--num-retries", + type=int, + default=5, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + handler(RestrictedEvalProvider()), + ): + translator = translate( + target_language="french", instructions="Use formal language." + ) + print(translator("hello, how are you? how is your day going?")) diff --git a/docs/source/llm_examples/chat_memory.py b/docs/source/llm_examples/chat_memory.py new file mode 100644 index 00000000..b926cdff --- /dev/null +++ b/docs/source/llm_examples/chat_memory.py @@ -0,0 +1,124 @@ +"""Chat agent with embedding-based memory. + +Demonstrates: +- A stateful chat agent that maintains conversation history +- Embedding-based retrieval of relevant past context +- Simple in-memory vector store with L2 distance +""" + +import argparse +import dataclasses +import os + +import litellm +import numpy as np + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Embedding helpers +# --------------------------------------------------------------------------- + + +def get_embedding(text: str) -> np.ndarray: + """Get an embedding vector for the given text using litellm.""" + response = litellm.embedding(model="text-embedding-ada-002", input=text) + return np.array(response.data[0]["embedding"], dtype=np.float32) + + +def find_closest( + index: list[tuple[str, np.ndarray]], phrase: str +) -> tuple[str, float] | None: + """Find the closest entry in the index to the given phrase.""" + if not index: + return None + phrase_embedding = get_embedding(phrase) + + def dist(a: np.ndarray, b: np.ndarray) -> float: + return float(((a - b) ** 2).sum()) + + return min( + ((msg, dist(embedding, phrase_embedding)) for msg, embedding in index), + key=lambda elt: elt[1], + ) + + +# --------------------------------------------------------------------------- +# Chat template +# --------------------------------------------------------------------------- + + +@Template.define +def respond_to_user( + user_message: str, relevant_context: str, prev_messages: str +) -> str: + """Given the user wrote: {user_message} + Continue the conversation. + The last few messages were: {prev_messages} + Older relevant context: {relevant_context}""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Chat agent +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class ChatAgent: + """A chat agent that compresses old messages into an embedding index.""" + + history: list[dict[str, str]] = dataclasses.field(default_factory=list) + index: list[tuple[str, np.ndarray]] = dataclasses.field(default_factory=list) + + def _compress(self): + """Move the oldest pair of messages into the embedding index.""" + oldest_pair, self.history = self.history[:2], self.history[2:] + text = "\n".join(m["content"] for m in oldest_pair) + self.index.append((text, get_embedding(text))) + + def _find_relevant(self, query: str) -> str: + result = find_closest(self.index, query) + return result[0] if result else "No relevant context." + + def chat(self, user_input: str): + relevant = self._find_relevant(user_input) + prev_messages = "\n".join( + f"{m['author']}: {m['content']}" for m in self.history + ) + response = respond_to_user(user_input, relevant, prev_messages) + self.history.append({"author": "user", "content": user_input}) + self.history.append({"author": "agent", "content": response}) + if len(self.history) > 6: + self._compress() + print(f"user: {user_input}") + print(f"agent: {response}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Chat agent with embedding-based memory" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + args = parser.parse_args() + + agent = ChatAgent() + + provider = LiteLLMProvider(model=args.model) + with handler(provider): + agent.chat("Hello! How are you doing?") + agent.chat("Lovely! I'm having a great day.") + agent.chat("What is the capital of France?") + agent.chat("I didn't know that! That's amazing!") diff --git a/docs/source/llm_examples/chat_search.py b/docs/source/llm_examples/chat_search.py new file mode 100644 index 00000000..a2d7cecd --- /dev/null +++ b/docs/source/llm_examples/chat_search.py @@ -0,0 +1,116 @@ +import argparse +import dataclasses +import os +import urllib.parse + +import requests +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + + +@Tool.define +def search_web(query: str) -> str: + """Search Wikipedia for a topic and return a summary. The query can be a topic name or a natural language question.""" + search_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "list": "search", + "srsearch": query, + "srlimit": 1, + "format": "json", + } + ) + search_data = requests.get( + search_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + results = search_data.get("query", {}).get("search", []) + if not results: + raise ValueError(f"No results found for: {query}") + title = results[0]["title"] + + summary_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "titles": title, + "prop": "extracts", + "exintro": True, + "explaintext": True, + "format": "json", + } + ) + summary_data = requests.get( + summary_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + page = next(iter(summary_data["query"]["pages"].values())) + extract = page.get("extract", "No summary available.") + url = f"https://en.wikipedia.org/wiki/{urllib.parse.quote(title.replace(' ', '_'))}" + + return f"# {title}\n\n{extract}\n\nSource: {url}" + + +@dataclasses.dataclass +class ChatBot(Agent): + """Simple chat agent for testing history accumulation.""" + + bot_name: str = dataclasses.field(default="ChatBot") + + @Template.define + def send(self, user_input: str) -> str: + """ + You are a friendly and helpful AI assistant named {self.bot_name}. + If user input contains a question that you're not sure how to answer, + consider using the web search tool to find the answer and include it in your response. + + The user writes: + {user_input} + """ + raise NotHandled + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="LLM-guided research agent with web search" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--name", + type=str, + default="Chatty McChatface", + help="The name of the chatbot", + ) + parser.add_argument( + "--interactive", + action="store_true", + help="Run in interactive mode, allowing multiple back-and-forth messages", + ) + parser.add_argument( + "--num-retries", + type=int, + default=4, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + chatbot = ChatBot(bot_name=args.name) + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + if args.interactive: + while True: + print(chatbot.send(input("You: "))) + else: + print(chatbot.send("Hi! Can you tell me about the Statue of Liberty?")) + print(chatbot.send("Who designed it?")) + print(chatbot.send("What about the speed of light? How fast is it?")) diff --git a/docs/source/llm_examples/decode_callable.py b/docs/source/llm_examples/decode_callable.py new file mode 100644 index 00000000..195167bc --- /dev/null +++ b/docs/source/llm_examples/decode_callable.py @@ -0,0 +1,90 @@ +"""Decoding LLM responses into Python objects, including callables. + +Demonstrates: +- Primitive type decoding (``int``) from a template that returns a number +- Synthesizing a Python ``Callable`` from a template, executed via + ``UnsafeEvalProvider`` from ``effectful.handlers.llm.evaluation`` +- ``inspect.getsource`` on the synthesized function +""" + +import argparse +import inspect +import os +from collections.abc import Callable + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.handlers.llm.evaluation import UnsafeEvalProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Templates +# --------------------------------------------------------------------------- + + +@Template.define +def primes(first_digit: int) -> int: + """Give a prime number with {first_digit} as the first digit. Do not use any tools.""" + raise NotHandled + + +@Template.define +def count_char(char: str) -> Callable[[str], int]: + """Write a function which takes a string and counts the occurrances of '{char}'. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Decode LLM responses to Python objects (incl. callables)" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--num-retries", + type=int, + default=5, + help="Number of retries for malformed LLM output", + ) + parser.add_argument( + "--first-digit", + type=int, + default=6, + help="First digit of the prime to request", + ) + parser.add_argument( + "--char", + type=str, + default="a", + help="Character whose occurrences the synthesized function will count", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + handler(UnsafeEvalProvider()), + ): + prime = primes(args.first_digit) + assert type(prime) is int + print(f"Prime starting with {args.first_digit}: {prime}") + + counter = count_char(args.char) + assert callable(counter) + print("\nGenerated function:") + print(inspect.getsource(counter)) + print(f'counter("banana") == {counter("banana")}') + print(f'counter("cherry") == {counter("cherry")}') diff --git a/docs/source/llm_examples/flight_booking.py b/docs/source/llm_examples/flight_booking.py new file mode 100644 index 00000000..d4de2e97 --- /dev/null +++ b/docs/source/llm_examples/flight_booking.py @@ -0,0 +1,259 @@ +"""Flight booking with multi-agent delegation. + +Demonstrates: +- Multi-agent delegation: a tool that internally calls a separate + ``@Template.define`` (agent-to-agent delegation) +- Programmatic validation of LLM output with retry +- Interactive human-in-the-loop flow +- ``Agent`` history for conversational seat selection +""" + +import argparse +import dataclasses +import datetime +import enum +import os +from typing import Literal + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Structured output types +# --------------------------------------------------------------------------- + + +class Airport(enum.StrEnum): + SFO = "SFO" + ANC = "ANC" + FAI = "FAI" + JNU = "JNU" + NYC = "NYC" + LAX = "LAX" + CHI = "CHI" + MIA = "MIA" + BOS = "BOS" + SEA = "SEA" + DFW = "DFW" + DEN = "DEN" + ATL = "ATL" + HOU = "HOU" + + +@dataclasses.dataclass(frozen=True) +class FlightDetails: + flight_number: str + price: int + origin: Airport # three-letter airport code + destination: Airport # three-letter airport code + date: datetime.date # YYYY-MM-DD + + +@dataclasses.dataclass(frozen=True) +class SeatPreference: + row: int # 1-30 + seat: Literal["A", "B", "C", "D", "E", "F"] + + +# --------------------------------------------------------------------------- +# Sample data (in reality, downloaded from a booking site) +# --------------------------------------------------------------------------- + +FLIGHTS_PAGE = """\ +1. Flight SFO-AK123 - $350 - San Francisco (SFO) to Anchorage (ANC) - 2025-01-10 +2. Flight SFO-AK456 - $370 - San Francisco (SFO) to Fairbanks (FAI) - 2025-01-10 +3. Flight SFO-AK789 - $400 - San Francisco (SFO) to Juneau (JNU) - 2025-01-20 +4. Flight NYC-LA101 - $250 - San Francisco (SFO) to Anchorage (ANC) - 2025-01-10 +5. Flight CHI-MIA202 - $200 - Chicago (ORD) to Miami (MIA) - 2025-01-12 +6. Flight BOS-SEA303 - $120 - Boston (BOS) to Anchorage (ANC) - 2025-01-12 +7. Flight DFW-DEN404 - $150 - Dallas (DFW) to Denver (DEN) - 2025-01-10 +8. Flight ATL-HOU505 - $180 - Atlanta (ATL) to Houston (IAH) - 2025-01-10 +""" + +# --------------------------------------------------------------------------- +# Extraction template (inner "agent") +# --------------------------------------------------------------------------- + + +@Template.define +def extract_flights(web_page_text: str) -> list[FlightDetails]: + """Extract all flight details from the following text. + + {web_page_text} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Tool that delegates to the extraction template +# --------------------------------------------------------------------------- + +# The tool is defined at module scope so that FlightFinder's template +# captures it via lexical scope (same pattern as search_web in other examples). + + +@Tool.define +def get_available_flights() -> list[FlightDetails]: + """Retrieve all available flights from the booking page.""" + return extract_flights(FLIGHTS_PAGE) + + +# --------------------------------------------------------------------------- +# Flight search agent +# --------------------------------------------------------------------------- + + +class FlightFinder(Agent): + """Agent that finds flights matching user criteria.""" + + @Template.define + def find_flight( + self, origin: Airport, destination: Airport, date: datetime.date + ) -> FlightDetails: + """Find the cheapest flight from {origin} to {destination} on {date}. + + Use the get_available_flights tool to retrieve all flights, then + select the cheapest one that matches the origin, destination, + and date exactly. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Seat selection agent +# --------------------------------------------------------------------------- + + +class SeatSelector(Agent): + """Agent that extracts seat preferences from natural language.""" + + @Template.define + def select_seat(self, user_input: str) -> SeatPreference: + """Extract the user's seat preference from their message. + + {user_input} + + Seats A and F are window seats. Seats C and D are aisle seats. + Row 1 is the front row with extra legroom. + Rows 14 and 20 also have extra legroom. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Validation (plain Python, no LLM needed) +# --------------------------------------------------------------------------- + + +def validate_flight( + flight: FlightDetails, origin: Airport, destination: Airport, date: datetime.date +) -> list[str]: + """Check that the selected flight matches the requested criteria.""" + errors = [] + if flight.origin != origin: + errors.append(f"origin should be {origin}, got {flight.origin}") + if flight.destination != destination: + errors.append(f"destination should be {destination}, got {flight.destination}") + if flight.date != date: + errors.append(f"date should be {date}, got {flight.date}") + return errors + + +# --------------------------------------------------------------------------- +# Booking flow +# --------------------------------------------------------------------------- + + +def book_flight( + origin: Airport, + destination: Airport, + date: datetime.date, + interactive: bool = False, + max_retries: int = 3, +) -> None: + """End-to-end flight booking with search, validation, and seat selection.""" + searcher = FlightFinder() + + # --- Search with validation retry --- + flight = None + for attempt in range(max_retries): + candidate = searcher.find_flight(origin, destination, date) + errors = validate_flight(candidate, origin, destination, date) + if errors: + print(f" [attempt {attempt}] Rejected: {'; '.join(errors)}") + continue + flight = candidate + break + + if flight is None: + print("Could not find a valid flight.") + return + + print( + f" Found: {flight.flight_number} ${flight.price} " + f"({flight.origin}->{flight.destination} on {flight.date})" + ) + + # --- User approval (interactive only) --- + if interactive: + if input(" Book this flight? (yes/no): ").strip().lower() != "yes": + print(" Cancelled.") + return + + # --- Seat selection --- + selector = SeatSelector() + seat_requests = ( + [input(" Seat preference: ")] + if interactive + else ["I'd like a window seat with extra legroom please"] + ) + for request in seat_requests: + seat = selector.select_seat(request) + print(f" Seat: row {seat.row}, seat {seat.seat}") + + print(f" Booked {flight.flight_number}, seat {seat.row}{seat.seat}!") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Flight booking with multi-agent delegation" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--interactive", + action="store_true", + help="Run in interactive mode with user prompts", + ) + parser.add_argument( + "--num-retries", + type=int, + default=4, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + book_flight( + origin=Airport.SFO, + destination=Airport.ANC, + date=datetime.date(2025, 1, 10), + interactive=args.interactive, + ) diff --git a/docs/source/llm_examples/guardrails.py b/docs/source/llm_examples/guardrails.py new file mode 100644 index 00000000..6ac80057 --- /dev/null +++ b/docs/source/llm_examples/guardrails.py @@ -0,0 +1,78 @@ +"""Travel advisor with input guardrails. + +Demonstrates: +- Using one template to validate/guard input before passing it to another +- Simple control-flow gating based on LLM classification +""" + +import argparse +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Templates +# --------------------------------------------------------------------------- + + +@Template.define +def travel_query(user_query: str) -> str: + """ + Produce a concise (<100 word) answer to: {user_query} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Guarded agent +# --------------------------------------------------------------------------- + + +def answer_travel_query(user_query: str) -> str: + """Only answer travel-related queries; reject everything else.""" + + @Template.define + def is_safe_query(user_query: str) -> bool: + """ + Determine whether the user's query is purely related to travel advice: {user_query} + """ + raise NotHandled + + if is_safe_query(user_query): + return travel_query(user_query) + else: + return f"Rejected: '{user_query}' is not related to travel advice." + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Analyze average ages concurrently") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--num-retries", + type=int, + default=4, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + print(answer_travel_query("What are great places to check out in NYC?")) + print(answer_travel_query("Should I buy apple stocks?")) diff --git a/docs/source/llm_examples/hanoi_solver_iterative.py b/docs/source/llm_examples/hanoi_solver_iterative.py new file mode 100644 index 00000000..0a5ecdba --- /dev/null +++ b/docs/source/llm_examples/hanoi_solver_iterative.py @@ -0,0 +1,207 @@ +"""LLM-guided Towers of Hanoi solver with tool-based validation. + +Adapted from https://github.com/BasisResearch/effectful/pull/404 + +Demonstrates: +- A static Pydantic ``Step`` model for structured output +- ``@Tool.define`` inside a closure to expose game-state validation as a tool +- ``RetryLLMHandler`` to retry on malformed LLM output +- Templates defined inside a function that auto-capture closure-scoped tools +""" + +import argparse +import itertools +import os +from dataclasses import dataclass, field + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Step model +# --------------------------------------------------------------------------- + + +@dataclass +class Step: + """A single move: take the top disk from tower ``start`` and place it on + tower ``end``. Tower indices are zero-based.""" + + start: int + end: int + explanation: str = field(default="") # optional reasoning from the LLM + + +# --------------------------------------------------------------------------- +# Game state +# --------------------------------------------------------------------------- + + +@dataclass +class GameState: + """State of a Towers of Hanoi game. + + Higher numbers represent larger disks, so ``(2, 1, 0)`` is a valid + tower (largest on bottom). The goal is to move all disks from the + leftmost tower (index 0) to the rightmost tower (index -1). + + This is a plain ``dataclass`` (not a Pydantic model) so the type checker + can see its methods. + """ + + size: int + towers: tuple[tuple[int, ...], ...] = field(default=()) + + def __post_init__(self): + if self.size > 0 and not self.towers: + self.towers = tuple( + tuple(reversed(range(self.size))) if i == 0 else () + for i in range(self.size) + ) + + def apply(self, step: Step) -> "GameState": + """Apply a move, returning the new state. Raises ``ValueError`` if + the move is invalid.""" + start, end = step.start, step.end + if not (0 <= start < len(self.towers) and 0 <= end < len(self.towers)): + raise ValueError(f"tower index out of range: ({start}, {end})") + if len(self.towers[start]) == 0: + raise ValueError(f"tower {start} is empty") + if len(self.towers[end]) > 0 and self.towers[start][-1] > self.towers[end][-1]: + raise ValueError( + f"cannot place disk {self.towers[start][-1]} on top of " + f"disk {self.towers[end][-1]}" + ) + new_towers = [list(t) for t in self.towers] + disk = new_towers[start].pop() + new_towers[end].append(disk) + return GameState(self.size, tuple(tuple(t) for t in new_towers)) + + def is_done(self) -> bool: + return all(len(t) == 0 for t in self.towers[:-1]) and all( + self.towers[-1][i] > self.towers[-1][i + 1] + for i in range(len(self.towers[-1]) - 1) + ) + + def valid_steps(self) -> list[Step]: + steps = [] + for i, ti in enumerate(self.towers): + for j, tj in enumerate(self.towers): + if i == j or len(ti) == 0: + continue + if len(tj) == 0 or ti[-1] < tj[-1]: + steps.append(Step(i, j)) + return steps + + def __str__(self) -> str: + return " | ".join(str(list(t)) for t in self.towers) + + +# --------------------------------------------------------------------------- +# LLM move predictor +# --------------------------------------------------------------------------- + + +def predict_next_step(state: GameState) -> Step: + """Ask the LLM to predict the next move. + + A ``get_valid_moves`` tool is defined in the closure so the template + can query which moves are legal for the current game state. A + ``validate_move`` tool checks whether a proposed move is legal and + raises ``ValueError`` if not — when wrapped by ``RetryLLMHandler``, + this error is fed back to the LLM so it can correct itself. + """ + valid = state.valid_steps() + + @Tool.define + def get_valid_moves() -> list[Step]: + """Return the list of valid moves for the current game state.""" + return valid + + @Tool.define + def validate_move(proposed: Step) -> bool: + """Check whether moving from tower ``start`` to tower ``end`` is legal.""" + return proposed in state.valid_steps() + + @Template.define + def predict(game_state: GameState) -> Step: + """Given the state of the game of Towers of Hanoi: + + {game_state} + + Predict the next step to complete the game (move all disks to the + rightmost tower). You MUST call get_valid_moves first to see which + moves are legal, then pick the best one. Give a brief reasoning. + """ + raise NotHandled + + return predict(state) + + +# --------------------------------------------------------------------------- +# Solver loop +# --------------------------------------------------------------------------- + + +def solve_hanoi(state: GameState, max_steps: int = 30): + """Solve Towers of Hanoi by repeatedly asking the LLM for the next move.""" + for i in itertools.count(): + print(f"step {i}: {state}") + if state.is_done(): + print("Solved!") + return + if i >= max_steps: + print("Gave up after max steps.") + return + + step: Step = predict_next_step(state) + try: + state = state.apply(step) + print(f" move: {step.start} -> {step.end}") + except ValueError as e: + print(f" attempt {i}: invalid move {step}: {e}") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="LLM-guided Towers of Hanoi solver") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--game-size", + type=int, + default=3, + help="Number of disks in the Towers of Hanoi game", + ) + parser.add_argument( + "--max-steps", + type=int, + default=30, + help="Maximum number of steps before giving up", + ) + parser.add_argument( + "--num-retries", + type=int, + default=5, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + solve_hanoi(GameState(size=args.game_size), max_steps=args.max_steps) diff --git a/docs/source/llm_examples/hanoi_solver_recursive.py b/docs/source/llm_examples/hanoi_solver_recursive.py new file mode 100644 index 00000000..b8438772 --- /dev/null +++ b/docs/source/llm_examples/hanoi_solver_recursive.py @@ -0,0 +1,193 @@ +"""Recursive LLM-based Towers of Hanoi solver. + +Adapted from https://github.com/BasisResearch/effectful/pull/404 + +Demonstrates: +- ``IsRecursive`` annotation to let a template call itself as a tool +- Recursive problem decomposition via LLM tool calls +- Post-hoc validation of the LLM-generated move sequence + +The classic recursive algorithm for Tower of Hanoi is: + + hanoi(n, source, target, auxiliary): + if n == 1: move disk from source to target + else: + hanoi(n-1, source, auxiliary, target) # move n-1 disks out of the way + move largest disk from source to target # move the bottom disk + hanoi(n-1, auxiliary, target, source) # move n-1 disks to target + +This solver defines a recursive ``Template`` that can call itself as a tool. +The LLM decomposes the n-disk problem into three sub-steps, making recursive +tool calls for the (n-1)-disk sub-problems, and returns the concatenated +list of moves. + +See: https://en.wikipedia.org/wiki/Tower_of_Hanoi +""" + +import argparse +import os +import typing +from dataclasses import dataclass, field + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.handlers.llm.template import IsRecursive +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Step model +# --------------------------------------------------------------------------- + + +@dataclass +class Step: + """A single move: take the top disk from tower ``start`` and place it on + tower ``end``. Tower indices are zero-based.""" + + start: int + end: int + + +# --------------------------------------------------------------------------- +# Game state (for validation only) +# --------------------------------------------------------------------------- + + +@dataclass +class GameState: + """State of a Towers of Hanoi game. + + Higher numbers represent larger disks, so ``(2, 1, 0)`` is a valid + tower (largest on bottom). The goal is to move all disks from the + leftmost tower (index 0) to the rightmost tower (index -1). + """ + + size: int + towers: tuple[tuple[int, ...], ...] = field(default=()) + + def __post_init__(self): + if self.size > 0 and not self.towers: + self.towers = tuple( + tuple(reversed(range(self.size))) if i == 0 else () + for i in range(self.size) + ) + + def apply(self, step: Step) -> "GameState": + """Apply a move, returning the new state. Raises ``ValueError`` if + the move is invalid.""" + start, end = step.start, step.end + if not (0 <= start < len(self.towers) and 0 <= end < len(self.towers)): + raise ValueError(f"tower index out of range: ({start}, {end})") + if len(self.towers[start]) == 0: + raise ValueError(f"tower {start} is empty") + if len(self.towers[end]) > 0 and self.towers[start][-1] > self.towers[end][-1]: + raise ValueError( + f"cannot place disk {self.towers[start][-1]} on top of " + f"disk {self.towers[end][-1]}" + ) + new_towers = [list(t) for t in self.towers] + disk = new_towers[start].pop() + new_towers[end].append(disk) + return GameState(self.size, tuple(tuple(t) for t in new_towers)) + + def is_done(self) -> bool: + return all(len(t) == 0 for t in self.towers[:-1]) and all( + self.towers[-1][i] > self.towers[-1][i + 1] + for i in range(len(self.towers[-1]) - 1) + ) + + def __str__(self) -> str: + return " | ".join(str(list(t)) for t in self.towers) + + +# --------------------------------------------------------------------------- +# Recursive LLM solver +# --------------------------------------------------------------------------- + + +@Template.define +def solve( + n_disks: int, source: int, target: int, auxiliary: int +) -> typing.Annotated[list[Step], IsRecursive]: + """Solve Tower of Hanoi: move {n_disks} disks from tower {source} to + tower {target}, using tower {auxiliary} as temporary storage. + + Recursive strategy: + - Base case (n_disks == 1): return [Step(start=source, end=target)] + - Recursive case (n_disks > 1): + 1. Call solve(n_disks - 1, source, auxiliary, target) to move the + top n_disks-1 disks out of the way onto the auxiliary tower. + 2. Move the largest disk: Step(start=source, end=target). + 3. Call solve(n_disks - 1, auxiliary, target, source) to move the + n_disks-1 disks from auxiliary to the target tower. + 4. Return the concatenated list of all steps from (1), (2), and (3). + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + + +def validate_solution(size: int, steps: list[Step]) -> bool: + """Apply all steps to the initial state and check that the puzzle is solved.""" + state = GameState(size=size) + print(f" initial: {state}") + for i, step in enumerate(steps): + try: + state = state.apply(step) + print(f" step {i}: move {step.start} -> {step.end} => {state}") + except ValueError as e: + print(f" step {i}: INVALID move {step.start} -> {step.end}: {e}") + return False + if state.is_done(): + print(f" Solved in {len(steps)} moves!") + return True + else: + print(f" Not solved after {len(steps)} moves. Final state: {state}") + return False + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Recursive LLM-based Towers of Hanoi solver" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--game-size", + type=int, + default=3, + help="Number of disks in the Towers of Hanoi game", + ) + parser.add_argument( + "--num-retries", + type=int, + default=5, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + n = args.game_size + print(f"Solving Tower of Hanoi with {n} disks...") + steps = solve(n_disks=n, source=0, target=n - 1, auxiliary=1) + print(f"\nLLM returned {len(steps)} steps. Validating...\n") + validate_solution(n, steps) diff --git a/docs/source/llm_examples/higher_order_function.py b/docs/source/llm_examples/higher_order_function.py new file mode 100644 index 00000000..da241095 --- /dev/null +++ b/docs/source/llm_examples/higher_order_function.py @@ -0,0 +1,105 @@ +"""Generating higher-order functions that call other templates. + +Demonstrates: +- A template returning a ``Callable``, evaluated via ``UnsafeEvalProvider`` +- The synthesized function calling sub-templates (``write_chapter``, + ``judge_chapter``) at runtime +- ``RetryLLMHandler`` to recover from transient validation/runtime errors +- ``inspect.getsource`` on the generated function +""" + +import argparse +import inspect +import os +from collections.abc import Callable +from typing import Literal + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.handlers.llm.evaluation import UnsafeEvalProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Sub-templates the generated function may call +# --------------------------------------------------------------------------- + + +@Template.define +def write_chapter(chapter_number: int, chapter_name: str) -> str: + """Write a short story about {chapter_number}. Do not use any tools.""" + raise NotHandled + + +@Template.define +def judge_chapter(story_so_far: str, chapter_number: int) -> bool: + """Decide if the new chapter is coherent with the story so far. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Orchestrator template returning a callable +# --------------------------------------------------------------------------- + + +@Template.define +def write_multi_chapter_story(style: Literal["moral", "funny"]) -> Callable[[str], str]: + """Generate a function that writes a story in style: {style} about the given topic. + + If you raise an exception, handle it yourself. + The program can use helper functions defined elsewhere (DO NOT REDEFINE THEM): + - write_chapter(chapter_number: int, chapter_name: str) -> str + - judge_chapter(story_so_far: str, chapter_number: int) -> bool + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Generate a higher-order function that calls sub-templates" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--topic", type=str, default="a curious cat", help="Story topic" + ) + parser.add_argument( + "--style", + type=str, + choices=["moral", "funny"], + default="moral", + help="Story style", + ) + parser.add_argument( + "--num-retries", + type=int, + default=4, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + print("Sub-templates available to write_multi_chapter_story:") + print(list(write_multi_chapter_story.tools.keys())) + + with ( + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + handler(provider), + handler(UnsafeEvalProvider()), + ): + print(f"\n=== Generating story function (style={args.style}) ===") + story_fn = write_multi_chapter_story(args.style) + print(inspect.getsource(story_fn)) + print(f"\n=== Running generated function on {args.topic!r} ===") + print(story_fn(args.topic)) diff --git a/docs/source/llm_examples/hitl.py b/docs/source/llm_examples/hitl.py new file mode 100644 index 00000000..540fc27c --- /dev/null +++ b/docs/source/llm_examples/hitl.py @@ -0,0 +1,177 @@ +"""Human-in-the-loop task planner. + +Demonstrates: +- An ``Agent`` that proposes a plan of action steps +- Human approval/rejection of each step before execution +- Feedback from rejection is fed back to the agent via history +- ``@Tool.define`` for executing approved actions +- Non-interactive mode for testing (auto-approves all steps) +""" + +import argparse +import dataclasses +import enum +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Structured output +# --------------------------------------------------------------------------- + + +class ActionType(enum.StrEnum): + send_email = "send_email" + create_file = "create_file" + schedule_meeting = "schedule_meeting" + done = "done" + + +@dataclasses.dataclass(frozen=True) +class ProposedAction: + action: ActionType + description: str + details: str + + +# --------------------------------------------------------------------------- +# Simulated action execution +# --------------------------------------------------------------------------- + + +execution_log: list[str] = [] + + +@Tool.define +def execute_action(action: ActionType, details: str) -> str: + """Execute an approved action. Returns a confirmation message.""" + msg = f"[executed] {action}: {details}" + execution_log.append(msg) + return msg + + +# --------------------------------------------------------------------------- +# Planner agent +# --------------------------------------------------------------------------- + + +class Planner(Agent): + """Agent that proposes actions one at a time for human approval.""" + + @Template.define + def propose_next(self, task: str, feedback: str) -> ProposedAction: + """You are a task planner helping the user accomplish a goal. + + Task: {task} + + Feedback from the last step: {feedback} + + Review the conversation history for previously completed actions. + Propose the next action to take. If the task is complete, + set action to "done". + + If a previous proposal was rejected, propose something different + that addresses the feedback. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Human-in-the-loop execution +# --------------------------------------------------------------------------- + + +def run_with_approval( + task: str, interactive: bool = False, max_steps: int = 5 +) -> list[str]: + """Run a task planner with human approval for each step.""" + planner = Planner() + feedback = "No actions taken yet. Start planning." + + for step in range(max_steps): + proposal = planner.propose_next(task, feedback) + + if proposal.action == ActionType.done: + print(f" [step {step + 1}] Done: {proposal.description}") + break + + print( + f" [step {step + 1}] Proposed: {proposal.action} - {proposal.description}" + ) + print(f" Details: {proposal.details}") + + if interactive: + answer = input(" Approve? (yes/no + reason): ").strip() + approved = answer.lower().startswith("y") + else: + answer = "yes" + approved = True + + if approved: + result = execute_action(proposal.action, proposal.details) + print(f" {result}") + feedback = f"Approved and executed: {result}" + else: + print(f" [rejected] {answer}") + feedback = f"Rejected: {answer}" + + return list(execution_log) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Human-in-the-loop task planner") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--interactive", + action="store_true", + help="Run in interactive mode with human approval prompts", + ) + parser.add_argument( + "--max-steps", + type=int, + default=5, + help="Maximum number of action steps", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + task = ( + "Organize a team lunch for next Friday. " + "Send an email to the team, create a shared document for " + "restaurant suggestions, and schedule a meeting to finalize plans." + ) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + print(f"Task: {task}\n") + log = run_with_approval( + task, + interactive=args.interactive, + max_steps=args.max_steps, + ) + print(f"\nExecution log ({len(log)} actions):") + for entry in log: + print(f" {entry}") diff --git a/docs/source/llm_examples/image_input.py b/docs/source/llm_examples/image_input.py new file mode 100644 index 00000000..14375b29 --- /dev/null +++ b/docs/source/llm_examples/image_input.py @@ -0,0 +1,63 @@ +"""Passing PIL images directly to a template. + +Demonstrates: +- Templates accepting ``PIL.Image.Image`` arguments +- Inline base64 image data so the script is self-contained +""" + +import argparse +import base64 +import io +import os + +from PIL import Image + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Inline image (32x32 yellow smiley face) +# --------------------------------------------------------------------------- + +IMAGE_BASE64 = ( + "iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAAhElEQVR4nO2W4QqA" + "MAiEVXr/VzYWDGoMdk7Cgrt/sUs/DqZTd3EplFU2JwATYAJMoOlAB4bq89s95+Mg" + "+gyAchsKAYplBBBA43hFhfxnUixDjdEUUL8hpr7R0KLdt9qElzcyiu8As+Kr8zQA" + "mgLavAl+kIzFZyCRxtsAmWb/voZvqRzgBE1sIDuVFX4eAAAAAElFTkSuQmCC" +) + + +# --------------------------------------------------------------------------- +# Template +# --------------------------------------------------------------------------- + + +@Template.define +def describe_image(image: Image.Image) -> str: + """Return a short description of the following image. + {image} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Pass a PIL image to a template") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use (must support image inputs)", + ) + args = parser.parse_args() + + image = Image.open(io.BytesIO(base64.b64decode(IMAGE_BASE64))) + + provider = LiteLLMProvider(model=args.model) + with handler(provider): + print(describe_image(image)) diff --git a/docs/source/llm_examples/image_tool.py b/docs/source/llm_examples/image_tool.py new file mode 100644 index 00000000..81305849 --- /dev/null +++ b/docs/source/llm_examples/image_tool.py @@ -0,0 +1,90 @@ +import argparse +import os + +from PIL import Image + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import ( + LiteLLMProvider, + RetryLLMHandler, +) +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + + +class ImageTools(Agent): + """You are an image processing agent.""" + + _image_to_handle: dict[int, int] + _handle_to_image: dict[int, Image.Image] + + def __init__(self): + self._image_to_handle = {} + self._handle_to_image = {} + + def _encode(self, image: Image.Image) -> int: + image_id = id(image) + handle = self._image_to_handle.get(image_id, None) + if handle is not None: + return handle + + handle = len(self._image_to_handle) + self._image_to_handle[image_id] = handle + + assert handle not in self._handle_to_image + self._handle_to_image[handle] = image + return handle + + def _decode(self, image_handle: int) -> Image.Image: + return self._handle_to_image[image_handle] + + @Tool.define + def rotate(self, image: int, angle: float) -> int: + """Returns a rotated copy of this image. The copy is rotated by `angle` + degrees counterclockwise around the image center. + + """ + return self._encode(self._decode(image).rotate(angle)) + + @Tool.define + def concat_horiz(self, i1_h: int, i2_h: int) -> int: + """Concatenates two images horizontally. The larger image will be + cropped to the height of the smaller image. + + """ + i1 = self._decode(i1_h) + i2 = self._decode(i2_h) + i3 = Image.new("RGB", (i1.width + i2.width, min(i1.height, i2.height))) + i3.paste(i1, (0, 0)) + i3.paste(i2, (i1.width, 0)) + return self._encode(i3) + + @Template.define + def _rotate_and_concat(self, i: int) -> int: + """Create an image consisting of four copies of the image {i} + concatenated horizontally. Each copy should be rotated 90 degrees from + the previous. + + """ + raise NotHandled + + def rotate_and_concat(self, i: Image.Image) -> Image.Image: + return self._decode(self._rotate_and_concat(self._encode(i))) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use (must support image inputs)", + ) + args = parser.parse_args() + + image_agent = ImageTools() + img = Image.open("../_static/img/chirho_logo_wide.png") + + provider = LiteLLMProvider(model=args.model) + with handler(provider), handler(RetryLLMHandler()): + image_agent.rotate_and_concat(img).show() diff --git a/docs/source/llm_examples/majority_vote.py b/docs/source/llm_examples/majority_vote.py new file mode 100644 index 00000000..25696ddc --- /dev/null +++ b/docs/source/llm_examples/majority_vote.py @@ -0,0 +1,81 @@ +"""Majority voting ensemble. + +Demonstrates: +- Running the same template multiple times and taking a majority vote +- ``collections.Counter`` for tallying responses +""" + +import argparse +import collections +import collections.abc +import enum +import os + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Template +# --------------------------------------------------------------------------- + + +class Answer(enum.StrEnum): + yes = "yes" + no = "no" + maybe = "maybe" + + +@Template.define +def yes_or_no(question: str) -> Answer: + """ + Answer the following yes/no/maybe question: {question} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Majority vote +# --------------------------------------------------------------------------- + + +def majority_vote[Q]( + oracle: collections.abc.Callable[[Q], Answer], query: Q, voters: int = 3 +) -> tuple[Answer, int]: + """Call ``oracle(query)`` multiple times and return the most common answer.""" + counter = collections.Counter(oracle(query) for _ in range(voters)) + return counter.most_common(1)[0] + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Majority voting ensemble for yes/no questions" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--num-voters", type=int, default=3, help="Number of voters for majority vote" + ) + parser.add_argument( + "--question", + type=str, + default="Is Paris the capital of France?", + help="Yes/no question to ask", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + with handler(provider): + answer, count = majority_vote(yes_or_no, args.question, voters=args.num_voters) + print( + f"Question: {args.question}\nAnswer: {answer} (voted {count}/{args.num_voters})" + ) diff --git a/docs/source/llm_examples/map_reduce.py b/docs/source/llm_examples/map_reduce.py new file mode 100644 index 00000000..70a79c59 --- /dev/null +++ b/docs/source/llm_examples/map_reduce.py @@ -0,0 +1,160 @@ +"""Map-reduce resume evaluation. + +Demonstrates: +- Fan-out: evaluating multiple items independently with the same template +- Reduce: aggregating individual results into a summary +- ``asyncio.gather`` with ``asyncio.to_thread`` for parallel LLM calls +- Structured output with dataclasses +""" + +import argparse +import asyncio +import collections.abc +import dataclasses +import functools +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Structured output +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass(frozen=True) +class Evaluation: + name: str + qualified: bool + strengths: str + weaknesses: str + score: int # 1-10 + + +# --------------------------------------------------------------------------- +# Templates +# --------------------------------------------------------------------------- + + +@Template.define +def evaluate_resume(resume: str, job_description: str) -> Evaluation: + """You are a hiring manager. Evaluate this resume against the job + description and produce a structured evaluation. + + Job description: {job_description} + + Resume: + {resume} + + Score from 1 (poor fit) to 10 (perfect fit). + """ + raise NotHandled + + +@Template.define +def summarize_evaluations( + job_description: str, + evaluations: collections.abc.Sequence[Evaluation], +) -> str: + """You are a hiring manager summarizing candidate evaluations. + + Job description: {job_description} + + Individual evaluations: + {evaluations} + + Provide a brief summary: rank the candidates from best to worst, + highlight the top candidate, and note any concerns. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Sample data +# --------------------------------------------------------------------------- + +JOB_DESCRIPTION = ( + "Senior Python Developer: 5+ years Python experience, " + "familiarity with web frameworks (Django/Flask), " + "database design, and cloud deployment (AWS/GCP)." +) + +RESUMES = [ + "Alice Chen - 7 years Python, Django expert, AWS certified, " + "led team of 5, built microservices architecture at FinTech startup.", + "Bob Smith - 3 years Python, 2 years JavaScript, some Flask experience, " + "junior developer at small agency, strong communication skills.", + "Carol Davis - 10 years software engineering, 6 years Python, " + "GCP specialist, PostgreSQL expert, open-source contributor, " + "previously senior engineer at Google.", + "Dave Wilson - 4 years Python, self-taught, built several side projects, " + "no professional experience with web frameworks or cloud platforms.", +] + +# --------------------------------------------------------------------------- +# Map-reduce pipeline +# --------------------------------------------------------------------------- + + +async def map_reduce_evaluate( + provider: LiteLLMProvider, + resumes: list[str], + job_description: str, +) -> str: + """Evaluate resumes in parallel (map), then summarize (reduce).""" + # Map: evaluate each resume concurrently + evaluate = functools.partial( + asyncio.to_thread, + handler(provider)( + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries)))( + evaluate_resume + ) + ), + ) + evaluations: list[Evaluation] = list( + await asyncio.gather(*(evaluate(resume, job_description) for resume in resumes)) + ) + + # Print individual evaluations + for ev in evaluations: + print(f" {ev.name}: score={ev.score}/10, qualified={ev.qualified}") + print(f" + {ev.strengths}") + print(f" - {ev.weaknesses}") + + # Reduce: summarize all evaluations + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + return summarize_evaluations(job_description, evaluations) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Map-reduce resume evaluation") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + print(f"Evaluating {len(RESUMES)} resumes for: {JOB_DESCRIPTION}\n") + summary = asyncio.run(map_reduce_evaluate(provider, RESUMES, JOB_DESCRIPTION)) + print(f"\n{summary}") diff --git a/docs/source/llm_examples/multi_agent.py b/docs/source/llm_examples/multi_agent.py new file mode 100644 index 00000000..0389c6c8 --- /dev/null +++ b/docs/source/llm_examples/multi_agent.py @@ -0,0 +1,168 @@ +"""Multi-agent Taboo word guessing game. + +Demonstrates: +- Two ``Agent`` instances with independent conversation histories +- Inter-agent communication via plain function calls +- Each agent has a different persona and goal +- ``Agent.__history__`` keeps each agent's context isolated +""" + +import argparse +import dataclasses +import enum +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Structured output +# --------------------------------------------------------------------------- + + +class Confidence(enum.Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + + +@dataclasses.dataclass(frozen=True) +class Guess: + guess: str + confidence: Confidence + + +# --------------------------------------------------------------------------- +# Agents +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class Hinter(Agent): + """Agent that gives hints about a secret word without saying it.""" + + secret_word: str = dataclasses.field(default="") + taboo_words: list[str] = dataclasses.field(default_factory=list) + + @Tool.define + def is_taboo(self, hint: str) -> bool: + """Check if the given hint contains any taboo words or the secret word.""" + lowered_hint = hint.lower() + if self.secret_word.lower() in lowered_hint: + return True + for taboo in self.taboo_words: + if taboo.lower() in lowered_hint: + return True + return False + + @Template.define + def give_hint(self, guesser_response: str) -> str: + """You are playing a word guessing game. You must help the guesser + figure out the secret word by giving creative hints. + + RULES: + - You MUST NOT say the secret word: {self.secret_word} + - You MUST NOT use any of these taboo words: {self.taboo_words} + - Give a single, concise hint (one sentence) + - Review conversation history to avoid repeating hints + - Use the is_taboo tool to check if your hint is valid + + The guesser's last response was: {guesser_response} + """ + raise NotHandled + + +class Guesser(Agent): + """Agent that tries to guess the secret word from hints.""" + + @Template.define + def make_guess(self, hint: str) -> Guess: + """You are playing a word guessing game. Based on the hints you've + received, guess the secret word. + + Latest hint: {hint} + + Review the conversation history for all previous hints. + Make your best guess. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Game loop +# --------------------------------------------------------------------------- + + +def play_taboo( + secret_word: str, + taboo_words: list[str], + max_rounds: int = 5, +) -> bool: + """Play a round of Taboo between a hinter and a guesser.""" + hinter = Hinter(secret_word=secret_word, taboo_words=taboo_words) + guesser = Guesser() + + guesser_response = "I'm ready to guess!" + + for round_num in range(max_rounds): + # Hinter gives a hint + hint = hinter.give_hint(guesser_response) + print(f" [round {round_num}] Hinter: {hint}") + + # Guesser tries to guess + guess = guesser.make_guess(hint) + guesser_response = f"I guessed '{guess.guess}' ({guess.confidence})" + print(f" [round {round_num}] Guesser: {guess.guess} ({guess.confidence})") + + if guess.guess.lower().strip() == secret_word.lower(): + print(f" Correct! Guessed in {round_num} round(s).") + return True + + print(f" Failed to guess '{secret_word}' in {max_rounds} rounds.") + return False + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Multi-agent Taboo word guessing game") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--max-rounds", + type=int, + default=5, + help="Maximum rounds per game", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + games = [ + ("piano", ["music", "keys", "instrument", "play"]), + ("volcano", ["lava", "eruption", "mountain", "hot"]), + ] + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + for secret, taboo in games: + print(f"\nGame: '{secret}' (taboo: {taboo})") + play_taboo(secret, taboo, max_rounds=args.max_rounds) diff --git a/docs/source/llm_examples/prompt_templates.py b/docs/source/llm_examples/prompt_templates.py new file mode 100644 index 00000000..74c1cf0d --- /dev/null +++ b/docs/source/llm_examples/prompt_templates.py @@ -0,0 +1,86 @@ +"""Basic prompt templates and deterministic caching. + +Demonstrates: +- ``@Template.define`` for declaring an LLM-backed function +- Non-determinism: calling the same template twice yields different results +- ``functools.cache`` to make a template call deterministic in-process +- ``LiteLLMProvider(caching=True)`` with ``litellm.cache`` for cross-process caching +""" + +import argparse +import functools +import os + +import litellm +from litellm.caching.caching import Cache + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Templates +# --------------------------------------------------------------------------- + + +@Template.define +def limerick(theme: str) -> str: + """Write a limerick on the theme of {theme}. Do not use any tools.""" + raise NotHandled + + +@functools.cache +@Template.define +def haiku(theme: str) -> str: + """Write a haiku on the theme of {theme}. Do not use any tools.""" + raise NotHandled + + +@Template.define +def haiku_no_cache(theme: str) -> str: + """Write a haiku on the theme of {theme}. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Basic prompt templates and deterministic caching" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument("--theme", type=str, default="fish", help="Theme for the poem") + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + print("=== Non-deterministic limerick (two independent calls) ===") + with handler(provider): + print(limerick(args.theme)) + print("-" * 40) + print(limerick(args.theme)) + + print("\n=== functools.cache: same result on second call ===") + with handler(provider): + print(haiku(args.theme)) + print("-" * 40) + print(haiku(args.theme)) + + print("\n=== LiteLLMProvider(caching=True): backed by litellm.cache ===") + litellm.cache = Cache() + provider_cached = LiteLLMProvider(model=args.model, caching=True) + try: + with handler(provider_cached): + print(haiku_no_cache(args.theme)) + print("-" * 40) + print(haiku_no_cache(args.theme)) + finally: + litellm.cache = None diff --git a/docs/source/llm_examples/rag.py b/docs/source/llm_examples/rag.py new file mode 100644 index 00000000..eca2b450 --- /dev/null +++ b/docs/source/llm_examples/rag.py @@ -0,0 +1,196 @@ +"""Retrieval-augmented generation (RAG). + +Demonstrates: +- Offline: chunking documents, embedding, and indexing +- Online: embedding a query, retrieving relevant chunks, and generating + a grounded answer +- ``@Tool.define`` to expose retrieval as a tool the LLM can call +- Separation of indexing (plain Python) from generation (``@Template.define``) +""" + +import argparse +import dataclasses +import os + +import litellm +import numpy as np +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Embedding helpers +# --------------------------------------------------------------------------- + + +def get_embedding(text: str, model: str) -> np.ndarray: + """Get an embedding vector for the given text using litellm.""" + response = litellm.embedding(model=model, input=text) + return np.array(response.data[0]["embedding"], dtype=np.float32) + + +# --------------------------------------------------------------------------- +# Vector index +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class VectorIndex: + """Simple in-memory vector index using L2 distance.""" + + model: str + chunks: list[str] = dataclasses.field(default_factory=list) + embeddings: list[np.ndarray] = dataclasses.field(default_factory=list) + + def add(self, text: str) -> None: + """Add a text chunk to the index.""" + self.chunks.append(text) + self.embeddings.append(get_embedding(text, model=self.model)) + + @Tool.define + def retrieve(self, query: str, top_k: int = 3) -> list[str]: + """Return the top-k most similar chunks to the query.""" + if not self.embeddings: + return [] + query_emb = get_embedding(query, model=self.model) + distances = [float(((emb - query_emb) ** 2).sum()) for emb in self.embeddings] + indices = sorted(range(len(distances)), key=lambda i: distances[i]) + return [self.chunks[i] for i in indices[:top_k]] + + +# --------------------------------------------------------------------------- +# Chunking +# --------------------------------------------------------------------------- + + +def chunk_text(text: str, chunk_size: int = 200, overlap: int = 50) -> list[str]: + """Split text into overlapping word-level chunks.""" + words = text.split() + chunks = [] + start = 0 + while start < len(words): + end = start + chunk_size + chunks.append(" ".join(words[start:end])) + start += chunk_size - overlap + return chunks + + +# --------------------------------------------------------------------------- +# Sample documents +# --------------------------------------------------------------------------- + +DOCUMENTS = [ + """The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars + in Paris, France. It is named after the engineer Gustave Eiffel, whose + company designed and built the tower from 1887 to 1889 as the centerpiece + of the 1889 World's Fair. Although initially criticized by some of France's + leading artists and intellectuals, the tower has become a global icon of + France and one of the most recognizable structures in the world. The tower + is 330 metres tall, about the same height as an 81-storey building, and + is the tallest structure in Paris. It was the first structure in the world + to reach a height of 300 metres.""", + """The Great Wall of China is a series of fortifications that were built + across the historical northern borders of ancient Chinese states and + Imperial China as protection against various nomadic groups. The total + length of all sections ever built is more than 20,000 km. Several walls + were built from as early as the 7th century BC, with selective stretches + later joined together by Qin Shi Huang, the first emperor of China. The + best-preserved sections of the wall date from the Ming dynasty + (1368-1644). The wall's purpose was defensive, and it featured + watchtowers, troop barracks, and signaling capabilities.""", + """The Colosseum, also known as the Flavian Amphitheatre, is an oval + amphitheatre in the centre of the city of Rome, Italy. It is the largest + ancient amphitheatre ever built, and is still the largest standing + amphitheatre in the world, despite its age. Construction began under + the emperor Vespasian in AD 72 and was completed in AD 80 under his + successor and heir, Titus. The Colosseum could hold an estimated 50,000 + to 80,000 spectators at various points in its history, and was used for + gladiatorial contests and public spectacles including animal hunts, + executions, re-enactments of famous battles, and dramas.""", +] + +# --------------------------------------------------------------------------- +# Build the index (offline phase) +# --------------------------------------------------------------------------- + + +def build_index(documents: list[str], embedding_model: str) -> VectorIndex: + """Chunk and index a collection of documents.""" + index = VectorIndex(model=embedding_model) + for doc in documents: + for chunk in chunk_text(doc, chunk_size=60, overlap=15): + index.add(chunk) + print(f"Indexed {len(index.chunks)} chunks from {len(documents)} documents") + return index + + +# --------------------------------------------------------------------------- +# RAG query (online phase) +# --------------------------------------------------------------------------- + + +@Template.define +def answer_question(question: str) -> str: + """You are a helpful assistant. Answer the user's question using ONLY + information retrieved from the knowledge base via the retrieve tool. + + If the retrieved information doesn't contain the answer, say so. + Always cite which document your information comes from. + + Question: {question} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Retrieval-augmented generation (RAG)") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--embedding-model", + type=str, + default="lm_studio/text-embedding-embeddinggemma-300m-qat", + help="Embedding model to use", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + # Offline: build the index + index = build_index(DOCUMENTS, embedding_model=args.embedding_model) + + # Create the retrieval tool bound to our index + retrieve: Tool = index.retrieve + + # Online: answer questions + questions = [ + "How tall is the Eiffel Tower?", + "When was the Great Wall of China built?", + "How many spectators could the Colosseum hold?", + ] + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + for question in questions: + print(f"\nQ: {question}") + answer = answer_question(question) + print(f"A: {answer}") diff --git a/docs/source/llm_examples/research_agent.py b/docs/source/llm_examples/research_agent.py new file mode 100644 index 00000000..308d2df2 --- /dev/null +++ b/docs/source/llm_examples/research_agent.py @@ -0,0 +1,137 @@ +"""Research agent with web search. + +Demonstrates: +- ``@defop`` + ``ObjectInterpretation`` to define a pluggable web search effect +- ``@Template.define`` for LLM-implemented answer/refine/judge templates +- Handler composition: stacking a search provider alongside an LLM provider +- Iterative refinement loop: answer → judge → refine → judge → ... +""" + +import argparse +import os +import urllib.parse + +import requests + +from effectful.handlers.llm import Template, Tool +from effectful.handlers.llm.completions import ( + LiteLLMProvider, +) +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Search effect + handler +# --------------------------------------------------------------------------- + + +@Tool.define +def search_web(query: str) -> str: + """Search Wikipedia for a topic and return a summary. The query can be a topic name or a natural language question.""" + search_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "list": "search", + "srsearch": query, + "srlimit": 1, + "format": "json", + } + ) + search_data = requests.get( + search_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + results = search_data.get("query", {}).get("search", []) + if not results: + return f"No results found for: {query}" + title = results[0]["title"] + + summary_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "titles": title, + "prop": "extracts", + "exintro": True, + "explaintext": True, + "format": "json", + } + ) + summary_data = requests.get( + summary_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + page = next(iter(summary_data["query"]["pages"].values())) + extract = page.get("extract", "No summary available.") + url = f"https://en.wikipedia.org/wiki/{urllib.parse.quote(title.replace(' ', '_'))}" + + return f"# {title}\n\n{extract}\n\nSource: {url}" + + +# --------------------------------------------------------------------------- +# Templates (auto-capture `search_web` from lexical scope) +# --------------------------------------------------------------------------- + + +@Template.define +def answer_question(question: str) -> str: + """Acting as a research assistant that can search the web, + construct an answer to the user's question: {question}.""" + raise NotHandled + + +@Template.define +def refine_answer(question: str, answer: str) -> str: + """Acting as a research assistant that can search the web, + given the user's original question ({question}), + refine this previous answer: {answer}.""" + raise NotHandled + + +@Template.define +def is_question_answered(question: str, answer: str) -> bool: + """Acting as a research assistant, decide if the user's question + ({question}) is appropriately answered by: {answer}. + Respond only true or false.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Agent loop +# --------------------------------------------------------------------------- + + +def research_agent(question: str, max_attempts: int = 3) -> str: + """Answer a question, iteratively refining until satisfactory.""" + answer = answer_question(question) + for _ in range(max_attempts): + if is_question_answered(question, answer): + break + answer = refine_answer(question, answer) + return answer + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="LLM-guided research agent with web search" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--question", + type=str, + default="What is the meaning of life?", + help="The question to research", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with handler(provider): + result = research_agent(args.question) + print(result) diff --git a/docs/source/llm_examples/retry_tool_errors.py b/docs/source/llm_examples/retry_tool_errors.py new file mode 100644 index 00000000..7ce0b7b1 --- /dev/null +++ b/docs/source/llm_examples/retry_tool_errors.py @@ -0,0 +1,91 @@ +"""Retrying tool execution failures. + +Demonstrates: +- ``RetryLLMHandler`` surfacing tool exceptions back to the LLM as tool messages +- A flaky tool (``unstable_service``) that succeeds only after multiple attempts +- The contrast between an unhandled failure and a retry-handled success +""" + +import argparse +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Flaky tool +# --------------------------------------------------------------------------- + +call_count = 0 +REQUIRED_RETRIES = 3 + + +@Tool.define +def unstable_service() -> str: + """Fetch data from an unstable external service. May require retries.""" + global call_count + call_count += 1 + if call_count < REQUIRED_RETRIES: + raise ConnectionError( + f"Service unavailable! Attempt {call_count}/{REQUIRED_RETRIES}. Please retry." + ) + return "{ 'status': 'ok', 'data': [1, 2, 3] }" + + +# --------------------------------------------------------------------------- +# Template (unstable_service auto-captured from lexical scope) +# --------------------------------------------------------------------------- + + +@Template.define +def fetch_data() -> str: + """Use the unstable_service tool to fetch data.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Retry LLM template calls when tools raise exceptions" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--num-retries", + type=int, + default=4, + help="Number of retries for tool/decode failures", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + print("=== Without RetryLLMHandler ===") + with handler(provider): + try: + result = fetch_data() + print(f"Result: {result}") + except Exception as e: + print(f"Error: {e}") + + # Reset for the retry-enabled run. + call_count = 0 + + print("\n=== With RetryLLMHandler ===") + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + result = fetch_data() + print(f"Result: {result} (after {call_count} tool attempts)") diff --git a/docs/source/llm_examples/retry_validation.py b/docs/source/llm_examples/retry_validation.py new file mode 100644 index 00000000..aab80df9 --- /dev/null +++ b/docs/source/llm_examples/retry_validation.py @@ -0,0 +1,107 @@ +"""Retrying when structured-output validation fails. + +Demonstrates: +- A pydantic dataclass with ``field_validator`` constraints +- ``RetryLLMHandler`` feeding ``PydanticCustomError`` messages back to the LLM + so it can correct its output on a subsequent attempt +""" + +import argparse +import os + +import pydantic +from pydantic import field_validator +from pydantic_core import PydanticCustomError +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Validated structured output +# --------------------------------------------------------------------------- + + +@pydantic.dataclasses.dataclass +class Rating: + score: int + explanation: str + + @field_validator("score") + @classmethod + def check_score(cls, v): + if v < 1 or v > 5: + raise PydanticCustomError( + "invalid_score", + "score must be 1–5, got {v}", + {"v": v}, + ) + return v + + @field_validator("explanation") + @classmethod + def check_explanation_contains_score(cls, v, info): + score = info.data.get("score", None) + if score is not None and str(score) not in v: + raise PydanticCustomError( + "invalid_explanation", + "explanation must mention the score {score}, got '{explanation}'", + {"score": score, "explanation": v}, + ) + return v + + +# --------------------------------------------------------------------------- +# Template +# --------------------------------------------------------------------------- + + +@Template.define +def give_rating_for_movie(movie_name: str) -> Rating: + """Give a rating for {movie_name}. The explanation MUST include the numeric score. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Retry on pydantic validation errors in LLM responses" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument("--movie", type=str, default="Die Hard", help="Movie to rate") + parser.add_argument( + "--num-retries", + type=int, + default=4, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + print("=== Without RetryLLMHandler ===") + with handler(provider): + try: + rating = give_rating_for_movie(args.movie) + print(f"Score: {rating.score}/5\nExplanation: {rating.explanation}") + except Exception as e: + print(f"Error: {e}") + + print("\n=== With RetryLLMHandler ===") + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + rating = give_rating_for_movie(args.movie) + print(f"Score: {rating.score}/5") + print(f"Explanation: {rating.explanation}") diff --git a/docs/source/llm_examples/structured_output.py b/docs/source/llm_examples/structured_output.py new file mode 100644 index 00000000..0f6c85f8 --- /dev/null +++ b/docs/source/llm_examples/structured_output.py @@ -0,0 +1,82 @@ +"""Structured output via dataclasses. + +Demonstrates: +- Dataclass return types decoded from constrained LLM generation +- Round-tripping a dataclass: one template produces it, another consumes it +""" + +import argparse +import dataclasses +import os + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Structured output +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass +class KnockKnockJoke: + whos_there: str + punchline: str + + +# --------------------------------------------------------------------------- +# Templates +# --------------------------------------------------------------------------- + + +@Template.define +def write_joke(theme: str) -> KnockKnockJoke: + """Write a knock-knock joke on the theme of {theme}. Do not use any tools.""" + raise NotHandled + + +@Template.define +def rate_joke(joke: KnockKnockJoke) -> bool: + """Decide if {joke} is funny or not. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Helper +# --------------------------------------------------------------------------- + + +def do_comedy(theme: str) -> None: + joke = write_joke(theme) + print("> You are onstage at a comedy club. You tell the following joke:") + print( + f"Knock knock.\nWho's there?\n{joke.whos_there}.\n" + f"{joke.whos_there} who?\n{joke.punchline}" + ) + if rate_joke(joke): + print("> The crowd laughs politely.") + else: + print("> The crowd stares in stony silence.") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Structured output via dataclasses") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--theme", type=str, default="lizards", help="Theme for the joke" + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + with handler(provider): + do_comedy(args.theme) diff --git a/docs/source/llm_examples/supervisor.py b/docs/source/llm_examples/supervisor.py new file mode 100644 index 00000000..29f258fe --- /dev/null +++ b/docs/source/llm_examples/supervisor.py @@ -0,0 +1,187 @@ +"""Supervisor quality-control wrapper. + +Demonstrates: +- Wrapping an agent's output with a quality-control check +- Using one ``Template`` to judge another's output +- Retry loop driven by LLM-based evaluation +""" + +import argparse +import dataclasses +import os +import urllib.parse + +import requests +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Search tool +# --------------------------------------------------------------------------- + + +@Tool.define +def search_web(query: str) -> str: + """Search Wikipedia for a topic and return a summary. The query can be a topic name or a natural language question.""" + search_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "list": "search", + "srsearch": query, + "srlimit": 1, + "format": "json", + } + ) + search_data = requests.get( + search_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + results = search_data.get("query", {}).get("search", []) + if not results: + return f"No results found for: {query}" + title = results[0]["title"] + + summary_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "titles": title, + "prop": "extracts", + "exintro": True, + "explaintext": True, + "format": "json", + } + ) + summary_data = requests.get( + summary_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + page = next(iter(summary_data["query"]["pages"].values())) + extract = page.get("extract", "No summary available.") + url = f"https://en.wikipedia.org/wiki/{urllib.parse.quote(title.replace(' ', '_'))}" + + return f"# {title}\n\n{extract}\n\nSource: {url}" + + +# --------------------------------------------------------------------------- +# Structured output for quality judgment +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass(frozen=True) +class QualityJudgment: + is_acceptable: bool + feedback: str + + +# --------------------------------------------------------------------------- +# Research agent +# --------------------------------------------------------------------------- + + +class Researcher(Agent): + """Agent that answers research questions using web search.""" + + @Template.define + def answer(self, question: str) -> str: + """You are a research assistant. Answer the following question using + the search tool to find accurate information. + + Question: {question} + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Supervisor (quality judge) +# --------------------------------------------------------------------------- + + +@Template.define +def judge_quality(question: str, answer: str) -> QualityJudgment: + """You are a strict quality reviewer. Evaluate whether this answer + adequately addresses the question with accurate, specific information. + + Question: {question} + Answer: {answer} + + An answer is acceptable if it contains specific facts (names, dates, + numbers) relevant to the question. Vague or generic answers should + be rejected. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Supervised agent loop +# --------------------------------------------------------------------------- + + +def supervised_research(question: str, max_retries: int = 3) -> str: + """Answer a question with quality-control supervision. + + The researcher agent answers, the supervisor judges quality, + and if rejected the researcher tries again with feedback. + """ + researcher = Researcher() + + for attempt in range(max_retries + 1): + answer = researcher.answer(question) + judgment = judge_quality(question, answer) + + if judgment.is_acceptable: + print(f"[supervisor] Accepted on attempt {attempt + 1}") + return answer + + print(f"[supervisor] Rejected attempt {attempt + 1}: {judgment.feedback}") + + print("[supervisor] Returning best effort after max retries") + return answer + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Supervised research agent with quality control" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--question", + type=str, + default="What year was the Eiffel Tower completed and how tall is it?", + help="Research question to answer", + ) + parser.add_argument( + "--max-retries", + type=int, + default=3, + help="Maximum number of supervisor rejections before accepting", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + result = supervised_research( + args.question, + max_retries=args.max_retries, + ) + print(f"\nFinal answer: {result}") diff --git a/docs/source/llm_examples/tao_agent.py b/docs/source/llm_examples/tao_agent.py new file mode 100644 index 00000000..2a8a1471 --- /dev/null +++ b/docs/source/llm_examples/tao_agent.py @@ -0,0 +1,182 @@ +"""Think-Act-Observe chain-of-thought agent. + +Demonstrates: +- ``Agent`` mixin for persistent conversation history +- Structured output with Pydantic models (``AgentThought``) +- A think → act → observe reasoning loop +- Pattern matching for action dispatch +""" + +import argparse +import dataclasses +import enum +import os +import urllib.parse + +import requests +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template, Tool +from effectful.handlers.llm.completions import ( + LiteLLMProvider, + RetryLLMHandler, +) +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Search tool +# --------------------------------------------------------------------------- + + +@Tool.define +def search_web(query: str) -> str: + """Search Wikipedia for a topic and return a summary. The query can be a topic name or a natural language question.""" + search_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "list": "search", + "srsearch": query, + "srlimit": 1, + "format": "json", + } + ) + search_data = requests.get( + search_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + results = search_data.get("query", {}).get("search", []) + if not results: + return f"No results found for: {query}" + title = results[0]["title"] + + summary_url = "https://en.wikipedia.org/w/api.php?" + urllib.parse.urlencode( + { + "action": "query", + "titles": title, + "prop": "extracts", + "exintro": True, + "explaintext": True, + "format": "json", + } + ) + summary_data = requests.get( + summary_url, headers={"User-Agent": "effectful-example/1.0"} + ).json() + page = next(iter(summary_data["query"]["pages"].values())) + extract = page.get("extract", "No summary available.") + url = f"https://en.wikipedia.org/wiki/{urllib.parse.quote(title.replace(' ', '_'))}" + + return f"# {title}\n\n{extract}\n\nSource: {url}" + + +# --------------------------------------------------------------------------- +# Structured output types +# --------------------------------------------------------------------------- + + +class AgentAction(enum.StrEnum): + search_the_web = "search_the_web" + calculate = "calculate" + answer = "answer" + + +@dataclasses.dataclass(frozen=True) +class AgentThought: + thinking: str + action: AgentAction + action_input: str + is_final: bool + + +# --------------------------------------------------------------------------- +# TAO Agent +# --------------------------------------------------------------------------- + + +class TAOAgent(Agent): + """Think-Act-Observe agent that reasons step by step.""" + + @Template.define + def think(self, query: str) -> AgentThought: + """You are an AI assistant solving a problem. Based on the user's query + ({query}) and prior conversation context, think about what action to + take next. + """ + raise NotHandled + + @Template.define + def observe(self, action: str, action_input: str, action_result: str) -> str: + """You are an observer. Provide a concise, objective observation of this result. + + Action: {action} + Action input: {action_input} + Action result: {action_result} + + + Do not make decisions, just describe what you see. + + """ + raise NotHandled + + def run(self, query: str, max_steps: int = 5) -> str: + result = "" + for _ in range(max_steps): + thought = self.think(query) + result = self._act(thought.action, thought.action_input) + self.observe(str(thought.action), thought.action_input, result) + if thought.is_final: + break + return result + + def _act(self, action: AgentAction, action_input: str) -> str: + match action: + case AgentAction.search_the_web: + return search_web(action_input) + case AgentAction.calculate: + try: + return action_input # eval(action_input)) # noqa: S307 + except Exception as e: + return str(e) + case AgentAction.answer: + return action_input + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="TAO chain-of-thought agent") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--max-steps", + type=int, + default=5, + help="Maximum number of steps before giving up", + ) + parser.add_argument( + "--num-retries", + type=int, + default=5, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + agent = TAOAgent() + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + answer = agent.run( + "How many tennis balls would fill an Olympic swimming pool?", + max_steps=args.max_steps, + ) + print("Answer:", answer) diff --git a/docs/source/llm_examples/template_composition.py b/docs/source/llm_examples/template_composition.py new file mode 100644 index 00000000..5f880307 --- /dev/null +++ b/docs/source/llm_examples/template_composition.py @@ -0,0 +1,76 @@ +"""Template composition: templates can call other templates. + +Demonstrates: +- Sub-templates auto-captured into an orchestrator template's lexical scope +- Inspecting ``write_story.tools`` to confirm sub-templates are exposed to the LLM +- The orchestrator dispatches to the right sub-template based on a style argument +""" + +import argparse +import os + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Sub-templates +# --------------------------------------------------------------------------- + + +@Template.define +def story_with_moral(topic: str) -> str: + """Write a short story about {topic} and end with a moral lesson. Do not use any tools.""" + raise NotHandled + + +@Template.define +def story_funny(topic: str) -> str: + """Write a funny, humorous story about {topic}. Do not use any tools.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Orchestrator template +# --------------------------------------------------------------------------- + + +@Template.define +def write_story(topic: str, style: str) -> str: + """Write a story about {topic} in the style: {style}. + Available styles: 'moral' for a story with a lesson, 'funny' for humor. + Use story_funny for humor, story_with_moral for a story with a lesson. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Template composition with auto-captured sub-templates" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--topic", type=str, default="a curious cat", help="Story topic" + ) + args = parser.parse_args() + + assert story_with_moral in write_story.tools.values() + assert story_funny in write_story.tools.values() + print("Sub-templates available to write_story:", list(write_story.tools.keys())) + + provider = LiteLLMProvider(model=args.model) + with handler(provider): + print("\n=== Story with moral ===") + print(write_story(args.topic, "moral")) + print("\n=== Funny story ===") + print(write_story(args.topic, "funny")) diff --git a/docs/source/llm_examples/text2sql.py b/docs/source/llm_examples/text2sql.py new file mode 100644 index 00000000..a3e36f93 --- /dev/null +++ b/docs/source/llm_examples/text2sql.py @@ -0,0 +1,174 @@ +"""Natural language to SQL with LLM-powered debug loop. + +Demonstrates: +- Generating SQL from natural language using ``@Template.define`` +- Executing SQL against a real SQLite database +- Feeding execution errors back to the LLM for iterative fixing +- ``@Tool.define`` to expose the database schema as a tool +""" + +import argparse +import os +import sqlite3 +import textwrap + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# In-memory database setup +# --------------------------------------------------------------------------- + + +def create_sample_db() -> sqlite3.Connection: + """Create a sample SQLite database with employee data.""" + conn = sqlite3.connect(":memory:") + conn.executescript( + textwrap.dedent("""\ + CREATE TABLE departments ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + budget REAL NOT NULL + ); + CREATE TABLE employees ( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + department_id INTEGER REFERENCES departments(id), + salary REAL NOT NULL, + hire_date TEXT NOT NULL + ); + INSERT INTO departments VALUES (1, 'Engineering', 500000); + INSERT INTO departments VALUES (2, 'Marketing', 200000); + INSERT INTO departments VALUES (3, 'Sales', 300000); + INSERT INTO employees VALUES (1, 'Alice', 1, 120000, '2020-01-15'); + INSERT INTO employees VALUES (2, 'Bob', 1, 110000, '2021-03-22'); + INSERT INTO employees VALUES (3, 'Carol', 2, 95000, '2019-07-01'); + INSERT INTO employees VALUES (4, 'Dave', 3, 105000, '2022-11-10'); + INSERT INTO employees VALUES (5, 'Eve', 1, 130000, '2018-05-20'); + INSERT INTO employees VALUES (6, 'Frank', 3, 98000, '2023-01-05'); + """) + ) + return conn + + +def get_schema(conn: sqlite3.Connection) -> str: + """Extract the schema from a SQLite database.""" + cursor = conn.execute( + "SELECT sql FROM sqlite_master WHERE type='table' ORDER BY name" + ) + return "\n\n".join(row[0] for row in cursor if row[0]) + + +# --------------------------------------------------------------------------- +# Templates +# --------------------------------------------------------------------------- + + +@Template.define +def generate_sql(question: str, db_schema: str) -> str: + """You are a SQL expert. Given this database schema: + + {db_schema} + + Write a SQLite query that answers: {question} + + Return ONLY the SQL query, no explanation. + """ + raise NotHandled + + +@Template.define +def fix_sql(question: str, db_schema: str, bad_sql: str, error: str) -> str: + """You are a SQL expert. Your previous query had an error. + + Database schema: + {db_schema} + + Original question: {question} + Failed SQL: {bad_sql} + Error: {error} + + Write a corrected SQLite query. Return ONLY the SQL query. + """ + raise NotHandled + + +# --------------------------------------------------------------------------- +# Text-to-SQL agent with debug loop +# --------------------------------------------------------------------------- + + +def text_to_sql( + conn: sqlite3.Connection, question: str, max_retries: int = 3 +) -> list[tuple]: + """Convert a natural language question to SQL and execute it. + + If the query fails, feed the error back to the LLM to fix it, + up to ``max_retries`` times. + """ + schema = get_schema(conn) + sql = generate_sql(question, schema) + + for attempt in range(max_retries + 1): + # Strip markdown fences if the LLM wraps the SQL + clean_sql = sql.strip().removeprefix("```sql").removesuffix("```").strip() + print(f" [attempt {attempt + 1}] {clean_sql}") + + try: + cursor = conn.execute(clean_sql) + return cursor.fetchall() + except Exception as e: + if attempt < max_retries: + print(f" [error] {e}") + sql = fix_sql(question, schema, clean_sql, str(e)) + else: + raise + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Natural language to SQL with LLM-powered debug loop" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + conn = create_sample_db() + provider = LiteLLMProvider(model=args.model) + + questions = [ + "What is the average salary by department?", + "Who is the highest paid employee?", + "How many employees were hired after 2021?", + ] + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + for question in questions: + print(f"\nQ: {question}") + try: + rows = text_to_sql(conn, question) + for row in rows: + print(f" => {row}") + except Exception as e: + print(f" FAILED: {e}") diff --git a/docs/source/llm_examples/thinking.py b/docs/source/llm_examples/thinking.py new file mode 100644 index 00000000..058de61e --- /dev/null +++ b/docs/source/llm_examples/thinking.py @@ -0,0 +1,117 @@ +"""Chain-of-thought reasoning with structured self-loop. + +Demonstrates: +- Structured output with a ``ThoughtStep`` dataclass +- An ``Agent`` that loops until it decides it has a final answer +- The LLM sees its own prior reasoning via ``Agent.__history__`` +""" + +import argparse +import dataclasses +import os + +from tenacity import stop_after_attempt + +from effectful.handlers.llm import Agent, Template +from effectful.handlers.llm.completions import LiteLLMProvider, RetryLLMHandler +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Structured output +# --------------------------------------------------------------------------- + + +@dataclasses.dataclass(frozen=True) +class ThoughtStep: + reasoning: str + conclusion: str + is_final: bool + + +# --------------------------------------------------------------------------- +# Chain-of-thought agent +# --------------------------------------------------------------------------- + + +class Thinker(Agent): + """Agent that reasons step-by-step until it reaches a final answer.""" + + @Template.define + def think(self, problem: str) -> ThoughtStep: + """You are solving a problem step by step. + + Problem: {problem} + + Review the conversation history for any prior reasoning steps. + Continue from where you left off. Break the problem into small, + logical steps. Set is_final=true only when you have a complete, + well-supported answer. + """ + raise NotHandled + + def solve(self, problem: str, max_steps: int = 10) -> str: + """Solve a problem by iterative chain-of-thought reasoning.""" + for i in range(max_steps): + step = self.think(problem) + print(f" [step {i + 1}] {step.reasoning}") + if step.is_final: + return step.conclusion + + return step.conclusion + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Chain-of-thought reasoning agent") + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + parser.add_argument( + "--max-steps", + type=int, + default=10, + help="Maximum reasoning steps before stopping", + ) + parser.add_argument( + "--problem", + type=str, + default=( + "A farmer has 17 sheep. All but 9 run away. " + "Then he buys 5 more. How many sheep does he have now?" + ), + help="The problem to solve", + ) + parser.add_argument( + "--num-retries", + type=int, + default=3, + help="Number of retries for malformed LLM output", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + + problems = [ + args.problem, + ( + "If you have a 3-gallon jug and a 5-gallon jug, " + "how do you measure exactly 4 gallons of water?" + ), + ] + + with ( + handler(provider), + handler(RetryLLMHandler(stop=stop_after_attempt(args.num_retries))), + ): + for problem in problems: + thinker = Thinker() + print(f"\nProblem: {problem}") + answer = thinker.solve(problem, max_steps=args.max_steps) + print(f"Answer: {answer}") diff --git a/docs/source/llm_examples/tool_calling.py b/docs/source/llm_examples/tool_calling.py new file mode 100644 index 00000000..f7d9b9f2 --- /dev/null +++ b/docs/source/llm_examples/tool_calling.py @@ -0,0 +1,65 @@ +"""Tool calling: templates invoke Python callables exposed via ``@Tool.define``. + +Demonstrates: +- ``@Tool.define`` for exposing a Python function to the model +- Lexical-scope auto-capture: tools defined alongside a template are made + available to the LLM without explicit registration +- The model chains multiple tool calls to answer a multi-step query +""" + +import argparse +import os + +from effectful.handlers.llm import Template, Tool +from effectful.handlers.llm.completions import LiteLLMProvider +from effectful.ops.semantics import handler +from effectful.ops.types import NotHandled + +# --------------------------------------------------------------------------- +# Tools +# --------------------------------------------------------------------------- + + +@Tool.define +def cities() -> list[str]: + """Return a list of cities that can be passed to `weather`.""" + return ["Chicago", "New York", "Barcelona"] + + +@Tool.define +def weather(city: str) -> str: + """Given a city name, return a description of the weather in that city.""" + status = {"Chicago": "cold", "New York": "wet", "Barcelona": "sunny"} + return status.get(city, "unknown") + + +# --------------------------------------------------------------------------- +# Template (cities and weather are auto-captured from lexical scope) +# --------------------------------------------------------------------------- + + +@Template.define +def vacation() -> str: + """Use the provided tools to suggest a city that has good weather. Use only the `cities` and `weather` tools provided.""" + raise NotHandled + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Tool calling with auto-captured lexical scope" + ) + parser.add_argument( + "--model", + type=str, + default=os.environ.get("EFFECTFUL_LLM_MODEL", ""), + help="LLM model to use", + ) + args = parser.parse_args() + + provider = LiteLLMProvider(model=args.model) + with handler(provider): + print(vacation())