-
Notifications
You must be signed in to change notification settings - Fork 731
Expand file tree
/
Copy pathbuildSourceQuery.ts
More file actions
101 lines (91 loc) · 2.67 KB
/
buildSourceQuery.ts
File metadata and controls
101 lines (91 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import { IS_PROD_ENV } from '@crowd/common'
const CDP_MATCHED_SEGMENTS = `
cdp_matched_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
WHERE s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)`
const ORG_ACCOUNTS = `
org_accounts AS (
SELECT account_id, account_name, website, domain_aliases, LOGO_URL, INDUSTRY, N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce.accounts
WHERE website IS NOT NULL
UNION ALL
SELECT account_id, account_name, website, domain_aliases, NULL AS LOGO_URL, NULL AS INDUSTRY, NULL AS N_EMPLOYEES
FROM analytics.bronze_fivetran_salesforce_b2b.accounts
WHERE website IS NOT NULL
)`
export const buildSourceQuery = (sinceTimestamp?: string): string => {
let select = `
SELECT
t.PRIMARY_KEY,
t.MEETING_ID,
t.MEETING_NAME,
t.PROJECT_ID,
t.PROJECT_NAME,
t.PROJECT_SLUG,
t.ACCOUNT_ID,
t.ACCOUNT_NAME,
t.MEETING_DATE,
t.MEETING_TIME,
t.INVITEE_FULL_NAME,
t.INVITEE_LF_SSO,
t.INVITEE_LF_USER_ID,
t.INVITEE_EMAIL,
t.INVITEE_ATTENDED,
t.WAS_INVITED,
t.RAW_COMMITTEE_TYPE,
org.website AS ORG_WEBSITE,
org.domain_aliases AS ORG_DOMAIN_ALIASES,
org.logo_url AS LOGO_URL,
org.industry AS ORGANIZATION_INDUSTRY,
CAST(org.n_employees AS VARCHAR) AS ORGANIZATION_SIZE
FROM ANALYTICS.SILVER_FACT.MEETING_ATTENDANCE t
INNER JOIN cdp_matched_segments cms
ON cms.slug = t.PROJECT_SLUG
AND cms.sourceId = t.PROJECT_ID
LEFT JOIN org_accounts org
ON t.ACCOUNT_ID = org.account_id
WHERE (t.WAS_INVITED = TRUE OR t.INVITEE_ATTENDED = TRUE)`
if (!IS_PROD_ENV) {
select += ` AND t.PROJECT_SLUG = 'cncf'`
}
const dedup = `
QUALIFY ROW_NUMBER() OVER (PARTITION BY t.PRIMARY_KEY ORDER BY org.website DESC) = 1`
if (!sinceTimestamp) {
return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS}
${select}
${dedup}`.trim()
}
return `
WITH ${ORG_ACCOUNTS},
${CDP_MATCHED_SEGMENTS},
new_cdp_segments AS (
SELECT DISTINCT
s.SOURCE_ID AS sourceId,
s.slug
FROM ANALYTICS.BRONZE_KAFKA_CROWD_DEV.SEGMENTS s
WHERE s.CREATED_TS >= '${sinceTimestamp}'
AND s.PARENT_SLUG IS NOT NULL
AND s.GRANDPARENTS_SLUG IS NOT NULL
AND s.SOURCE_ID IS NOT NULL
)
-- Updated records in existing segments
${select}
AND t.MEETING_DATE >= '${sinceTimestamp}'::DATE
${dedup}
UNION
-- All records in newly created segments
${select}
AND EXISTS (
SELECT 1 FROM new_cdp_segments ncs
WHERE ncs.slug = cms.slug AND ncs.sourceId = cms.sourceId
)
${dedup}`.trim()
}