Skip to main content

Build a custom attribution pipeline

Goal

Get every Sumeru-attributed order into your data warehouse (Snowflake / BigQuery / Redshift) so you can:

  • Build custom ROAS reports blending Sumeru attribution with your ad-spend data
  • Run cohort analyses across years of history
  • Apply company-specific touchpoint weights
  • Power executive dashboards (Looker, Tableau)

Two approaches

ApproachWhen
Real-time webhookNeed data within minutes; low-volume
Daily batch exportDaily refresh acceptable; high-volume; warehouse-friendly

Most setups use both — webhook for live operational dashboards; batch for analytical workload.

Real-time approach

Subscribe to event

curl -X POST \
-H "Authorization: Bearer copt_live_..." \
-H "Content-Type: application/json" \
-d '{
"url": "https://yourapp.com/attribution-webhook",
"events": ["order.attributed"],
"secret": "auto"
}' \
"https://api.sumeru.systems/api/v1/webhook-subscriptions"

Stream to warehouse

import { BigQuery } from '@google-cloud/bigquery';
const bq = new BigQuery();

app.post('/attribution-webhook', verifySignature, async (req, res) => {
const event = JSON.parse(req.body.toString());
const { order_id, attribution } = event.data;

// One row per (order × touchpoint × model) for analytical flexibility
const rows = [];
for (const model of attribution.models) {
for (const touchpoint of model.touchpoints) {
rows.push({
order_id,
attributed_at: new Date(),
model: model.name, // last_touch, linear, time_decay, etc.
touchpoint_channel: touchpoint.channel,
touchpoint_timestamp: touchpoint.timestamp,
credit_pct: touchpoint.credit_pct,
credited_revenue: touchpoint.credited_revenue,
});
}
}

await bq.dataset('sumeru').table('attribution').insert(rows);
res.status(200).end();
});

Stream to other warehouses

WarehouseLibrary
Snowflakesnowflake-sdk
RedshiftAurora COPY via S3 staging
Postgrespg
ClickHouse@clickhouse/client

Pattern is the same: receive event, transform to rows, insert.

Batch approach

Daily export

import { SumeruES } from '@sumeru/sdk';
import { BigQuery } from '@google-cloud/bigquery';

const sumeru = new SumeruES({ apiKey: process.env.ATLANTIS_API_KEY });
const bq = new BigQuery();

async function dailyAttributionExport(date) {
let cursor = null;
const tableInsert = bq.dataset('sumeru').table('attribution_daily');

do {
const { data, pagination } = await sumeru.attribution.list({
filter: { date_from: date, date_to: date },
include: ['touchpoints'],
limit: 100,
cursor,
});

const rows = data.flatMap(att =>
att.touchpoints.map(tp => ({
order_id: att.order_id,
attributed_at: att.attributed_at,
model: att.model,
touchpoint_channel: tp.channel,
touchpoint_timestamp: tp.timestamp,
credit_pct: tp.credit_pct,
credited_revenue: tp.credited_revenue,
}))
);

await tableInsert.insert(rows);

cursor = pagination.has_next ? pagination.next_cursor : null;
} while (cursor);
}

// Run from cron or Airflow
dailyAttributionExport('2026-05-09');

Schema for the warehouse table

CREATE TABLE sumeru.attribution (
order_id STRING,
attributed_at TIMESTAMP,
model STRING,
touchpoint_channel STRING,
touchpoint_timestamp TIMESTAMP,
credit_pct FLOAT64,
credited_revenue FLOAT64,
loaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
)
PARTITION BY DATE(attributed_at)
CLUSTER BY model, touchpoint_channel;

Partition + cluster for query performance on large volumes.

Blending with ad-spend

If you also load ad-spend (from Meta / Google / TikTok APIs):

-- Real ROAS per channel per day
WITH attribution AS (
SELECT
DATE(attributed_at) as day,
touchpoint_channel as channel,
SUM(credited_revenue) as revenue
FROM sumeru.attribution
WHERE model = 'time_decay'
GROUP BY 1, 2
),
ad_spend AS (
SELECT
DATE(spend_at) as day,
channel,
SUM(spend) as spend
FROM your_warehouse.ad_spend_daily
GROUP BY 1, 2
)
SELECT
a.day,
a.channel,
a.revenue,
s.spend,
a.revenue / NULLIF(s.spend, 0) as roas
FROM attribution a
LEFT JOIN ad_spend s USING (day, channel)
ORDER BY a.day DESC, a.revenue DESC;

Custom touchpoint weights

If you want company-specific weights (vs. Sumeru's multi-touch models):

-- Example: weight Meta 1.5×, TikTok 1.2×, others 1.0×
WITH custom_weights AS (
SELECT 'meta' as channel, 1.5 as weight
UNION ALL SELECT 'tiktok', 1.2
)
SELECT
a.order_id,
a.touchpoint_channel,
a.credit_pct,
COALESCE(w.weight, 1.0) as custom_weight,
a.credit_pct * COALESCE(w.weight, 1.0) as custom_credit_pct
FROM sumeru.attribution a
LEFT JOIN custom_weights w
ON a.touchpoint_channel = w.channel
WHERE a.model = 'linear';

Renormalize per order to sum to 100%.

Backfill

For historical data:

import { format, subDays } from 'date-fns';

async function backfillAttribution(daysBack = 365) {
for (let i = 0; i < daysBack; i++) {
const date = format(subDays(new Date(), i), 'yyyy-MM-dd');
console.log(`Backfilling ${date}...`);
await dailyAttributionExport(date);
}
}

backfillAttribution(365);

Run once; respects rate limits via SDK auto-retry.

Operational considerations

Idempotency

Both real-time + batch should upsert (MERGE in BigQuery, INSERT ... ON CONFLICT in Postgres) — same order_id may appear if attribution is updated post-hoc.

Schema evolution

Sumeru may add fields to attribution payload. Always parse defensively:

const customCustomMetric = touchpoint.custom_metric ?? null;

Don't JSON.stringify strict-mode against a frozen schema.

Latency expectations

SourceLatency
Real-time webhook< 5 minutes from order creation
Daily batchFollowing calendar day

For dashboards needing real-time + history: use webhook for "today" + batch for historical.

See also