Skip to content

feature: locking support #29

@bigbes

Description

@bigbes

Description

Add distributed locking primitives to go-storage library to enable coordination between multiple processes/nodes in a cluster. This is a common requirement for distributed systems where processes need to coordinate access to shared resources.

The implementation should be built on top of the existing storage.Storage interface using transactional operations (Tx) to ensure atomicity and consistency across different storage backends (TCS, etcd).

Requirements

Core Requirements:

  1. UUID-based Lock Ownership: Generate a new UUID for each lock acquisition attempt
  2. TTL with Refresh: Locks should have a time-to-live (duration) with ability to refresh before expiration
  3. Transaction-based Implementation: Use existing storage.Storage Tx interface for atomic operations

Functional Requirements:

  • Exclusive Locks: Traditional mutex-style locking
  • TryLock Support: Non-blocking attempt to acquire lock
  • Lock Refresh: Ability to extend lock TTL while holding the lock
  • Owner Verification: Ensure only lock owner can unlock/refresh (using UUID)
  • Automatic Cleanup: Expired locks should be automatically released
  • Watch Support: Optional ability to watch for lock state changes

Proposed Solution

Package Structure

Create a new package lock with the following structure:

lock/
├── locker.go      # Locker interface and implementation
├── lock.go        # Lock interface and implementation  
├── options.go     # Configuration options
└── errors.go      # Error definitions

API Design

package lock

// Locker provides distributed locking capabilities
type Locker interface {
    // Lock attempts to acquire a lock, blocking until successful or context cancellation
    Lock(ctx context.Context, key []byte, opts ...LockOption) (Lock, error)
    
    // TryLock attempts to acquire a lock without blocking
    // Returns ErrLockAcquired if lock cannot be acquired immediately
    TryLock(ctx context.Context, key []byte, opts ...LockOption) (Lock, error)
}

// Lock represents an acquired distributed lock
type Lock interface {
    // Unlock releases the lock
    Unlock(ctx context.Context) error
    
    // Refresh extends the lock's TTL
    Refresh(ctx context.Context, ttl time.Duration) error
    
    // ID returns the unique identifier for this lock instance
    // for debugging purposes
    ID() string
    
    // Key returns the locked key
    Key() []byte
    
    // ExpiresAt returns when the lock expires (if TTL is set)
    ExpiresAt() time.Time
}

// LockOption configures lock behavior
// implement using options library
type LockOption options.OptionCallback(*lockOptions)

// WithTTL sets the time-to-live for a lock (default: 1sec)
func WithTTL(d time.Duration) LockOption

// WithOwner sets a custom owner ID (default: auto-generated UUID)
func WithOwner(id string) LockOption

// WithRefreshInterval sets automatic refresh interval
func WithRefreshInterval(d time.Duration) LockOption

Implementation Details

Storage Schema

Locks will be stored using a predictable key pattern:

  • Exclusive locks, for example /locks/{key}/lock

Each lock entry will store:

{
    "id": "uuid-v4",
    "owner": "client-id",
    "expires_at": "2024-01-01T00:00:00Z",
    "created_at": "2024-01-01T00:00:00Z",
    "metadata": {} // optional custom data
}

Transactional Logic

Lock acquisition uses conditional transactions:

// Pseudocode for Lock acquisition
func (l *locker) Lock(ctx context.Context, key []byte, opts ...LockOption) (Lock, error) {
    options := applyOptions(opts)
    lockID := generateUUID()
    
    for {
        // Try to acquire lock using transaction
        resp, err := l.storage.Tx(ctx).
            If(
                // Check if lock doesn't exist OR is expired
                predicate.VersionEqual(lockKey, 0),
            ).
            Then(
                operation.Put(lockKey, serializeLock(lockID, options)),
            ).
            Commit()
        
        if err == nil && resp.Succeeded {
            return newLock(lockID, key, options), nil
        }
        
        // Wait or retry based on backoff strategy
        select {
        case <-time.After(backoff):
            continue
        case <-ctx.Done():
            return nil, ctx.Err()
        }
    }
}

TTL and Refresh Mechanism

  1. TTL: Lazy expiration check on acquisition
  2. Refresh: Update expires_at field with new timestamp
  3. Automatic Cleanup: Optional background cleaner for orphaned locks

Examples

Exclusive Lock Usage

storage := NewStorage(driver)
locker := lock.NewLocker(storage)

// Acquire exclusive lock with 30s TTL
lock, err := locker.Lock(ctx, []byte("resource-1"), lock.WithTTL(30*time.Second))
if err != nil {
    return err
}
defer lock.Unlock(ctx)

// Perform critical section operations
// ...

// Refresh lock if operation takes longer
if err := lock.Refresh(ctx, 30*time.Second); err != nil {
    return err
}

TryLock Pattern

lock, err := locker.TryLock(ctx, []byte("resource-2"))
if errors.Is(err, lock.ErrLockAcquired) {
    // Lock already held by someone else
    return ErrResourceBusy
}
if err != nil {
    return err
}
defer lock.Unlock(ctx)

Custom Owner ID

lock, err := locker.Lock(
    ctx,
    []byte("job-scheduler"),
    lock.WithTTL(5*time.Minute),
    lock.WithOwner("scheduler-node-1"),
)

Dependencies: no new external dependencies required

Testing

  • Unit tests for lock logic
  • Integration tests with both TCS and etcd backends

Documentation

  • API documentation with examples
  • Best practices for distributed locking
  • Failure mode handling guide
  • Performance considerations

Related Issues/PRs

  • None currently

Checklist

  • Exclusive locks work with both TCS and etcd backends
  • TTL expiration works correctly
  • Lock refresh extends TTL properly
  • UUID ownership prevents unauthorized unlock
  • All operations are transactional and atomic
  • Comprehensive test coverage (>90%)
  • Documentation with usage examples

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions