CRB L4 Spec — Module T: Transaction Pipeline (Category A)

Scope: This file covers only Category A L3s in Module T — the evidentiary layer, canonical event publication, Merkle anchoring, and substrate contracts. Category B L3s (T.1 ingestion, T.3 parsing, T.6 backfill replay) are sourced from NCR/Counterpoint API documentation; see CRB-L4-FILL-PLAN.md Track B.


T.2 — Evidence Pipeline (Category A: T.2.3, T.2.4, T.2.5, T.2.6)

T.2.3 — Hash-Before-Parse

Source: tsp/consumers/sub1_seal.pyprocess_message()
Patent significance: The ordering constraint (hash verification BEFORE JSON parse) is the basis for patent application #63/991,596. This ordering must not be changed in the Go implementation.

PRD reference: TSP-03 v1.2 §228-282, Processing sequence Steps 2-3

L4 ID Activity Implementation Detail
T.2.3.1 Receive stream message XREADGROUP sub1-seal → message with 9 fields: event_id, merchant_id, source, source_event_id, event_type, event_hash, raw_payload, received_at, parse_failed
T.2.3.2 Validate required fields Check all of: event_id, merchant_id, source, event_hash, raw_payload, received_at are non-empty; publish to dead_letter on missing field
T.2.3.3 Recompute event hash SHA-256(raw_payload.encode('utf-8')) → compare hex digest to event_hash field
T.2.3.4 Hash mismatch handling On mismatch: log CRITICAL; publish to canary:dead_letter stream; do NOT XACK; return False
T.2.3.5 Parse only after hash match JSON-decode raw_payload after hash verified — NOT before. This is the invariant.

T.2.4 — Seal Record Write

Source: tsp/consumers/sub1_seal.py Steps 4-8; evidence_chain.py
Invariant: EvidenceRecord is INSERT-only. No UPDATE, no DELETE path anywhere in the codebase.
Invariant: chain_hash computed by PostgreSQL trigger compute_entry_hash() BEFORE INSERT (P0-3, B-001). Application code sets event_hash only; trigger computes chain_hash from previous record.

L4 ID Activity Implementation Detail
T.2.4.1 Acquire advisory lock pg_try_advisory_lock(merchant_id_hash) per merchant_id before chain hash computation
T.2.4.2 Read previous chain hash SELECT chain_hash FROM evidence_records WHERE merchant_id=? ORDER BY sealed_at DESC LIMIT 1
T.2.4.3 Compute chain hash (application layer — pre-trigger era) compute_chain_hash(event_hash: bytes, previous_chain_hash: Optional[bytes]) → bytes Genesis: SHA-256(event_hash). Subsequent: SHA-256(bytes(prev) + bytes(event_hash)) — raw BYTEA concatenation, NO delimiter
T.2.4.4 INSERT evidence record Model: EvidenceRecord. Fields: event_id, merchant_id, source, source_event_id, event_type, event_hash (BYTEA), raw_payload, received_at, sealed_at=now(). chain_hash field populated by trigger.
T.2.4.5 XACK message Only after successful INSERT. On exception: do NOT XACK (message redelivered by Valkey).
T.2.4.6 Release advisory lock In finally block — always release even on error

compute_chain_hash() contract (evidence_chain.py):

Genesis record (previous_chain_hash is None):
  chain_hash = SHA-256(event_hash)

Subsequent record:
  chain_hash = SHA-256(bytes(previous_chain_hash) + bytes(event_hash))
  # Raw BYTEA concatenation. No pipe, no delimiter, no encoding.

T.2.5 — Fanout Routing

Source: tsp/stream_publisher.py, sub1/sub2/sub3/sub4 consumer groups
Architecture: One Valkey stream (canary:events, DB 4). Three independent consumer groups read in parallel. Each group maintains its own offset.

L4 ID Activity Implementation Detail
T.2.5.1 Initialize consumer groups init_consumer_groups(): XGROUP CREATE canary:events sub1-seal $ MKSTREAM; XGROUP CREATE canary:events sub2-parse $ MKSTREAM; XGROUP CREATE canary:events sub3-merkle $ MKSTREAM. MUST run before TSP-01 starts publishing.
T.2.5.2 sub1-seal: hash → seal Consumes all events; verifies hash; writes EvidenceRecord; XACK
T.2.5.3 sub2-parse: parse → CRDM Consumes all events; routes by event_type via webhook_dispatch registry; writes CRDM models to canary_sales; publishes to detection stream for LP-critical events
T.2.5.4 sub3-merkle: accumulate → tree Consumes all events; accumulates event_hashes in Valkey sorted set; builds Merkle tree on threshold; does NOT XACK individual events until batch commits
T.2.5.5 sub4-detect: detect Consumes LP-critical events from detection stream; routes to Chirp rule engine

