Compare commits

...
This repository has been archived on 2023-03-18. You can view files and clone it, but cannot push or open issues or pull requests.

22 Commits

Author SHA1 Message Date
Gerhard Schlager
c7ddb4f196 Quick & dirty rake task for generating avatars from SSO 2021-12-13 16:22:22 +01:00
Gerhard Schlager
c0ad2adeef Merge remote-tracking branch 'origin/main' into generic-import 2021-12-13 12:52:57 +01:00
Gerhard Schlager
3ae2011e47 Merge branch 'main' into generic-import 2021-12-01 15:31:44 +01:00
Gerhard Schlager
53dcf67362 First post is now stored in the posts table 2021-11-25 16:44:38 +01:00
Gerhard Schlager
a6c7b9949b No rate limit? 2021-11-18 21:27:57 +01:00
Gerhard Schlager
ae9db01c76 Close topics [skip ci] 2021-11-17 17:16:08 +01:00
Gerhard Schlager
0098836e10 Merge branch 'main' into generic-import 2021-11-17 16:56:53 +01:00
Gerhard Schlager
47ea61c36c Fix more problems 2021-11-17 16:56:23 +01:00
Gerhard Schlager
961b5ab848 Fix problems 2021-11-17 01:36:38 +01:00
Gerhard Schlager
380424baef Try to make bulk imports work with strings as import_id 2021-11-17 00:49:20 +01:00
Gerhard Schlager
7376205a74 Trying to get the generic bulk import working 2021-11-17 00:11:22 +01:00
Gerhard Schlager
9e93f089c1 Download avatars from URL in SSO record and create missing user profiles 2021-11-16 17:22:33 +01:00
Gerhard Schlager
d836638758 Import SSO records in generic bulk import 2021-11-16 17:21:37 +01:00
Gerhard Schlager
1c31741a0b Merge branch 'main' into generic-import 2021-11-11 15:24:06 +01:00
Gerhard Schlager
dc821493f8 Create SSO records, import avatars from URL 2021-11-11 00:21:56 +01:00
Gerhard Schlager
29b431cff2 Merge branch 'main' into generic-import 2021-11-09 15:30:43 +01:00
Gerhard Schlager
9e0ac2cc92 WIP 2021-10-31 15:48:26 +01:00
Gerhard Schlager
fd8e8e362e WIP 2021-10-31 15:48:25 +01:00
Gerhard Schlager
4d69eae996 Allow importing of users without full name 2021-10-31 15:48:24 +01:00
Gerhard Schlager
59223b283a WIP bulk importer 2021-10-31 15:48:21 +01:00
Gerhard Schlager
12295e1f7a Small action posts weren't found on incremental import 2021-10-31 15:47:29 +01:00
Gerhard Schlager
ea204ac12f First attempt to create a generic import script 2021-10-31 15:47:28 +01:00
11 changed files with 844 additions and 32 deletions

View File

@ -263,3 +263,6 @@ gem 'colored2', require: false
gem 'maxminddb'
gem 'rails_failover', require: false
gem 'redcarpet'
gem "sqlite3"

View File

@ -345,6 +345,7 @@ GEM
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
rchardet (1.8.0)
redcarpet (3.5.1)
redis (4.5.1)
redis-namespace (1.8.1)
redis (>= 3.0.4)
@ -446,6 +447,7 @@ GEM
actionpack (>= 5.2)
activesupport (>= 5.2)
sprockets (>= 3.0.0)
sqlite3 (1.4.2)
sshkey (2.0.0)
stackprof (0.2.17)
test-prof (1.0.7)
@ -577,6 +579,7 @@ DEPENDENCIES
rb-fsevent
rbtrace
rchardet
redcarpet
redis
redis-namespace
rinku
@ -601,6 +604,7 @@ DEPENDENCIES
simplecov
sprockets (= 3.7.2)
sprockets-rails
sqlite3
sshkey
stackprof
test-prof
@ -614,4 +618,4 @@ DEPENDENCIES
yaml-lint
BUNDLED WITH
2.2.26
2.2.29

View File

@ -902,7 +902,8 @@ class Topic < ActiveRecord::Base
silent: opts[:silent],
skip_validations: true,
custom_fields: opts[:custom_fields],
import_mode: opts[:import_mode])
import_mode: opts[:import_mode],
created_at: opts[:created_at])
if (new_post = creator.create) && new_post.present?
increment!(:moderator_posts_count) if new_post.persisted?

