diff --git a/lib/sidekiq/pausable.rb b/lib/sidekiq/pausable.rb index 49ca1eb6f6..16d4a71462 100644 --- a/lib/sidekiq/pausable.rb +++ b/lib/sidekiq/pausable.rb @@ -1,15 +1,19 @@ require 'thread' class SidekiqPauser + TTL = 60 + PAUSED_KEY = "sidekiq_is_paused_v2" + def initialize @mutex = Mutex.new + @dbs ||= Set.new end def pause! - redis.setex paused_key, 60, "paused" + $redis.setex PAUSED_KEY, TTL, "paused" @mutex.synchronize do - @extend_lease_thread ||= extend_lease_thread + extend_lease_thread sleep 0.001 while !paused? end @@ -17,38 +21,38 @@ class SidekiqPauser end def paused? - !!redis.get(paused_key) + !!$redis.get(PAUSED_KEY) end def unpause! @mutex.synchronize do - @extend_lease_thread = nil + @dbs.delete(RailsMultisite::ConnectionManagement.current_db) + @extend_lease_thread = nil if @dbs.size == 0 end - redis.del(paused_key) + $redis.del(PAUSED_KEY) true end private def extend_lease_thread - Thread.new do + @dbs << RailsMultisite::ConnectionManagement.current_db + + @extend_lease_thread ||= Thread.new do while true do break unless @mutex.synchronize { @extend_lease_thread } - redis.expire paused_key, 60 - sleep(Rails.env.test? ? 0.01 : 30) + + @dbs.each do |db| + RailsMultisite::ConnectionManagement.with_connection(db) do + $redis.expire PAUSED_KEY, TTL + end + end + + sleep(Rails.env.test? ? 0.01 : TTL / 2) end end end - - def redis - $redis.without_namespace - end - - def paused_key - "sidekiq_is_paused_v2" - end - end module Sidekiq @@ -74,7 +78,7 @@ class Sidekiq::Pausable end def call(worker, msg, queue) - if Sidekiq.paused? && !(Jobs::RunHeartbeat === worker) + if sidekiq_paused?(msg) && !(Jobs::RunHeartbeat === worker) worker.class.perform_in(@delay, *msg['args']) else start = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -85,4 +89,14 @@ class Sidekiq::Pausable end end + private + + def sidekiq_paused?(msg) + if site_id = msg["args"]&.first&.dig("current_site_id") + RailsMultisite::ConnectionManagement.with_connection(site_id) do + Sidekiq.paused? + end + end + end + end diff --git a/spec/components/sidekiq/pausable_spec.rb b/spec/components/sidekiq/pausable_spec.rb deleted file mode 100644 index 7a9f86c8c1..0000000000 --- a/spec/components/sidekiq/pausable_spec.rb +++ /dev/null @@ -1,36 +0,0 @@ -require 'rails_helper' -require_dependency 'sidekiq/pausable' - -describe Sidekiq do - after do - Sidekiq.unpause! - end - - it "can pause and unpause" do - Sidekiq.pause! - expect(Sidekiq.paused?).to eq(true) - Sidekiq.unpause! - expect(Sidekiq.paused?).to eq(false) - end - - it "can still run heartbeats when paused" do - Sidekiq.pause! - - freeze_time 1.week.from_now - - jobs = Sidekiq::ScheduledSet.new - - Sidekiq::Testing.disable! do - jobs.clear - - middleware = Sidekiq::Pausable.new - middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do - "done" - end - - jobs = Sidekiq::ScheduledSet.new - expect(jobs.size).to eq(0) - end - - end -end diff --git a/spec/multisite/pausable_spec.rb b/spec/multisite/pausable_spec.rb new file mode 100644 index 0000000000..57297dd4f7 --- /dev/null +++ b/spec/multisite/pausable_spec.rb @@ -0,0 +1,89 @@ +require 'rails_helper' +require_dependency 'sidekiq/pausable' + +RSpec.describe "Pausing/Unpausing Sidekiq", type: :multisite do + after do + $redis.flushall + end + + describe '#pause!, #unpause! and #paused?' do + it "can pause and unpause" do + Sidekiq.pause! + expect(Sidekiq.paused?).to eq(true) + + test_multisite_connection('second') do + expect(Sidekiq.paused?).to eq(false) + end + + Sidekiq.unpause! + + expect(Sidekiq.paused?).to eq(false) + + test_multisite_connection('second') do + Sidekiq.pause! + expect(Sidekiq.paused?).to eq(true) + end + end + end +end + +RSpec.describe Sidekiq::Pausable do + after do + $redis.flushall + end + + it "can still run heartbeats when paused" do + Sidekiq.pause! + + freeze_time 1.week.from_now + + jobs = Sidekiq::ScheduledSet.new + jobs.clear + middleware = Sidekiq::Pausable.new + + middleware.call(Jobs::RunHeartbeat.new, { "args" => [{}] }, "critical") do + "done" + end + + jobs = Sidekiq::ScheduledSet.new + expect(jobs.size).to eq(0) + end + + describe 'when sidekiq is paused', type: :multisite do + let(:middleware) { Sidekiq::Pausable.new } + + def call_middleware(db = RailsMultisite::ConnectionManagement::DEFAULT) + middleware.call(Jobs::PostAlert.new, { + "args" => [{ "current_site_id" => db }] + }, "critical") do + yield + end + end + + it 'should delay the job' do + Sidekiq.pause! + + called = false + called2 = false + call_middleware { called = true } + + expect(called).to eq(false) + + test_multisite_connection('second') do + call_middleware('second') { called2 = true } + expect(called2).to eq(true) + end + + Sidekiq.unpause! + call_middleware { called = true } + + expect(called).to eq(true) + + test_multisite_connection('second') do + Sidekiq.pause! + call_middleware('second') { called2 = false } + expect(called2).to eq(true) + end + end + end +end