Claude
Skills
Sign in
Back

hubspot-bulk-migration

Included with Lifetime
$97 forever

Bulk-migrate CRM data into HubSpot from Salesforce, Pipedrive, or Copper — or export off HubSpot — with field mapping, ID continuity, association re-linking, dedup safety, rate-limit budgeting, and rollback mitigation across 100K+ record datasets. Use when migrating any source CRM to HubSpot, recovering from a failed import with duplicate or unlinked records, or exporting out of HubSpot before switching platforms. Trigger with "hubspot bulk migration", "salesforce to hubspot", "pipedrive to hubspot", "copper to hubspot", "migrate off hubspot", "hubspot import dedup", "hubspot association relink", "hubspot field mapping", "hubspot id continuity".

Sales & CRMhubspotmigrationdata-engineeringsalesforceimport

What this skill does


# HubSpot Bulk Migration

## Overview

Move CRM data into HubSpot from Salesforce, Pipedrive, or Copper — or extract it back out — without losing cross-system IDs, breaking associations, or flooding the portal with duplicates. This is not a data-mapping worksheet. It is the code, sequencing, and guardrails your migration runs at 2am against 150K records when the daily API quota is finite, HubSpot has no bulk-delete API, and a bad import leaves permanent junk that requires a support ticket to remove.

The six production failures this skill addresses:

1. **ID continuity loss** — source CRM IDs are not preserved in HubSpot. Fix: create a custom `source_crm_id` property on every object type before the first record lands, and write the source ID into it during import.
2. **Association re-creation failure** — contacts, companies, and deals in the source are associated; batch-importing them independently creates records without associations unless a second pass re-links them after all IDs are known. Fix: import in order (companies → contacts → deals), then re-link in three association passes.
3. **Import dedup missing existing records** — HubSpot's batch upsert deduplicates contacts by email, but only when email matches exactly. Missing or differently-formatted emails create duplicates. Fix: normalize email to lowercase + trimmed before every import; use `batch/upsert` not `batch/create` for contacts.
4. **Field type mismatch** — source date fields in `M/D/Y` format fail HubSpot's ISO 8601 validation; multi-picklist values not in HubSpot's allowed enumeration are silently dropped. Fix: run a pre-migration dry-run that validates every field against HubSpot's property schema before writing a single record.
5. **Rate limit exhaustion** — 100K contacts at 100/batch equals 1,000 API calls; association re-linking doubles that. Combined with retries, a naive migration burns the 500K daily quota before finishing. Fix: budget calls per run, sleep between burst windows, and use the CSV import API for volumes above 10K.
6. **Rollback impossibility** — HubSpot has no bulk-delete API. Fix: maintain a local `source_id → hubspot_id` mapping file throughout the migration so `batch/archive` calls are programmable.

**Auth:** set `HUBSPOT_ACCESS_TOKEN` environment variable to a private app token with CRM write scopes. For token caching, rotation, and multi-portal routing see the `hubspot-auth` skill in this pack.

## Prerequisites

- HubSpot private app token with scopes: `crm.objects.contacts.write`, `crm.objects.companies.write`, `crm.objects.deals.write`, `crm.associations.write`, `crm.schemas.contacts.write`
- Python 3.10+ (`pip install requests`)
- Source CRM data exported to CSV or available via API
- `HUBSPOT_ACCESS_TOKEN` set in environment

## Instructions

### Step 1: Create custom properties for ID continuity

Before importing a single record, create a custom property on every object type to store the source CRM's record ID. Treat `409 CONFLICT` as success — property creation is idempotent.

```python
import os, requests

TOKEN = os.environ["HUBSPOT_ACCESS_TOKEN"]
BASE  = "https://api.hubapi.com"
HDRS  = {"Authorization": f"Bearer {TOKEN}", "Content-Type": "application/json"}

def create_source_id_property(object_type: str, source_crm: str) -> None:
    prop_name = f"{source_crm}_id"
    payload = {
        "name": prop_name,
        "label": f"{source_crm.title()} ID",
        "type": "string",
        "fieldType": "text",
        "groupName": f"{object_type}information",
        "description": f"Original {source_crm.title()} record ID. Do not modify.",
    }
    resp = requests.post(f"{BASE}/crm/v3/properties/{object_type}", headers=HDRS, json=payload)
    if resp.status_code not in (200, 201, 409):
        resp.raise_for_status()

for obj in ["contacts", "companies", "deals"]:
    create_source_id_property(obj, "salesforce")  # or "pipedrive", "copper"
```