View File

@ -107,7 +107,8 @@ class UserAvatar < ActiveRecord::Base
avatar_url,
max_file_size: SiteSetting.max_image_size_kb.kilobytes,
tmp_file_name: "sso-avatar",
follow_redirect: true
follow_redirect: true,
skip_rate_limit: !!options&.fetch(:skip_rate_limit)
)
return unless tempfile

View File

@ -85,7 +85,8 @@ class UserProfile < ActiveRecord::Base
background_url,
max_file_size: SiteSetting.max_image_size_kb.kilobytes,
tmp_file_name: "sso-profile-background",
follow_redirect: true
follow_redirect: true,
skip_rate_limit: true
)
return unless tempfile

View File

@ -11,6 +11,7 @@ task "import:ensure_consistency" => :environment do
insert_topic_views
insert_user_actions
insert_user_options
insert_user_profiles
insert_user_stats
insert_user_visits
insert_draft_sequences
@ -187,6 +188,17 @@ def insert_user_options
SQL
end
def insert_user_profiles
log "Inserting user profiles..."
DB.exec <<-SQL
INSERT INTO user_profiles (user_id)
SELECT id
FROM users
ON CONFLICT DO NOTHING
SQL
end
def insert_user_stats
log "Inserting user stats..."
@ -520,3 +532,73 @@ task "import:update_first_post_created_at" => :environment do
log "Done"
end
desc "Update avatars from external_avatar_url in SSO records"
task "import:update_avatars_from_sso" => :environment do
log "Updating avatars from SSO records"
sql = <<~SQL
SELECT user_id, external_avatar_url
FROM single_sign_on_records s
WHERE NOT EXISTS (
SELECT 1
FROM user_avatars a
WHERE a.user_id = s.user_id
)
SQL
queue = SizedQueue.new(1000)
threads = []
threads << Thread.new do ||
DB.query_each(sql) do |row|
queue << { user_id: row.user_id, url: row.external_avatar_url }
end
queue.close
end
max_count = DB.query_single(<<~SQL).first
SELECT COUNT(*)
FROM single_sign_on_records s
WHERE NOT EXISTS (
SELECT 1
FROM user_avatars a
WHERE a.user_id = s.user_id
)
SQL
status_queue = Queue.new
status_thread = Thread.new do
error_count = 0
current_count = 0
while !(status = status_queue.pop).nil?
error_count += 1 if !status
current_count += 1
print "\r%7d / %7d (%d errors)" % [current_count, max_count, error_count]
end
end
20.times do
threads << Thread.new do
while row = queue.pop
begin
UserAvatar.import_url_for_user(
row[:url],
User.find(row[:user_id]),
override_gravatar: true,
skip_rate_limit: true
)
status_queue << true
rescue
status_queue << false
end
end
end
end
threads.each(&:join)
status_queue.close
status_thread.join
end

View File

