Class: DistributedLock::GoogleCloudStorage::Lock

Inherits:
Object
  • Object
show all
Defined in:
lib/distributed-lock-google-cloud-storage/lock.rb

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) ⇒ Lock

Note:

The logger must either be thread-safe, or all writes to this logger by anything besides this Lock instance must be synchronized through logger_mutex. This is because the logger will be written to by a background thread.

Creates a new Lock instance.

Under the hood we'll instantiate a Google::Cloud::Storage::Bucket object for accessing the bucket. You can customize the project ID, authentication method, etc. through cloud_storage_options and cloud_storage_bucket_options.

Parameters:

  • bucket_name (String)

    The name of a Cloud Storage bucket in which to place the lock. This bucket must already exist.

  • path (String)

    The object path within the bucket to use for locking.

  • instance_identity_prefix (String) (defaults to: self.class.default_instance_identity_prefix)

    A unique identifier for the client of this lock, excluding its thread identity. Learn more in the readme, section "Instant recovery from stale locks".

  • thread_safe (Boolean) (defaults to: true)

    Whether this Lock instance should be thread-safe. When true, the thread's identity will be included in the instance identity.

  • logger (defaults to: Logger.new($stderr))

    A Logger-compatible object to log progress to. See also the note about thread-safety.

  • logger_mutex (defaults to: Mutex.new)

    A Mutex to synchronize multithreaded writes to the logger.

  • ttl (Numeric) (defaults to: DEFAULT_TTL)

    The lock is considered stale if it's age (in seconds) is older than this value. This value should be generous, in the order of minutes.

  • refresh_interval (Numeric, nil) (defaults to: nil)

    We'll refresh the lock's timestamp every refresh_interval seconds. This value should be many times smaller than ttl, in order to account for network delays, temporary network errors, and events that cause the lock to become unhealthy.

    This value must be smaller than ttl / max_refresh_fails.

    Default: ttl / (max_refresh_fails * 3)

  • max_refresh_fails (Integer) (defaults to: DEFAULT_MAX_REFRESH_FAILS)

    The lock will be declared unhealthy if refreshing fails with a temporary error this many times consecutively. If refreshing fails with a permanent error, then the lock is immediately declared unhealthy regardless of this value.

  • backoff_min (Numeric) (defaults to: DEFAULT_BACKOFF_MIN)

    Minimum amount of time, in seconds, to back off when waiting for a lock to become available. Must be at least 0.

  • backoff_max (Numeric) (defaults to: DEFAULT_BACKOFF_MAX)

    Maximum amount of time, in seconds, to back off when waiting for a lock to become available. Must be at least backoff_min.

  • backoff_multiplier (Numeric) (defaults to: DEFAULT_BACKOFF_MULTIPLIER)

    Factor to increase the backoff time by, each time when acquiring the lock fails. Must be at least 0.

  • object_acl (String, nil) (defaults to: nil)

    A predefined set of access control to apply to the Cloud Storage object. See the acl parameter in Google::Cloud::Storage::Bucket#create_file for acceptable values.

  • cloud_storage_options (Hash, nil) (defaults to: nil)

    Additional options to pass to Google::Cloud::Storage.new. See its documentation to learn which options are available.

  • cloud_storage_bucket_options (Hash, nil) (defaults to: nil)

    Additional options to pass to Google::Cloud::Storage::Project#bucket. See its documentation to learn which options are available.

Raises:

  • (ArgumentError)

    When an invalid argument is detected.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 82

def initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix,
  thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new,
  ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS,
  backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX,
  backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER,
  object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil)

  check_refresh_interval_allowed!(ttl, refresh_interval, max_refresh_fails)
  check_backoff_min!(backoff_min)
  check_backoff_max!(backoff_max, backoff_min)
  check_backoff_multiplier!(backoff_multiplier)


  ### Read-only variables (safe to access concurrently) ###

  @bucket_name = bucket_name
  @path = path
  @instance_identity_prefix = instance_identity_prefix
  @thread_safe = thread_safe
  @logger = logger
  @logger_mutex = logger_mutex
  @ttl = ttl
  @refresh_interval = refresh_interval || ttl.to_f / (max_refresh_fails * 3)
  @max_refresh_fails = max_refresh_fails
  @backoff_min = backoff_min
  @backoff_max = backoff_max
  @backoff_multiplier = backoff_multiplier
  @object_acl = object_acl

  @client = create_gcloud_storage_client(cloud_storage_options)
  @bucket = get_gcloud_storage_bucket(@client, bucket_name, cloud_storage_bucket_options)

  @state_mutex = Mutex.new
  @refresher_cond = ConditionVariable.new


  ### Read-write variables protected by @state_mutex ###

  @owner = nil
  @metageneration = nil
  @refresher_thread = nil
  @refresher_error = nil

  # The refresher generation is incremented every time we shutdown
  # the refresher thread. It allows the refresher thread to know
  # whether it's being shut down (and thus shouldn't access/modify
  # state).
  @refresher_generation = 0
end

Class Method Details

.default_instance_identity_prefixString

Generates a sane default instance identity prefix string. The result is identical across multiple calls in the same process. It supports forking, so that calling this method in a forked child process automatically returns a different value than when called from the parent process.

The result doesn't include a thread identitier, which is why we call this a prefix.

Returns:

  • (String)


27
28
29
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 27

def self.default_instance_identity_prefix
  "#{DEFAULT_INSTANCE_IDENTITY_PREFIX_WITHOUT_PID}-#{Process.pid}"
end

Instance Method Details

#abandonvoid

This method returns an undefined value.

Pretends like we've never obtained this lock, abandoning our internal state about the lock.

Shuts down background lock refreshing, and ensures that #locked_according_to_internal_state? returns false.

Does not modify any server data, so #locked_according_to_server? may still return true.



284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 284

def abandon
  refresher_generation = nil
  thread = nil

  @state_mutex.synchronize do
    if unsynced_locked_according_to_internal_state?
      refresher_generation = @refresher_generation
      thread = shutdown_refresher_thread
    end
  end

  if thread
    log_debug { "Abandoning locked lock" }
    thread.join
    log_debug { "Done abandoned locked lock. refresher_generation=#{refresher_generation}" }
  else
    log_debug { "Abandoning unlocked lock" }
  end
end

#check_health!void

This method returns an undefined value.

Checks whether the lock is healthy. See #healthy? for the definition of "healthy". Use #last_refresh_error to query the last error that caused the lock to be declared unhealthy.

It only makes sense to call this method after having obtained this lock.

Raises:



336
337
338
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 336

def check_health!
  raise LockUnhealthyError, 'Lock is not healthy' if !healthy?
end

#healthy?Boolean

Returns whether the lock is healthy. A lock is considered healthy until we fail to refresh the lock too many times consecutively.

Failure to refresh could happen for many reasons. Some failures are temporary, such as network problems. Others are permanent, such as the lock object being forcefully deleted by someone else.

Upon encountering a permanent failure, the lock is immediately declared unhealthy. Upon encountering a temporary failure, the lock is declared unhealthy after encountering a temporary error max_refresh_fails times consecutively.

It only makes sense to call this method after having obtained this lock.

Returns:

  • (Boolean)

Raises:

See Also:



320
321
322
323
324
325
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 320

def healthy?
  @state_mutex.synchronize do
    raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state?
    @refresher_thread.alive?
  end
end

#last_refresh_errorStandardError?

Returns the last error that caused the lock to be declared unhealthy.

Don't use this method to check whether the lock is currently healthy. If this lock has ever been unhealthy, then this method returns a non-nil value even if the lock is currently healthy.

Returns:

  • (StandardError, nil)


347
348
349
350
351
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 347

def last_refresh_error
  @state_mutex.synchronize do
    @refresher_error
  end
end

#lock(timeout: 2 * @ttl) ⇒ void

This method returns an undefined value.

Obtains the lock. If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.

Parameters:

  • timeout (Numeric) (defaults to: 2 * @ttl)

    The timeout in seconds.

Raises:

  • (AlreadyLockedError)

    This Lock instance — according to its internal state — believes that it's already holding the lock.

  • (TimeoutError)

    Failed to acquire the lock within timeout seconds.

  • (Google::Cloud::Error)


182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 182

