diff --git a/attestotest/server.go b/attestotest/server.go index d9ccf16..246627c 100644 --- a/attestotest/server.go +++ b/attestotest/server.go @@ -236,6 +236,19 @@ func orEmpty(v any) any { func (s *Server) append(stream attesto.M, body attesto.M) attesto.M { payload := orEmpty(body["payload"]) metadata := orEmpty(body["metadata"]) + // Idempotent on (source_kind, source_ref), like real ingestion: a resend + // returns the existing event's receipt instead of appending. + sourceKind := str(body["sourceKind"], "sdk") + sourceRef := str(body["sourceRef"], "") + for _, existing := range s.events[stream["streamId"].(string)] { + if sourceRef == "" { + break // anonymous events never dedupe + } + source := existing.Envelope["source"].(attesto.M) + if source["kind"] == sourceKind && source["event_id"] == sourceRef { + return s.receipts[existing.StreamEventID] + } + } seqNo := int64(stream["lastSeqNo"].(float64)) + 1 ingestedAt := nowISO() @@ -323,6 +336,7 @@ func (s *Server) append(stream attesto.M, body attesto.M) attesto.M { TenantView: attesto.M{ "streamEventId": streamEventID, "seq_no": seqNo, "event_type": envelope["event_type"], + "source_ref": envelope["source"].(attesto.M)["event_id"], "event_hash": eventHash, "prev_event_hash": stream["lastEventHash"], "stream_head_hash": streamHeadHash, "payload_commitment": envelope["payload_commitment"], "mock": true, diff --git a/cmd/attesto/report_test.go b/cmd/attesto/report_test.go index 780b2ae..fedc420 100644 --- a/cmd/attesto/report_test.go +++ b/cmd/attesto/report_test.go @@ -2,6 +2,7 @@ package main import ( "context" + "fmt" "strings" "testing" @@ -27,8 +28,9 @@ func TestArticle12ReportWordingAndStructure(t *testing.T) { t.Fatal(err) } for i := 0; i < 2; i++ { + // Distinct source refs: identical refs are one event (idempotent ingestion). if _, err := client.LogEvent(ctx, stream.StreamID, attesto.EventInput{ - SourceRef: "e", EventType: decision.EventType(), Payload: payload, + SourceRef: fmt.Sprintf("e-%d", i), EventType: decision.EventType(), Payload: payload, }); err != nil { t.Fatal(err) } diff --git a/heads.go b/heads.go index 27d4332..6464281 100644 --- a/heads.go +++ b/heads.go @@ -144,6 +144,11 @@ func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) { // does not chain. A forward gap is accepted and advances. func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error { if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok { + if receipt.SeqNo == storedSeq && receipt.EventHash == storedHash { + // Benign idempotent replay: the server deduplicated a resend and + // returned the same receipt for the same event. + return nil + } if receipt.SeqNo <= storedSeq || (receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) { return &ForkDetectedError{ diff --git a/heads_test.go b/heads_test.go index a6137e3..f50e0ca 100644 --- a/heads_test.go +++ b/heads_test.go @@ -80,3 +80,20 @@ func TestFileHeadStorePersistsAndIs0600(t *testing.T) { t.Error("expected fork on reopened store") } } + +func TestExactReplayOfStoredHeadIsBenign(t *testing.T) { + // [P3.3 regression] A deduplicated resend returns the same receipt; the + // head tracker must treat (same seqNo, same eventHash) as a no-op. + store := NewMemoryHeadStore() + first := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h1"} + if err := checkAndAdvanceHead(store, first); err != nil { + t.Fatal(err) + } + if err := checkAndAdvanceHead(store, first); err != nil { + t.Fatalf("exact replay must be benign, got %v", err) + } + fork := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h2"} + if err := checkAndAdvanceHead(store, fork); err == nil { + t.Fatal("same seq with different hash must be a fork") + } +}