@ -128,16 +128,16 @@ class BulkImport::Base
end
def imported_ids(name)
map = []
map = {}
ids = []
@raw_connection.send_query("SELECT value, #{name}_id FROM #{name}_custom_fields WHERE name = 'import_id'")
@raw_connection.set_single_row_mode
@raw_connection.get_result.stream_each do |row|
id = row["value"].to_i
id = row["value"]
ids << id
map[id] = row["#{name}_id"].to_i
map[id] = row["#{name}_id"]
end
@raw_connection.get_result
@ -145,6 +145,7 @@ class BulkImport::Base
[map, ids]
end
# FIXME Calculating last_imported_*_id doesn't work for strings!
def load_imported_ids
puts "Loading imported group ids..."
@groups, imported_group_ids = imported_ids("group")
@ -172,7 +173,7 @@ class BulkImport::Base
def last_id(klass)
# the first record created will have id of this value + 1
[klass.unscoped.maximum(:id) || 0, 0].max
klass.unscoped.maximum(:id) || 0
end
def load_values(name, column, size)
@ -198,12 +199,15 @@ class BulkImport::Base
puts "Loading users indexes..."
@last_user_id = last_id(User)
@last_user_email_id = last_id(UserEmail)
@emails = User.unscoped.joins(:user_emails).pluck(:"user_emails.email", :"user_emails.user_id").to_h
@last_sso_record_id = last_id(SingleSignOnRecord)
@emails = UserEmail.pluck(:email, :user_id).to_h
@external_ids = SingleSignOnRecord.pluck(:external_id, :user_id).to_h
@usernames_lower = User.unscoped.pluck(:username_lower).to_set
@mapped_usernames = UserCustomField.joins(:user).where(name: "import_username").pluck("user_custom_fields.value", "users.username").to_h
puts "Loading categories indexes..."
@last_category_id = last_id(Category)
@highest_category_position = Category.unscoped.maximum(:position) || 0
@category_names = Category.unscoped.pluck(:parent_category_id, :name).map { |pci, name| "#{pci}-#{name}" }.to_set
puts "Loading topics indexes..."
@ -232,6 +236,7 @@ class BulkImport::Base
@raw_connection.exec("SELECT setval('#{Group.sequence_name}', #{@last_group_id})") if @last_group_id > 0
@raw_connection.exec("SELECT setval('#{User.sequence_name}', #{@last_user_id})") if @last_user_id > 0
@raw_connection.exec("SELECT setval('#{UserEmail.sequence_name}', #{@last_user_email_id})") if @last_user_email_id > 0
@raw_connection.exec("SELECT setval('#{SingleSignOnRecord.sequence_name}', #{@last_sso_record_id})") if @last_sso_record_id > 0
@raw_connection.exec("SELECT setval('#{Category.sequence_name}', #{@last_category_id})") if @last_category_id > 0
@raw_connection.exec("SELECT setval('#{Topic.sequence_name}', #{@last_topic_id})") if @last_topic_id > 0
@raw_connection.exec("SELECT setval('#{Post.sequence_name}', #{@last_post_id})") if @last_post_id > 0
@ -239,23 +244,23 @@ class BulkImport::Base
end
def group_id_from_imported_id(id)
@groups[id.to_i]
@groups[id]
end
def user_id_from_imported_id(id)
@users[id.to_i]
@users[id]
end
def category_id_from_imported_id(id)
@categories[id.to_i]
@categories[id]
end
def topic_id_from_imported_id(id)
@topics[id.to_i]
@topics[id]
end
def post_id_from_imported_id(id)
@posts[id.to_i]
@posts[id]
end
def post_number_from_imported_id(id)
@ -293,6 +298,12 @@ class BulkImport::Base
user_id location website bio_raw bio_cooked views
}
USER_SSO_RECORD_COLUMNS ||= %i{
id user_id external_id last_payload created_at updated_at external_username
external_email external_name external_avatar_url external_profile_background_url
external_card_background_url
}
GROUP_USER_COLUMNS ||= %i{
group_id user_id created_at updated_at
}
@ -352,6 +363,9 @@ class BulkImport::Base
def create_user_profiles(rows, &block)
create_records(rows, "user_profile", USER_PROFILE_COLUMNS, &block)
end
def create_single_sign_on_records(rows, &block)
create_records(rows, "single_sign_on_record", USER_SSO_RECORD_COLUMNS, &block)
end
def create_group_users(rows, &block)
create_records(rows, "group_user", GROUP_USER_COLUMNS, &block)
end
@ -375,7 +389,7 @@ class BulkImport::Base
end
def process_group(group)
@groups[group[:imported_id].to_i] = group[:id] = @last_group_id += 1
@groups[group[:imported_id]] = group[:id] = @last_group_id += 1
group[:name] = fix_name(group[:name])
@ -399,13 +413,22 @@ class BulkImport::Base
if existing_user_id = @emails[user[:email]]
@pre_existing_user_ids << existing_user_id
@users[user[:imported_id].to_i] = existing_user_id
@users[user[:imported_id]] = existing_user_id
user[:skip] = true
return user
end
end
@users[user[:imported_id].to_i] = user[:id] = @last_user_id += 1
if user[:external_id].present?
if existing_user_id = @external_ids[user[:external_id]]
@pre_existing_user_ids << existing_user_id
@users[user[:imported_id]] = existing_user_id
user[:skip] = true
return user
end
end
@users[user[:imported_id]] = user[:id] = @last_user_id += 1
imported_username = user[:username].dup
@ -435,7 +458,7 @@ class BulkImport::Base
end
def process_user_email(user_email)
user_id = @users[user_email[:imported_user_id].to_i]
user_id = @users[user_email[:imported_user_id]]
return { skip: true } if @pre_existing_user_ids.include?(user_id)
user_email[:id] = @last_user_email_id += 1
@ -444,8 +467,7 @@ class BulkImport::Base
user_email[:created_at] ||= NOW
user_email[:updated_at] ||= user_email[:created_at]
user_email[:email] ||= random_email
user_email[:email].downcase!
user_email[:email] = user_email[:email]&.downcase || random_email
# unique email
user_email[:email] = random_email until user_email[:email] =~ EmailValidator.email_regex && !@emails.has_key?(user_email[:email])
@ -453,7 +475,7 @@ class BulkImport::Base
end
def process_user_stat(user_stat)
user_id = @users[user_stat[:imported_user_id].to_i]
user_id = @users[user_stat[:imported_user_id]]
return { skip: true } if @pre_existing_user_ids.include?(user_id)
user_stat[:user_id] = user_id
@ -480,6 +502,18 @@ class BulkImport::Base
user_profile
end
def process_single_sign_on_record(sso_record)
user_id = @users[sso_record[:imported_user_id]]
return { skip: true } if @pre_existing_user_ids.include?(user_id)
sso_record[:id] = @last_sso_record_id += 1
sso_record[:user_id] = user_id
sso_record[:last_payload] ||= ""
sso_record[:created_at] = NOW
sso_record[:updated_at] = NOW
sso_record
end
def process_group_user(group_user)
group_user[:created_at] = NOW
group_user[:updated_at] = NOW
@ -487,8 +521,14 @@ class BulkImport::Base
end
def process_category(category)
if category[:existing_id].present?
@categories[category[:imported_id]] = category[:existing_id]
category[:skip] = true
return category
end
category[:id] ||= @last_category_id += 1
@categories[category[:imported_id].to_i] ||= category[:id]
@categories[category[:imported_id]] ||= category[:id]
category[:name] = category[:name][0...50].scrub.strip
# TODO: unique name
category[:name_lower] = category[:name].downcase
@ -497,11 +537,18 @@ class BulkImport::Base
category[:user_id] ||= Discourse::SYSTEM_USER_ID
category[:created_at] ||= NOW
category[:updated_at] ||= category[:created_at]
if category[:position]
@highest_category_position = category[:position] if category[:position] > @highest_category_position
else
category[:position] = @highest_category_position += 1
end
category
end
def process_topic(topic)
@topics[topic[:imported_id].to_i] = topic[:id] = @last_topic_id += 1
@topics[topic[:imported_id]] = topic[:id] = @last_topic_id += 1
topic[:archetype] ||= Archetype.default
topic[:title] = topic[:title][0...255].scrub.strip
topic[:fancy_title] ||= pre_fancy(topic[:title])
@ -519,7 +566,7 @@ class BulkImport::Base
end
def process_post(post)
@posts[post[:imported_id].to_i] = post[:id] = @last_post_id += 1
@posts[post[:imported_id]] = post[:id] = @last_post_id += 1
post[:user_id] ||= Discourse::SYSTEM_USER_ID
post[:last_editor_id] = post[:user_id]
@highest_post_number_by_topic_id[post[:topic_id]] ||= 0