def lock(timeout: 2 * @ttl)
  raise AlreadyLockedError, 'Already locked' if owned_according_to_internal_state?

  file = retry_with_backoff_until_success(timeout,
    retry_logger: method(:log_lock_retry),
    backoff_min: @backoff_min,
    backoff_max: @backoff_max,
    backoff_multiplier: @backoff_multiplier) do

    log_debug { 'Acquiring lock' }
    if (file = create_lock_object)
      log_debug { 'Successfully acquired lock' }
      [:success, file]
    else
      log_debug { 'Error acquiring lock. Investigating why...' }
      file = @bucket.file(@path)
      if file.nil?
        log_warn { 'Lock was deleted right after having created it. Retrying.' }
        :retry_immediately
      elsif file.['identity'] == identity
        log_warn { 'Lock was already owned by this instance, but was abandoned. Resetting lock' }
        delete_lock_object(file.metageneration)
        :retry_immediately
      else
        if lock_stale?(file)
          log_warn { 'Lock is stale. Resetting lock' }
          delete_lock_object(file.metageneration)
        else
          log_debug { 'Lock was already acquired, and is not stale' }
        end
        :error
      end
    end
  end

  refresher_generation = nil
  @state_mutex.synchronize do
    @owner = identity
    @metageneration = file.metageneration
    spawn_refresher_thread
    refresher_generation = @refresher_generation
  end
  log_debug { "Locked. refresher_generation=#{refresher_generation}, metageneration=#{file.metageneration}" }
  nil
end

#locked_according_to_internal_state?Boolean

Returns whether this Lock instance's internal state believes that the lock is currently held by this instance. Does not check whether the lock is stale.

Returns:

  • (Boolean)


136
137
138
139
140
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 136

def locked_according_to_internal_state?
  @state_mutex.synchronize do
    unsynced_locked_according_to_internal_state?
  end
end

#locked_according_to_server?Boolean

Returns whether the server believes that the lock is currently held by somebody. Does not check whether the lock is stale.

Returns:

  • (Boolean)

Raises:

  • (Google::Cloud::Error)


147
148
149
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 147

def locked_according_to_server?
  !@bucket.file(@path).nil?
end

#owned_according_to_internal_state?Boolean

Returns whether this Lock instance's internal state believes that the lock is held by the current Lock instance in the calling thread.

Returns:

  • (Boolean)


155
156
157
158
159
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 155

def owned_according_to_internal_state?
  @state_mutex.synchronize do
    unsynced_owned_according_to_internal_state?
  end
end

#owned_according_to_server?Boolean

Returns whether the server believes that the lock is held by the current Lock instance in the calling thread.

Returns:

  • (Boolean)

Raises:

  • (Google::Cloud::Error)


166
167
168
169
170
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 166

def owned_according_to_server?
  file = @bucket.file(@path)
  return false if file.nil?
  file.['identity'] == identity
end

#synchronizeObject

Obtains the lock, runs the block, and releases the lock when the block completes.

If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.

Accepts the same parameters as #lock.

Returns:

  • The block's return value.

Raises:

  • (AlreadyLockedError)

    This Lock instance — according to its internal state — believes that it's already holding the lock.

  • (TimeoutError)

    Failed to acquire the lock within timeout seconds.



267
268
269
270
271
272
273
274
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 267

def synchronize(...)
  lock(...)
  begin
    yield
  ensure
    unlock
  end
end

#unlockBoolean

Releases the lock and stops refreshing the lock in the background.

Returns:

  • (Boolean)

    True if the lock object was actually deleted, false if the lock object was already deleted.

Raises:

  • (NotLockedError)

    This Lock instance — according to its internal state — believes that it isn't currently holding the lock.

  • (Google::Cloud::Error)


235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 235

def unlock
  refresher_generation = nil
  metageneration = nil
  thread = nil

  @state_mutex.synchronize do
    raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state?
    refresher_generation = @refresher_generation
    thread = shutdown_refresher_thread
    metageneration = @metageneration
    @owner = nil
    @metageneration = nil
  end

  thread.join
  result = delete_lock_object(metageneration)
  log_debug { "Unlocked. refresher_generation=#{refresher_generation}, metageneration=#{metageneration}" }
  result
end