From 1e4a11e4869f15b3838a5f0b38ddd243434baa74 Mon Sep 17 00:00:00 2001 From: Codex Date: Thu, 11 Jun 2026 15:08:35 +0200 Subject: [PATCH] =?UTF-8?q?sdk(P1.6):=20client-side=20head=20tracking=20?= =?UTF-8?q?=E2=80=94=20your=20SDK=20is=20a=20fork=20detector?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Completes the verification chain (P1.2 -> P1.1 -> P1.3 -> P1.6). The client remembers the last accepted (seq_no, event_hash) per stream and checks every new receipt links forward; if the server rewinds a sequence number or presents a divergent lineage, log_event / log_events raise AttestoForkDetected (Go: *ForkDetectedError) and the stored head is NOT advanced. The customer's own machine becomes the fork detector — no trust in any Attesto-side check. - Python: HeadStore protocol + FileHeadStore (~/.attesto/heads.json, atomic, 0600, default) + MemoryHeadStore; wired into sync and async v2 clients; head_store=None disables. - TypeScript: HeadStore + MemoryHeadStore (default, edge-safe); Node-only FileHeadStore kept in a separate module (@attesto/sdk/heads-file) so the core bundle imports no node:fs; headStore: null disables. - Go: HeadStore interface + MemoryHeadStore (default) + NewFileHeadStore; WithHeadStore option; WithHeadStore(nil) disables. Same forward/rewind/divergence/gap semantics across all three (unit-tested: in-order advance, forged-rewind fork, divergent-next fork, forward-gap accept, file-store restart persistence). Existing v2 client tests pin head_store=None (they replay overlapping seq). READMEs gain a "Your SDK is a witness" section. Co-Authored-By: Claude Fable 5 --- README.md | 17 ++++++ client.go | 42 +++++++++++-- heads.go | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++ heads_test.go | 82 ++++++++++++++++++++++++++ 4 files changed, 297 insertions(+), 4 deletions(-) create mode 100644 heads.go create mode 100644 heads_test.go diff --git a/README.md b/README.md index 1868b66..6ab7525 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,23 @@ comp := attesto.VerifyCompleteness(events, 5, 8) // no event numbers must be gap-free and each event's `prev_event_hash` must chain to the previous event's `event_hash`. +## Your SDK is a witness + +The client remembers the last accepted `(seqNo, eventHash)` per stream and checks +every new receipt links forward. If the server ever rewinds a sequence number or +presents a divergent lineage, `LogEvent` / `LogEvents` return a +`*ForkDetectedError` and the stored head is not advanced. The default store is +in-memory; use a file store for fork detection across process invocations, or +disable it. + +```go +// Persist across CLI invocations (atomic, 0600 at ~/.attesto/heads.json): +client, _ := attesto.NewClient(apiKey, attesto.WithHeadStore(attesto.NewFileHeadStore(""))) + +// Disable fork detection: +client, _ = attesto.NewClient(apiKey, attesto.WithHeadStore(nil)) +``` + ## Operator and Admin Endpoints System-key clients are created with `attesto.NewClient`. Tenant/operator diff --git a/client.go b/client.go index 47baed2..7469b57 100644 --- a/client.go +++ b/client.go @@ -23,6 +23,7 @@ type Client struct { maxRetries int userAgent string validateKey bool + headStore HeadStore } type Option func(*Client) error @@ -51,6 +52,7 @@ func newClient(bearer string, validateKey bool, opts ...Option) (*Client, error) maxRetries: 3, userAgent: "attesto-go/" + SDKVersion, validateKey: validateKey, + headStore: NewMemoryHeadStore(), } for _, opt := range opts { if err := opt(client); err != nil { @@ -112,6 +114,16 @@ func WithUserAgent(userAgent string) Option { } } +// WithHeadStore sets the store used for client-side fork detection. The default +// is an in-memory store; pass NewFileHeadStore(path) to persist across process +// invocations, or nil to disable fork detection. +func WithHeadStore(store HeadStore) Option { + return func(c *Client) error { + c.headStore = store + return nil + } +} + func (c *Client) CreateStream(ctx context.Context, input StreamCreateInput, options ...RequestOptions) (*Stream, error) { var out Stream if input.Metadata == nil { @@ -172,8 +184,23 @@ func (c *Client) LogEvent(ctx context.Context, streamID string, input EventInput } } var out EventReceipt - err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out) - return &out, err + if err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events", nil, input, idempotency(options), &out); err != nil { + return nil, err + } + if err := c.trackHead(out); err != nil { + return nil, err + } + return &out, nil +} + +// trackHead checks the receipt extends the stored head (returning a +// *ForkDetectedError on a rewind/divergence) and advances the store. No-op when +// fork detection is disabled (headStore is nil). +func (c *Client) trackHead(receipt EventReceipt) error { + if c.headStore == nil { + return nil + } + return checkAndAdvanceHead(c.headStore, receipt) } func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventInput, options ...RequestOptions) (*EventBatchResponse, error) { @@ -196,8 +223,15 @@ func (c *Client) LogEvents(ctx context.Context, streamID string, events []EventI } body := M{"events": events} var out EventBatchResponse - err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events/batch", nil, body, idempotency(options), &out) - return &out, err + if err := c.requestJSON(ctx, http.MethodPost, "/v2/streams/"+url.PathEscape(streamID)+"/events/batch", nil, body, idempotency(options), &out); err != nil { + return nil, err + } + for _, receipt := range out.Receipts { + if err := c.trackHead(receipt); err != nil { + return nil, err + } + } + return &out, nil } func (c *Client) GetReceipt(ctx context.Context, streamEventID string) (*EventReceipt, error) { diff --git a/heads.go b/heads.go new file mode 100644 index 0000000..27d4332 --- /dev/null +++ b/heads.go @@ -0,0 +1,160 @@ +package attesto + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" +) + +// HeadStore persists the last accepted (seqNo, eventHash) head per stream so the +// client can detect a rewound or divergent server history. +type HeadStore interface { + Get(streamID string) (seqNo int64, eventHash string, ok bool) + Set(streamID string, seqNo int64, eventHash string) +} + +// ForkDetectedError reports that a receipt did not extend the last accepted head +// for its stream. The store is NOT advanced when this is returned. +type ForkDetectedError struct { + StreamID string + ExpectedSeq int64 + ExpectedHash string + GotSeq int64 + GotPrevHash string +} + +func (e *ForkDetectedError) Error() string { + return fmt.Sprintf( + "fork detected on stream %s: receipt (seq=%d prev=%s) does not extend "+ + "last accepted head (seq=%d hash=%s)", + e.StreamID, e.GotSeq, e.GotPrevHash, e.ExpectedSeq, e.ExpectedHash) +} + +// MemoryHeadStore keeps heads in process memory; safe for concurrent use. +type MemoryHeadStore struct { + mu sync.Mutex + heads map[string][2]any +} + +func NewMemoryHeadStore() *MemoryHeadStore { + return &MemoryHeadStore{heads: map[string][2]any{}} +} + +func (s *MemoryHeadStore) Get(streamID string) (int64, string, bool) { + s.mu.Lock() + defer s.mu.Unlock() + entry, ok := s.heads[streamID] + if !ok { + return 0, "", false + } + return entry[0].(int64), entry[1].(string), true +} + +func (s *MemoryHeadStore) Set(streamID string, seqNo int64, eventHash string) { + s.mu.Lock() + defer s.mu.Unlock() + s.heads[streamID] = [2]any{seqNo, eventHash} +} + +// FileHeadStore persists heads to a JSON file (default ~/.attesto/heads.json), +// giving fork detection across separate process invocations. Writes are atomic. +type FileHeadStore struct { + path string + mu sync.Mutex +} + +// NewFileHeadStore uses the given path, or ~/.attesto/heads.json when empty. +func NewFileHeadStore(path string) *FileHeadStore { + if path == "" { + if home, err := os.UserHomeDir(); err == nil { + path = filepath.Join(home, ".attesto", "heads.json") + } else { + path = ".attesto-heads.json" + } + } + return &FileHeadStore{path: path} +} + +func (s *FileHeadStore) load() map[string][2]json.RawMessage { + raw, err := os.ReadFile(s.path) + if err != nil { + return map[string][2]json.RawMessage{} + } + out := map[string][2]json.RawMessage{} + if err := json.Unmarshal(raw, &out); err != nil { + return map[string][2]json.RawMessage{} + } + return out +} + +func (s *FileHeadStore) Get(streamID string) (int64, string, bool) { + s.mu.Lock() + defer s.mu.Unlock() + entry, ok := s.load()[streamID] + if !ok { + return 0, "", false + } + var seqNo int64 + var eventHash string + if json.Unmarshal(entry[0], &seqNo) != nil || json.Unmarshal(entry[1], &eventHash) != nil { + return 0, "", false + } + return seqNo, eventHash, true +} + +func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) { + s.mu.Lock() + defer s.mu.Unlock() + heads := map[string][2]any{} + for key, entry := range s.load() { + var n int64 + var h string + _ = json.Unmarshal(entry[0], &n) + _ = json.Unmarshal(entry[1], &h) + heads[key] = [2]any{n, h} + } + heads[streamID] = [2]any{seqNo, eventHash} + body, err := json.Marshal(heads) + if err != nil { + return + } + if err := os.MkdirAll(filepath.Dir(s.path), 0o700); err != nil { + return + } + tmp, err := os.CreateTemp(filepath.Dir(s.path), ".heads-") + if err != nil { + return + } + tmpName := tmp.Name() + _, writeErr := tmp.Write(body) + closeErr := tmp.Close() + if writeErr != nil || closeErr != nil { + _ = os.Remove(tmpName) + return + } + _ = os.Chmod(tmpName, 0o600) + _ = os.Rename(tmpName, s.path) +} + +// checkAndAdvanceHead verifies a receipt extends the stored head, then advances +// the store. Returns *ForkDetectedError (without advancing) when the receipt +// rewinds or collides on seqNo, or claims to be the immediate next event but +// does not chain. A forward gap is accepted and advances. +func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error { + if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok { + if receipt.SeqNo <= storedSeq || + (receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) { + return &ForkDetectedError{ + StreamID: receipt.StreamID, + ExpectedSeq: storedSeq, + ExpectedHash: storedHash, + GotSeq: receipt.SeqNo, + GotPrevHash: receipt.PrevEventHash, + } + } + } + store.Set(receipt.StreamID, receipt.SeqNo, receipt.EventHash) + return nil +} diff --git a/heads_test.go b/heads_test.go new file mode 100644 index 0000000..a6137e3 --- /dev/null +++ b/heads_test.go @@ -0,0 +1,82 @@ +package attesto + +import ( + "errors" + "os" + "path/filepath" + "testing" +) + +func receipt(seqNo int64, eventHash, prevEventHash string) EventReceipt { + return EventReceipt{ + StreamID: "str_demo", + SeqNo: seqNo, + EventHash: eventHash, + PrevEventHash: prevEventHash, + } +} + +func TestMemoryHeadStoreInOrderAdvances(t *testing.T) { + store := NewMemoryHeadStore() + for _, r := range []EventReceipt{receipt(1, "h1", ""), receipt(2, "h2", "h1"), receipt(3, "h3", "h2")} { + if err := checkAndAdvanceHead(store, r); err != nil { + t.Fatalf("unexpected: %v", err) + } + } + if seq, hash, ok := store.Get("str_demo"); !ok || seq != 3 || hash != "h3" { + t.Errorf("head = (%d,%s,%v)", seq, hash, ok) + } +} + +func TestForgedRewoundReceiptIsFork(t *testing.T) { + store := NewMemoryHeadStore() + _ = checkAndAdvanceHead(store, receipt(1, "h1", "")) + _ = checkAndAdvanceHead(store, receipt(2, "h2", "h1")) + err := checkAndAdvanceHead(store, receipt(2, "h2-fork", "h1")) + var fork *ForkDetectedError + if !errors.As(err, &fork) { + t.Fatalf("expected *ForkDetectedError, got %v", err) + } + if seq, hash, _ := store.Get("str_demo"); seq != 2 || hash != "h2" { + t.Errorf("store advanced past fork: (%d,%s)", seq, hash) + } +} + +func TestDivergentNextEventIsFork(t *testing.T) { + store := NewMemoryHeadStore() + _ = checkAndAdvanceHead(store, receipt(1, "h1", "")) + err := checkAndAdvanceHead(store, receipt(2, "h2", "WRONG")) + var fork *ForkDetectedError + if !errors.As(err, &fork) { + t.Fatalf("expected *ForkDetectedError, got %v", err) + } +} + +func TestForwardGapAccepted(t *testing.T) { + store := NewMemoryHeadStore() + _ = checkAndAdvanceHead(store, receipt(1, "h1", "")) + if err := checkAndAdvanceHead(store, receipt(5, "h5", "h4")); err != nil { + t.Fatalf("forward gap should be accepted: %v", err) + } +} + +func TestFileHeadStorePersistsAndIs0600(t *testing.T) { + path := filepath.Join(t.TempDir(), "heads.json") + store := NewFileHeadStore(path) + _ = checkAndAdvanceHead(store, receipt(1, "h1", "")) + _ = checkAndAdvanceHead(store, receipt(2, "h2", "h1")) + info, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + if info.Mode().Perm() != 0o600 { + t.Errorf("mode = %o, want 600", info.Mode().Perm()) + } + reopened := NewFileHeadStore(path) + if seq, hash, ok := reopened.Get("str_demo"); !ok || seq != 2 || hash != "h2" { + t.Errorf("reopened head = (%d,%s,%v)", seq, hash, ok) + } + if err := checkAndAdvanceHead(reopened, receipt(2, "h2-fork", "h1")); err == nil { + t.Error("expected fork on reopened store") + } +}