diff --git a/app/jobs/regular/notify_reviewable.rb b/app/jobs/regular/notify_reviewable.rb index a59adc4ac1..690fb75f0b 100644 --- a/app/jobs/regular/notify_reviewable.rb +++ b/app/jobs/regular/notify_reviewable.rb @@ -22,58 +22,60 @@ class Jobs::NotifyReviewable < ::Jobs::Base end end - counts = Hash.new(0) + DistributedMutex.synchronize("notify_reviewable_job", validity: 10, max_get_lock_attempts: 1) do + counts = Hash.new(0) - Reviewable.default_visible.pending.each do |r| - counts[:admins] += 1 - counts[:moderators] += 1 if r.reviewable_by_moderator? - counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id - end + Reviewable.default_visible.pending.each do |r| + counts[:admins] += 1 + counts[:moderators] += 1 if r.reviewable_by_moderator? + counts[r.reviewable_by_group_id] += 1 if r.reviewable_by_group_id + end - if SiteSetting.legacy_navigation_menu? - notify_legacy( - User.real.admins.pluck(:id), - count: counts[:admins], - updates: all_updates[:admins], - ) - else - notify_users(User.real.admins, all_updates[:admins]) - end - - if reviewable.reviewable_by_moderator? if SiteSetting.legacy_navigation_menu? notify_legacy( - User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id), - count: counts[:moderators], - updates: all_updates[:moderators], + User.real.admins.pluck(:id), + count: counts[:admins], + updates: all_updates[:admins], ) else - notify_users( - User.real.moderators.where("id NOT IN (?)", @contacted), - all_updates[:moderators], - ) + notify_users(User.real.admins, all_updates[:admins]) end - end - - if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group) - users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted) - - users.find_each do |user| - count = 0 - updates = {} - user.group_users.each do |gu| - updates.merge!(all_updates[gu.group_id]) - count += counts[gu.group_id] - end + if reviewable.reviewable_by_moderator? if SiteSetting.legacy_navigation_menu? - notify_legacy([user.id], count: count, updates: updates) + notify_legacy( + User.real.moderators.where("id NOT IN (?)", @contacted).pluck(:id), + count: counts[:moderators], + updates: all_updates[:moderators], + ) else - notify_user(user, updates) + notify_users( + User.real.moderators.where("id NOT IN (?)", @contacted), + all_updates[:moderators], + ) end end - @contacted += users.pluck(:id) + if SiteSetting.enable_category_group_moderation? && (group = reviewable.reviewable_by_group) + users = group.users.includes(:group_users).where("users.id NOT IN (?)", @contacted) + + users.find_each do |user| + count = 0 + updates = {} + user.group_users.each do |gu| + updates.merge!(all_updates[gu.group_id]) + count += counts[gu.group_id] + end + + if SiteSetting.legacy_navigation_menu? + notify_legacy([user.id], count: count, updates: updates) + else + notify_user(user, updates) + end + end + + @contacted += users.pluck(:id) + end end end diff --git a/lib/distributed_mutex.rb b/lib/distributed_mutex.rb index f7eb3b4e51..df4d04ae96 100644 --- a/lib/distributed_mutex.rb +++ b/lib/distributed_mutex.rb @@ -30,16 +30,28 @@ class DistributedMutex end LUA - def self.synchronize(key, redis: nil, validity: DEFAULT_VALIDITY, &blk) - self.new(key, redis: redis, validity: validity).synchronize(&blk) + def self.synchronize( + key, + redis: nil, + validity: DEFAULT_VALIDITY, + max_get_lock_attempts: nil, + &blk + ) + self.new( + key, + redis: redis, + validity: validity, + max_get_lock_attempts: max_get_lock_attempts, + ).synchronize(&blk) end - def initialize(key, redis: nil, validity: DEFAULT_VALIDITY) + def initialize(key, redis: nil, validity: DEFAULT_VALIDITY, max_get_lock_attempts: nil) @key = key @using_global_redis = true if !redis @redis = redis || Discourse.redis @mutex = Mutex.new @validity = validity + @max_get_lock_attempts = max_get_lock_attempts end # NOTE wrapped in mutex to maintain its semantics @@ -69,11 +81,15 @@ class DistributedMutex result end + class MaximumAttemptsExceeded < StandardError + end + private attr_reader :key attr_reader :redis attr_reader :validity + attr_reader :max_get_lock_attempts def get_lock attempts = 0 @@ -92,6 +108,10 @@ class DistributedMutex if @using_global_redis && Discourse.recently_readonly? && attempts > CHECK_READONLY_ATTEMPTS raise Discourse::ReadOnly end + + if max_get_lock_attempts && attempts > max_get_lock_attempts + raise DistributedMutex::MaximumAttemptsExceeded + end end end