AttestoSpanProcessor (attesto.otel / @attesto/sdk) turns ended OTel spans
into commitment events: source_ref otel:{trace_id}:{span_id} so resending
a span is idempotent, only allowlisted attributes committed (as a
commitment, never raw — non-allowlisted values provably absent from
stored objects), fail-open with onError, strict opt-in. Both
implementations are structurally compatible with the SpanProcessor
interface, so neither SDK gains an opentelemetry dependency.
Building this surfaced two real gaps, fixed in all three languages:
- Emulators now deduplicate on (source_kind, source_ref) like real
ingestion (resend returns the existing receipt; anonymous empty refs
exempt) — previously a resend silently appended a duplicate event.
- P1.6 head tracking treated an exact idempotent replay (same seq_no AND
same event_hash as the stored head) as a fork; it is now a benign no-op,
while same-seq/different-hash remains AttestoForkDetected (regression
tests in Python, TypeScript-path via emulator test, and Go).
Suites: Python 109 passed, TypeScript 77 passed, Go 4/4 packages ok.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
100 lines
3.1 KiB
Go
100 lines
3.1 KiB
Go
package attesto
|
|
|
|
import (
|
|
"errors"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
)
|
|
|
|
func receipt(seqNo int64, eventHash, prevEventHash string) EventReceipt {
|
|
return EventReceipt{
|
|
StreamID: "str_demo",
|
|
SeqNo: seqNo,
|
|
EventHash: eventHash,
|
|
PrevEventHash: prevEventHash,
|
|
}
|
|
}
|
|
|
|
func TestMemoryHeadStoreInOrderAdvances(t *testing.T) {
|
|
store := NewMemoryHeadStore()
|
|
for _, r := range []EventReceipt{receipt(1, "h1", ""), receipt(2, "h2", "h1"), receipt(3, "h3", "h2")} {
|
|
if err := checkAndAdvanceHead(store, r); err != nil {
|
|
t.Fatalf("unexpected: %v", err)
|
|
}
|
|
}
|
|
if seq, hash, ok := store.Get("str_demo"); !ok || seq != 3 || hash != "h3" {
|
|
t.Errorf("head = (%d,%s,%v)", seq, hash, ok)
|
|
}
|
|
}
|
|
|
|
func TestForgedRewoundReceiptIsFork(t *testing.T) {
|
|
store := NewMemoryHeadStore()
|
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
|
_ = checkAndAdvanceHead(store, receipt(2, "h2", "h1"))
|
|
err := checkAndAdvanceHead(store, receipt(2, "h2-fork", "h1"))
|
|
var fork *ForkDetectedError
|
|
if !errors.As(err, &fork) {
|
|
t.Fatalf("expected *ForkDetectedError, got %v", err)
|
|
}
|
|
if seq, hash, _ := store.Get("str_demo"); seq != 2 || hash != "h2" {
|
|
t.Errorf("store advanced past fork: (%d,%s)", seq, hash)
|
|
}
|
|
}
|
|
|
|
func TestDivergentNextEventIsFork(t *testing.T) {
|
|
store := NewMemoryHeadStore()
|
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
|
err := checkAndAdvanceHead(store, receipt(2, "h2", "WRONG"))
|
|
var fork *ForkDetectedError
|
|
if !errors.As(err, &fork) {
|
|
t.Fatalf("expected *ForkDetectedError, got %v", err)
|
|
}
|
|
}
|
|
|
|
func TestForwardGapAccepted(t *testing.T) {
|
|
store := NewMemoryHeadStore()
|
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
|
if err := checkAndAdvanceHead(store, receipt(5, "h5", "h4")); err != nil {
|
|
t.Fatalf("forward gap should be accepted: %v", err)
|
|
}
|
|
}
|
|
|
|
func TestFileHeadStorePersistsAndIs0600(t *testing.T) {
|
|
path := filepath.Join(t.TempDir(), "heads.json")
|
|
store := NewFileHeadStore(path)
|
|
_ = checkAndAdvanceHead(store, receipt(1, "h1", ""))
|
|
_ = checkAndAdvanceHead(store, receipt(2, "h2", "h1"))
|
|
info, err := os.Stat(path)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if info.Mode().Perm() != 0o600 {
|
|
t.Errorf("mode = %o, want 600", info.Mode().Perm())
|
|
}
|
|
reopened := NewFileHeadStore(path)
|
|
if seq, hash, ok := reopened.Get("str_demo"); !ok || seq != 2 || hash != "h2" {
|
|
t.Errorf("reopened head = (%d,%s,%v)", seq, hash, ok)
|
|
}
|
|
if err := checkAndAdvanceHead(reopened, receipt(2, "h2-fork", "h1")); err == nil {
|
|
t.Error("expected fork on reopened store")
|
|
}
|
|
}
|
|
|
|
func TestExactReplayOfStoredHeadIsBenign(t *testing.T) {
|
|
// [P3.3 regression] A deduplicated resend returns the same receipt; the
|
|
// head tracker must treat (same seqNo, same eventHash) as a no-op.
|
|
store := NewMemoryHeadStore()
|
|
first := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h1"}
|
|
if err := checkAndAdvanceHead(store, first); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := checkAndAdvanceHead(store, first); err != nil {
|
|
t.Fatalf("exact replay must be benign, got %v", err)
|
|
}
|
|
fork := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h2"}
|
|
if err := checkAndAdvanceHead(store, fork); err == nil {
|
|
t.Fatal("same seq with different hash must be a fork")
|
|
}
|
|
}
|