diff --git a/app/jobs/base.rb b/app/jobs/base.rb index 643bbb8939..f210308de4 100644 --- a/app/jobs/base.rb +++ b/app/jobs/base.rb @@ -19,50 +19,53 @@ module Jobs class JobInstrumenter def initialize(job_class:, opts:, db:, jid:) return unless enabled? - @data = {} + self.class.mutex.synchronize do + @data = {} - @data["hostname"] = `hostname`.strip # Hostname - @data["pid"] = Process.pid # Pid - @data["database"] = db # DB name - multisite db name it ran on - @data["job_id"] = jid # Job unique ID - @data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats - @data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular - @data["opts"] = opts.to_json # Params - json encoded params for the job + @data["hostname"] = `hostname`.strip # Hostname + @data["pid"] = Process.pid # Pid + @data["database"] = db # DB name - multisite db name it ran on + @data["job_id"] = jid # Job unique ID + @data["job_name"] = job_class.name # Job Name - eg: Jobs::AboutStats + @data["job_type"] = job_class.try(:scheduled?) ? "scheduled" : "regular" # Job Type - either s for scheduled or r for regular + @data["opts"] = opts.to_json # Params - json encoded params for the job - @data["status"] = 'pending' - @start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC) + @data["status"] = 'pending' + @start_timestamp = Process.clock_gettime(Process::CLOCK_MONOTONIC) - self.class.ensure_interval_logging! - @@active_jobs ||= [] - @@active_jobs << self + self.class.ensure_interval_logging! + @@active_jobs ||= [] + @@active_jobs << self - MethodProfiler.ensure_discourse_instrumentation! - MethodProfiler.start + MethodProfiler.ensure_discourse_instrumentation! + MethodProfiler.start + end end def stop(exception:) return unless enabled? + self.class.mutex.synchronize do + profile = MethodProfiler.stop - profile = MethodProfiler.stop + @@active_jobs.delete(self) - @@active_jobs.delete(self) + @data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run + @data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s) + @data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran + @data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s) + @data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands + @data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s) + @data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands - @data["duration"] = profile[:total_duration] # Duration - length in seconds it took to run - @data["sql_duration"] = profile.dig(:sql, :duration) || 0 # Sql Duration (s) - @data["sql_calls"] = profile.dig(:sql, :calls) || 0 # Sql Statements - how many statements ran - @data["redis_duration"] = profile.dig(:redis, :duration) || 0 # Redis Duration (s) - @data["redis_calls"] = profile.dig(:redis, :calls) || 0 # Redis commands - @data["net_duration"] = profile.dig(:net, :duration) || 0 # Redis Duration (s) - @data["net_calls"] = profile.dig(:net, :calls) || 0 # Redis commands + if exception.present? + @data["exception"] = exception # Exception - if job fails a json encoded exception + @data["status"] = 'failed' + else + @data["status"] = 'success' # Status - fail, success, pending + end - if exception.present? - @data["exception"] = exception # Exception - if job fails a json encoded exception - @data["status"] = 'failed' - else - @data["status"] = 'success' # Status - fail, success, pending + write_to_log end - - write_to_log end def self.raw_log(message) @@ -97,14 +100,21 @@ module Jobs ENV["DISCOURSE_LOG_SIDEKIQ"] == "1" end + def self.mutex + @@mutex ||= Mutex.new + end + def self.ensure_interval_logging! interval = ENV["DISCOURSE_LOG_SIDEKIQ_INTERVAL"] return if !interval + interval = interval.to_i @@interval_thread ||= Thread.new do begin loop do - sleep interval.to_i - @@active_jobs.each { |j| j.write_to_log if j.current_duration > interval.to_i } + sleep interval + mutex.synchronize do + @@active_jobs.each { |j| j.write_to_log if j.current_duration > interval } + end end rescue Exception => e Discourse.warn_exception(e, message: "Sidekiq interval logging thread terminated unexpectedly")