feat(P3.3): OTel bridge + idempotency fidelity fixes it surfaced
AttestoSpanProcessor (attesto.otel / @attesto/sdk) turns ended OTel spans
into commitment events: source_ref otel:{trace_id}:{span_id} so resending
a span is idempotent, only allowlisted attributes committed (as a
commitment, never raw — non-allowlisted values provably absent from
stored objects), fail-open with onError, strict opt-in. Both
implementations are structurally compatible with the SpanProcessor
interface, so neither SDK gains an opentelemetry dependency.
Building this surfaced two real gaps, fixed in all three languages:
- Emulators now deduplicate on (source_kind, source_ref) like real
ingestion (resend returns the existing receipt; anonymous empty refs
exempt) — previously a resend silently appended a duplicate event.
- P1.6 head tracking treated an exact idempotent replay (same seq_no AND
same event_hash as the stored head) as a fork; it is now a benign no-op,
while same-seq/different-hash remains AttestoForkDetected (regression
tests in Python, TypeScript-path via emulator test, and Go).
Suites: Python 109 passed, TypeScript 77 passed, Go 4/4 packages ok.
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
@@ -236,6 +236,19 @@ func orEmpty(v any) any {
|
||||
func (s *Server) append(stream attesto.M, body attesto.M) attesto.M {
|
||||
payload := orEmpty(body["payload"])
|
||||
metadata := orEmpty(body["metadata"])
|
||||
// Idempotent on (source_kind, source_ref), like real ingestion: a resend
|
||||
// returns the existing event's receipt instead of appending.
|
||||
sourceKind := str(body["sourceKind"], "sdk")
|
||||
sourceRef := str(body["sourceRef"], "")
|
||||
for _, existing := range s.events[stream["streamId"].(string)] {
|
||||
if sourceRef == "" {
|
||||
break // anonymous events never dedupe
|
||||
}
|
||||
source := existing.Envelope["source"].(attesto.M)
|
||||
if source["kind"] == sourceKind && source["event_id"] == sourceRef {
|
||||
return s.receipts[existing.StreamEventID]
|
||||
}
|
||||
}
|
||||
seqNo := int64(stream["lastSeqNo"].(float64)) + 1
|
||||
ingestedAt := nowISO()
|
||||
|
||||
@@ -323,6 +336,7 @@ func (s *Server) append(stream attesto.M, body attesto.M) attesto.M {
|
||||
TenantView: attesto.M{
|
||||
"streamEventId": streamEventID, "seq_no": seqNo,
|
||||
"event_type": envelope["event_type"],
|
||||
"source_ref": envelope["source"].(attesto.M)["event_id"],
|
||||
"event_hash": eventHash, "prev_event_hash": stream["lastEventHash"],
|
||||
"stream_head_hash": streamHeadHash,
|
||||
"payload_commitment": envelope["payload_commitment"], "mock": true,
|
||||
|
||||
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@@ -27,8 +28,9 @@ func TestArticle12ReportWordingAndStructure(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
// Distinct source refs: identical refs are one event (idempotent ingestion).
|
||||
if _, err := client.LogEvent(ctx, stream.StreamID, attesto.EventInput{
|
||||
SourceRef: "e", EventType: decision.EventType(), Payload: payload,
|
||||
SourceRef: fmt.Sprintf("e-%d", i), EventType: decision.EventType(), Payload: payload,
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
5
heads.go
5
heads.go
@@ -144,6 +144,11 @@ func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) {
|
||||
// does not chain. A forward gap is accepted and advances.
|
||||
func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error {
|
||||
if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok {
|
||||
if receipt.SeqNo == storedSeq && receipt.EventHash == storedHash {
|
||||
// Benign idempotent replay: the server deduplicated a resend and
|
||||
// returned the same receipt for the same event.
|
||||
return nil
|
||||
}
|
||||
if receipt.SeqNo <= storedSeq ||
|
||||
(receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) {
|
||||
return &ForkDetectedError{
|
||||
|
||||
@@ -80,3 +80,20 @@ func TestFileHeadStorePersistsAndIs0600(t *testing.T) {
|
||||
t.Error("expected fork on reopened store")
|
||||
}
|
||||
}
|
||||
|
||||
func TestExactReplayOfStoredHeadIsBenign(t *testing.T) {
|
||||
// [P3.3 regression] A deduplicated resend returns the same receipt; the
|
||||
// head tracker must treat (same seqNo, same eventHash) as a no-op.
|
||||
store := NewMemoryHeadStore()
|
||||
first := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h1"}
|
||||
if err := checkAndAdvanceHead(store, first); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := checkAndAdvanceHead(store, first); err != nil {
|
||||
t.Fatalf("exact replay must be benign, got %v", err)
|
||||
}
|
||||
fork := EventReceipt{StreamID: "str_x", SeqNo: 1, EventHash: "h2"}
|
||||
if err := checkAndAdvanceHead(store, fork); err == nil {
|
||||
t.Fatal("same seq with different hash must be a fork")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user