diff --git a/Gemfile.lock b/Gemfile.lock index 55ae151..2abf927 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - sidekiq-queue-pause (0.1.0) + sidekiq-queue-pause (0.1.2) sidekiq (>= 6.0, < 7.0) GEM @@ -13,7 +13,7 @@ GEM backport (1.2.0) benchmark (0.2.1) coderay (1.1.3) - connection_pool (2.3.0) + connection_pool (2.4.1) diff-lcs (1.5.0) docile (1.4.0) e2mmap (0.1.0) @@ -56,9 +56,11 @@ GEM lumberjack (1.2.8) method_source (1.0.0) mini_mime (1.1.2) + mini_portile2 (2.8.5) multi_xml (0.6.0) nenv (0.3.0) - nokogiri (1.14.0-x86_64-linux) + nokogiri (1.14.0) + mini_portile2 (~> 2.8.0) racc (~> 1.4) notiffany (0.1.3) nenv (~> 0.1) @@ -89,12 +91,12 @@ GEM method_source (~> 1.0) public_suffix (5.0.1) racc (1.6.2) - rack (2.2.6.2) + rack (2.2.8.1) rainbow (3.1.1) rb-fsevent (0.11.2) rb-inotify (0.10.1) ffi (~> 1.0) - redis (4.8.0) + redis (4.8.1) regexp_parser (2.6.2) reverse_markdown (2.1.1) nokogiri @@ -134,7 +136,7 @@ GEM addressable (>= 2.3.5) faraday (>= 0.17.3, < 3) shellany (0.0.1) - sidekiq (6.5.8) + sidekiq (6.5.12) connection_pool (>= 2.2.5, < 3) rack (~> 2.0) redis (>= 4.5.0, < 5) @@ -176,6 +178,7 @@ GEM webrick (~> 1.7.0) PLATFORMS + x86_64-darwin-23 x86_64-linux DEPENDENCIES diff --git a/README.md b/README.md index 9bddb80..0190e55 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ Initializer: ```ruby Sidekiq.configure_server do |config| - Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch.new(Sidekiq.options) + Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch.new(Sidekiq) # Optionally, you may set some unique key identifying the # Sidekiq process you want to control. This (server) process will diff --git a/lib/sidekiq-queue-pause.rb b/lib/sidekiq-queue-pause.rb index 56be63c..135e6dd 100644 --- a/lib/sidekiq-queue-pause.rb +++ b/lib/sidekiq-queue-pause.rb @@ -53,8 +53,9 @@ def retrieve_work end def retrieve_work_for_queues(qcmd) - work = Sidekiq.redis { |conn| conn.brpop(*qcmd) } - UnitOfWork.new(*work) if work + #queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) } + queue, job = redis { |conn| conn.brpop(*qcmd) } + UnitOfWork.new(queue, job, config) if queue end # Returns the list of unpause queue names. diff --git a/sidekiq-queue-pause.gemspec b/sidekiq-queue-pause.gemspec index e1946e7..20d7b5c 100644 --- a/sidekiq-queue-pause.gemspec +++ b/sidekiq-queue-pause.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = "sidekiq-queue-pause" - s.version = "0.1.1" + s.version = "0.1.2" s.summary = "Pause a Sidekiq queue" s.description = "Let's you pause/unpause individual sidekiq queues." s.license = "MIT" diff --git a/spec/sidekiq-queue-pause_spec.rb b/spec/sidekiq-queue-pause_spec.rb index 79e7fd4..52afa91 100644 --- a/spec/sidekiq-queue-pause_spec.rb +++ b/spec/sidekiq-queue-pause_spec.rb @@ -2,34 +2,70 @@ describe Sidekiq::QueuePause do describe Sidekiq::QueuePause::PausingFetch do - describe "#unpause_queues_cmd" do - let(:queuename) { "some_queue" } - let(:config) { {queues: [queuename], strict: true} } - let(:pausing_fetch) { described_class.new(config) } + let(:queue_name) { "some_queue" } + let(:logger) { double("logger") } + let(:job) { {queue: "some_queue", retry: true} } + let(:queue) { "queue:#{queue_name}" } + let(:queue_and_work) { [queue, job.to_json] } + let(:conn) { double("redis connection", read_timeout: 5, blocking_call: queue_and_work, brpop: queue_and_work) } + let(:config) { OpenStruct.new(queues: [queue_name], strict: true, logger: logger, redis: conn) } + + subject(:pausing_fetch) { described_class.new(config) } + + describe "instance methods from Component" do + it "responds to `logger`" do + expect(pausing_fetch).to respond_to(:logger) + end + + it "responds to `redis`" do + expect(pausing_fetch).to respond_to(:redis) + end + + it "config is not a `#{Hash}`" do + expect(pausing_fetch.config).not_to be_a(Hash) + end + end + describe "#unpause_queues_cmd" do context "with Sidekiq > 6.5.6 the queues list can contain Hashes" do - let(:queue_list) { ["queue:#{queuename}", {timeout: 2}] } + let(:queue_list) { ["queue:#{queue_name}", {timeout: 2}] } before { allow(pausing_fetch).to receive(:queues_cmd).and_return(queue_list) } it "does not checked whether the Hash is paused" do - expect(Sidekiq::QueuePause).to receive(:paused?).with(queuename, Sidekiq::QueuePause.process_key).and_return(false) + expect(Sidekiq::QueuePause).to receive(:paused?).with(queue_name, Sidekiq::QueuePause.process_key).and_return(false) expect(pausing_fetch.unpaused_queues_cmd).to match_array(queue_list) end end context "with Sidekiq < 6.5.6 the queues list can contain an Integer" do - let(:queue_list) { ["queue:#{queuename}", 2] } + let(:queue_list) { ["queue:#{queue_name}", 2] } before { allow(pausing_fetch).to receive(:queues_cmd).and_return(queue_list) } it "does not check whether the Integer is paused" do - expect(Sidekiq::QueuePause).to receive(:paused?).with(queuename, Sidekiq::QueuePause.process_key).and_return(false) + expect(Sidekiq::QueuePause).to receive(:paused?).with(queue_name, Sidekiq::QueuePause.process_key).and_return(false) expect(pausing_fetch.unpaused_queues_cmd).to match_array(queue_list) end end end + + describe "reenqueueing a unit of work" do + + it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do + expect(config).to receive(:redis).and_yield(conn).twice #one for fetch, one for requeue + + expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original + #expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT) + expect(conn).to receive(:brpop).with(queue) + expect(conn).to receive(:rpush).with(queue, job.to_json) + + unit_of_work = pausing_fetch.retrieve_work_for_queues(queue) + + expect { unit_of_work.requeue }.to_not raise_error + end + end end end