Stream publisher contract (9-field message schema, TSP-01): - event_id — UUID, platform-generated - merchant_id — tenant identifier - source — "square", "counterpoint", etc. - source_event_id — upstream event ID for dedup - event_type — canonical event type (SALE, RETURN, VOID, etc.) - event_hash — SHA-256 hex of raw_payload - raw_payload — original webhook body, unmodified - received_at — ISO-8601 timestamp - parse_failed — "true" / "false"

Stream client config: - DB 4 (isolated from cache DBs 0-3 — prevents volatile-lru eviction of stream data) - socket_timeout=30s (must exceed XREADGROUP block_ms of 5s) - socket_connect_timeout=5s

T.2.6 — Mutation-Lock

Source: tsp/consumers/sub1_seal.py; PostgreSQL model constraints

L4 ID Activity Implementation Detail
T.2.6.1 INSERT-only evidence EvidenceRecord has no UPDATE path. No db.query(EvidenceRecord).filter().update() anywhere in codebase.
T.2.6.2 PostgreSQL trigger owns chain_hash compute_entry_hash() trigger runs BEFORE INSERT on evidence_records. Application MUST NOT set chain_hash or previous_chain_hash — trigger reads previous record and computes. (P0-3, B-001)
T.2.6.3 Advisory lock per merchant pg_try_advisory_lock() before any chain operation — prevents concurrent hash chain race condition
T.2.6.4 Dead-letter on integrity failure Failed XACK leaves message in pending entries; Valkey XAUTOCLAIM redelivers for retry

T.4 — Canonical Event Publication (T.4.1–T.4.9)

Source: tsp/stream_publisher.py, tsp/enrichers/square.py

The canonical event types the platform publishes. Each maps to a specific CRDM write in sub2-parse. Counterpoint enricher will parallel the Square enricher — same event type taxonomy, different field mapping.

L4 ID Event Type CRDM Target Notes
T.4.1.1 SALE SaleTransaction Primary revenue event
T.4.2.1 RETURN ReturnTransaction Negative revenue; feeds C-007, C-001
T.4.3.1 VOID / POST_VOID VoidTransaction Feeds C-501, C-502
T.4.4.1 PAYMENT PaymentRecord Tender detail; feeds F.4 payment flow
T.4.5.1 TAX TaxRecord Multi-authority tax line
T.4.6.1 AUDIT AuditLogEntry Operator action log
T.4.7.1 XFER routing TransferDocument Routed to D.3 transfer lifecycle via DOC_TYP=XFER
T.4.8.1 Customer upsert CustomerRecord Merge on source_customer_id; feeds R module
T.4.9.1 Publication via stream_publisher XADD canary:events All canonical events published via get_stream_client().xadd() with 9-field schema

Sub2 routing pattern: - LP-critical event types: full parse → CRDM write → publish to detection sub-stream - Log-only event types: XACK without CRDM write (accepted but not analyzed) - Unknown event types: publish to dead_letter; XACK


T.5 — Merkle Anchoring (T.5.1–T.5.5)

Source: tsp/merkle.py, tsp/consumers/sub3_merkle.py
Patent: Application #63/991,596, FIG. 1 Nodes 5/6

T.5.1–T.5.2 — Batch Accumulation and Threshold

L4 ID Activity Implementation Detail
T.5.1.1 Accumulate event hashes Sub3 reads event_hash from stream message; ZADD to Valkey sorted set canary:batch:current with ULID score
T.5.1.2 Track message IDs without XACK Accumulate pending Valkey message IDs; DO NOT XACK individual events during accumulation
T.5.2.1 Count threshold check count >= BATCH_COUNT_THRESHOLD (default 100)
T.5.2.2 Time threshold check time since first event in batch >= BATCH_TIME_THRESHOLD_SECONDS (default 600s / 10 min)
T.5.2.3 Trigger on either threshold Whichever fires first — count or time

