Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ require (

require github.com/getsentry/sentry-go/otel v0.44.1

require github.com/hashicorp/golang-lru/v2 v2.0.7

require (
cel.dev/expr v0.25.1 // indirect
github.com/Azure/go-ansiterm v0.0.0-20250102033503-faa5f7b0171c // indirect
Expand Down
173 changes: 0 additions & 173 deletions pkg/cache/validating_cache.go

This file was deleted.

15 changes: 2 additions & 13 deletions pkg/transport/session/session_data_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ import (
// - Create atomically creates metadata for id only if it does not already exist.
// Use this in preference to Load+Upsert to avoid TOCTOU races.
// - Upsert creates or overwrites the metadata for id, refreshing the TTL.
// - Update overwrites metadata only if the key already exists (SET XX semantics).
// Returns (true, nil) if updated, (false, nil) if the session was not found.
// Use this instead of Load+Upsert to avoid TOCTOU resurrection races.
// - Load retrieves metadata and refreshes the TTL (sliding-window expiry).
// Returns ErrSessionNotFound if the session does not exist.
// - Delete removes the session. It is not an error if the session is absent.
Expand All @@ -42,13 +39,6 @@ type DataStorage interface {
// Upsert creates or updates session metadata with a sliding TTL.
Upsert(ctx context.Context, id string, metadata map[string]string) error

// Update overwrites session metadata only if the session ID already exists
// (conditional write, equivalent to Redis SET XX). Returns (true, nil) if
// the entry was updated, (false, nil) if it was not found, or (false, err)
// on storage errors. Use this instead of Load+Upsert to prevent resurrections
// after a concurrent Delete.
Update(ctx context.Context, id string, metadata map[string]string) (bool, error)

// Load retrieves session metadata and refreshes its TTL.
// Returns ErrSessionNotFound if the session does not exist.
Load(ctx context.Context, id string) (map[string]string, error)
Expand All @@ -75,9 +65,8 @@ func NewLocalSessionDataStorage(ttl time.Duration) (*LocalSessionDataStorage, er
return nil, fmt.Errorf("ttl must be a positive duration")
}
s := &LocalSessionDataStorage{
sessions: make(map[string]*localDataEntry),
ttl: ttl,
stopCh: make(chan struct{}),
ttl: ttl,
stopCh: make(chan struct{}),
}
go s.cleanupRoutine()
return s, nil
Expand Down
97 changes: 47 additions & 50 deletions pkg/transport/session/session_data_storage_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ func (e *localDataEntry) lastAccess() time.Time {
}

// LocalSessionDataStorage implements DataStorage using an in-memory
// map with TTL-based eviction.
// sync.Map with TTL-based eviction.
//
// Sessions are evicted if they have not been accessed within the configured TTL.
// A background goroutine runs until Close is called.
type LocalSessionDataStorage struct {
sessions map[string]*localDataEntry // guarded by mu
mu sync.Mutex
sessions sync.Map // map[string]*localDataEntry
ttl time.Duration
stopCh chan struct{}
stopOnce sync.Once
Expand All @@ -50,9 +49,9 @@ func (s *LocalSessionDataStorage) Upsert(_ context.Context, id string, metadata
if metadata == nil {
metadata = make(map[string]string)
}
s.mu.Lock()
s.sessions[id] = newLocalDataEntry(maps.Clone(metadata))
s.mu.Unlock()
// Store a defensive copy so callers cannot mutate stored data.
copied := maps.Clone(metadata)
s.sessions.Store(id, newLocalDataEntry(copied))
return nil
}

Expand All @@ -62,71 +61,54 @@ func (s *LocalSessionDataStorage) Load(_ context.Context, id string) (map[string
if id == "" {
return nil, fmt.Errorf("cannot load session data with empty ID")
}
s.mu.Lock()
entry, ok := s.sessions[id]
if ok {
entry.lastAccessNano.Store(time.Now().UnixNano())
}
s.mu.Unlock()

val, ok := s.sessions.Load(id)
if !ok {
return nil, ErrSessionNotFound
}
entry, ok := val.(*localDataEntry)
if !ok {
return nil, fmt.Errorf("invalid entry type in local session data storage")
}

// Refresh last-access in place. deleteExpired re-checks the timestamp
// immediately before calling CompareAndDelete, so this atomic store is
// sufficient to prevent eviction of an actively accessed entry.
entry.lastAccessNano.Store(time.Now().UnixNano())

return maps.Clone(entry.metadata), nil
}

// Create creates session metadata only if the session ID does not already exist.
// Returns (true, nil) if created, (false, nil) if the key already existed.
// Create atomically creates session metadata only if the session ID
// does not already exist. Uses sync.Map.LoadOrStore for atomicity.
func (s *LocalSessionDataStorage) Create(_ context.Context, id string, metadata map[string]string) (bool, error) {
if id == "" {
return false, fmt.Errorf("cannot write session data with empty ID")
}
if metadata == nil {
metadata = make(map[string]string)
}
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.sessions[id]; exists {
return false, nil
}
s.sessions[id] = newLocalDataEntry(maps.Clone(metadata))
return true, nil
}

// Update overwrites session metadata only if the session ID already exists.
// Returns (true, nil) if updated, (false, nil) if not found.
func (s *LocalSessionDataStorage) Update(_ context.Context, id string, metadata map[string]string) (bool, error) {
if id == "" {
return false, fmt.Errorf("cannot write session data with empty ID")
}
if metadata == nil {
metadata = make(map[string]string)
}
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.sessions[id]; !ok {
return false, nil
}
s.sessions[id] = newLocalDataEntry(maps.Clone(metadata))
return true, nil
copied := maps.Clone(metadata)
_, loaded := s.sessions.LoadOrStore(id, newLocalDataEntry(copied))
return !loaded, nil
}

// Delete removes session metadata. Not an error if absent.
func (s *LocalSessionDataStorage) Delete(_ context.Context, id string) error {
if id == "" {
return fmt.Errorf("cannot delete session data with empty ID")
}
s.mu.Lock()
delete(s.sessions, id)
s.mu.Unlock()
s.sessions.Delete(id)
return nil
}

// Close stops the background cleanup goroutine and clears all stored metadata.
func (s *LocalSessionDataStorage) Close() error {
s.stopOnce.Do(func() { close(s.stopCh) })
s.mu.Lock()
s.sessions = make(map[string]*localDataEntry)
s.mu.Unlock()
s.sessions.Range(func(key, _ any) bool {
s.sessions.Delete(key)
return true
})
return nil
}

Expand Down Expand Up @@ -158,11 +140,26 @@ func (s *LocalSessionDataStorage) cleanupRoutine() {

func (s *LocalSessionDataStorage) deleteExpired() {
cutoff := time.Now().Add(-s.ttl)
s.mu.Lock()
defer s.mu.Unlock()
for id, entry := range s.sessions {
if entry.lastAccess().Before(cutoff) {
delete(s.sessions, id)
var toDelete []struct {
id string
entry *localDataEntry
}
s.sessions.Range(func(key, val any) bool {
entry, ok := val.(*localDataEntry)
if ok && entry.lastAccess().Before(cutoff) {
id, ok := key.(string)
if ok {
toDelete = append(toDelete, struct {
id string
entry *localDataEntry
}{id, entry})
}
}
return true
})
for _, item := range toDelete {
if item.entry.lastAccess().Before(cutoff) {
s.sessions.CompareAndDelete(item.id, item.entry)
}
}
}
Loading
Loading