This guide shows three ways to connect your own application to PageCrawl and provides working code for each in Python, Node.js, and PHP. These are the same patterns the official Home Assistant integration uses, distilled into minimal examples you can adapt.
Pick the pattern that fits your needs:
- Polling is the simplest. You read the API on a timer. Best for dashboards and reports that do not need instant updates.
- Webhooks (push) deliver changes to your server the moment they happen. Best for real-time automation and alerting.
- Hybrid combines a webhook for instant updates with a slow reconcile poll that catches anything missed. This is the most robust option and what the Home Assistant integration runs.
Authentication
All API requests use a bearer token in the Authorization header:
Authorization: Bearer YOUR_TOKENYou can use an API token (Settings > API) or an OAuth access token. Free accounts can use the API. Treat the token like a password and keep it server-side.
Rate Limits
- Free accounts: 60 requests per minute.
- Paid accounts: 300 requests per minute.
When you exceed the limit the API responds with HTTP 429. Honor the Retry-After response header (seconds to wait) before retrying. Choose a poll interval that stays well under your limit, especially if you paginate across many monitors.
Polling
Poll GET /api/pages?simple=1 on an interval. Each page object includes a latest snapshot and a checks array. Read latest.contents for the primary tracked element, and read per-element values from checks[0].elements, keyed by element_id so each value maps to a stable tracked element in your own system. Use pagination if your workspace returns multiple pages of results.
Python
import time
import requests
BASE = "https://pagecrawl.io"
TOKEN = "YOUR_TOKEN"
SESSION = requests.Session()
SESSION.headers["Authorization"] = f"Bearer {TOKEN}"
def fetch_pages():
"""Fetch all monitors, following pagination and honoring 429."""
pages, url = [], f"{BASE}/api/pages?simple=1"
while url:
resp = SESSION.get(url, timeout=30)
if resp.status_code == 429:
wait = int(resp.headers.get("Retry-After", "5"))
time.sleep(wait)
continue
resp.raise_for_status()
body = resp.json()
pages.extend(body.get("data", []))
url = body.get("links", {}).get("next")
return pages
def poll_once():
for page in fetch_pages():
latest = page.get("latest") or {}
print(page["id"], page.get("title"), "->", latest.get("contents"))
checks = page.get("checks") or []
elements = checks[0].get("elements", []) if checks else []
for el in elements:
# element_id is stable across every check; use it as your key.
print(" ", el.get("element_id"), el.get("label"), el.get("contents"))
if __name__ == "__main__":
while True:
poll_once()
time.sleep(300) # stay well under the rate limitNode.js
const BASE = "https://pagecrawl.io";
const TOKEN = "YOUR_TOKEN";
const HEADERS = { Authorization: `Bearer ${TOKEN}` };
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
async function fetchPages() {
const pages = [];
let url = `${BASE}/api/pages?simple=1`;
while (url) {
const resp = await fetch(url, { headers: HEADERS });
if (resp.status === 429) {
const wait = parseInt(resp.headers.get("Retry-After") || "5", 10);
await sleep(wait * 1000);
continue;
}
if (!resp.ok) throw new Error(`HTTP ${resp.status}`);
const body = await resp.json();
pages.push(...(body.data || []));
url = body.links?.next || null;
}
return pages;
}
async function pollOnce() {
for (const page of await fetchPages()) {
const latest = page.latest || {};
console.log(page.id, page.title, "->", latest.contents);
const elements = page.checks?.[0]?.elements || [];
for (const el of elements) {
// element_id is stable across every check; use it as your key.
console.log(" ", el.element_id, el.label, el.contents);
}
}
}
async function main() {
while (true) {
await pollOnce();
await sleep(300_000); // stay well under the rate limit
}
}
main();Webhooks (Push)
Create a hook so PageCrawl POSTs to your server the instant a change is detected, then verify every delivery.
1. Create the hook
POST /api/hooks
Authorization: Bearer YOUR_TOKEN
Content-Type: application/json
{
"target_url": "https://your-server.example.com/pagecrawl",
"match_type": "all",
"event_type": "change_detected"
}The response includes a signing_secret. Store it securely. You will use it to verify deliveries. (You can also create hooks in the UI under Settings > API > Webhooks.)
2. Verify each delivery
Every webhook includes two headers:
X-PageCrawl-Signature: sha256=<hmac>X-PageCrawl-Timestamp: <unix>
The HMAC is HMAC_SHA256(signing_secret, "{timestamp}.{body}") where {body} is the exact raw request body. Compute the same value, compare it in constant time, and reject deliveries whose timestamp is too old (to prevent replay). Always verify against the raw bytes, not a re-serialized object.
Python
import hashlib
import hmac
import time
MAX_AGE = 300 # seconds
def verify_signature(secret: str, timestamp: str, raw_body: bytes, header: str) -> bool:
if not secret or not timestamp or not header:
return False
try:
ts = int(timestamp)
except (TypeError, ValueError):
return False
if abs(time.time() - ts) > MAX_AGE:
return False # stale, possible replay
expected = hmac.new(
secret.encode("utf-8"),
f"{timestamp}.".encode("utf-8") + raw_body,
hashlib.sha256,
).hexdigest()
provided = header[len("sha256="):] if header.startswith("sha256=") else header
return hmac.compare_digest(expected, provided)A minimal Flask receiver:
from flask import Flask, request, abort
app = Flask(__name__)
SIGNING_SECRET = "YOUR_SIGNING_SECRET"
@app.post("/pagecrawl")
def receive():
sig = request.headers.get("X-PageCrawl-Signature")
ts = request.headers.get("X-PageCrawl-Timestamp")
if not verify_signature(SIGNING_SECRET, ts, request.get_data(), sig):
abort(401)
payload = request.get_json()
print("change on", payload.get("id"), payload.get("short_summary"))
return "", 204Node.js
const crypto = require("crypto");
const express = require("express");
const SIGNING_SECRET = "YOUR_SIGNING_SECRET";
const MAX_AGE = 300; // seconds
function verifySignature(secret, timestamp, rawBody, header) {
if (!secret || !timestamp || !header) return false;
const ts = parseInt(timestamp, 10);
if (Number.isNaN(ts)) return false;
if (Math.abs(Date.now() / 1000 - ts) > MAX_AGE) return false; // stale
const expected = crypto
.createHmac("sha256", secret)
.update(`${timestamp}.${rawBody}`)
.digest("hex");
const provided = header.startsWith("sha256=") ? header.slice(7) : header;
const a = Buffer.from(expected);
const b = Buffer.from(provided);
return a.length === b.length && crypto.timingSafeEqual(a, b);
}
const app = express();
// Capture the raw body exactly as received so the HMAC matches.
app.use(express.raw({ type: "*/*" }));
app.post("/pagecrawl", (req, res) => {
const sig = req.get("X-PageCrawl-Signature");
const ts = req.get("X-PageCrawl-Timestamp");
const raw = req.body.toString("utf8");
if (!verifySignature(SIGNING_SECRET, ts, raw, sig)) {
return res.sendStatus(401);
}
const payload = JSON.parse(raw);
console.log("change on", payload.id, payload.short_summary);
res.sendStatus(204);
});
app.listen(8080);PHP
<?php
function verify_signature(string $secret, ?string $timestamp, string $rawBody, ?string $header): bool
{
$maxAge = 300; // seconds
if ($secret === '' || $timestamp === null || $header === null) {
return false;
}
if (! ctype_digit($timestamp)) {
return false;
}
if (abs(time() - (int) $timestamp) > $maxAge) {
return false; // stale, possible replay
}
$expected = hash_hmac('sha256', "{$timestamp}.{$rawBody}", $secret);
$provided = str_starts_with($header, 'sha256=') ? substr($header, 7) : $header;
return hash_equals($expected, $provided);
}
$signingSecret = 'YOUR_SIGNING_SECRET';
$rawBody = file_get_contents('php://input');
$sig = $_SERVER['HTTP_X_PAGECRAWL_SIGNATURE'] ?? null;
$ts = $_SERVER['HTTP_X_PAGECRAWL_TIMESTAMP'] ?? null;
if (! verify_signature($signingSecret, $ts, $rawBody, $sig)) {
http_response_code(401);
exit;
}
$payload = json_decode($rawBody, true);
error_log('change on '.$payload['id'].' '.($payload['short_summary'] ?? ''));
http_response_code(204);Hybrid (Push Plus Reconcile)
The most robust integration uses a webhook for instant updates and a slow background poll that reconciles state. The webhook keeps you current in real time. The reconcile poll catches anything a webhook might miss (for example if your server was briefly offline) and refreshes monitors that did not change. This is the model the Home Assistant integration runs: push updates the in-memory snapshot, and a slow loop re-fetches the full list on a long interval.
Python (sketch)
import threading
import time
state = {} # element_id -> latest value, shared between push and poll
lock = threading.Lock()
def on_webhook(payload):
"""Called from your verified webhook receiver. Instant update."""
with lock:
for el in payload.get("page_elements", []):
state[el["element_id"]] = el.get("contents")
def reconcile_loop():
"""Slow safety net. Re-reads everything on a long interval."""
while True:
for page in fetch_pages(): # from the polling example above
checks = page.get("checks") or []
for el in (checks[0].get("elements", []) if checks else []):
with lock:
state[el["element_id"]] = el.get("contents")
time.sleep(3600) # reconcile hourly; the webhook handles real time
threading.Thread(target=reconcile_loop, daemon=True).start()Node.js (sketch)
const state = new Map(); // element_id -> latest value
function onWebhook(payload) {
// Called from your verified webhook receiver. Instant update.
for (const el of payload.page_elements || []) {
state.set(el.element_id, el.contents);
}
}
async function reconcileLoop() {
// Slow safety net. Re-reads everything on a long interval.
while (true) {
for (const page of await fetchPages()) {
// fetchPages from the polling example
for (const el of page.checks?.[0]?.elements || []) {
state.set(el.element_id, el.contents);
}
}
await new Promise((r) => setTimeout(r, 3_600_000)); // reconcile hourly
}
}
reconcileLoop();Keep the reconcile interval long (hourly or slower) so the webhook does the real-time work and the poll stays comfortably within your rate limit.
Related Articles
- Webhook Integration - Full webhook payload reference, including the
page_elementsarray andelement_id - Full API Reference - Interactive OpenAPI reference for every endpoint
