Skip to content

Commit 3e37f62

Browse files
committed
Add engagement detection link implementation
- Introduced a new link for analyzing customer-agent engagement in conversations using OpenAI's GPT-4.1 model. - Implemented functionality to analyze transcripts, store results as analyses and tags, and included retry logic and error handling. - Added comprehensive tests to ensure functionality and reliability. - Updated README.md with configuration options, features, and metrics for the new link.
1 parent 8702a6f commit 3e37f62

3 files changed

Lines changed: 433 additions & 0 deletions

File tree

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Engagement Detection Link
2+
3+
This link analyzes conversations to determine if both the customer and agent are engaged in the dialogue. It uses OpenAI's GPT-4.1 model to analyze transcripts and determine engagement status.
4+
5+
## Features
6+
7+
- Analyzes each dialog in a vCon to detect engagement
8+
- Uses GPT-4.1 for accurate conversation analysis
9+
- Stores results both as analysis and tags
10+
- Includes retry logic and error handling
11+
- Provides metrics for monitoring
12+
13+
## Configuration Options
14+
15+
The link can be configured with the following options:
16+
17+
```python
18+
default_options = {
19+
"prompt": "Did both the customer and the agent speak? Respond with 'true' if yes, 'false' if not. Respond with only 'true' or 'false'.",
20+
"analysis_type": "engagement_analysis",
21+
"model": "gpt-4.1",
22+
"sampling_rate": 1,
23+
"temperature": 0.2,
24+
"source": {
25+
"analysis_type": "transcript",
26+
"text_location": "body.paragraphs.transcript",
27+
}
28+
}
29+
```
30+
31+
### Options Description
32+
33+
- `prompt`: The prompt used to analyze engagement
34+
- `analysis_type`: The type of analysis to store in the vCon
35+
- `model`: The OpenAI model to use (default: gpt-4.1)
36+
- `sampling_rate`: Rate at which to sample vCons for analysis
37+
- `temperature`: Model temperature for response generation
38+
- `source`: Configuration for where to find the transcript data
39+
40+
## Output
41+
42+
The link adds two types of data to the vCon:
43+
44+
1. Analysis: Stores the engagement status as an analysis object
45+
2. Tags: Adds an "engagement" tag with the boolean result
46+
47+
## Metrics
48+
49+
The link provides the following metrics:
50+
51+
- `conserver.link.openai.engagement_detected`: Gauge for engagement status
52+
- `conserver.link.openai.engagement_analysis_time`: Time taken for analysis
53+
- `conserver.link.openai.engagement_analysis_failures`: Count of analysis failures
54+
55+
## Requirements
56+
57+
- OpenAI API key must be set in the environment
58+
- vCon must contain transcript data
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
from lib.vcon_redis import VconRedis
2+
from lib.logging_utils import init_logger
3+
import logging
4+
from openai import OpenAI
5+
from tenacity import (
6+
retry,
7+
stop_after_attempt,
8+
wait_exponential,
9+
before_sleep_log,
10+
)
11+
from lib.metrics import init_metrics, stats_gauge, stats_count
12+
import time
13+
from lib.links.filters import is_included, randomly_execute_with_sampling
14+
import os
15+
16+
init_metrics()
17+
logger = init_logger(__name__)
18+
19+
default_options = {
20+
"prompt": "Did both the customer and the agent speak? Respond with 'true' if yes, 'false' if not. Respond with only 'true' or 'false'.",
21+
"analysis_type": "engagement_analysis",
22+
"model": "gpt-4.1",
23+
"sampling_rate": 1,
24+
"temperature": 0.2,
25+
"source": {
26+
"analysis_type": "transcript",
27+
"text_location": "body.paragraphs.transcript",
28+
},
29+
"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "") # Make it optional with empty default
30+
}
31+
32+
def get_analysis_for_type(vcon, index, analysis_type):
33+
for a in vcon.analysis:
34+
if a["dialog"] == index and a["type"] == analysis_type:
35+
return a
36+
return None
37+
38+
@retry(
39+
wait=wait_exponential(multiplier=2, min=1, max=65),
40+
stop=stop_after_attempt(6),
41+
before_sleep=before_sleep_log(logger, logging.INFO),
42+
)
43+
def check_engagement(transcript, prompt, model, temperature, client) -> bool:
44+
# The new responses API expects a single string or a list of message dicts as 'input'
45+
input_text = f"{prompt}\n\nTranscript: {transcript}"
46+
47+
response = client.responses.create(
48+
model=model,
49+
input=input_text,
50+
temperature=temperature
51+
)
52+
53+
# The new API returns the result in response.output_text
54+
answer = response.output_text.strip().lower()
55+
return answer == "true"
56+
57+
def run(
58+
vcon_uuid,
59+
link_name,
60+
opts=default_options,
61+
):
62+
module_name = __name__.split(".")[-1]
63+
logger.info(f"Starting {module_name}: {link_name} plugin for: {vcon_uuid}")
64+
merged_opts = default_options.copy()
65+
merged_opts.update(opts)
66+
opts = merged_opts
67+
68+
# Check for OPENAI_API_KEY in opts or environment
69+
openai_key = opts.get("OPENAI_API_KEY") or os.getenv("OPENAI_API_KEY")
70+
if not openai_key:
71+
logger.warning("OPENAI_API_KEY not defined, skipping analysis for vCon: %s", vcon_uuid)
72+
return vcon_uuid
73+
opts["OPENAI_API_KEY"] = openai_key
74+
75+
vcon_redis = VconRedis()
76+
vCon = vcon_redis.get_vcon(vcon_uuid)
77+
78+
if not is_included(opts, vCon):
79+
logger.info(f"Skipping {link_name} vCon {vcon_uuid} due to filters")
80+
return vcon_uuid
81+
82+
if not randomly_execute_with_sampling(opts):
83+
logger.info(f"Skipping {link_name} vCon {vcon_uuid} due to sampling")
84+
return vcon_uuid
85+
86+
client = OpenAI(api_key=opts["OPENAI_API_KEY"], timeout=120.0, max_retries=0)
87+
source_type = opts["source"]["analysis_type"]
88+
text_location = opts["source"]["text_location"]
89+
90+
for index, dialog in enumerate(vCon.dialog):
91+
source = get_analysis_for_type(vCon, index, source_type)
92+
if not source:
93+
logger.warning("No %s found for vCon: %s", source_type, vCon.uuid)
94+
continue
95+
96+
source_text = navigate_dict(source, text_location)
97+
if not source_text:
98+
logger.warning("No source_text found at %s for vCon: %s", text_location, vCon.uuid)
99+
continue
100+
101+
analysis = get_analysis_for_type(vCon, index, opts["analysis_type"])
102+
if analysis:
103+
logger.info(
104+
"Dialog %s already has a %s in vCon: %s",
105+
index,
106+
opts["analysis_type"],
107+
vCon.uuid,
108+
)
109+
continue
110+
111+
logger.info(
112+
"Analyzing engagement for dialog %s with options: %s",
113+
index,
114+
{k: v for k, v in opts.items() if k != "OPENAI_API_KEY"},
115+
)
116+
start = time.time()
117+
try:
118+
is_engaged = check_engagement(
119+
transcript=source_text,
120+
prompt=opts["prompt"],
121+
model=opts["model"],
122+
temperature=opts["temperature"],
123+
client=client
124+
)
125+
126+
# Always use string 'true'/'false' for tag and body
127+
is_engaged_str = "true" if is_engaged else "false"
128+
129+
vendor_schema = {
130+
"model": opts["model"],
131+
"prompt": opts["prompt"],
132+
"is_engaged": is_engaged_str
133+
}
134+
135+
vCon.add_analysis(
136+
type=opts["analysis_type"],
137+
dialog=index,
138+
vendor="openai",
139+
body=is_engaged_str,
140+
encoding="none",
141+
extra={
142+
"vendor_schema": vendor_schema,
143+
},
144+
)
145+
146+
vCon.add_tag(tag_name="engagement", tag_value=is_engaged_str)
147+
logger.info(f"Applied engagement tag: {is_engaged_str}")
148+
149+
stats_gauge(
150+
"conserver.link.openai.engagement_detected",
151+
1 if is_engaged else 0,
152+
tags=[f"analysis_type:{opts['analysis_type']}"],
153+
)
154+
155+
except Exception as e:
156+
import traceback
157+
logger.error(
158+
"Failed to generate engagement analysis for vCon %s after multiple retries: %s\nException type: %s\nTraceback:\n%s",
159+
vcon_uuid,
160+
e,
161+
type(e).__name__,
162+
traceback.format_exc()
163+
)
164+
stats_count(
165+
"conserver.link.openai.engagement_analysis_failures",
166+
tags=[f"analysis_type:{opts['analysis_type']}"],
167+
)
168+
raise e
169+
170+
stats_gauge(
171+
"conserver.link.openai.engagement_analysis_time",
172+
time.time() - start,
173+
tags=[f"analysis_type:{opts['analysis_type']}"],
174+
)
175+
176+
vcon_redis.store_vcon(vCon)
177+
logger.info(f"Finished detect_engagement - {module_name}:{link_name} plugin for: {vcon_uuid}")
178+
179+
return vcon_uuid
180+
181+
def navigate_dict(dictionary, path):
182+
keys = path.split(".")
183+
current = dictionary
184+
for key in keys:
185+
if key in current:
186+
current = current[key]
187+
else:
188+
return None
189+
return current

0 commit comments

Comments
 (0)