Files
attesto-go/heads.go
Codex 8dd6c4a784 feat(P3.3): OTel bridge + idempotency fidelity fixes it surfaced
AttestoSpanProcessor (attesto.otel / @attesto/sdk) turns ended OTel spans
into commitment events: source_ref otel:{trace_id}:{span_id} so resending
a span is idempotent, only allowlisted attributes committed (as a
commitment, never raw — non-allowlisted values provably absent from
stored objects), fail-open with onError, strict opt-in. Both
implementations are structurally compatible with the SpanProcessor
interface, so neither SDK gains an opentelemetry dependency.

Building this surfaced two real gaps, fixed in all three languages:
- Emulators now deduplicate on (source_kind, source_ref) like real
  ingestion (resend returns the existing receipt; anonymous empty refs
  exempt) — previously a resend silently appended a duplicate event.
- P1.6 head tracking treated an exact idempotent replay (same seq_no AND
  same event_hash as the stored head) as a fork; it is now a benign no-op,
  while same-seq/different-hash remains AttestoForkDetected (regression
  tests in Python, TypeScript-path via emulator test, and Go).

Suites: Python 109 passed, TypeScript 77 passed, Go 4/4 packages ok.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-12 11:14:45 +02:00

166 lines
4.5 KiB
Go

package attesto
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
)
// HeadStore persists the last accepted (seqNo, eventHash) head per stream so the
// client can detect a rewound or divergent server history.
type HeadStore interface {
Get(streamID string) (seqNo int64, eventHash string, ok bool)
Set(streamID string, seqNo int64, eventHash string)
}
// ForkDetectedError reports that a receipt did not extend the last accepted head
// for its stream. The store is NOT advanced when this is returned.
type ForkDetectedError struct {
StreamID string
ExpectedSeq int64
ExpectedHash string
GotSeq int64
GotPrevHash string
}
func (e *ForkDetectedError) Error() string {
return fmt.Sprintf(
"fork detected on stream %s: receipt (seq=%d prev=%s) does not extend "+
"last accepted head (seq=%d hash=%s)",
e.StreamID, e.GotSeq, e.GotPrevHash, e.ExpectedSeq, e.ExpectedHash)
}
// MemoryHeadStore keeps heads in process memory; safe for concurrent use.
type MemoryHeadStore struct {
mu sync.Mutex
heads map[string][2]any
}
func NewMemoryHeadStore() *MemoryHeadStore {
return &MemoryHeadStore{heads: map[string][2]any{}}
}
func (s *MemoryHeadStore) Get(streamID string) (int64, string, bool) {
s.mu.Lock()
defer s.mu.Unlock()
entry, ok := s.heads[streamID]
if !ok {
return 0, "", false
}
return entry[0].(int64), entry[1].(string), true
}
func (s *MemoryHeadStore) Set(streamID string, seqNo int64, eventHash string) {
s.mu.Lock()
defer s.mu.Unlock()
s.heads[streamID] = [2]any{seqNo, eventHash}
}
// FileHeadStore persists heads to a JSON file (default ~/.attesto/heads.json),
// giving fork detection across separate process invocations. Writes are atomic.
type FileHeadStore struct {
path string
mu sync.Mutex
}
// NewFileHeadStore uses the given path, or ~/.attesto/heads.json when empty.
func NewFileHeadStore(path string) *FileHeadStore {
if path == "" {
if home, err := os.UserHomeDir(); err == nil {
path = filepath.Join(home, ".attesto", "heads.json")
} else {
path = ".attesto-heads.json"
}
}
return &FileHeadStore{path: path}
}
func (s *FileHeadStore) load() map[string][2]json.RawMessage {
raw, err := os.ReadFile(s.path)
if err != nil {
return map[string][2]json.RawMessage{}
}
out := map[string][2]json.RawMessage{}
if err := json.Unmarshal(raw, &out); err != nil {
return map[string][2]json.RawMessage{}
}
return out
}
func (s *FileHeadStore) Get(streamID string) (int64, string, bool) {
s.mu.Lock()
defer s.mu.Unlock()
entry, ok := s.load()[streamID]
if !ok {
return 0, "", false
}
var seqNo int64
var eventHash string
if json.Unmarshal(entry[0], &seqNo) != nil || json.Unmarshal(entry[1], &eventHash) != nil {
return 0, "", false
}
return seqNo, eventHash, true
}
func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) {
s.mu.Lock()
defer s.mu.Unlock()
heads := map[string][2]any{}
for key, entry := range s.load() {
var n int64
var h string
_ = json.Unmarshal(entry[0], &n)
_ = json.Unmarshal(entry[1], &h)
heads[key] = [2]any{n, h}
}
heads[streamID] = [2]any{seqNo, eventHash}
body, err := json.Marshal(heads)
if err != nil {
return
}
if err := os.MkdirAll(filepath.Dir(s.path), 0o700); err != nil {
return
}
tmp, err := os.CreateTemp(filepath.Dir(s.path), ".heads-")
if err != nil {
return
}
tmpName := tmp.Name()
_, writeErr := tmp.Write(body)
closeErr := tmp.Close()
if writeErr != nil || closeErr != nil {
_ = os.Remove(tmpName)
return
}
_ = os.Chmod(tmpName, 0o600)
_ = os.Rename(tmpName, s.path)
}
// checkAndAdvanceHead verifies a receipt extends the stored head, then advances
// the store. Returns *ForkDetectedError (without advancing) when the receipt
// rewinds or collides on seqNo, or claims to be the immediate next event but
// does not chain. A forward gap is accepted and advances.
func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error {
if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok {
if receipt.SeqNo == storedSeq && receipt.EventHash == storedHash {
// Benign idempotent replay: the server deduplicated a resend and
// returned the same receipt for the same event.
return nil
}
if receipt.SeqNo <= storedSeq ||
(receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) {
return &ForkDetectedError{
StreamID: receipt.StreamID,
ExpectedSeq: storedSeq,
ExpectedHash: storedHash,
GotSeq: receipt.SeqNo,
GotPrevHash: receipt.PrevEventHash,
}
}
}
store.Set(receipt.StreamID, receipt.SeqNo, receipt.EventHash)
return nil
}