Compare commits

...
This repository has been archived on 2023-03-18. You can view files and clone it, but cannot push or open issues or pull requests.

34 Commits

Author SHA1 Message Date
Leonardo Mosquera
8c34d937b3
Add support for suspension fields 2023-02-13 23:06:01 +00:00
Leonardo Mosquera
f1a184eaff
WIP - add FIXMEs to fix progress reporting for records without imported_id 2023-02-13 23:05:54 +00:00
Leonardo Mosquera
1630bb28d9
Clarify that import:ensure_consistency will be run
Since the base bulk importer says "now run that task", and it isn't
obvious that it is already run at the end.
2022-11-15 20:56:43 +00:00
Leonardo Mosquera
ae6b038154
Import views for topics 2022-11-15 19:02:18 +00:00
Leonardo Mosquera
648b970bcf
Import user name as well 2022-04-21 23:35:15 +00:00
Leonardo Mosquera
4feba63d15
Import tags as well 2022-04-21 23:35:15 +00:00
cocococosti
19a2904459
Fix queries format 2022-04-21 19:15:42 -04:00
cocococosti
88448c7236
Adding support for User Stats import to the generic importer 2022-04-21 18:56:15 -04:00
cocococosti
a4f84257e4
Adding support to import Admin and Moderators to the generic importer 2022-04-21 18:56:15 -04:00
cocococosti
cdd612b1da
Adding support for PMs import to the generic importer 2022-04-21 18:56:15 -04:00
cocococosti
095dc0019a
Adding fixes to make the generic importer work with the YAFNET converter, and adding support for Likes import 2022-04-21 18:55:33 -04:00
cocococosti
4500419f14
Revert "Try to make bulk imports work with strings as import_id"
This reverts commit 380424baef.
2022-01-24 23:24:26 -04:00
Gerhard Schlager
c7ddb4f196 Quick & dirty rake task for generating avatars from SSO 2021-12-13 16:22:22 +01:00
Gerhard Schlager
c0ad2adeef Merge remote-tracking branch 'origin/main' into generic-import 2021-12-13 12:52:57 +01:00
Gerhard Schlager
3ae2011e47 Merge branch 'main' into generic-import 2021-12-01 15:31:44 +01:00
Gerhard Schlager
53dcf67362 First post is now stored in the posts table 2021-11-25 16:44:38 +01:00
Gerhard Schlager
a6c7b9949b No rate limit? 2021-11-18 21:27:57 +01:00
Gerhard Schlager
ae9db01c76 Close topics [skip ci] 2021-11-17 17:16:08 +01:00
Gerhard Schlager
0098836e10 Merge branch 'main' into generic-import 2021-11-17 16:56:53 +01:00
Gerhard Schlager
47ea61c36c Fix more problems 2021-11-17 16:56:23 +01:00
Gerhard Schlager
961b5ab848 Fix problems 2021-11-17 01:36:38 +01:00
Gerhard Schlager
380424baef Try to make bulk imports work with strings as import_id 2021-11-17 00:49:20 +01:00
Gerhard Schlager
7376205a74 Trying to get the generic bulk import working 2021-11-17 00:11:22 +01:00
Gerhard Schlager
9e93f089c1 Download avatars from URL in SSO record and create missing user profiles 2021-11-16 17:22:33 +01:00
Gerhard Schlager
d836638758 Import SSO records in generic bulk import 2021-11-16 17:21:37 +01:00
Gerhard Schlager
1c31741a0b Merge branch 'main' into generic-import 2021-11-11 15:24:06 +01:00
Gerhard Schlager
dc821493f8 Create SSO records, import avatars from URL 2021-11-11 00:21:56 +01:00
Gerhard Schlager
29b431cff2 Merge branch 'main' into generic-import 2021-11-09 15:30:43 +01:00
Gerhard Schlager
9e0ac2cc92 WIP 2021-10-31 15:48:26 +01:00
Gerhard Schlager
fd8e8e362e WIP 2021-10-31 15:48:25 +01:00
Gerhard Schlager
4d69eae996 Allow importing of users without full name 2021-10-31 15:48:24 +01:00
Gerhard Schlager
59223b283a WIP bulk importer 2021-10-31 15:48:21 +01:00
Gerhard Schlager
12295e1f7a Small action posts weren't found on incremental import 2021-10-31 15:47:29 +01:00
Gerhard Schlager
ea204ac12f First attempt to create a generic import script 2021-10-31 15:47:28 +01:00
17 changed files with 994 additions and 16 deletions

BIN
.DS_Store vendored Normal file

Binary file not shown.

View File

@ -345,6 +345,7 @@ GEM
msgpack (>= 0.4.3)
optimist (>= 3.0.0)
rchardet (1.8.0)
redcarpet (3.5.1)
redis (4.5.1)
redis-namespace (1.8.1)
redis (>= 3.0.4)
@ -446,6 +447,7 @@ GEM
actionpack (>= 5.2)
activesupport (>= 5.2)
sprockets (>= 3.0.0)
sqlite3 (1.4.2)
sshkey (2.0.0)
stackprof (0.2.17)
test-prof (1.0.7)
@ -577,6 +579,7 @@ DEPENDENCIES
rb-fsevent
rbtrace
rchardet
redcarpet
redis
redis-namespace
rinku
@ -601,6 +604,7 @@ DEPENDENCIES
simplecov
sprockets (= 3.7.2)
sprockets-rails
sqlite3
sshkey
stackprof
test-prof
@ -614,4 +618,4 @@ DEPENDENCIES
yaml-lint
BUNDLED WITH
2.2.26
2.2.29

BIN
app/.DS_Store vendored Normal file

Binary file not shown.

BIN
app/assets/.DS_Store vendored Normal file

Binary file not shown.

BIN
app/assets/javascripts/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -902,7 +902,8 @@ class Topic < ActiveRecord::Base
silent: opts[:silent],
skip_validations: true,
custom_fields: opts[:custom_fields],
import_mode: opts[:import_mode])
import_mode: opts[:import_mode],
created_at: opts[:created_at])
if (new_post = creator.create) && new_post.present?
increment!(:moderator_posts_count) if new_post.persisted?

View File

@ -107,7 +107,8 @@ class UserAvatar < ActiveRecord::Base
avatar_url,
max_file_size: SiteSetting.max_image_size_kb.kilobytes,
tmp_file_name: "sso-avatar",
follow_redirect: true
follow_redirect: true,
skip_rate_limit: !!options&.fetch(:skip_rate_limit)
)
return unless tempfile

View File

@ -85,7 +85,8 @@ class UserProfile < ActiveRecord::Base
background_url,
max_file_size: SiteSetting.max_image_size_kb.kilobytes,
tmp_file_name: "sso-profile-background",
follow_redirect: true
follow_redirect: true,
skip_rate_limit: true
)
return unless tempfile

View File

@ -11,6 +11,7 @@ task "import:ensure_consistency" => :environment do
insert_topic_views
insert_user_actions
insert_user_options
insert_user_profiles
insert_user_stats
insert_user_visits
insert_draft_sequences
@ -187,6 +188,17 @@ def insert_user_options
SQL
end
def insert_user_profiles
log "Inserting user profiles..."
DB.exec <<-SQL
INSERT INTO user_profiles (user_id)
SELECT id
FROM users
ON CONFLICT DO NOTHING
SQL
end
def insert_user_stats
log "Inserting user stats..."
@ -520,3 +532,73 @@ task "import:update_first_post_created_at" => :environment do
log "Done"
end
desc "Update avatars from external_avatar_url in SSO records"
task "import:update_avatars_from_sso" => :environment do
log "Updating avatars from SSO records"
sql = <<~SQL
SELECT user_id, external_avatar_url
FROM single_sign_on_records s
WHERE NOT EXISTS (
SELECT 1
FROM user_avatars a
WHERE a.user_id = s.user_id
)
SQL
queue = SizedQueue.new(1000)
threads = []
threads << Thread.new do ||
DB.query_each(sql) do |row|
queue << { user_id: row.user_id, url: row.external_avatar_url }
end
queue.close
end
max_count = DB.query_single(<<~SQL).first
SELECT COUNT(*)
FROM single_sign_on_records s
WHERE NOT EXISTS (
SELECT 1
FROM user_avatars a
WHERE a.user_id = s.user_id
)
SQL
status_queue = Queue.new
status_thread = Thread.new do
error_count = 0
current_count = 0
while !(status = status_queue.pop).nil?
error_count += 1 if !status
current_count += 1
print "\r%7d / %7d (%d errors)" % [current_count, max_count, error_count]
end
end
20.times do
threads << Thread.new do
while row = queue.pop
begin
UserAvatar.import_url_for_user(
row[:url],
User.find(row[:user_id]),
override_gravatar: true,
skip_rate_limit: true
)
status_queue << true
rescue
status_queue << false
end
end
end
end
threads.each(&:join)
status_queue.close
status_thread.join
end

BIN
public/.DS_Store vendored Normal file

Binary file not shown.

BIN
public/images/.DS_Store vendored Normal file

Binary file not shown.

BIN
public/images/emoji/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -128,7 +128,7 @@ class BulkImport::Base
end
def imported_ids(name)
map = []
map = {}
ids = []
@raw_connection.send_query("SELECT value, #{name}_id FROM #{name}_custom_fields WHERE name = 'import_id'")
@ -198,12 +198,15 @@ class BulkImport::Base
puts "Loading users indexes..."
@last_user_id = last_id(User)
@last_user_email_id = last_id(UserEmail)
@emails = User.unscoped.joins(:user_emails).pluck(:"user_emails.email", :"user_emails.user_id").to_h
@last_sso_record_id = last_id(SingleSignOnRecord)
@emails = UserEmail.pluck(:email, :user_id).to_h
@external_ids = SingleSignOnRecord.pluck(:external_id, :user_id).to_h
@usernames_lower = User.unscoped.pluck(:username_lower).to_set
@mapped_usernames = UserCustomField.joins(:user).where(name: "import_username").pluck("user_custom_fields.value", "users.username").to_h
puts "Loading categories indexes..."
@last_category_id = last_id(Category)
@highest_category_position = Category.unscoped.maximum(:position) || 0
@category_names = Category.unscoped.pluck(:parent_category_id, :name).map { |pci, name| "#{pci}-#{name}" }.to_set
puts "Loading topics indexes..."
@ -232,6 +235,7 @@ class BulkImport::Base
@raw_connection.exec("SELECT setval('#{Group.sequence_name}', #{@last_group_id})") if @last_group_id > 0
@raw_connection.exec("SELECT setval('#{User.sequence_name}', #{@last_user_id})") if @last_user_id > 0
@raw_connection.exec("SELECT setval('#{UserEmail.sequence_name}', #{@last_user_email_id})") if @last_user_email_id > 0
@raw_connection.exec("SELECT setval('#{SingleSignOnRecord.sequence_name}', #{@last_sso_record_id})") if @last_sso_record_id > 0
@raw_connection.exec("SELECT setval('#{Category.sequence_name}', #{@last_category_id})") if @last_category_id > 0
@raw_connection.exec("SELECT setval('#{Topic.sequence_name}', #{@last_topic_id})") if @last_topic_id > 0
@raw_connection.exec("SELECT setval('#{Post.sequence_name}', #{@last_post_id})") if @last_post_id > 0
@ -293,6 +297,12 @@ class BulkImport::Base
user_id location website bio_raw bio_cooked views
}
USER_SSO_RECORD_COLUMNS ||= %i{
id user_id external_id last_payload created_at updated_at external_username
external_email external_name external_avatar_url external_profile_background_url
external_card_background_url
}
GROUP_USER_COLUMNS ||= %i{
group_id user_id created_at updated_at
}
@ -352,6 +362,9 @@ class BulkImport::Base
def create_user_profiles(rows, &block)
create_records(rows, "user_profile", USER_PROFILE_COLUMNS, &block)
end
def create_single_sign_on_records(rows, &block)
create_records(rows, "single_sign_on_record", USER_SSO_RECORD_COLUMNS, &block)
end
def create_group_users(rows, &block)
create_records(rows, "group_user", GROUP_USER_COLUMNS, &block)
end
@ -405,6 +418,15 @@ class BulkImport::Base
end
end
if user[:external_id].present?
if existing_user_id = @external_ids[user[:external_id]]
@pre_existing_user_ids << existing_user_id
@users[user[:imported_id].to_i] = existing_user_id
user[:skip] = true
return user
end
end
@users[user[:imported_id].to_i] = user[:id] = @last_user_id += 1
imported_username = user[:username].dup
@ -431,6 +453,8 @@ class BulkImport::Base
user[:last_emailed_at] ||= NOW
user[:created_at] ||= NOW
user[:updated_at] ||= user[:created_at]
user[:suspended_at] ||= user[:suspended_at]
user[:suspended_till] ||= user[:suspended_till] || (200.years.from_now if user[:suspended_at].present?)
user
end
@ -444,8 +468,7 @@ class BulkImport::Base
user_email[:created_at] ||= NOW
user_email[:updated_at] ||= user_email[:created_at]
user_email[:email] ||= random_email
user_email[:email].downcase!
user_email[:email] = user_email[:email]&.downcase || random_email
# unique email
user_email[:email] = random_email until user_email[:email] =~ EmailValidator.email_regex && !@emails.has_key?(user_email[:email])
@ -480,6 +503,18 @@ class BulkImport::Base
user_profile
end
def process_single_sign_on_record(sso_record)
user_id = @users[sso_record[:imported_user_id].to_i]
return { skip: true } if @pre_existing_user_ids.include?(user_id)
sso_record[:id] = @last_sso_record_id += 1
sso_record[:user_id] = user_id
sso_record[:last_payload] ||= ""
sso_record[:created_at] = NOW
sso_record[:updated_at] = NOW
sso_record
end
def process_group_user(group_user)
group_user[:created_at] = NOW
group_user[:updated_at] = NOW
@ -487,6 +522,12 @@ class BulkImport::Base
end
def process_category(category)
if category[:existing_id].present?
@categories[category[:imported_id].to_i] = category[:existing_id]
category[:skip] = true
return category
end
category[:id] ||= @last_category_id += 1
@categories[category[:imported_id].to_i] ||= category[:id]
category[:name] = category[:name][0...50].scrub.strip
@ -497,6 +538,13 @@ class BulkImport::Base
category[:user_id] ||= Discourse::SYSTEM_USER_ID
category[:created_at] ||= NOW
category[:updated_at] ||= category[:created_at]
if category[:position]
@highest_category_position = category[:position] if category[:position] > @highest_category_position
else
category[:position] = @highest_category_position += 1
end
category
end
@ -532,6 +580,7 @@ class BulkImport::Base
if @bbcode_to_md
post[:raw] = post[:raw].bbcode_to_md(false, {}, :disable, :quote) rescue post[:raw]
end
post[:raw] = normalize_text(post[:raw])
post[:like_count] ||= 0
post[:cooked] = pre_cook post[:raw]
post[:hidden] ||= false
@ -726,6 +775,9 @@ class BulkImport::Base
end
end
# FIXME: this does not count succesfully inserted records that do not happen to have an imported_id field,
# which is misguiding and can results in lots of wasted time double checking "0 records imported" prints
if imported_ids.size > 0
print "\r%7d - %6d/sec" % [imported_ids.size, imported_ids.size.to_f / (Time.now - start)]
puts

View File

@ -0,0 +1,359 @@
# frozen_string_literal: true
require_relative "base"
require "sqlite3"
require "json"
class BulkImport::Generic < BulkImport::Base
AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"]
UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"]
def initialize(db_path)
super()
@db = create_connection(db_path)
end
def start
run # will call execute, and then "complete" the migration
# Now that the migration is complete, do some more work:
Discourse::Application.load_tasks
puts "running 'import:ensure_consistency' rake task."
Rake::Task["import:ensure_consistency"].invoke
end
def execute
import_categories
import_users
import_user_emails
import_single_sign_on_records
import_topics
import_posts
import_topic_allowed_users
import_likes
import_user_stats
import_tags
end
def import_categories
puts "Importing categories..."
categories = @db.execute(<<~SQL)
WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug,
old_relative_url, existing_id, level, rowid) AS (
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, c.existing_id, 0 AS level, c.ROWID
FROM categories c
WHERE c.parent_category_id IS NULL
UNION
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, c.existing_id, tree.level + 1 AS level, c.ROWID
FROM categories c,
tree
WHERE c.parent_category_id = tree.id
)
SELECT *
FROM tree
ORDER BY level, rowid
SQL
create_categories(categories) do |row|
{
imported_id: row["id"],
existing_id: row["existing_id"],
name: row["name"],
description: row["description"],
parent_category_id: row["parent_category_id"] ? category_id_from_imported_id(row["parent_category_id"]) : nil,
slug: row["slug"]
}
end
end
def import_users
puts "Importing users..."
users = @db.execute(<<~SQL)
SELECT ROWID, *
FROM users
ORDER BY ROWID
SQL
create_users(users) do |row|
sso_record = JSON.parse(row["sso_record"]) if row["sso_record"].present?
if row["suspension"].present?
suspension = JSON.parse(row["suspension"])
suspended_at = suspension['suspended_at']
suspended_till = suspension['suspended_till']
end
{
imported_id: row["id"],
username: row["username"],
name: row["name"],
email: row["email"],
external_id: sso_record&.fetch("external_id"),
created_at: to_datetime(row["created_at"]),
admin: row["admin"],
moderator: row["moderator"],
suspended_at: suspended_at,
suspended_till: suspended_till,
}
end
end
def import_user_emails
puts '', 'Importing user emails...'
users = @db.execute(<<~SQL)
SELECT ROWID, id, email, created_at
FROM users
ORDER BY ROWID
SQL
create_user_emails(users) do |row|
{
# FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id"
imported_id: row["id"],
imported_user_id: row["id"],
email: row["email"],
created_at: to_datetime(row["created_at"])
}
end
end
def import_single_sign_on_records
puts '', 'Importing SSO records...'
users = @db.execute(<<~SQL)
SELECT ROWID, id, sso_record
FROM users
WHERE sso_record IS NOT NULL
ORDER BY ROWID
SQL
create_single_sign_on_records(users) do |row|
sso_record = JSON.parse(row["sso_record"], symbolize_names: true)
# FIXME: using both "imported_id" and "imported_user_id" and should be replaced by just "imported_id"
sso_record[:imported_id] = row["id"]
sso_record[:imported_user_id] = row["id"]
sso_record
end
end
def import_topics
puts "Importing topics..."
topics = @db.execute(<<~SQL)
SELECT ROWID, *
FROM topics
ORDER BY ROWID
SQL
create_topics(topics) do |row|
{
archetype: row["private_message"] ? Archetype.private_message: Archetype.default,
imported_id: row["id"],
title: row["title"],
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
category_id: category_id_from_imported_id(row["category_id"]),
closed: to_boolean(row["closed"]),
views: row["views"],
}
end
end
def import_topic_allowed_users
puts "Importing topic_allowed_users..."
topics = @db.execute(<<~SQL)
SELECT ROWID, *
FROM topics
WHERE private_message IS NOT NULL
ORDER BY ROWID
SQL
added = 0
create_topic_allowed_users(topics) do |row|
next unless topic_id = topic_id_from_imported_id(row["id"])
imported_user_id = JSON.parse(row["private_message"])["user_ids"].first
user_id = user_id_from_imported_id(imported_user_id)
added += 1
{
# FIXME: missing imported_id
topic_id: topic_id,
user_id: user_id
}
end
puts '', "Added #{added} topic_allowed_users records."
end
def import_posts
puts "Importing posts..."
posts = @db.execute(<<~SQL)
SELECT ROWID, *
FROM posts
ORDER BY topic_id, post_number
SQL
create_posts(posts) do |row|
next if row["raw"].blank?
next unless topic_id = topic_id_from_imported_id(row["topic_id"])
{
imported_id: row["id"],
topic_id: topic_id,
user_id: user_id_from_imported_id(row["user_id"]),
created_at: to_datetime(row["created_at"]),
raw: row["raw"],
like_count: row["like_count"]
}
end
end
def import_likes
puts "Importing likes..."
@imported_likes = Set.new
likes = @db.execute(<<~SQL)
SELECT ROWID, *
FROM likes
ORDER BY ROWID
SQL
create_post_actions(likes) do |row|
post_id = post_id_from_imported_id(row["post_id"])
user_id = user_id_from_imported_id(row["user_id"])
next if post_id.nil? || user_id.nil?
next if @imported_likes.add?([post_id, user_id]).nil?
{
# FIXME: missing imported_id
post_id: post_id_from_imported_id(row["post_id"]),
user_id: user_id_from_imported_id(row["user_id"]),
post_action_type_id: 2,
created_at: to_datetime(row["created_at"])
}
end
end
def import_user_stats
puts "Importing user stats..."
users = @db.execute(<<~SQL)
WITH posts_counts AS (
SELECT COUNT(p.id) AS count, p.user_id
FROM posts p GROUP BY p.user_id
),
topic_counts AS (
SELECT COUNT(t.id) AS count, t.user_id
FROM topics t GROUP BY t.user_id
),
first_post AS (
SELECT MIN(p.created_at) AS created_at, p.user_id
FROM posts p GROUP BY p.user_id ORDER BY p.created_at ASC
)
SELECT u.id AS user_id, u.created_at, pc.count AS posts, tc.count AS topics, fp.created_at AS first_post
FROM users u
JOIN posts_counts pc ON u.id = pc.user_id
JOIN topic_counts tc ON u.id = tc.user_id
JOIN first_post fp ON u.id = fp.user_id
SQL
create_user_stats(users) do |row|
user = {
imported_id: row["user_id"],
imported_user_id: row["user_id"],
new_since: to_datetime(row["created_at"]),
post_count: row["posts"],
topic_count: row["topics"],
first_post_created_at: to_datetime(row["first_post"])
}
likes_received = @db.execute(<<~SQL)
SELECT COUNT(l.id) AS likes_received
FROM likes l JOIN posts p ON l.post_id = p.id
WHERE p.user_id = #{row["user_id"]}
SQL
if likes_received
user[:likes_received] = row["likes_received"]
end
likes_given = @db.execute(<<~SQL)
SELECT COUNT(l.id) AS likes_given
FROM likes l
WHERE l.user_id = #{row["user_id"]}
SQL
if likes_given
user[:likes_given] = row["likes_given"]
end
user
end
end
def import_tags
puts "", "Importing tags..."
tags =
@db.execute('SELECT id as topic_id, tags FROM topics').
map do |r|
next unless r['tags']
[ r['topic_id'], JSON.parse(r['tags']).uniq ]
end.compact
tag_mapping = {}
tags.map(&:last).flatten.compact.uniq.each do |tag_name|
cleaned_tag_name = DiscourseTagging.clean_tag(tag_name)
tag = Tag.find_by_name(cleaned_tag_name) || Tag.create!(name: cleaned_tag_name)
tag_mapping[tag_name] = tag.id
end
tags_disaggregated = tags.map{|topic_id, tags| tags.map{|t| { topic_id: topic_id, tag_id: tag_mapping.fetch(t) }}}.flatten
create_topic_tags(tags_disaggregated) do |row|
next unless topic_id = topic_id_from_imported_id(row[:topic_id])
{
topic_id: topic_id,
tag_id: row[:tag_id],
}
end
end
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60000 # 60 seconds
sqlite.auto_vacuum = "full"
sqlite.foreign_keys = true
sqlite.journal_mode = "wal"
sqlite.synchronous = "normal"
sqlite
end
def to_date(text)
text.present? ? Date.parse(text) : nil
end
def to_datetime(text)
text.present? ? DateTime.parse(text) : nil
end
def to_boolean(value)
value == 1
end
end
BulkImport::Generic.new(ARGV.first).start

View File

@ -85,7 +85,8 @@ class ImportScripts::Base
clean_up_inactive_users_after_days: 0,
clean_up_unused_staged_users_after_days: 0,
clean_up_uploads: false,
clean_orphan_uploads_grace_period_hours: 1800
clean_orphan_uploads_grace_period_hours: 1800,
full_name_required: false
}
end
@ -303,8 +304,12 @@ class ImportScripts::Base
opts.delete(:id)
merge = opts.delete(:merge)
post_create_action = opts.delete(:post_create_action)
sso_record = opts.delete(:sso_record)
existing = find_existing_user(opts[:email], opts[:username])
existing = find_existing_user(opts[:email], sso_record&.fetch("external_id"))
return existing if existing
existing = User.where(username: opts[:username]).first
return existing if existing && (merge || existing.custom_fields["import_id"].to_s == import_id.to_s)
bio_raw = opts.delete(:bio_raw)
@ -390,14 +395,47 @@ class ImportScripts::Base
end
u.create_single_sign_on_record!(sso_record) if sso_record
post_create_action.try(:call, u) if u.persisted?
u # If there was an error creating the user, u.errors has the messages
end
def find_existing_user(email, username)
def find_existing_user(email, external_id)
# Force the use of the index on the 'user_emails' table
UserEmail.where("lower(email) = ?", email.downcase).first&.user || User.where(username: username).first
user = UserEmail.where("lower(email) = ?", email.downcase).first&.user
user ||= SingleSignOnRecord.where(external_id: external_id).first&.user if external_id.present?
user
end
def create_group_members(results, opts = {})
created = 0
total = opts[:total] || results.count
group = nil
group_member_ids = []
results.each_with_index do |result, index|
member = yield(result)
if !group || group.id != member[:group_id]
if group_member_ids.any?
group.bulk_add(group_member_ids)
group_member_ids = []
end
group = Group.find(member[:group_id])
end
group_member_ids << member[:user_id]
group.bulk_add(group_member_ids) if index == results.size - 1 && group_member_ids.any?
created += 1
print_status(created + (opts[:offset] || 0), total, get_start_time("group_members"))
end
created
end
def created_category(category)

View File

@ -17,15 +17,15 @@ module ImportScripts
tmp.unlink rescue nil
end
def create_avatar(user, avatar_path)
def create_avatar(user, avatar_path, origin_url = nil)
tempfile = copy_to_tempfile(avatar_path)
filename = "avatar#{File.extname(avatar_path)}"
upload = UploadCreator.new(tempfile, filename, type: "avatar").create_for(user.id)
upload = UploadCreator.new(tempfile, filename, type: "avatar", origin: origin_url).create_for(user.id)
if upload.present? && upload.persisted?
user.create_user_avatar
user.user_avatar.update(custom_upload_id: upload.id)
user.update(uploaded_avatar_id: upload.id)
user.user_avatar.update!(custom_upload_id: upload.id)
user.update!(uploaded_avatar_id: upload.id)
else
STDERR.puts "Failed to upload avatar for user #{user.username}: #{avatar_path}"
STDERR.puts upload.errors.inspect if upload

View File

@ -0,0 +1,440 @@
# frozen_string_literal: true
require_relative 'base'
require 'sqlite3'
class ImportScripts::Generic < ImportScripts::Base
BATCH_SIZE = 1000
AVATAR_DIRECTORY = ENV["AVATAR_DIRECTORY"]
UPLOAD_DIRECTORY = ENV["UPLOAD_DIRECTORY"]
def initialize(db_path)
super()
@db = create_connection(db_path)
end
def execute
import_users
import_groups
import_group_members
import_categories
import_topics
import_posts
mark_topics_as_solved
end
def import_users
log_action "Creating users"
total_count = count_users
last_row_id = -1
batches do |offset|
rows, last_row_id = fetch_users(last_row_id)
break if rows.empty?
next if all_records_exist?(:users, rows.map { |row| row["id"] })
create_users(rows, total: total_count, offset: offset) do |row|
if row["sso_record"].present?
sso_record = JSON.parse(row["sso_record"])
sso_record[:last_payload] = ""
end
{
id: row["id"],
username: row["username"],
created_at: to_datetime(row["created_at"]),
name: row["name"],
email: row["email"] || fake_email,
last_seen_at: to_datetime(row[:"last_seen_at"]) || to_datetime(row["created_at"]),
bio_raw: row["bio"],
location: row["location"],
admin: to_boolean(row["admin"]),
moderator: to_boolean(row["moderator"]),
sso_record: sso_record,
post_create_action: proc do |user|
create_avatar(user, row["avatar_path"], row["avatar_url"])
suspend_user(user, row["suspension"])
end
}
end
end
end
def create_avatar(user, avatar_path, avatar_url)
if avatar_path.present?
avatar_path = File.join(AVATAR_DIRECTORY, avatar_path)
if File.exist?(avatar_path)
@uploader.create_avatar(user, avatar_path)
else
STDERR.puts "Could not find avatar: #{avatar_path}"
end
elsif avatar_url.present?
UserAvatar.import_url_for_user(avatar_url, user, override_gravatar: true)
end
end
def suspend_user(user, suspension)
return if suspension.blank?
suspension = JSON.parse(suspension)
user.suspended_at = suspension["suspended_at"] || user.last_seen_at || Time.now
user.suspended_till = suspension["suspended_till"] || 200.years.from_now
user.save!
if suspension["reason"].present?
StaffActionLogger.new(Discourse.system_user).log_user_suspend(user, suspension["reason"])
end
end
def count_users
@db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM users
SQL
end
def fetch_users(last_row_id)
query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM users
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
end
def import_groups
log_action "Creating groups"
rows = @db.execute(<<~SQL)
SELECT *
FROM groups
ORDER BY ROWID
SQL
create_groups(rows) do |row|
{
id: row["id"],
name: row["name"]
}
end
end
def import_group_members
log_action "Adding group members"
total_count = @db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM group_members
SQL
last_row_id = -1
batches do |offset|
rows, last_row_id = query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM group_members
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
break if rows.empty?
create_group_members(rows, total: total_count, offset: offset) do |row|
{
group_id: group_id_from_imported_group_id(row["group_id"]),
user_id: user_id_from_imported_user_id(row["user_id"])
}
end
end
end
def import_categories
log_action "Creating categories"
rows = @db.execute(<<~SQL)
WITH RECURSIVE tree(id, parent_category_id, name, description, color, text_color, read_restricted, slug,
old_relative_url, level, rowid) AS (
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, 0 AS level, c.ROWID
FROM categories c
WHERE c.parent_category_id IS NULL
UNION
SELECT c.id, c.parent_category_id, c.name, c.description, c.color, c.text_color, c.read_restricted, c.slug,
c.old_relative_url, tree.level + 1 AS level, c.ROWID
FROM categories c,
tree
WHERE c.parent_category_id = tree.id
)
SELECT *
FROM tree
ORDER BY level, rowid
SQL
create_categories(rows) do |row|
{
id: row["id"],
name: row["name"],
description: row["description"],
color: row["color"],
text_color: row["text_color"],
read_restricted: row["read_restricted"],
parent_category_id: category_id_from_imported_category_id(row["parent_category_id"]),
post_create_action: proc do |category|
create_permalink(row["old_relative_url"], category_id: category.id)
end
}
end
end
def import_topics
log_action "Creating topics"
total_count = count_topics
last_row_id = -1
batches do |offset|
rows, last_row_id = fetch_topics(last_row_id)
break if rows.empty?
next if all_records_exist?(:topics, rows.map { |row| row["id"] })
create_posts(rows, total: total_count, offset: offset) do |row|
# TODO add more columns
mapped = {
id: row["id"],
title: row["title"],
created_at: to_datetime(row["created_at"]),
raw: process_raw(row),
category: category_id_from_imported_category_id(row["category_id"]),
user_id: user_id_from_imported_user_id(row["user_id"]) || Discourse::SYSTEM_USER_ID,
post_create_action: proc do |post|
add_tags(post, row["tags"])
create_permalink(row["old_relative_url"], topic_id: post.topic.id)
end
}
if row["private_message"].present?
pm = JSON.parse(row["private_message"])
target_usernames = ([row["user_id"]] + pm["user_ids"]).map { |id| find_user_by_import_id(id)&.username }
target_group_names = pm["special_group_names"] + pm["group_ids"].map { |id| find_group_by_import_id(id)&.name }
mapped[:archetype] = Archetype.private_message
mapped[:target_usernames] = target_usernames.compact.uniq.join(",")
mapped[:target_group_names] = target_group_names.compact.uniq.join(",")
end
mapped
end
end
end
def count_topics
@db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM topics
SQL
end
def fetch_topics(last_row_id)
query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM topics
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
end
def process_raw(row)
raw = row["raw"]
upload_ids = row["upload_ids"]
return raw if upload_ids.blank? || raw.blank?
joined_upload_ids = JSON.parse(upload_ids).map { |id| SQLite3::Database.quote(id) }.join(",")
files = @db.execute("SELECT * FROM uploads WHERE id IN (?)", joined_upload_ids)
files.each do |file|
user_id = user_id_from_imported_user_id(file["user_id"]) || Discourse::SYSTEM_USER_ID
path = File.join(UPLOAD_DIRECTORY, file["path"])
upload = create_upload(user_id, path, file["filename"])
if upload.present? && upload.persisted?
raw.gsub!("[upload|#{file['id']}]", @uploader.html_for_upload(upload, file["filename"]))
end
end
raw
end
def add_tags(post, tags)
tag_names = JSON.parse(tags) if tags
DiscourseTagging.tag_topic_by_names(post.topic, staff_guardian, tag_names) if tag_names.present?
end
def create_permalink(url, topic_id: nil, post_id: nil, category_id: nil, tag_id: nil)
return if url.blank? || Permalink.exists?(url: url)
Permalink.create(
url: url,
topic_id: topic_id,
post_id: post_id,
category_id: category_id,
tag_id: tag_id
)
end
def import_posts
log_action "Creating posts"
total_count = count_posts
last_row_id = -1
batches do |offset|
rows, last_row_id = fetch_posts(last_row_id)
break if rows.empty?
next if all_records_exist?(:posts, rows.map { |row| row["id"] })
create_posts(rows, total: total_count, offset: offset) do |row|
topic = topic_lookup_from_imported_post_id(row["topic_id"])
next if !topic
if row["small_action"]
create_small_action(row, topic)
next
end
reply_to = topic_lookup_from_imported_post_id(row["reply_to_post_id"]) if row["reply_to_post_id"]
reply_to = nil if reply_to&.dig(:topic_id) != topic[:topic_id]
mapped = {
id: row["id"],
topic_id: topic[:topic_id],
created_at: to_datetime(row["created_at"]),
raw: process_raw(row),
reply_to_post_number: reply_to&.dig(:post_number),
user_id: user_id_from_imported_user_id(row["user_id"]) || Discourse::SYSTEM_USER_ID,
post_create_action: proc do |post|
create_permalink(row["old_relative_url"], post_id: post.id)
end
}
mapped[:post_type] = Post.types[:whisper] if to_boolean(row["whisper"])
mapped[:custom_fields] = { is_accepted_answer: "true" } if to_boolean(row["accepted_answer"])
mapped
end
end
end
def count_posts
@db.get_first_value(<<~SQL)
SELECT COUNT(*)
FROM posts
SQL
end
def fetch_posts(last_row_id)
query_with_last_rowid(<<~SQL, last_row_id)
SELECT ROWID, *
FROM posts
WHERE ROWID > :last_row_id
ORDER BY ROWID
LIMIT #{BATCH_SIZE}
SQL
end
def mark_topics_as_solved
log_action "Marking topics as solved..."
DB.exec <<~SQL
INSERT INTO topic_custom_fields (name, value, topic_id, created_at, updated_at)
SELECT 'accepted_answer_post_id', pcf.post_id, p.topic_id, p.created_at, p.created_at
FROM post_custom_fields pcf
JOIN posts p ON p.id = pcf.post_id
WHERE pcf.name = 'is_accepted_answer' AND pcf.value = 'true'
AND NOT EXISTS (
SELECT 1
FROM topic_custom_fields x
WHERE x.topic_id = p.topic_id AND x.name = 'accepted_answer_post_id'
)
SQL
end
def create_small_action(row, topic)
small_action = JSON.parse(row["small_action"])
case small_action["type"]
when "split_topic"
create_split_topic_small_action(row, small_action, topic)
else
raise "Unknown small action type: #{small_action['type']}"
end
end
def create_split_topic_small_action(row, small_action, original_topic)
destination_topic = topic_lookup_from_imported_post_id(small_action["destination_topic_id"])
destination_topic = Topic.find_by(id: destination_topic[:topic_id]) if destination_topic
return if !destination_topic
original_topic = Topic.find_by(id: original_topic[:topic_id]) if original_topic
return if !original_topic
move_type = small_action['move_type']
message = I18n.with_locale(SiteSetting.default_locale) do
I18n.t(
"move_posts.#{move_type}_moderator_post",
count: small_action['post_count'],
topic_link: "[#{destination_topic.title}](#{destination_topic.relative_url})"
)
end
post_type = move_type.include?("message") ? Post.types[:whisper] : Post.types[:small_action]
original_topic.add_moderator_post(
Discourse.system_user, message,
post_type: post_type,
action_code: "split_topic",
import_mode: true,
created_at: to_datetime(row["created_at"]),
custom_fields: { import_id: row["id"] }
)
end
def query_with_last_rowid(sql, last_row_id)
rows = @db.execute(sql, last_row_id: last_row_id)
[rows, rows.last&.dig("rowid")]
end
def to_date(text)
text.present? ? Date.parse(text) : nil
end
def to_datetime(text)
text.present? ? DateTime.parse(text) : nil
end
def to_boolean(value)
value == 1
end
def log_action(text)
puts "", text
end
def create_connection(path)
sqlite = SQLite3::Database.new(path, results_as_hash: true)
sqlite.busy_timeout = 60000 # 60 seconds
sqlite.auto_vacuum = "full"
sqlite.foreign_keys = true
sqlite.journal_mode = "wal"
sqlite.synchronous = "normal"
sqlite
end
def batches
super(BATCH_SIZE)
end
def staff_guardian
@staff_guardian ||= Guardian.new(Discourse.system_user)
end
end
ImportScripts::Generic.new(ARGV.first).perform