From bfecbde8370e43cf7bcc814b53bb7da060c468af Mon Sep 17 00:00:00 2001 From: Leonardo Mosquera Date: Mon, 28 Nov 2022 16:30:19 -0300 Subject: [PATCH] Fixes for vBulletin bulk importer (#17618) * Allow taking table prefix from env var * FIX: remove unused column references The columns `filedata` and `extension` are not present in a v4.2.4 database, and they aren't used in the method anyways. * FIX: report progress for tables without imported_id * FIX: effectively check for AR validation errors NOTE: other migration scripts also have this problem; see /t/58202 * FIX: properly count Posts when importing attachments * FIX: improve logging * Remove leftover comment * FIX: show progress when exporting Permalink file * PERF: stream Permalink file The current way results in tons of memory usage; write once per line instead * Document fixes needed * WIP - deduplicate category names * Ignore non alphanumeric chars for grouping * FIX: properly deduplicate user emails by merging accounts * FIX: don't merge empty UserEmails * Improve logging * Merge users AFTER fixing primary key sequences * Parallelize user merging * Save duplicated users structure for debugging purposes * Add progress logging for the (multiple hour) user merging step --- Gemfile | 2 + script/bulk_import/base.rb | 14 ++- script/bulk_import/vbulletin.rb | 190 ++++++++++++++++++++--------- script/import_scripts/vbulletin.rb | 10 +- 4 files changed, 145 insertions(+), 71 deletions(-) diff --git a/Gemfile b/Gemfile index 3bbbfee3c0..b35e4805cc 100644 --- a/Gemfile +++ b/Gemfile @@ -254,6 +254,8 @@ if ENV["IMPORT"] == "1" gem 'reverse_markdown' gem 'tiny_tds' gem 'csv' + + gem 'parallel', require: false end gem 'webpush', require: false diff --git a/script/bulk_import/base.rb b/script/bulk_import/base.rb index abe7677526..767469a83b 100644 --- a/script/bulk_import/base.rb +++ b/script/bulk_import/base.rb @@ -99,6 +99,7 @@ class BulkImport::Base load_indexes execute fix_primary_keys + execute_after puts "Done! Now run the 'import:ensure_consistency' rake task." end @@ -227,6 +228,9 @@ class BulkImport::Base raise NotImplementedError end + def execute_after + end + def fix_primary_keys puts "Updating primary key sequences..." @raw_connection.exec("SELECT setval('#{Group.sequence_name}', #{@last_group_id})") if @last_group_id > 0 @@ -713,6 +717,7 @@ class BulkImport::Base imported_ids = [] process_method_name = "process_#{name}" sql = "COPY #{name.pluralize} (#{columns.map { |c| "\"#{c}\"" }.join(",")}) FROM STDIN" + rows_created = 0 @raw_connection.copy_data(sql, @encoder) do rows.each do |row| @@ -722,7 +727,8 @@ class BulkImport::Base imported_ids << mapped[:imported_id] unless mapped[:imported_id].nil? imported_ids |= mapped[:imported_ids] unless mapped[:imported_ids].nil? @raw_connection.put_copy_data columns.map { |c| processed[c] } unless processed[:skip] - print "\r%7d - %6d/sec" % [imported_ids.size, imported_ids.size.to_f / (Time.now - start)] if imported_ids.size % 5000 == 0 + rows_created += 1 + print "\r%7d - %6d/sec" % [rows_created, rows_created.to_f / (Time.now - start)] if rows_created % 100 == 0 rescue => e puts "\n" puts "ERROR: #{e.message}" @@ -731,10 +737,7 @@ class BulkImport::Base end end - if imported_ids.size > 0 - print "\r%7d - %6d/sec" % [imported_ids.size, imported_ids.size.to_f / (Time.now - start)] - puts - end + print "\r%7d - %6d/sec\n" % [rows_created, rows_created.to_f / (Time.now - start)] if rows_created > 0 id_mapping_method_name = "#{name}_id_from_imported_id".freeze return unless respond_to?(id_mapping_method_name) @@ -745,6 +748,7 @@ class BulkImport::Base } end rescue => e + # FIXME: errors catched here stop the rest of the COPY puts e.message puts e.backtrace.join("\n") end diff --git a/script/bulk_import/vbulletin.rb b/script/bulk_import/vbulletin.rb index 511d7a8cea..fde0fbe7d7 100644 --- a/script/bulk_import/vbulletin.rb +++ b/script/bulk_import/vbulletin.rb @@ -4,10 +4,11 @@ require_relative "base" require "set" require "mysql2" require "htmlentities" +require "parallel" class BulkImport::VBulletin < BulkImport::Base - TABLE_PREFIX = "vb_" + TABLE_PREFIX ||= ENV['TABLE_PREFIX'] || "vb_" SUSPENDED_TILL ||= Date.new(3000, 1, 1) ATTACHMENT_DIR ||= ENV['ATTACHMENT_DIR'] || '/shared/import/data/attachments' AVATAR_DIR ||= ENV['AVATAR_DIR'] || '/shared/import/data/customavatars' @@ -43,6 +44,8 @@ class BulkImport::VBulletin < BulkImport::Base AND `COLUMN_NAME` LIKE 'post_thanks_%' SQL ).to_a.count > 0 + + @user_ids_by_email = {} end def execute @@ -82,8 +85,17 @@ class BulkImport::VBulletin < BulkImport::Base import_signatures end + def execute_after + max_age = SiteSetting.delete_user_max_post_age + SiteSetting.delete_user_max_post_age = 50 * 365 + + merge_duplicated_users + + SiteSetting.delete_user_max_post_age = max_age + end + def import_groups - puts "Importing groups..." + puts '', "Importing groups..." groups = mysql_stream <<-SQL SELECT usergroupid, title, description, usertitle @@ -103,7 +115,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_users - puts "Importing users..." + puts '', "Importing users..." users = mysql_stream <<-SQL SELECT u.userid, username, email, joindate, birthday, ipaddress, u.usergroupid, bandate, liftdate @@ -133,7 +145,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_user_emails - puts "Importing user emails..." + puts '', "Importing user emails..." users = mysql_stream <<-SQL SELECT u.userid, email, joindate @@ -143,17 +155,31 @@ class BulkImport::VBulletin < BulkImport::Base SQL create_user_emails(users) do |row| + user_id, email = row[0 .. 1] + + @user_ids_by_email[email.downcase] ||= [] + user_ids = @user_ids_by_email[email.downcase] << user_id + + if user_ids.count > 1 + # fudge email to avoid conflicts; accounts from the 2nd and on will later be merged back into the first + # NOTE: gsub! is used to avoid creating a new (frozen) string + email.gsub!(/^/, SecureRandom.hex) + end + { - imported_id: row[0], - imported_user_id: row[0], - email: row[1], + imported_id: user_id, + imported_user_id: user_id, + email: email, created_at: Time.zone.at(row[2]) } end + + # for debugging purposes; not used operationally + save_duplicated_users end def import_user_stats - puts "Importing user stats..." + puts '', "Importing user stats..." users = mysql_stream <<-SQL SELECT u.userid, joindate, posts, COUNT(t.threadid) AS threads, p.dateline @@ -186,7 +212,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_group_users - puts "Importing group users..." + puts '', "Importing group users..." group_users = mysql_stream <<-SQL SELECT usergroupid, userid @@ -203,7 +229,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_user_passwords - puts "Importing user passwords..." + puts '', "Importing user passwords..." user_passwords = mysql_stream <<-SQL SELECT userid, password @@ -221,7 +247,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_user_salts - puts "Importing user salts..." + puts '', "Importing user salts..." user_salts = mysql_stream <<-SQL SELECT userid, salt @@ -240,7 +266,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_user_profiles - puts "Importing user profiles..." + puts '', "Importing user profiles..." user_profiles = mysql_stream <<-SQL SELECT userid, homepage, profilevisits @@ -259,14 +285,32 @@ class BulkImport::VBulletin < BulkImport::Base end def import_categories - puts "Importing categories..." + puts '', "Importing categories..." categories = mysql_query(<<-SQL - SELECT forumid, parentid, title, description, displayorder - FROM #{TABLE_PREFIX}forum - WHERE forumid > #{@last_imported_category_id} - ORDER BY forumid - SQL + select + forumid, + parentid, + case + when forumid in ( + select distinct forumid from ( + select forumid, title, count(title) + from forum + group by replace(replace(title, ':', ''), '&', '') + having count(title) > 1 + ) as duplicated_forum_ids + ) + then + -- deduplicate by fudging the title; categories will needed to be manually merged later + concat(title, '_DUPLICATE_', forumid) + else + title + end as title, + description, + displayorder + from forum + order by forumid + SQL ).to_a return if categories.empty? @@ -283,7 +327,7 @@ class BulkImport::VBulletin < BulkImport::Base end end - puts "Importing parent categories..." + puts '', "Importing parent categories..." create_categories(parent_categories) do |row| { imported_id: row[0], @@ -293,7 +337,7 @@ class BulkImport::VBulletin < BulkImport::Base } end - puts "Importing children categories..." + puts '', "Importing children categories..." create_categories(children_categories) do |row| { imported_id: row[0], @@ -306,7 +350,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_topics - puts "Importing topics..." + puts '', "Importing topics..." topics = mysql_stream <<-SQL SELECT threadid, title, forumid, postuserid, open, dateline, views, visible, sticky @@ -337,7 +381,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_posts - puts "Importing posts..." + puts '', "Importing posts..." posts = mysql_stream <<-SQL SELECT postid, p.threadid, parentid, userid, p.dateline, p.visible, pagetext @@ -371,7 +415,7 @@ class BulkImport::VBulletin < BulkImport::Base def import_likes return unless @has_post_thanks - puts "Importing likes..." + puts '', "Importing likes..." @imported_likes = Set.new @last_imported_post_id = 0 @@ -400,7 +444,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_private_topics - puts "Importing private topics..." + puts '', "Importing private topics..." @imported_topics = {} @@ -429,7 +473,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_topic_allowed_users - puts "Importing topic allowed users..." + puts '', "Importing topic allowed users..." allowed_users = Set.new @@ -456,7 +500,7 @@ class BulkImport::VBulletin < BulkImport::Base end def import_private_posts - puts "Importing private posts..." + puts '', "Importing private posts..." posts = mysql_stream <<-SQL SELECT pmtextid, title, fromuserid, touserarray, dateline, message @@ -485,35 +529,27 @@ class BulkImport::VBulletin < BulkImport::Base def create_permalink_file puts '', 'Creating Permalink File...', '' - id_mapping = [] + total = Topic.listable_topics.count + start = Time.now - Topic.listable_topics.find_each do |topic| - pcf = topic.first_post.custom_fields - if pcf && pcf["import_id"] - id = pcf["import_id"].split('-').last - id_mapping.push("XXX#{id} YYY#{topic.id}") - end - end + i = 0 + File.open(File.expand_path("../vb_map.csv", __FILE__), "w") do |f| + Topic.listable_topics.find_each do |topic| + i += 1 + pcf = topic.posts.includes(:_custom_fields).where(post_number: 1).first.custom_fields + if pcf && pcf["import_id"] + id = pcf["import_id"].split('-').last - # Category.find_each do |cat| - # ccf = cat.custom_fields - # if ccf && ccf["import_id"] - # id = ccf["import_id"].to_i - # id_mapping.push("/forumdisplay.php?#{id} http://forum.quartertothree.com#{cat.url}") - # end - # end - - CSV.open(File.expand_path("../vb_map.csv", __FILE__), "w") do |csv| - id_mapping.each do |value| - csv << [value] + f.print [ "XXX#{id} YYY#{topic.id}" ].to_csv + print "\r%7d/%7d - %6d/sec" % [i, total, i.to_f / (Time.now - start)] if i % 5000 == 0 + end end end end # find the uploaded file information from the db def find_upload(post, attachment_id) - sql = "SELECT a.attachmentid attachment_id, a.userid user_id, a.filename filename, - a.filedata filedata, a.extension extension + sql = "SELECT a.attachmentid attachment_id, a.userid user_id, a.filename filename FROM #{TABLE_PREFIX}attachment a WHERE a.attachmentid = #{attachment_id}" results = mysql_query(sql) @@ -538,7 +574,7 @@ class BulkImport::VBulletin < BulkImport::Base upload = create_upload(post.user.id, filename, real_filename) - if upload.nil? || !upload.valid? + if upload.nil? || upload.errors.any? puts "Upload not valid :(" puts upload.errors.inspect if upload return @@ -556,15 +592,7 @@ class BulkImport::VBulletin < BulkImport::Base RateLimiter.disable current_count = 0 - - total_count = mysql_query(<<-SQL - SELECT COUNT(p.postid) count - FROM #{TABLE_PREFIX}post p - JOIN #{TABLE_PREFIX}thread t ON t.threadid = p.threadid - WHERE t.firstpostid <> p.postid - SQL - ).first[0].to_i - + total_count = Post.count success_count = 0 fail_count = 0 @@ -677,6 +705,54 @@ class BulkImport::VBulletin < BulkImport::Base end end + def merge_duplicated_users + count = 0 + total_count = 0 + + duplicated = {} + @user_ids_by_email. + select { |e, ids| ids.count > 1 }. + each_with_index do |(email, ids), i| + duplicated[email] = [ ids, i ] + count += 1 + total_count += ids.count + end + + puts '', "Merging #{total_count} duplicated users across #{count} distinct emails..." + + start = Time.now + + Parallel.each duplicated do |email, (user_ids, i)| + # nothing to do about these - they will remain a randomized hex string + next unless email.presence + + # queried one by one to ensure ordering + first, *rest = user_ids.map do |id| + UserCustomField.includes(:user).find_by!(name: 'import_id', value: id).user + end + + rest.each do |dup| + UserMerger.new(dup, first).merge! + first.reload + printf '.' + end + + print "\n%6d/%6d - %6d/sec" % [i, count, i.to_f / (Time.now - start)] if i % 10 == 0 + end + + puts + end + + def save_duplicated_users + File.open('duplicated_users.json', 'w+') do |f| + f.puts @user_ids_by_email.to_json + end + end + + def read_duplicated_users + @user_ids_by_email = JSON.parse File.read('duplicated_users.json') + end + def extract_pm_title(title) normalize_text(title).scrub.gsub(/^Re\s*:\s*/i, "") end diff --git a/script/import_scripts/vbulletin.rb b/script/import_scripts/vbulletin.rb index 30bfcfc1d1..90182e19b9 100644 --- a/script/import_scripts/vbulletin.rb +++ b/script/import_scripts/vbulletin.rb @@ -590,15 +590,7 @@ class ImportScripts::VBulletin < ImportScripts::Base end current_count = 0 - - total_count = mysql_query(<<-SQL - SELECT COUNT(postid) count - FROM #{TABLE_PREFIX}post p - JOIN #{TABLE_PREFIX}thread t ON t.threadid = p.threadid - WHERE t.firstpostid <> p.postid - SQL - ).first["count"] - + total_count = Post.count success_count = 0 fail_count = 0