2016-04-06 12 views
2

初心者のビットで完全に困惑しています。CSVワーカーがdbaseをロックしています

連絡先テーブルに保存するCSV連絡先をインポートする作業者がいます(保存する前に、電話または電子メールに基づいて既に存在するかどうかを確認します)。

これは論理的に動作します(CSVファイルは正しく処理されますが)、これが実行されるとdbases全体がロックされます(アプリケーションのユーザーは更新できません)

私の質問は次のとおりです:なぜ/何がロックされている全体連絡先、電話番号、メールアドレスdbases?私はルビーの原理に違反していますか?レール?これを解決するために何をする必要がありますか?

キーポイント:

  • 私たちは、このコードを書いた(電子メールや電話で連絡 モデルの一部であった)、それがうまく働きました。その後、私たちは子どものモデルとしてそれらを作成しました(私たちは連絡先ごとに複数の電子メール&の電話番号を持っていました)。 ---配列内の配列 がロックする可能性がありますか?

  • 私たちが最初にコードを書いたとき、スマートな行がすべて であることが確認された後で保存されました(そして、インクリメントしています)。今度は、完全なCSVファイルが処理された後に連絡先をすべて保存するようです( )。

  • 他にも何か?

WORKER:

class ImportCsvFileWorker 

    def self.perform(csv_file_id) 
    csv_file = CsvFile.find(csv_file_id) 

    csv_file.import! 
    csv_file.send_report! end 

end 

クラスCsvParsingService

 attr_accessor :csv_file, :contact 

     def initialize(csv_file) 
     @csv_file = csv_file 
     @contact = nil 
     end 

     def perform 
     Rails.logger.info "[CSV.parsing] Processing new csv file..." 
     process_csv 
     csv_file.finish_import! 
     end 

     def process_csv 
     parser = ::ImportData::SmartCsvParser.new(csv_file.file_url) 

     parser.each do |smart_row| 
      csv_file.increment!(:total_parsed_records) 
      begin 
      self.contact = process_row(smart_row) 
      rescue => e 
      row_parse_error(smart_row, e) 
      end 
     end 
     rescue => e # parser error or unexpected error 
     csv_file.save_import_error(e) 
     end 

     private 

     def process_row(smart_row) 
     new_contact, existing_records = smart_row.to_contact 
     self.contact = ContactMergingService.new(csv_file.user, new_contact, existing_records).perform 
     init_contact_info self.contact 

     if contact_valid? 
      save_imported_contact(new_contact) 
     else 
      reject_imported_contact(new_contact, smart_row) 
     end 
     end 

     def contact_valid? 
     self.contact.first_name || self.contact.last_name || 
      self.contact.email_addresses.first || self.contact.phone_numbers.first 
     end 

     def save_imported_contact(new_contact) 
     self.contact.save! 
     csv_file.increment!(:total_imported_records) 
     log_processed_contacts new_contact 
     end 

     def reject_imported_contact(new_contact, smart_row) 
     Rails.logger.info "[CSV.parsing] Contact rejected. Missing name, email or phone number." 
     csv_file.increment!(:total_failed_records) 
     csv_file.invalid_records.create!(
      original_row: smart_row.row.to_csv, 
      contact_errors: ["Contact rejected. Missing name, email or phone number"] 
     ) 
     log_processed_contacts new_contact 
     false 
     end 

     def row_parse_error(smart_row, e) 
     csv_file.increment!(:total_failed_records) 
     csv_file.invalid_records.create!(
      original_row: smart_row.row.to_csv, 
      contact_errors: contact.try(:errors).try(:full_messages) 
     ) 
     end 

     def init_contact_info(contact) 
     unless contact.persisted? 
      contact.user = csv_file.user 
      contact.created_by_user = csv_file.user 
      contact.import_source = csv_file 
     end 
     contact.required_salutations_to_set = true # will be used for envelope/letter saluation 
     end 

     def log_processed_contacts(new_contact) 
     Rails.logger.info(
      "[CSV.parsing] Records parsed:: parsed: #{csv_file.total_parsed_records}"\ 
      " : imported: #{csv_file.total_imported_records} : failed: "\ 
      "#{csv_file.total_failed_records}" 
     ) 
     Rails.logger.info(
      "[CSV.parsing] Contact- New : #{new_contact.email_addresses.map(&:email)}"\ 
      " : #{new_contact.first_name} : #{new_contact.last_name} "\ 
      "#{new_contact.phone_numbers.map(&:number)} :: Old : "\ 
      "#{self.contact.email_addresses.map(&:email)} :"\ 
      "#{self.contact.phone_numbers.map(&:number)}\n" 
     ) 
     end 
    end 

