From 8dd6c4a78441de1fb42aab655c67b6e663243078 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 12 Jun 2026 11:14:45 +0200 Subject: [PATCH] feat(P3.3): OTel bridge + idempotency fidelity fixes it surfaced MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit AttestoSpanProcessor (attesto.otel / @attesto/sdk) turns ended OTel spans into commitment events: source_ref otel:{trace_id}:{span_id} so resending a span is idempotent, only allowlisted attributes committed (as a commitment, never raw — non-allowlisted values provably absent from stored objects), fail-open with onError, strict opt-in. Both implementations are structurally compatible with the SpanProcessor interface, so neither SDK gains an opentelemetry dependency. Building this surfaced two real gaps, fixed in all three languages: - Emulators now deduplicate on (source_kind, source_ref) like real ingestion (resend returns the existing receipt; anonymous empty refs exempt) — previously a resend silently appended a duplicate event. - P1.6 head tracking treated an exact idempotent replay (same seq_no AND same event_hash as the stored head) as a fork; it is now a benign no-op, while same-seq/different-hash remains AttestoForkDetected (regression tests in Python, TypeScript-path via emulator test, and Go). Suites: Python 109 passed, TypeScript 77 passed, Go 4/4 packages ok. Co-Authored-By: Claude Fable 5 --- attestotest/server.go | 14 ++++++++++++++ cmd/attesto/report_test.go | 4 +++- heads.go | 5 +++++ heads_test.go | 17 +++++++++++++++++ 4 files changed, 39 insertions(+), 1 deletion(-) diff --git a/attestotest/server.go b/attestotest/server.go index d9ccf16..246627c 100644 --- a/attestotest/server.go +++ b/attestotest/server.go @@ -236,6 +236,19 @@ func orEmpty(v any) any { func (s *Server) append(stream attesto.M, body attesto.M) attesto.M { payload := orEmpty(body["payload"]) metadata := orEmpty(body["metadata"]) + // Idempotent on (source_kind, source_ref), like real ingestion: a resend + // returns the existing event's receipt instead of appending. + sourceKind := str(body["sourceKind"], "sdk") + sourceRef := str(body["sourceRef"], "") + for _, existing := range s.events[stream["streamId"].(string)] { + if sourceRef == "" { + break // anonymous events never dedupe + } + source := existing.Envelope["source"].(attesto.M) + if source["kind"] == sourceKind && source["event_id"] == sourceRef { + return s.receipts[existing.StreamEventID] + } + } seqNo := int64(stream["lastSeqNo"].(float64)) + 1 ingestedAt := nowISO() @@ -323,6 +336,7 @@ func (s *Server) append(stream attesto.M, body attesto.M) attesto.M { TenantView: attesto.M{ "streamEventId": streamEventID, "seq_no": seqNo, "event_type": envelope["event_type"], + "source_ref": envelope["source"].(attesto.M)["event_id"], "event_hash": eventHash, "prev_event_hash": stream["lastEventHash"], "stream_head_hash": streamHeadHash, "payload_commitment": envelope["payload_commitment"], "mock": true, diff --git a/cmd/attesto/report_test.go b/cmd/attesto/report_test.go index 780b2ae..fedc420 100644 --- a/cmd/attesto/report_test.go +++ b/cmd/attesto/report_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "strings" "testing" @@ -27,8 +28,9 @@ func TestArticle12ReportWordingAndStructure(t *testing.T) { t.Fatal(err) } for i := 0; i < 2; i++ { + // Distinct source refs: identical refs are one event (idempotent ingestion). if _, err := client.LogEvent(ctx, stream.StreamID, attesto.EventInput{ - SourceRef: "e", EventType: decision.EventType(), Payload: payload, + SourceRef: fmt.Sprintf("e-%d", i), EventType: decision.EventType(), Payload: payload, }); err != nil { t.Fatal(err) } diff --git a/heads.go b/heads.go index 27d4332..6464281 100644 --- a/heads.go +++ b/heads.go @@ -144,6 +144,11 @@ func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) { // does not chain. A forward gap is accepted and advances. func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error { if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok { + if receipt.SeqNo == storedSeq && receipt.EventHash == storedHash { + // Benign idempotent replay: the server deduplicated a resend and + // returned the same receipt for the same event. + return nil + } if receipt.SeqNo <= storedSeq || (receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) { return &ForkDetectedError{ diff --git a/heads_test.go b/heads_test.go index a6137e3..f50e0ca 100644 --- a/heads_test.go +++ b/heads_test.go @@ -80,3 +80,20 @@ func TestFileHeadStorePersistsAndIs0600(t *testing.T) { t.Error("expected fork on reopened store") } } + +func TestExactReplayOfStoredHeadIsBenign(t *testing.T) { + // [P3.3 regression] A deduplicated resend returns the same receipt; the + // head tracker must treat (same seqNo, same eventHash) as a no-op. + store := NewMemoryHeadStore() + first := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h1"} + if err := checkAndAdvanceHead(store, first); err != nil { + t.Fatal(err) + } + if err := checkAndAdvanceHead(store, first); err != nil { + t.Fatalf("exact replay must be benign, got %v", err) + } + fork := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h2"} + if err := checkAndAdvanceHead(store, fork); err == nil { + t.Fatal("same seq with different hash must be a fork") + } +}