### Step 2: Validate field types (dry-run before any writes)

Pull HubSpot's property schema and validate every field in your source CSV before touching the API. Catches date format mismatches and invalid enum values upfront.

```python
import csv, json, re
from datetime import datetime

def get_property_schema(object_type: str) -> dict:
    resp = requests.get(f"{BASE}/crm/v3/properties/{object_type}", headers=HDRS)
    resp.raise_for_status()
    return {p["name"]: p for p in resp.json()["results"]}

def validate_record(record: dict, schema: dict) -> list[str]:
    errors = []
    for field, value in record.items():
        if not value or field not in schema:
            continue
        prop = schema[field]
        if prop.get("type") == "date":
            try:
                datetime.strptime(str(value), "%Y-%m-%d")
            except ValueError:
                errors.append(f"{field}='{value}' must be YYYY-MM-DD")
        if prop.get("type") == "enumeration":
            allowed = {o["value"] for o in prop.get("options", [])}
            if allowed and str(value) not in allowed:
                errors.append(f"{field}='{value}' not in allowed values: {sorted(allowed)}")
    return errors
```

Full dry-run driver and field-by-field transform helpers in [implementation-guide.md](references/implementation-guide.md).

### Step 3: Rate-budgeted batch import — contacts first, companies second, deals third

Import in dependency order. Track the `source_id → hubspot_id` mapping per batch. Save the map to disk after every batch — a mid-run crash leaves a complete map for the records that did land.

```python
import time, json

def batch_import(records, object_type, source_id_field, batch_size=100):
    """Import records and return {source_id: hubspot_id} mapping."""
    endpoint = "upsert" if object_type == "contacts" else "create"
    url = f"{BASE}/crm/v3/objects/{object_type}/batch/{endpoint}"
    id_map, errors = {}, []
    batches = [records[i:i+batch_size] for i in range(0, len(records), batch_size)]

    for n, batch in enumerate(batches, 1):
        if endpoint == "upsert":
            payload = {"inputs": [{"idProperty": "email", "id": r.get("email",""), "properties": r} for r in batch]}
        else:
            payload = {"inputs": [{"properties": r} for r in batch]}

        resp = requests.post(url, headers=HDRS, json=payload)
        daily_left = int(resp.headers.get("X-HubSpot-RateLimit-Daily-Remaining", 500_000))
        if resp.status_code == 429:
            time.sleep(int(resp.headers.get("Retry-After", 10)))
            resp = requests.post(url, headers=HDRS, json=payload)
        if resp.status_code not in (200, 201, 207):
            errors.append({"batch": n, "status": resp.status_code})
            continue
        # Daily quota guard — 500K is the Professional/Enterprise daily limit
        if daily_left < 10_000:
            raise RuntimeError(f"Daily quota critical: {daily_left} remaining. Resume tomorrow.")
        for result_item, orig in zip(resp.json().get("results", []), batch):
            src_id = orig.get(source_id_field, "")
            hs_id  = result_item.get("id", "")
            if src_id and hs_id:
                id_map[src_id] = hs_id
        print(f"  Batch {n}/{len(batches)}: {len(batch)} records, daily_left={daily_left}")
        time.sleep(0.12)   # stay below 90 req/10s burst ceiling

    return id_map, errors

def save_id_map(id_map: dict, path: str) -> None:
    existing = {}
    try:
        with open(path) as f: existing = json.load(f)
    except FileNotFoundError: pass
    existing.update(id_map)
    with open(path, "w") as f: json.dump(existing, f, indent=2)
```

### Step 4: Re-link associations (two-pass — after all objects are imported)

```python
def relink_associations(source_assocs, from_id_map, to_id_map,
                         from_type, to_type, type_id, batch_size=100):
    """Re-create associations using v4 API. Returns {linked, skipped, errors}."""
    url = f"{BASE}/crm/v4/associations/{from_type}/{to_type}/batch/create"
    linked = ski

Related in Sales & CRM