マージ・サービス

class ContactMergingService 

    attr_reader :new_contact, :user 

    def initialize(user, new_contact, _existing_records) 
    @user = user 
    @new_contact = new_contact 
    @found_records = matching_emails_and_phone_numbers end 

    def perform 
    Rails.logger.info "[CSV.merging] Checking if new contact matches existing contact..." 
    if (existing_contact = existing_contact()) 
     Rails.logger.info "[CSV.merging] Contact match found." 
     merge(existing_contact, new_contact) 
     existing_contact 
    else 
     Rails.logger.info "[CSV.merging] No contact match found." 
     binding.pry 
     new_contact 
    end end 

    private 

    def existing_contact 
    Rails.logger.info "[CSV.merging] Found records: #{@found_records.inspect}" 
    if @found_records.present? 
     @user.contacts.find @found_records.first.owner_id # Fetch first owner 
    end end 

    def merge(existing_contact, new_contact) 
    Rails.logger.info "[CSV.merging] Merging with existing contact (ID: #{existing_contact.id})..." 
    merge_records(existing_contact, new_contact) end 

    def merge_records(existing_relation, new_relation) 
    existing_relation.attributes do |field, value| 
     if value.blank? && new_relation[field].present? 
     existing_relation[field] = new_relation[field] 
     end 
    end 
    new_relation.email_addresses.each do |email_address| 
     Rails.logger.info "[CSV.merging.emails] Email: #{email_address.inspect}" 
     if existing_relation.email_addresses.find_by(email: email_address.email) 
     Rails.logger.info "[CSV.merging.emails] Email address exists." 
     else 
     Rails.logger.info "[CSV.merging.emails] Email does not already exist. Saving..." 
     email_address.owner = existing_relation 
     email_address.save! 
     end 
    end 
    new_relation.phone_numbers.each do |phone_number| 
     Rails.logger.info "[CSV.merging.phone] Phone Number: #{phone_number.inspect}" 
     if existing_relation.phone_numbers.find_by(number: phone_number.number) 
     Rails.logger.info "[CSV.merging.phone] Phone number exists." 
     else 
     Rails.logger.info "[CSV.merging.phone] Phone Number does not already exist. Saving..." 
     phone_number.owner = existing_relation 
     phone_number.save! 
     end 
    end end 

    def matching_emails_and_phone_numbers 
    records = [] 
    if @user 
     records << matching_emails 
     records << matching_phone_numbers 
     Rails.logger.info "[CSV.merging] merged records: #{records.inspect}" 
     records.flatten! 
     Rails.logger.info "[CSV.merging] flattened records: #{records.inspect}" 
     records.compact! 
     Rails.logger.info "[CSV.merging] compacted records: #{records.inspect}" 
    end 
    records end 

    def matching_emails 
    existing_emails = [] 
    new_contact_emails = @new_contact.email_addresses 
    Rails.logger.info "[CSV.merging] new_contact_emails: #{new_contact_emails.inspect}" 
    new_contact_emails.each do |email| 
     Rails.logger.info "[CSV.merging] Checking for a match on email: #{email.inspect}..." 
     if existing_email = @user.contact_email_addresses.find_by(email: email.email, primary: email.primary) 
     Rails.logger.info "[CSV.merging] Found a matching email" 
     existing_emails << existing_email 
     else 
     Rails.logger.info "[CSV.merging] No match found" 
     false 
     end 
    end 
    existing_emails end 

    def matching_phone_numbers 
    existing_phone_numbers = [] 
    @new_contact.phone_numbers.each do |phone_number| 
     Rails.logger.info "[CSV.merging] Checking for a match on phone_number: #{phone_number.inspect}..." 
     if existing_phone_number = @user.contact_phone_numbers.find_by(number: phone_number.number) 
     Rails.logger.info "[CSV.merging] Found a matching phone number" 
     existing_phone_numbers << existing_phone_number 
     else 
     Rails.logger.info "[CSV.merging] No match found" 
     false 
     end 
    end 
    existing_phone_numbers end 

    def clean_phone_number(number) 
    number.gsub(/[\s\-\(\)]+/, "") end 

end 

スマートCSV Parser.rb

され、次の10
require "fuzzy_match" 
require "import_data/base_parser" 

module ImportData 
    class SmartCsvParser < BaseParser 

    def row_class 
     ::ImportData::SmartCsvRow 
    end 

    def each_contact 
     each { |row| yield row.to_contact[0] } 
    end 
    end 

    class SmartCsvRow < BaseRow 

    CONTACT_MAPPING = { 
     "Name" => :name, 
     "First Name" => :first_name, 
     "Last Name" => :last_name, 
     "Middle Name" => "", 
     "Spouse" => :spouse, 
     "Mobile Phone" => :mobile_phone, 
     "Notes" => :notes, 
     "Account" => "", 
     "Internet Free Busy" => "", 
    } 

    ADDRESS_MAPPING = { 
     "Address" => :address, 
     "Address1" => :street, 
     "Street" => :street, 
     "City" => :city, 
     "State" => :state, 
     "Postal Code" => :zip, 
     "Zip" => :zip, 
     "Zip Code" => :zip, 
     "Country" => :country, 
     "Home Address" => :address, 
     "Home Street" => :street, 
     "Home Street 2" => :street, 
     "Home Street 3" => :street, 
     "Home Address PO Box" => :street, 
     "Home City" => :city, 
     "Home State" => :state, 
     "Home Postal Code" => :zip, 
     "Home Country" => :country, 
     "Business Address" => :address, 
     "Business Street" => :street, 
     "Business Street 2" => :street, 
     "Business Street 3" => :street, 
     "Business Address PO Box" => :street, 
     "Business City" => :city, 
     "Business State" => :state, 
     "Business Postal Code" => :zip, 
     "Business Country" => :country, 
     "Other Address" => :address, 
     "Other Street" => :street, 
     "Other Street 2" => :street, 
     "Other Street 3" => :street, 
     "Other Address PO Box" => :street, 
     "Other City" => :city, 
     "Other State" => :state, 
     "Other Postal Code" => :zip, 
     "Other Country" => :country, 
    } 

    EMAIL_ADDRESS_FIELDS = [ 
     "Email", 
     "E-mail Address", 
     "Email 2 Address", 
     "Email 3 Address", 
     "E-mail", 
     "E-mail 2 Address", 
     "E-mail 3 Address" 
    ] 
    PHONE_TYPE_MAPPINGS = { 
     "Phone" => "Home", 
     "Primary Phone" => "Home", 
     "Home Phone" => "Home", 
     "Home Phone 2" => "Home", 
     "Mobile Phone" => "Mobile", 
     "Home Fax" => "Fax", 
     "Business Phone" => "Work", 
     "Business Phone 2" => "Work", 
     "Business Fax" => "Fax", 
     "Other Phone" => "Other", 
     "Other Fax" => "Fax", 
     "Company Main Phone" => "Work", 
    } 

    def initialize(headers, row) 
     headers = headers.map { |h| best_match_or_self(h) } 
     super(headers, row) 
    end 

    def to_contact 
     existing_emails = existing_phone_numbers = nil 
     contact = Contact.new.tap do |contact| 
     initiate_instance(contact, CONTACT_MAPPING) 
     address = initiate_instance(Address.new, ADDRESS_MAPPING) 
     contact.addresses << address if address 
     email_addresses, existing_emails = initialize_emails(EMAIL_ADDRESS_FIELDS) 
     contact.email_addresses << email_addresses 
     phone_numbers, existing_phone_numbers = initialize_phone_numbers(PHONE_TYPE_MAPPINGS) 
     contact.phone_numbers << phone_numbers 
     contact 
     end 
     existing_records = [] 
     existing_records << existing_emails 
     existing_records << existing_phone_numbers 
     existing_records.flatten! 
     existing_records.compact! 
     [contact, existing_records] 
    end 

    private 

    def fetch_phone_type field 
     PHONE_TYPE_MAPPINGS[field] 
    end 

    FM = FuzzyMatch.new(CONTACT_MAPPING.keys + ADDRESS_MAPPING.keys + EMAIL_ADDRESS_FIELDS + PHONE_TYPE_MAPPINGS.keys) 

    def best_match_or_self(header) 
     # Select if Dice's Coefficient 
     # choose closet by Levenshtein distance 
     candidate = FM.find(header, find_all_with_score: true). 
        select { |(_text, dice, _lev)| dice > 0.5 }. 
        max_by { |(_text, _dice, lev)| lev } 

     # if cannot find candidate return header 
     candidate ? candidate[0] : header 
    end 

    end 