T.5.3 — Merkle Tree Construction

Source: tsp/merkle.pybuild_tree(event_hashes: list[bytes]) → MerkleResult

L4 ID Step Implementation Detail
T.5.3.1 Sort hashes Sort event_hashes by hex representation — sorted(hashes, key=lambda h: h.hex()). Deterministic ordering is required for proof verification.
T.5.3.2 Double-hash leaves Each leaf = SHA-256(event_hash) — second preimage resistance. This is different from internal nodes.
T.5.3.3 Pad to power of 2 If len(leaves) not a power of 2: duplicate last leaf until len = next power of 2
T.5.3.4 Build tree bottom-up Level 0 = leaves. Each parent = SHA-256(left_child + right_child). Continue until 1 node remains = root.
T.5.3.5 Generate proof paths For each leaf, generate MerkleProof: list of (sibling_hash, direction) tuples from leaf to root
T.5.3.6 Return MerkleResult NamedTuple: root_hash, tree_levels, proof_paths, leaf_count

T.5.4 — Inscription Pool Write

L4 ID Activity Implementation Detail
T.5.4.1 Transaction boundary BEGIN TRANSACTION before any inscription writes
T.5.4.2 INSERT inscription_pool Tree root hash, batch metadata, merchant_id, batch_size
T.5.4.3 INSERT event_inscriptions One row per event in batch: event_id → inscription_pool_id (foreign key)
T.5.4.4 COMMIT and XACK Only after successful COMMIT: XACK all batch message IDs as a batch
T.5.4.5 Clear accumulator DELETE Valkey sorted set canary:batch:current after XACK
T.5.4.6 Inscription dispatch Sprint 6: MOCK_INSCRIPTION=true → mock OrdinalsBot response. Sprint 7+: live OrdinalsBot API.

T.5.5 — Audit Proof Verification

Source: tsp/merkle.pyverify_proof(event_hash: bytes, proof: MerkleProof, expected_root: bytes) → bool

L4 ID Activity Implementation Detail
T.5.5.1 Start from leaf hash current = SHA-256(event_hash) (same double-hash as build_tree)
T.5.5.2 Walk proof path For each (sibling, direction) in proof: if direction=left: current = SHA-256(sibling + current); if direction=right: current = SHA-256(current + sibling)
T.5.5.3 Compare to expected root Return current == expected_root
T.5.5.4 MCP tool surface verify_proof exposed as investigator MCP tool — takes event_id, returns

T.7 — Substrate Contracts (T.7.1–T.7.12)

Source: tsp/tools.py
Purpose: MCP tool API surface that downstream modules (Q, R, D, F, W) call to interact with the TSP substrate.

These are the Go cmd/tsp MCP server tools. The full tool manifest must be read from tsp/tools.py directly. Key contracts:

Contract What It Guarantees to Downstream
Evidence record exists for every processed event Sub1 XACK only after INSERT
Hash chain is unbroken Advisory lock + PostgreSQL trigger
Merkle root exists for every batch Sub3 INSERT before XACK
Canonical event types are stable event_type vocabulary defined in stream publisher
Audit proof available for any event verify_proof() reachable via MCP tool

Key Invariants Summary for Go Implementation

# Invariant Source
1 Hash verification BEFORE JSON parse (patent-critical) sub1_seal.py process_message() Step 2→3
2 BATCH_SIZE=1 for sub1 (chain integrity) sub1_seal.py constant
3 chain_hash computed by PostgreSQL trigger, NOT app code P0-3, B-001
4 Advisory lock per merchant_id before chain operation sub1_seal.py Steps 4-5
5 EvidenceRecord is INSERT-only No UPDATE path in codebase
6 Stream client on DB 4 (separate from cache DBs) stream_publisher.py
7 Three consumer groups must be created BEFORE publisher starts stream_publisher.py init_consumer_groups()
8 Sub3 does NOT XACK individual events during accumulation sub3_merkle.py — holds pending IDs
9 Merkle leaves are double-hashed (second-preimage resistance) merkle.py build_tree()
10 Pad to power of 2 by duplicating last leaf merkle.py build_tree()

Pass: GRO-670 / GRO-675 | T Category A L4 spec complete | Patent invariants annotated | Category B T L3s (ingestion, parsing, backfill) pending Counterpoint API doc pass