View File

@ -0,0 +1,195 @@
# frozen_string_literal: true
require_relative "base"
require "sqlite3"
class BulkImport::Generic < BulkImport::Base
AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"]
UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"]
def initialize(db_path)
super()
@db = create_connection(db_path)
end
def start
run # will call execute, and then "complete" the migration
# Now that the migration is complete, do some more work:
Discourse::Application.load_tasks
Rake::Task["import:ensure_consistency"].invoke
end
def execute
import_categories
import_users
import_user_emails
import_single_sign_on_records
import_topics
import_posts
end
def import_categories
puts "Importing categories..."
categories = @db.execute(<<~SQL)
WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug,
old_relative_url, existing_id, level, rowid) AS (
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, c.existing_id, 0 AS level, c.ROWID
FROM categories c
WHERE c.parent_category_id IS NULL
UNION
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, c.existing_id, tree.level + 1 AS level, c.ROWID
FROM categories c,
tree
WHERE c.parent_category_id = tree.id
)
SELECT *
FROM tree
ORDER BY level, rowid
SQL
create_categories(categories) do |row|
{
imported_id: row["id"],
existing_id: row["existing_id"],
name: row["name"],
description: row["description"],
parent_category_id: row["parent_category_id"] ? category_id_from_imported_id(row["parent_category_id"]) : nil,
slug: row["slug"]
}
end
end
def import_users
puts "Importing users..."
users = @db.execute(<<~SQL)
SELECT ROWID, *
FROM users
ORDER BY ROWID
SQL
create_users(users) do |row|
sso_record = JSON.parse(row["sso_record"]) if row["sso_record"].present?
{
imported_id: row["id"],
username: row["username"],
email: row["email"],
external_id: sso_record&.fetch("external_id"),
created_at: to_datetime(row["created_at"])
}
end
end
def import_user_emails
puts '', 'Importing user emails...'
users = @db.execute(<<~SQL)
SELECT ROWID, id, email, created_at
FROM users
ORDER BY ROWID
SQL
create_user_emails(users) do |row|
{
# FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id"
imported_id: row["id"],
imported_user_id: row["id"],
email: row["email"],
created_at: to_datetime(row["created_at"])
}
end
end
def import_single_sign_on_records
puts '', 'Importing SSO records...'
users = @db.execute(<<~SQL)
SELECT ROWID, id, sso_record
FROM users
WHERE sso_record IS NOT NULL
ORDER BY ROWID
SQL
create_single_sign_on_records(users) do |row|
sso_record = JSON.parse(row["sso_record"], symbolize_names: true)
# FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id"
sso_record[:imported_id] = row["id"]
sso_record[:imported_user_id] = row["id"]
sso_record
end
end
def import_topics
puts "Importing topics..."
topics = @db.execute(<<~SQL)
SELECT ROWID, *
FROM topics
ORDER BY ROWID
SQL
create_topics(topics) do |row|
{
imported_id: row["id"],
title: row["title"],
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
category_id: category_id_from_imported_id(row["category_id"]),
closed: to_boolean(row["closed"])
}
end
end
def import_posts
puts "Importing posts..."
posts = @db.execute(<<~SQL)
SELECT ROWID, *
FROM posts
ORDER BY topic_id, post_number
SQL
create_posts(posts) do |row|
next if row["raw"].blank?
next unless topic_id = topic_id_from_imported_id(row["topic_id"])
{
imported_id: row["id"],
topic_id: topic_id,
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
raw: row["raw"]
}
end
end
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60000 # 60 seconds
sqlite.auto_vacuum = "full"
sqlite.foreign_keys = true
sqlite.journal_mode = "wal"
sqlite.synchronous = "normal"
sqlite
end
def to_date(text)
text.present? ? Date.parse(text) : nil
end
def to_datetime(text)
text.present? ? DateTime.parse(text) : nil
end
def to_boolean(value)
value == 1
end
end
BulkImport::Generic.new(ARGV.first).start