end 

DATABASE YML

<% branch_name = `git symbolic-ref HEAD 2>/dev/null`.chomp.sub('refs/heads/', '') %> 
<% repository_name = `git rev-parse --show-toplevel`.split('/').last.strip %> 

development: 
    adapter: postgresql 
    database: <%= repository_name %>_development 
    host: localhost 

test: 
    adapter: postgresql 
    database: <%= repository_name %>_test 
    host: localhost 

production: 
    adapter: postgresql 
    database: <%= repository_name %>_production 
    host: localhost 

WORKERログ - 実行中の最初の数行

Delayed::Backend::ActiveRecord::Job Load (1.5ms) UPDATE "delayed_jobs" SET locked_at = '2016-04-07 17:59:37.569861', locked_by = 'host:tests-MBP-3.att.net pid:9659' WHERE id IN (SELECT "delayed_jobs"."id" FROM "delayed_jobs" WHERE ((run_at <= '2016-04-07 17:59:37.568990' AND (locked_at IS NULL OR locked_at < '2016-04-07 16:29:37.569013') OR locked_by = 'host:tests-MBP-3.att.net pid:9659') AND failed_at IS NULL) ORDER BY priority ASC, run_at ASC LIMIT 1 FOR UPDATE) RETURNING * /*application:AGENTBRIGHT*/ 
2016-04-07T13:59:37-0400: [Worker(host:tests-MBP-3.att.net pid:9659)] Job ImportCsvFileWorker.perform (id=19) RUNNING 
    CsvFile Load (0.6ms) SELECT "csv_files".* FROM "csv_files" WHERE "csv_files"."id" = $1 LIMIT 1 /*application:AGENTBRIGHT*/ [["id", 1]] 
    (0.2ms) BEGIN /*application:AGENTBRIGHT*/ 
    SQL (0.4ms) UPDATE "csv_files" SET "state" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["state", "processing"], ["updated_at", "2016-04-07 17:59:37.632648"], ["id", 1]] 
[CSV.parsing] Processing new csv file... 
    SQL (0.4ms) UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_parsed_records", 1], ["updated_at", "2016-04-07 17:59:38.571053"], ["id", 1]] 
[CSV.base_row] Initializing emails... 
[CSV.base_row] 
[CSV.base_row] Email addresses: 0 
[CSV.base_row] Existing emails : 0 
[CSV.base_row] Initializing phone numbers... 
[CSV.base_row] Phone numbers: 0 
[CSV.base_row] Existing phone numbers : [] 
    User Load (0.4ms) SELECT "users".* FROM "users" WHERE "users"."id" = $1 LIMIT 1 /*application:AGENTBRIGHT*/ [["id", 1]] 
[CSV.merging] new_contact_emails: #<ActiveRecord::Associations::CollectionProxy []> 
[CSV.merging] merged records: [[], []] 
[CSV.merging] flattened records: [] 
[CSV.merging] compacted records: [] 
[CSV.merging] Checking if new contact matches existing contact... 
[CSV.merging] Found records: [] 
[CSV.merging] No contact match found. 
    SQL (0.7ms) INSERT INTO "contacts" ("data", "first_name", "user_id", "created_by_user_id", "import_source_id", "import_source_type", "name", "envelope_salutation", "letter_salutation", "created_at", "updated_at", "avatar_color") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING "id" /*application:AGENTBRIGHT*/ [["data", "\"notes\"=>\"Family Name: and XXXX ail\r\nAddress 1: Any Street Point Road\r\nCity: Louisville, \r\nState: TN \r\nZip: 06437\r\n# invited: 2\r\nAdults: 2\r\n\""], ["first_name", "Mary Joe"], ["user_id", 1], ["created_by_user_id", 1], ["import_source_id", 1], ["import_source_type", "CsvFile"], ["name", "Mary Joe Smith"], ["envelope_salutation", ""], ["letter_salutation", "Dear Mary Joe,"], ["created_at", "2016-04-07 17:59:38.763119"], ["updated_at", "2016-04-07 17:59:38.763119"], ["avatar_color", 10]] 
    SQL (1.0ms) UPDATE "users" SET "contacts_count" = COALESCE("contacts_count", 0) + 1 WHERE "users"."id" = $1 /*application:AGENTBRIGHT*/ [["id", 1]] 
    SQL (0.4ms) UPDATE "csv_files" SET "total_failed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_failed_records", 1], ["updated_at", "2016-04-07 17:59:38.775598"], ["id", 1]] 
    SQL (1.1ms) INSERT INTO "csv_file_invalid_records" ("original_row", "csv_file_id", "created_at", "updated_at") VALUES ($1, $2, $3, $4) RETURNING "id" /*application:AGENTBRIGHT*/ [["original_row", "Mary Joe Smith,,,,,,,,,,,,,\"Family Name: XXXX \r\nAddress 1: Any Street \r\nCity: Louisville, \r\nState: TN \r\nZip: 37777\r\n# invited: 2\r\nAdults: 2\r\n\",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Normal,,My Contacts;Imported 8/13/15 1;Imported 8/13/15,\n"], ["csv_file_id", 1], ["created_at", "2016-04-07 17:59:38.796886"], ["updated_at", "2016-04-07 17:59:38.796886"]] 
    SQL (0.4ms) UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_parsed_records", 2], ["updated_at", "2016-04-07 17:59:39.744526"], ["id", 1]] 
[CSV.base_row] Initializing emails... 
[CSV.base_row] 
[CSV.base_row] Email addresses: 0 
[CSV.base_row] Existing emails : 0 
[CSV.base_row] Initializing phone numbers... 
[CSV.base_row] Phone numbers: 0 
[CSV.base_row] Existing phone numbers : [] 
[CSV.merging] new_contact_emails: #<ActiveRecord::Associations::CollectionProxy []> 
[CSV.merging] merged records: [[], []] 
[CSV.merging] flattened records: [] 
[CSV.merging] compacted records: [] 
[CSV.merging] Checking if new contact matches existing contact... 
[CSV.merging] Found records: [] 
[CSV.merging] No contact match found. 
    SQL (0.5ms) INSERT INTO "contacts" ("data", "first_name", "user_id", "created_by_user_id", "import_source_id", "import_source_type", "name", "envelope_salutation", "letter_salutation", "created_at", "updated_at", "avatar_color") VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) RETURNING "id" /*application:AGENTBRIGHT*/ [["data", "\"notes\"=>\"Family Name: Xyzlastname\r\nAddress 1: 100 Village Drive #C101\r\nCity: Somecity, \r\nState: MA \r\nZip: 07827\r\n# invited: 1\r\nAdults: 1\r\n\""], ["first_name", "ABC"], ["user_id", 1], ["created_by_user_id", 1], ["import_source_id", 1], ["import_source_type", "CsvFile"], ["name", "ABC"], ["envelope_salutation", ""], ["letter_salutation", "Dear ABC,"], ["created_at", "2016-04-07 17:59:39.753055"], ["updated_at", "2016-04-07 17:59:39.753055"], ["avatar_color", 4]] 
    SQL (0.7ms) UPDATE "users" SET "contacts_count" = COALESCE("contacts_count", 0) + 1 WHERE "users"."id" = $1 /*application:AGENTBRIGHT*/ [["id", 1]] 
    SQL (0.4ms) UPDATE "csv_files" SET "total_failed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_failed_records", 2], ["updated_at", "2016-04-07 17:59:39.763091"], ["id", 1]] 
    SQL (0.4ms) INSERT INTO "csv_file_invalid_records" ("original_row", "csv_file_id", "created_at", "updated_at") VALUES ($1, $2, $3, $4) RETURNING "id" /*application:AGENTBRIGHT*/ [["original_row", "Avi,,,,,,,,,,,,,\"Family Name: Xyzlastname\r\nAddress 1: 100 Village Drive #C101\r\nCity: SomeCity, \r\nState: MA \r\nZip: 06777\r\n# invited: 1\r\nAdults: 1\r\n\",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,Normal,,My Contacts;Imported 8/13/15 1;Imported 8/13/15,\n"], ["csv_file_id", 1], ["created_at", "2016-04-07 17:59:39.767706"], ["updated_at", "2016-04-07 17:59:39.767706"]] 
    SQL (0.4ms) UPDATE "csv_files" SET "total_parsed_records" = $1, "updated_at" = $2 WHERE "csv_files"."id" = $3 /*application:AGENTBRIGHT*/ [["total_parsed_records", 3], ["updated_at", "2016-04-07 17:59:40.761966"], ["id", 1]] 
[CSV.base_row] Initializing emails... 
[CSV.base_row] 
[CSV.base_row] Email addresses: 0 
[CSV.base_ro 
+0

database.ymlファイルを投稿できますか?バックグラウンドで、あるいはコントローラの中で直接CSVをインポートしますか? –

+0

こんにちは。私はdatabase.ymlを追加しました。ユーザーはアプリケーション内にあります - ファイルアップロード(コントローラから)をクリックして作業者を蹴る – user2970050

+0

ワーカーはcsv_file.import!を呼び出します。あなたはcsv_fileモデルを投稿できますか? –

答えて

2

全表ロックは、おそらくアップデートを適用するときに参照整合性を維持するために一貫性を強制するのPostgresとやるべきことがあります複数のテーブルにわたって

つまり、Ruby/Rails内からクエリを改善する方法はたくさんあります。

User.create([{email: "[email protected]", name: "Joe"}, {email: "Ann", name: "Steve"}]) 

は、この完全にテーブルロックの問題をかわすことはありませんが、より効率的にする必要があります:あなたがそうのようにそれをハッシュオブジェクトの配列を渡す場合create方法は、一度に複数のレコードを作成することができます。

また、ActiveRecordコレクションを繰り返した場合、eachの代わりにfind_eachを使用してください。 find_eachは、一度にすべてのレコードをロードするのではなく、バッチでコレクションをロードします。

+0

あなたの答えはありがとうございました。個々の行については、これにより効率が向上します。しかし、私が理解していないことの1つは、CSVファイルの実行中にすべての連絡先dbaseがロックされている理由です。行と関連する連絡先をロックしてはいけませんか? – user2970050

+0

問題は、gem state-machine_active_recordがデフォルトでトランザクション内のすべての状態遷移をラップすることでした。 ワーカーはcsv_file.import!を実行して解析サービスを呼び出し、csv_fileステートマシンで遷移をトリガーし、CsvParsingServiceと呼ばれ、各行を解析しました。ステートマシンはトランザクション内のすべてをラップしたので、状態遷移が完了するまでコミットされたものはありません。 gemをバージョン0.4.0preに更新し、オプションuse_transactions:falseをCsvFileモデルのステートマシンに追加することで、処理時にデータベースをロックしなくなりました。 – user2970050

関連する問題