Class: DistributedLock::GoogleCloudStorage::Lock
- Inherits:
-
Object
- Object
- DistributedLock::GoogleCloudStorage::Lock
- Defined in:
- lib/distributed-lock-google-cloud-storage/lock.rb
Class Method Summary collapse
-
.default_instance_identity_prefix ⇒ String
Generates a sane default instance identity prefix string.
Instance Method Summary collapse
-
#abandon ⇒ void
Pretends like we've never obtained this lock, abandoning our internal state about the lock.
-
#check_health! ⇒ void
Checks whether the lock is healthy.
-
#healthy? ⇒ Boolean
Returns whether the lock is healthy.
-
#initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) ⇒ Lock
constructor
Creates a new Lock instance.
-
#last_refresh_error ⇒ StandardError?
Returns the last error that caused the lock to be declared unhealthy.
-
#lock(timeout: 2 * @ttl) ⇒ void
Obtains the lock.
-
#locked_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance's internal state believes that the lock is currently held by this instance.
-
#locked_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is currently held by somebody.
-
#owned_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance's internal state believes that the lock is held by the current Lock instance in the calling thread.
-
#owned_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is held by the current Lock instance in the calling thread.
-
#synchronize ⇒ Object
Obtains the lock, runs the block, and releases the lock when the block completes.
-
#unlock ⇒ Boolean
Releases the lock and stops refreshing the lock in the background.
Constructor Details
#initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) ⇒ Lock
The logger must either be thread-safe, or all writes to this logger by anything besides
this Lock
instance must be synchronized through logger_mutex
. This is because the logger will be
written to by a background thread.
Creates a new Lock instance.
Under the hood we'll instantiate a
Google::Cloud::Storage::Bucket
object for accessing the bucket. You can customize the project ID, authentication method, etc. through
cloud_storage_options
and cloud_storage_bucket_options
.
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 82 def initialize(bucket_name:, path:, instance_identity_prefix: self.class.default_instance_identity_prefix, thread_safe: true, logger: Logger.new($stderr), logger_mutex: Mutex.new, ttl: DEFAULT_TTL, refresh_interval: nil, max_refresh_fails: DEFAULT_MAX_REFRESH_FAILS, backoff_min: DEFAULT_BACKOFF_MIN, backoff_max: DEFAULT_BACKOFF_MAX, backoff_multiplier: DEFAULT_BACKOFF_MULTIPLIER, object_acl: nil, cloud_storage_options: nil, cloud_storage_bucket_options: nil) check_refresh_interval_allowed!(ttl, refresh_interval, max_refresh_fails) check_backoff_min!(backoff_min) check_backoff_max!(backoff_max, backoff_min) check_backoff_multiplier!(backoff_multiplier) ### Read-only variables (safe to access concurrently) ### @bucket_name = bucket_name @path = path @instance_identity_prefix = instance_identity_prefix @thread_safe = thread_safe @logger = logger @logger_mutex = logger_mutex @ttl = ttl @refresh_interval = refresh_interval || ttl.to_f / (max_refresh_fails * 3) @max_refresh_fails = max_refresh_fails @backoff_min = backoff_min @backoff_max = backoff_max @backoff_multiplier = backoff_multiplier @object_acl = object_acl @client = create_gcloud_storage_client() @bucket = get_gcloud_storage_bucket(@client, bucket_name, ) @state_mutex = Mutex.new @refresher_cond = ConditionVariable.new ### Read-write variables protected by @state_mutex ### @owner = nil @metageneration = nil @refresher_thread = nil @refresher_error = nil # The refresher generation is incremented every time we shutdown # the refresher thread. It allows the refresher thread to know # whether it's being shut down (and thus shouldn't access/modify # state). @refresher_generation = 0 end |
Class Method Details
.default_instance_identity_prefix ⇒ String
Generates a sane default instance identity prefix string. The result is identical across multiple calls in the same process. It supports forking, so that calling this method in a forked child process automatically returns a different value than when called from the parent process.
The result doesn't include a thread identitier, which is why we call this a prefix.
27 28 29 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 27 def self.default_instance_identity_prefix "#{DEFAULT_INSTANCE_IDENTITY_PREFIX_WITHOUT_PID}-#{Process.pid}" end |
Instance Method Details
#abandon ⇒ void
This method returns an undefined value.
Pretends like we've never obtained this lock, abandoning our internal state about the lock.
Shuts down background lock refreshing, and ensures that #locked_according_to_internal_state? returns false.
Does not modify any server data, so #locked_according_to_server? may still return true.
284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 284 def abandon refresher_generation = nil thread = nil @state_mutex.synchronize do if unsynced_locked_according_to_internal_state? refresher_generation = @refresher_generation thread = shutdown_refresher_thread end end if thread log_debug { "Abandoning locked lock" } thread.join log_debug { "Done abandoned locked lock. refresher_generation=#{refresher_generation}" } else log_debug { "Abandoning unlocked lock" } end end |
#check_health! ⇒ void
This method returns an undefined value.
Checks whether the lock is healthy. See #healthy? for the definition of "healthy". Use #last_refresh_error to query the last error that caused the lock to be declared unhealthy.
It only makes sense to call this method after having obtained this lock.
336 337 338 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 336 def check_health! raise LockUnhealthyError, 'Lock is not healthy' if !healthy? end |
#healthy? ⇒ Boolean
Returns whether the lock is healthy. A lock is considered healthy until we fail to refresh the lock too many times consecutively.
Failure to refresh could happen for many reasons. Some failures are temporary, such as network problems. Others are permanent, such as the lock object being forcefully deleted by someone else.
Upon encountering a permanent failure, the lock is immediately declared unhealthy.
Upon encountering a temporary failure, the lock is declared unhealthy after encountering
a temporary error max_refresh_fails
times consecutively.
It only makes sense to call this method after having obtained this lock.
320 321 322 323 324 325 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 320 def healthy? @state_mutex.synchronize do raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state? @refresher_thread.alive? end end |
#last_refresh_error ⇒ StandardError?
Returns the last error that caused the lock to be declared unhealthy.
Don't use this method to check whether the lock is currently healthy. If this lock has ever been unhealthy, then this method returns a non-nil value even if the lock is currently healthy.
347 348 349 350 351 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 347 def last_refresh_error @state_mutex.synchronize do @refresher_error end end |
#lock(timeout: 2 * @ttl) ⇒ void
This method returns an undefined value.
Obtains the lock. If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 182 def lock(timeout: 2 * @ttl) raise AlreadyLockedError, 'Already locked' if owned_according_to_internal_state? file = retry_with_backoff_until_success(timeout, retry_logger: method(:log_lock_retry), backoff_min: @backoff_min, backoff_max: @backoff_max, backoff_multiplier: @backoff_multiplier) do log_debug { 'Acquiring lock' } if (file = create_lock_object) log_debug { 'Successfully acquired lock' } [:success, file] else log_debug { 'Error acquiring lock. Investigating why...' } file = @bucket.file(@path) if file.nil? log_warn { 'Lock was deleted right after having created it. Retrying.' } :retry_immediately elsif file.['identity'] == identity log_warn { 'Lock was already owned by this instance, but was abandoned. Resetting lock' } delete_lock_object(file.) :retry_immediately else if lock_stale?(file) log_warn { 'Lock is stale. Resetting lock' } delete_lock_object(file.) else log_debug { 'Lock was already acquired, and is not stale' } end :error end end end refresher_generation = nil @state_mutex.synchronize do @owner = identity @metageneration = file. spawn_refresher_thread refresher_generation = @refresher_generation end log_debug { "Locked. refresher_generation=#{refresher_generation}, metageneration=#{file.}" } nil end |
#locked_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance's internal state believes that the lock is currently held by this instance. Does not check whether the lock is stale.
136 137 138 139 140 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 136 def locked_according_to_internal_state? @state_mutex.synchronize do unsynced_locked_according_to_internal_state? end end |
#locked_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is currently held by somebody. Does not check whether the lock is stale.
147 148 149 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 147 def locked_according_to_server? !@bucket.file(@path).nil? end |
#owned_according_to_internal_state? ⇒ Boolean
Returns whether this Lock instance's internal state believes that the lock is held by the current Lock instance in the calling thread.
155 156 157 158 159 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 155 def owned_according_to_internal_state? @state_mutex.synchronize do unsynced_owned_according_to_internal_state? end end |
#owned_according_to_server? ⇒ Boolean
Returns whether the server believes that the lock is held by the current Lock instance in the calling thread.
166 167 168 169 170 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 166 def owned_according_to_server? file = @bucket.file(@path) return false if file.nil? file.['identity'] == identity end |
#synchronize ⇒ Object
Obtains the lock, runs the block, and releases the lock when the block completes.
If the lock is stale, resets it automatically. If the lock is already obtained by some other instance identity, waits until it becomes available, or until timeout.
Accepts the same parameters as #lock.
267 268 269 270 271 272 273 274 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 267 def synchronize(...) lock(...) begin yield ensure unlock end end |
#unlock ⇒ Boolean
Releases the lock and stops refreshing the lock in the background.
235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 |
# File 'lib/distributed-lock-google-cloud-storage/lock.rb', line 235 def unlock refresher_generation = nil = nil thread = nil @state_mutex.synchronize do raise NotLockedError, 'Not locked' if !unsynced_locked_according_to_internal_state? refresher_generation = @refresher_generation thread = shutdown_refresher_thread = @metageneration @owner = nil @metageneration = nil end thread.join result = delete_lock_object() log_debug { "Unlocked. refresher_generation=#{refresher_generation}, metageneration=#{}" } result end |