View File

@ -85,7 +85,8 @@ class ImportScripts::Base
clean_up_inactive_users_after_days: 0,
clean_up_unused_staged_users_after_days: 0,
clean_up_uploads: false,
clean_orphan_uploads_grace_period_hours: 1800
clean_orphan_uploads_grace_period_hours: 1800,
full_name_required: false
}
end
@ -303,8 +304,12 @@ class ImportScripts::Base
opts.delete(:id)
merge = opts.delete(:merge)
post_create_action = opts.delete(:post_create_action)
sso_record = opts.delete(:sso_record)
existing = find_existing_user(opts[:email], opts[:username])
existing = find_existing_user(opts[:email], sso_record&.fetch("external_id"))
return existing if existing
existing = User.where(username: opts[:username]).first
return existing if existing && (merge || existing.custom_fields["import_id"].to_s == import_id.to_s)
bio_raw = opts.delete(:bio_raw)
@ -390,14 +395,47 @@ class ImportScripts::Base
end
u.create_single_sign_on_record!(sso_record) if sso_record
post_create_action.try(:call, u) if u.persisted?
u # If there was an error creating the user, u.errors has the messages
end
def find_existing_user(email, username)
def find_existing_user(email, external_id)
# Force the use of the index on the 'user_emails' table
UserEmail.where("lower(email) = ?", email.downcase).first&.user || User.where(username: username).first
user = UserEmail.where("lower(email) = ?", email.downcase).first&.user
user ||= SingleSignOnRecord.where(external_id: external_id).first&.user if external_id.present?
user
end
def create_group_members(results, opts = {})
created = 0
total = opts[:total] || results.count
group = nil
group_member_ids = []
results.each_with_index do |result, index|
member = yield(result)
if !group || group.id != member[:group_id]
if group_member_ids.any?
group.bulk_add(group_member_ids)
group_member_ids = []
end
group = Group.find(member[:group_id])
end
group_member_ids << member[:user_id]
group.bulk_add(group_member_ids) if index == results.size - 1 && group_member_ids.any?
created += 1
print_status(created + (opts[:offset] || 0), total, get_start_time("group_members"))
end
created
end
def created_category(category)

