Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions ddl/functions/handle_tastemaker.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
-- handle_tastemaker
--
-- Emits a `tastemaker` notification when the tastemaker challenge
-- processor (challenge_id 't') mints a user_challenges row. Each row
-- corresponds to one user who reposted or saved a track that later went
-- trending. The notification tells the tastemaker user that they were
-- early to a now-trending track.
--
-- Sibling of handle_user_challenges.sql which already emits the generic
-- `challenge_reward` notification for all challenge completions. This
-- trigger is the type-specific layer that matches apps' tastemaker
-- notification (src/tasks/index_tastemaker.py).
--
-- Specifier shape from jobs/challenges/tastemaker.go is
-- "<hex_user_id>:t:<hex_track_id>" — we parse the trailing hex track_id,
-- look up its owner from `tracks`, and infer the action (repost takes
-- precedence over save, matching apps' dedupe_notifications_by_group_id).
create or replace function handle_tastemaker() returns trigger as $$
declare
track_hex text;
track_id_int bigint;
owner_id_int int;
action_str text;
begin
-- WHEN clause on the trigger gates challenge_id='t', but defend in
-- depth here too in case the trigger is invoked another way.
if new.challenge_id <> 't' then
return null;
end if;

-- Parse trailing hex segment "<user_hex>:t:<track_hex>" → track_id.
track_hex := split_part(new.specifier, ':', 3);
if track_hex !~ '^[0-9a-f]+$' then
return null;
end if;
track_id_int := ('x' || lpad(track_hex, 16, '0'))::bit(64)::bigint;
if track_id_int <= 0 then
return null;
end if;

select t.owner_id
into owner_id_int
from tracks t
where t.track_id = track_id_int
and t.is_current = true
limit 1;
if owner_id_int is null then
return null;
end if;

-- Repost takes precedence over save when a user is in both lists for
-- the same track — matches apps' dedupe_notifications_by_group_id
-- where repost_notifications win over save_notifications.
if exists (
select 1
from reposts
where user_id = new.user_id
and repost_item_id = track_id_int
and repost_type = 'track'
and is_current = true
and is_delete = false
) then
action_str := 'repost';
else
action_str := 'save';
end if;

insert into notification
(blocknumber, user_ids, timestamp, type, specifier, group_id, data)
values
(
new.completed_blocknumber,
ARRAY[new.user_id],
new.completed_at,
'tastemaker',
track_id_int::text,
'tastemaker_user_id:' || new.user_id || ':tastemaker_item_id:' || track_id_int,
jsonb_build_object(
'tastemaker_item_id', track_id_int,
'tastemaker_item_type', 'track',
'tastemaker_item_owner_id', owner_id_int,
'action', action_str,
'tastemaker_user_id', new.user_id
)
)
on conflict do nothing;

return null;

exception
when others then
raise warning 'An error occurred in %: %', tg_name, sqlerrm;
return null;
end;
$$ language plpgsql;


do $$ begin
-- Fire only on INSERT (not UPDATE) so the notification is minted
-- exactly once per (user_id, track_id) pair — UpsertUserChallenge
-- hits its ON CONFLICT DO UPDATE branch on re-runs, which does not
-- fire AFTER INSERT triggers.
create trigger on_tastemaker_user_challenge
after insert on user_challenges
for each row when (new.challenge_id = 't')
execute procedure handle_tastemaker();
exception
when others then null;
end $$;
119 changes: 119 additions & 0 deletions ddl/functions/handle_trending.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
-- handle_trending
--
-- Emits a `trending` or `trending_underground` notification when the
-- trending challenge processor mints a user_challenges row for
-- challenge_id 'tt' / 'tut'. These are the "your track is trending"
-- notifications shown to the track owner.
--
-- (Trending playlists — challenge_id 'tp' — were a product feature that
-- has since been removed, so they're intentionally not handled here.
-- handle_user_challenges.sql still excludes 'tp' from the
-- claimable_reward path on line 14 for historical rows.)
--
-- Sibling of handle_user_challenges.sql which already emits the generic
-- `challenge_reward` notification for all challenge completions. This
-- trigger is the type-specific layer that matches apps'
-- index_trending.py notifications.
--
-- Specifier shape from jobs/challenges/trending.go is "<week>:<rank>"
-- (e.g. "2026-05-22:3"). Entity id is recovered from `trending_results`,
-- which the same processor wrote earlier in this transaction.
create or replace function handle_trending() returns trigger as $$
declare
rank_int int;
week_date date;
entity_id_str text;
entity_id_int bigint;
notif_type text;
trend_type text;
ts_epoch bigint;
data_jsonb jsonb;
begin
if new.challenge_id not in ('tt', 'tut') then
return null;
end if;

case new.challenge_id
when 'tt' then notif_type := 'trending'; trend_type := 'TRACKS';
when 'tut' then notif_type := 'trending_underground'; trend_type := 'UNDERGROUND_TRACKS';
end case;

-- Specifier: "<YYYY-MM-DD>:<rank>"
begin
week_date := split_part(new.specifier, ':', 1)::date;
rank_int := split_part(new.specifier, ':', 2)::int;
exception when others then
return null;
end;

-- Recover entity id from the trending_results row the processor wrote
-- earlier in this transaction. PK is (rank, type, version, week); we
-- pin to NEW.user_id so we ignore any unrelated version rows.
select id
into entity_id_str
from trending_results
where rank = rank_int
and type = trend_type
and week = week_date
and user_id = new.user_id
limit 1;
if entity_id_str is null then
return null;
end if;
begin
entity_id_int := entity_id_str::bigint;
exception when others then
return null;
end;

-- timestamp suffix matches apps: epoch seconds of the recompute. We
-- use completed_at which is set by UpsertUserChallenge to now() on
-- the first insert — close enough to the recompute moment.
ts_epoch := extract(epoch from new.completed_at)::bigint;

data_jsonb := jsonb_build_object(
'time_range', 'week',
'genre', 'all',
'rank', rank_int,
'track_id', entity_id_int
);

insert into notification
(blocknumber, user_ids, timestamp, type, specifier, group_id, data)
values
(
new.completed_blocknumber,
ARRAY[new.user_id],
new.completed_at,
notif_type,
entity_id_int::text,
notif_type
|| ':time_range:week:genre:all:rank:' || rank_int
|| ':track_id:' || entity_id_int
|| ':timestamp:' || ts_epoch,
data_jsonb
)
on conflict do nothing;

return null;

exception
when others then
raise warning 'An error occurred in %: %', tg_name, sqlerrm;
return null;
end;
$$ language plpgsql;


do $$ begin
-- Fire only on INSERT (not UPDATE) so the notification is minted
-- exactly once per (challenge_id, week, rank) — re-runs hit
-- UpsertUserChallenge's ON CONFLICT DO UPDATE branch and do not
-- fire AFTER INSERT triggers.
create trigger on_trending_user_challenge
after insert on user_challenges
for each row when (new.challenge_id in ('tt', 'tut'))
execute procedure handle_trending();
exception
when others then null;
end $$;
35 changes: 35 additions & 0 deletions ddl/migrations/0204_seed_phase_2_challenges.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
-- Seed catalog rows for Phase 2 challenge processors.
--
-- Values mirror apps/packages/discovery-provider/src/challenges/challenges.json
-- with ON CONFLICT DO UPDATE so the catalog stays aligned (matches apps'
-- create_new_challenges.py behavior).
--
-- Phase 2 set (all aggregate type — they accumulate per-occurrence):
-- c first_weekly_comment (each ISO week)
-- t tastemaker (10 catalog amount × per-track win)
-- cp comment_pin (pinned by verified artist on their track)
-- cs cosign (parent owner cosigns a remix; CURRENTLY INACTIVE in apps)
-- w remix_contest_winner (winner of remix contest hosted by verified user)
-- b audio_matching_buyer (USDC content purchase, file under buyer)
-- s audio_matching_seller (same purchase, file under verified seller)

BEGIN;

INSERT INTO challenges (id, type, amount, active, step_count, starting_block, weekly_pool, cooldown_days) VALUES
('c', 'aggregate', '1', true, 2147483647, 0, 2147483647, 7),
('t', 'aggregate', '100', true, 2147483647, 0, 2147483647, 7),
('cp', 'aggregate', '10', true, 2147483647, 1979515, 2147483647, 7),
('cs', 'aggregate', '1000', false, 2147483647, 95017582, 50000, 7),
('w', 'aggregate', '1000', true, 2147483647, 98950182, 50000, 7),
('b', 'aggregate', '1', true, 2147483647, 220157041, 25000, 7),
('s', 'aggregate', '5', true, 2147483647, 220157041, 25000, 7)
ON CONFLICT (id) DO UPDATE SET
type = EXCLUDED.type,
amount = EXCLUDED.amount,
active = EXCLUDED.active,
step_count = EXCLUDED.step_count,
starting_block = EXCLUDED.starting_block,
weekly_pool = EXCLUDED.weekly_pool,
cooldown_days = EXCLUDED.cooldown_days;

COMMIT;
131 changes: 131 additions & 0 deletions jobs/challenges/audio_matching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package challenges

import (
"context"
"fmt"

"github.com/jackc/pgx/v5"
)

// AudioMatchingProcessor implements challenges "b" and "s" — both fire on
// USDC content purchases (tracks/playlists/albums), with different
// recipients and gates.
//
// Source: v_usdc_purchases — a stable view over sol_purchases that exposes
// (buyer_user_id, seller_user_id, content_id, amount, slot, created_at).
//
// Specifier (matches apps for both b and s):
//
// <hex_buyer_user_id>:<hex_content_id>
//
// Amount = challenge.amount × dollars (catalog: b=1, s=5; dollars = amount/1e6).
// "b" recipient = buyer; "s" recipient = seller, and only if seller is verified.
type AudioMatchingProcessor struct {
ID string
ForBuyer bool // true => buyer earns; false => seller earns
VerifyOnly bool // only s gates on seller is_verified
}

func NewAudioMatchingBuyerProcessor() Processor {
return &AudioMatchingProcessor{ID: "b", ForBuyer: true, VerifyOnly: false}
}
func NewAudioMatchingSellerProcessor() Processor {
return &AudioMatchingProcessor{ID: "s", ForBuyer: false, VerifyOnly: true}
}

func (p *AudioMatchingProcessor) ChallengeID() string { return p.ID }

const usdcDecimals = 6

func (p *AudioMatchingProcessor) checkpointName() string {
return "challenges:" + p.ID + ":last_slot"
}

func (p *AudioMatchingProcessor) Reconcile(ctx context.Context, tx pgx.Tx) error {
c, ok, err := LoadChallenge(ctx, tx, p.ChallengeID())
if err != nil {
return fmt.Errorf("load challenge: %w", err)
}
if !ok || !c.Active {
return nil
}
catalogAmount := c.AmountInt()

prev, err := readCheckpointInt(ctx, tx, p.checkpointName())
if err != nil {
return fmt.Errorf("read checkpoint: %w", err)
}

// VerifyOnly adds a JOIN to filter seller verification at query time so
// we don't allocate rows we'll throw away.
verifyJoin := ""
if p.VerifyOnly {
verifyJoin = `
JOIN users seller ON seller.user_id = v.seller_user_id
AND seller.is_current = true
AND seller.is_verified = true
`
}

query := `
SELECT v.slot, v.buyer_user_id, v.seller_user_id, v.content_id, v.amount
FROM v_usdc_purchases v
` + verifyJoin + `
WHERE v.slot > $1
AND v.seller_user_id IS NOT NULL
ORDER BY v.slot ASC
`
rows, err := tx.Query(ctx, query, prev)
if err != nil {
return fmt.Errorf("scan purchases: %w", err)
}
type purchaseRow struct {
slot int64
buyerID int64
sellerID int64
contentID int64
amountMicro int64
}
var results []purchaseRow
maxSlot := prev
for rows.Next() {
var r purchaseRow
if err := rows.Scan(&r.slot, &r.buyerID, &r.sellerID, &r.contentID, &r.amountMicro); err != nil {
rows.Close()
return err
}
results = append(results, r)
if r.slot > maxSlot {
maxSlot = r.slot
}
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}

for _, r := range results {
dollars := int32(r.amountMicro / 1_000_000)
if dollars <= 0 {
continue // shouldn't happen for valid purchases; defensive
}
recipient := r.buyerID
if !p.ForBuyer {
recipient = r.sellerID
}
rewardAmount := catalogAmount * dollars
specifier := fmt.Sprintf("%x:%x", r.buyerID, r.contentID)
if err := UpsertUserChallenge(ctx, tx,
p.ChallengeID(), specifier, recipient, 1, 1, rewardAmount,
); err != nil {
return fmt.Errorf("upsert: %w", err)
}
}

if maxSlot > prev {
if err := writeCheckpointInt(ctx, tx, p.checkpointName(), maxSlot); err != nil {
return fmt.Errorf("save checkpoint: %w", err)
}
}
return nil
}
Loading
Loading