package attesto import ( "encoding/json" "fmt" "os" "path/filepath" "sync" ) // HeadStore persists the last accepted (seqNo, eventHash) head per stream so the // client can detect a rewound or divergent server history. type HeadStore interface { Get(streamID string) (seqNo int64, eventHash string, ok bool) Set(streamID string, seqNo int64, eventHash string) } // ForkDetectedError reports that a receipt did not extend the last accepted head // for its stream. The store is NOT advanced when this is returned. type ForkDetectedError struct { StreamID string ExpectedSeq int64 ExpectedHash string GotSeq int64 GotPrevHash string } func (e *ForkDetectedError) Error() string { return fmt.Sprintf( "fork detected on stream %s: receipt (seq=%d prev=%s) does not extend "+ "last accepted head (seq=%d hash=%s)", e.StreamID, e.GotSeq, e.GotPrevHash, e.ExpectedSeq, e.ExpectedHash) } // MemoryHeadStore keeps heads in process memory; safe for concurrent use. type MemoryHeadStore struct { mu sync.Mutex heads map[string][2]any } func NewMemoryHeadStore() *MemoryHeadStore { return &MemoryHeadStore{heads: map[string][2]any{}} } func (s *MemoryHeadStore) Get(streamID string) (int64, string, bool) { s.mu.Lock() defer s.mu.Unlock() entry, ok := s.heads[streamID] if !ok { return 0, "", false } return entry[0].(int64), entry[1].(string), true } func (s *MemoryHeadStore) Set(streamID string, seqNo int64, eventHash string) { s.mu.Lock() defer s.mu.Unlock() s.heads[streamID] = [2]any{seqNo, eventHash} } // FileHeadStore persists heads to a JSON file (default ~/.attesto/heads.json), // giving fork detection across separate process invocations. Writes are atomic. type FileHeadStore struct { path string mu sync.Mutex } // NewFileHeadStore uses the given path, or ~/.attesto/heads.json when empty. func NewFileHeadStore(path string) *FileHeadStore { if path == "" { if home, err := os.UserHomeDir(); err == nil { path = filepath.Join(home, ".attesto", "heads.json") } else { path = ".attesto-heads.json" } } return &FileHeadStore{path: path} } func (s *FileHeadStore) load() map[string][2]json.RawMessage { raw, err := os.ReadFile(s.path) if err != nil { return map[string][2]json.RawMessage{} } out := map[string][2]json.RawMessage{} if err := json.Unmarshal(raw, &out); err != nil { return map[string][2]json.RawMessage{} } return out } func (s *FileHeadStore) Get(streamID string) (int64, string, bool) { s.mu.Lock() defer s.mu.Unlock() entry, ok := s.load()[streamID] if !ok { return 0, "", false } var seqNo int64 var eventHash string if json.Unmarshal(entry[0], &seqNo) != nil || json.Unmarshal(entry[1], &eventHash) != nil { return 0, "", false } return seqNo, eventHash, true } func (s *FileHeadStore) Set(streamID string, seqNo int64, eventHash string) { s.mu.Lock() defer s.mu.Unlock() heads := map[string][2]any{} for key, entry := range s.load() { var n int64 var h string _ = json.Unmarshal(entry[0], &n) _ = json.Unmarshal(entry[1], &h) heads[key] = [2]any{n, h} } heads[streamID] = [2]any{seqNo, eventHash} body, err := json.Marshal(heads) if err != nil { return } if err := os.MkdirAll(filepath.Dir(s.path), 0o700); err != nil { return } tmp, err := os.CreateTemp(filepath.Dir(s.path), ".heads-") if err != nil { return } tmpName := tmp.Name() _, writeErr := tmp.Write(body) closeErr := tmp.Close() if writeErr != nil || closeErr != nil { _ = os.Remove(tmpName) return } _ = os.Chmod(tmpName, 0o600) _ = os.Rename(tmpName, s.path) } // checkAndAdvanceHead verifies a receipt extends the stored head, then advances // the store. Returns *ForkDetectedError (without advancing) when the receipt // rewinds or collides on seqNo, or claims to be the immediate next event but // does not chain. A forward gap is accepted and advances. func checkAndAdvanceHead(store HeadStore, receipt EventReceipt) error { if storedSeq, storedHash, ok := store.Get(receipt.StreamID); ok { if receipt.SeqNo <= storedSeq || (receipt.SeqNo == storedSeq+1 && receipt.PrevEventHash != storedHash) { return &ForkDetectedError{ StreamID: receipt.StreamID, ExpectedSeq: storedSeq, ExpectedHash: storedHash, GotSeq: receipt.SeqNo, GotPrevHash: receipt.PrevEventHash, } } } store.Set(receipt.StreamID, receipt.SeqNo, receipt.EventHash) return nil }