View File

@ -17,15 +17,15 @@ module ImportScripts
tmp.unlink rescue nil
end
def create_avatar(user, avatar_path)
def create_avatar(user, avatar_path, origin_url = nil)
tempfile = copy_to_tempfile(avatar_path)
filename = "avatar#{File.extname(avatar_path)}"
upload = UploadCreator.new(tempfile, filename, type: "avatar").create_for(user.id)
upload = UploadCreator.new(tempfile, filename, type: "avatar", origin: origin_url).create_for(user.id)
if upload.present? && upload.persisted?
user.create_user_avatar
user.user_avatar.update(custom_upload_id: upload.id)
user.update(uploaded_avatar_id: upload.id)
user.user_avatar.update!(custom_upload_id: upload.id)
user.update!(uploaded_avatar_id: upload.id)
else
STDERR.puts "Failed to upload avatar for user #{user.username}: #{avatar_path}"
STDERR.puts upload.errors.inspect if upload

View File

@ -0,0 +1,440 @@
# frozen_string_literal: true
require_relative 'base'
require 'sqlite3'
class ImportScripts::Generic < ImportScripts::Base
BATCH_SIZE = 1000
AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"]
UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"]
def initialize(db_path)
super()
@db = create_connection(db_path)
end
def execute
import_users
import_groups
import_group_members
import_categories
import_topics
import_posts
mark_topics_as_solved
end
def import_users
log_action "Creating users"
total_count = count_users
last_row_id = -1
batches do |offset|
rows, last_row_id = fetch_users(last_row_id)
break if rows.empty?
next if all_records_exist?(:users, rows.map { |row| row["id"] })
create_users(rows, total: total_count, offset: offset) do |row|
if row["sso_record"].present?
sso_record = JSON.parse(row["sso_record"])
sso_record[:last_payload] = ""
end
{
id: row["id"],
username: row["username"],
created_at: to_datetime(row["created_at"]),
name: row["name"],
email: row["email"] || fake_email,
last_seen_at: to_datetime(row[:"last_seen_at"]) || to_datetime(row["created_at"]),
bio_raw: row["bio"],
location: row["location"],
admin: to_boolean(row["admin"]),
moderator: to_boolean(row["moderator"]),
sso_record: sso_record,
post_create_action: proc do |user|
create_avatar(user, row["avatar_path"], row["avatar_url"])
suspend_user(user, row["suspension"])
end
}
end
end
end
def create_avatar(user, avatar_path, avatar_url)
if avatar_path.present?
avatar_path = File.join(AVATAR_DIRECTORY, avatar_path)
if File.exist?(avatar_path)
@uploader.create_avatar(user, avatar_path)
else
STDERR.puts "Could not find avatar: #{avatar_path}"
end
elsif avatar_url.present?
UserAvatar.import_url_for_user(avatar_url, user, override_gravatar: true)
end
end
def suspend_user(user, suspension)
return if suspension.blank?
suspension = JSON.parse(suspension)
user.suspended_at = suspension["suspended_at"] || user.last_seen_at || Time.now
user.suspended_till = suspension["suspended_till"] || 200.years.from_now
user.save!
if suspension["reason"].present?
StaffActionLogger.new(Discourse.system_user).log_user_suspend(user, suspension["reason"])
end
end
def count_users
@db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM users
SQL
end
def fetch_users(last_row_id)
query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM users
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
end
def import_groups
log_action "Creating groups"
rows = @db.execute(<<~SQL)
SELECT *
FROM groups
ORDER BY ROWID
SQL
create_groups(rows) do |row|
{
id: row["id"],
name: row["name"]
}
end
end
def import_group_members
log_action "Adding group members"
total_count = @db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM group_members
SQL
last_row_id = -1
batches do |offset|
rows, last_row_id = query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM group_members
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
break if rows.empty?
create_group_members(rows, total: total_count, offset: offset) do |row|
{
group_id: group_id_from_imported_group_id(row["group_id"]),
user_id: user_id_from_imported_user_id(row["user_id"])
}
end
end
end
def import_categories
log_action "Creating categories"
rows = @db.execute(<<~SQL)
WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug,
old_relative_url, level, rowid) AS (
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, 0 AS level, c.ROWID
FROM categories c
WHERE c.parent_category_id IS NULL
UNION
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, tree.level + 1 AS level, c.ROWID
FROM categories c,
tree
WHERE c.parent_category_id = tree.id
)
SELECT *
FROM tree
ORDER BY level, rowid
SQL
create_categories(rows) do |row|
{
id: row["id"],
name: row["name"],
description: row["description"],
color: row["color"],
text_color: row["text_color"],
read_restricted: row["read_restricted"],
parent_category_id: category_id_from_imported_category_id(row["parent_category_id"]),
post_create_action: proc do |category|
create_permalink(row["old_relative_url"], category_id: category.id)
end
}
end
end
def import_topics
log_action "Creating topics"
total_count = count_topics
last_row_id = -1
batches do |offset|
rows, last_row_id = fetch_topics(last_row_id)
break if rows.empty?
next if all_records_exist?(:topics, rows.map { |row| row["id"] })
create_posts(rows, total: total_count, offset: offset) do |row|
# TODO add more columns
mapped = {
id: row["id"],
title: row["title"],
created_at: to_datetime(row["created_at"]),
raw: process_raw(row),
category: category_id_from_imported_category_id(row["category_id"]),
user_id: user_id_from_imported_user_id(row["user_id"]) || Discourse::SYSTEM_USER_ID,
post_create_action: proc do |post|
add_tags(post, row["tags"])
create_permalink(row["old_relative_url"], topic_id: post.topic.id)
end
}
if row["private_message"].present?
pm = JSON.parse(row["private_message"])
target_usernames = ([row["user_id"]] + pm["user_ids"]).map { |id| find_user_by_import_id(id)&.username }
target_group_names = pm["special_group_names"] + pm["group_ids"].map { |id| find_group_by_import_id(id)&.name }
mapped[:archetype] = Archetype.private_message
mapped[:target_usernames] = target_usernames.compact.uniq.join(",")
mapped[:target_group_names] = target_group_names.compact.uniq.join(",")
end
mapped
end
end
end
def count_topics
@db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM topics
SQL
end
def fetch_topics(last_row_id)
query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM topics
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
end
def process_raw(row)
raw = row["raw"]
upload_ids = row["upload_ids"]
return raw if upload_ids.blank? || raw.blank?
joined_upload_ids = JSON.parse(upload_ids).map { |id| SQLite3::Database.quote(id) }.join(",")
files = @db.execute("SELECT * FROM uploads WHERE id IN (?)", joined_upload_ids)
files.each do |file|
user_id = user_id_from_imported_user_id(file["user_id"]) || Discourse::SYSTEM_USER_ID
path = File.join(UPLOAD_DIRECTORY, file["path"])
upload = create_upload(user_id, path, file["filename"])
if upload.present? && upload.persisted?
raw.gsub!("[upload|#{file['id']}]", @uploader.html_for_upload(upload, file["filename"]))
end
end
raw
end
def add_tags(post, tags)
tag_names = JSON.parse(tags) if tags
DiscourseTagging.tag_topic_by_names(post.topic, staff_guardian, tag_names) if tag_names.present?
end
def create_permalink(url, topic_id: nil, post_id: nil, category_id: nil, tag_id: nil)
return if url.blank? || Permalink.exists?(url: url)
Permalink.create(
url: url,
topic_id: topic_id,
post_id: post_id,
category_id: category_id,
tag_id: tag_id
)
end
def import_posts
log_action "Creating posts"
total_count = count_posts
last_row_id = -1
batches do |offset|
rows, last_row_id = fetch_posts(last_row_id)
break if rows.empty?
next if all_records_exist?(:posts, rows.map { |row| row["id"] })
create_posts(rows, total: total_count, offset: offset) do |row|
topic = topic_lookup_from_imported_post_id(row["topic_id"])
next if !topic
if row["small_action"]
create_small_action(row, topic)
next
end
reply_to = topic_lookup_from_imported_post_id(row["reply_to_post_id"]) if row["reply_to_post_id"]
reply_to = nil if reply_to&.dig(:topic_id) != topic[:topic_id]
mapped = {
id: row["id"],
topic_id: topic[:topic_id],
created_at: to_datetime(row["created_at"]),
raw: process_raw(row),
reply_to_post_number: reply_to&.dig(:post_number),
user_id: user_id_from_imported_user_id(row["user_id"]) || Discourse::SYSTEM_USER_ID,
post_create_action: proc do |post|
create_permalink(row["old_relative_url"], post_id: post.id)
end
}
mapped[:post_type] = Post.types[:whisper] if to_boolean(row["whisper"])
mapped[:custom_fields] = { is_accepted_answer: "true" } if to_boolean(row["accepted_answer"])
mapped
end
end
end
def count_posts
@db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM posts
SQL
end
def fetch_posts(last_row_id)
query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM posts
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
end
def mark_topics_as_solved
log_action "Marking topics as solved..."
DB.exec <<~SQL
INSERT INTO topic_custom_fields (name, value, topic_id, created_at, updated_at)
SELECT 'accepted_answer_post_id', pcf.post_id, p.topic_id, p.created_at, p.created_at
FROM post_custom_fields pcf
JOIN posts p ON p.id = pcf.post_id
WHERE pcf.name = 'is_accepted_answer' AND pcf.value = 'true'
AND NOT EXISTS (
SELECT 1
FROM topic_custom_fields x
WHERE x.topic_id = p.topic_id AND x.name = 'accepted_answer_post_id'
)
SQL
end
def create_small_action(row, topic)
small_action = JSON.parse(row["small_action"])
case small_action["type"]
when "split_topic"
create_split_topic_small_action(row, small_action, topic)
else
raise "Unknown small action type: #{small_action['type']}"
end
end
def create_split_topic_small_action(row, small_action, original_topic)
destination_topic = topic_lookup_from_imported_post_id(small_action["destination_topic_id"])
destination_topic = Topic.find_by(id: destination_topic[:topic_id]) if destination_topic
return if !destination_topic
original_topic = Topic.find_by(id: original_topic[:topic_id]) if original_topic
return if !original_topic
move_type = small_action['move_type']
message = I18n.with_locale(SiteSetting.default_locale) do
I18n.t(
"move_posts.#{move_type}_moderator_post",
count: small_action['post_count'],
topic_link: "[#{destination_topic.title}](#{destination_topic.relative_url})"
)
end
post_type = move_type.include?("message") ? Post.types[:whisper] : Post.types[:small_action]
original_topic.add_moderator_post(
Discourse.system_user, message,
post_type: post_type,
action_code: "split_topic",
import_mode: true,
created_at: to_datetime(row["created_at"]),
custom_fields: { import_id: row["id"] }
)
end
def query_with_last_rowid(sql, last_row_id)
rows = @db.execute(sql, last_row_id: last_row_id)
[rows, rows.last&.dig("rowid")]
end
def to_date(text)
text.present? ? Date.parse(text) : nil
end
def to_datetime(text)
text.present? ? DateTime.parse(text) : nil
end
def to_boolean(value)
value == 1
end
def log_action(text)
puts "", text
end
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60000 # 60 seconds
sqlite.auto_vacuum = "full"
sqlite.foreign_keys = true
sqlite.journal_mode = "wal"
sqlite.synchronous = "normal"
sqlite
end
def batches
super(BATCH_SIZE)
end
def staff_guardian
@staff_guardian ||= Guardian.new(Discourse.system_user)
end
end
ImportScripts::Generic.new(ARGV.first).perform