Build a custom attribution pipeline
Goal
Get every Sumeru-attributed order into your data warehouse (Snowflake / BigQuery / Redshift) so you can:
- Build custom ROAS reports blending Sumeru attribution with your ad-spend data
- Run cohort analyses across years of history
- Apply company-specific touchpoint weights
- Power executive dashboards (Looker, Tableau)
Two approaches
| Approach | When |
|---|---|
| Real-time webhook | Need data within minutes; low-volume |
| Daily batch export | Daily refresh acceptable; high-volume; warehouse-friendly |
Most setups use both — webhook for live operational dashboards; batch for analytical workload.
Real-time approach
Subscribe to event
curl -X POST \
-H "Authorization: Bearer copt_live_..." \
-H "Content-Type: application/json" \
-d '{
"url": "https://yourapp.com/attribution-webhook",
"events": ["order.attributed"],
"secret": "auto"
}' \
"https://api.sumeru.systems/api/v1/webhook-subscriptions"
Stream to warehouse
import { BigQuery } from '@google-cloud/bigquery';
const bq = new BigQuery();
app.post('/attribution-webhook', verifySignature, async (req, res) => {
const event = JSON.parse(req.body.toString());
const { order_id, attribution } = event.data;
// One row per (order × touchpoint × model) for analytical flexibility
const rows = [];
for (const model of attribution.models) {
for (const touchpoint of model.touchpoints) {
rows.push({
order_id,
attributed_at: new Date(),
model: model.name, // last_touch, linear, time_decay, etc.
touchpoint_channel: touchpoint.channel,
touchpoint_timestamp: touchpoint.timestamp,
credit_pct: touchpoint.credit_pct,
credited_revenue: touchpoint.credited_revenue,
});
}
}
await bq.dataset('sumeru').table('attribution').insert(rows);
res.status(200).end();
});
Stream to other warehouses
| Warehouse | Library |
|---|---|
| Snowflake | snowflake-sdk |
| Redshift | Aurora COPY via S3 staging |
| Postgres | pg |
| ClickHouse | @clickhouse/client |
Pattern is the same: receive event, transform to rows, insert.
Batch approach
Daily export
import { SumeruES } from '@sumeru/sdk';
import { BigQuery } from '@google-cloud/bigquery';
const sumeru = new SumeruES({ apiKey: process.env.ATLANTIS_API_KEY });
const bq = new BigQuery();
async function dailyAttributionExport(date) {
let cursor = null;
const tableInsert = bq.dataset('sumeru').table('attribution_daily');
do {
const { data, pagination } = await sumeru.attribution.list({
filter: { date_from: date, date_to: date },
include: ['touchpoints'],
limit: 100,
cursor,
});
const rows = data.flatMap(att =>
att.touchpoints.map(tp => ({
order_id: att.order_id,
attributed_at: att.attributed_at,
model: att.model,
touchpoint_channel: tp.channel,
touchpoint_timestamp: tp.timestamp,
credit_pct: tp.credit_pct,
credited_revenue: tp.credited_revenue,
}))
);
await tableInsert.insert(rows);
cursor = pagination.has_next ? pagination.next_cursor : null;
} while (cursor);
}
// Run from cron or Airflow
dailyAttributionExport('2026-05-09');
Schema for the warehouse table
CREATE TABLE sumeru.attribution (
order_id STRING,
attributed_at TIMESTAMP,
model STRING,
touchpoint_channel STRING,
touchpoint_timestamp TIMESTAMP,
credit_pct FLOAT64,
credited_revenue FLOAT64,
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
)
PARTITION BY DATE(attributed_at)
CLUSTER BY model, touchpoint_channel;
Partition + cluster for query performance on large volumes.
Blending with ad-spend
If you also load ad-spend (from Meta / Google / TikTok APIs):
-- Real ROAS per channel per day
WITH attribution AS (
SELECT
DATE(attributed_at) as day,
touchpoint_channel as channel,
SUM(credited_revenue) as revenue
FROM sumeru.attribution
WHERE model = 'time_decay'
GROUP BY 1, 2
),
ad_spend AS (
SELECT
DATE(spend_at) as day,
channel,
SUM(spend) as spend
FROM your_warehouse.ad_spend_daily
GROUP BY 1, 2
)
SELECT
a.day,
a.channel,
a.revenue,
s.spend,
a.revenue / NULLIF(s.spend, 0) as roas
FROM attribution a
LEFT JOIN ad_spend s USING (day, channel)
ORDER BY a.day DESC, a.revenue DESC;
Custom touchpoint weights
If you want company-specific weights (vs. Sumeru's multi-touch models):
-- Example: weight Meta 1.5×, TikTok 1.2×, others 1.0×
WITH custom_weights AS (
SELECT 'meta' as channel, 1.5 as weight
UNION ALL SELECT 'tiktok', 1.2
)
SELECT
a.order_id,
a.touchpoint_channel,
a.credit_pct,
COALESCE(w.weight, 1.0) as custom_weight,
a.credit_pct * COALESCE(w.weight, 1.0) as custom_credit_pct
FROM sumeru.attribution a
LEFT JOIN custom_weights w
ON a.touchpoint_channel = w.channel
WHERE a.model = 'linear';
Renormalize per order to sum to 100%.
Backfill
For historical data:
import { format, subDays } from 'date-fns';
async function backfillAttribution(daysBack = 365) {
for (let i = 0; i < daysBack; i++) {
const date = format(subDays(new Date(), i), 'yyyy-MM-dd');
console.log(`Backfilling ${date}...`);
await dailyAttributionExport(date);
}
}
backfillAttribution(365);
Run once; respects rate limits via SDK auto-retry.
Operational considerations
Idempotency
Both real-time + batch should upsert (MERGE in BigQuery,
INSERT ... ON CONFLICT in Postgres) — same order_id may
appear if attribution is updated post-hoc.
Schema evolution
Sumeru may add fields to attribution payload. Always parse defensively:
const customCustomMetric = touchpoint.custom_metric ?? null;
Don't JSON.stringify strict-mode against a frozen schema.
Latency expectations
| Source | Latency |
|---|---|
| Real-time webhook | < 5 minutes from order creation |
| Daily batch | Following calendar day |
For dashboards needing real-time + history: use webhook for "today" + batch for historical.
See also
- Cookbook overview
- Sync orders to ERP — similar pattern
- v1 admin API —
/api/v1/attribution - Sales engine attribution