Class: ImportDataset::DarwinCore
- Inherits:
-
ImportDataset
- Object
- ActiveRecord::Base
- ApplicationRecord
- ImportDataset
- ImportDataset::DarwinCore
- Defined in:
- app/models/import_dataset/darwin_core.rb
Direct Known Subclasses
Defined Under Namespace
Classes: Checklist, Occurrences, Unknown
Constant Summary collapse
- CHECKLIST_ROW_TYPE =
"http://rs.tdwg.org/dwc/terms/Taxon"
- OCCURRENCES_ROW_TYPE =
"http://rs.tdwg.org/dwc/terms/Occurrence"
Instance Attribute Summary
Attributes inherited from ImportDataset
#description, #metadata, #source_content_type, #source_file_name, #source_file_size, #source_updated_at, #status
Class Method Summary collapse
-
.create_with_subtype_detection(params) ⇒ Checklist, ...
Returns the appropriate ImportDataset::DarwinCore subclass instantiated (not saved) for the supplied params.
Instance Method Summary collapse
- #add_filters(records, filters) ⇒ Object private
- #core_records_fields ⇒ Object
- #default_nomenclatural_code ⇒ Object
- #destroy_namespace ⇒ Object private
- #get_core_record_identifier_namespace ⇒ Object
- #get_dwc_default_values(table) ⇒ Object private
- #get_dwc_headers(table) ⇒ Object protected
- #get_dwc_records(table) ⇒ Object protected
- #get_field_mapping(field_name) ⇒ Object protected
- #get_fields_mapping ⇒ Object private
- #get_normalized_dwc_term(field) ⇒ Object private
- #get_records(source) ⇒ Object protected
-
#import(max_time, max_records, retry_errored: false, filters: nil, record_id: nil) ⇒ Hash
Returns the updated dataset records.
-
#initialize(params) ⇒ DarwinCore
constructor
A new instance of DarwinCore.
-
#progress(filters: nil) ⇒ Hash
Returns a hash with the record counts grouped by status.
-
#set_import_settings(import_settings) ⇒ Object
Sets import settings for this dataset.
-
#stage ⇒ Object
Stages DwC-A records into DB.
-
#start_import(&block) ⇒ String
Sets up import dataset for import and returns UUID.
-
#stop_import ⇒ Object
Sets import dataset to stop importing data.
Methods inherited from ImportDataset
Methods included from Shared::OriginRelationship
#new_objects, #old_objects, #reject_origin_relationships, #set_origin
Methods included from Shared::IsData
#errors_excepting, #full_error_messages_excepting, #identical, #is_community?, #is_destroyable?, #is_editable?, #is_in_use?, #is_in_users_projects?, #metamorphosize, #similar
Methods included from Housekeeping
#has_polymorphic_relationship?
Methods inherited from ApplicationRecord
Constructor Details
#initialize(params) ⇒ DarwinCore
Returns a new instance of DarwinCore.
11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
# File 'app/models/import_dataset/darwin_core.rb', line 11 def initialize(params) import_settings = params&.delete(:import_settings) super(params) self. = { core_headers: [], namespaces: { core: nil, eventID: nil } } set_import_settings(import_settings || {}) end |
Class Method Details
.create_with_subtype_detection(params) ⇒ Checklist, ...
Returns the appropriate ImportDataset::DarwinCore subclass instantiated (not saved) for the supplied params
34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
# File 'app/models/import_dataset/darwin_core.rb', line 34 def self.create_with_subtype_detection(params) core_type = nil return Unknown.new unless params[:source] begin path = params[:source].tempfile.path if path =~ /\.zip\z/i dwc = ::DarwinCore.new(path) core_type = dwc.core.data[:attributes][:rowType] ### Check all files are readable [dwc.core, *dwc.extensions].each do |table| table.read { |data, errors| raise "Errors found when reading data" unless errors.empty? } end else if path =~ /\.(xlsx?|ods)\z/i headers = CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true).headers else headers = CSV.read(path, headers: true, col_sep: "\t", quote_char: nil, encoding: 'bom|utf-8').headers end row_type = params.dig(:import_settings, :row_type) if row_type core_type = row_type elsif headers.include? "occurrenceID" core_type = OCCURRENCES_ROW_TYPE elsif headers.include? "taxonID" core_type = CHECKLIST_ROW_TYPE end end rescue Errno::ENOENT, RuntimeError => e # TODO: dwc-archive gem should probably detect missing (or wrongly mapped) files and raise its own exception return Unknown.new(params.merge({error_message: "#{e.}"})) end case core_type when OCCURRENCES_ROW_TYPE Occurrences.new(params) when CHECKLIST_ROW_TYPE Checklist.new(params) else Unknown.new(params.merge({error_message: "unknown DwC-A core type '#{core_type}'."})) end end |
Instance Method Details
#add_filters(records, filters) ⇒ Object (private)
328 329 330 331 332 333 |
# File 'app/models/import_dataset/darwin_core.rb', line 328 def add_filters(records, filters) filters&.each do |key, value| records = records.where(id: core_records_fields.at(key.to_i).with_value(value).select(:dataset_record_id)) end records end |
#core_records_fields ⇒ Object
26 27 28 |
# File 'app/models/import_dataset/darwin_core.rb', line 26 def core_records_fields dataset_record_fields.with_record_class(core_records_class) end |
#default_nomenclatural_code ⇒ Object
237 238 239 |
# File 'app/models/import_dataset/darwin_core.rb', line 237 def default_nomenclatural_code self..dig("import_settings", "nomenclatural_code")&.downcase&.to_sym || :iczn end |
#destroy_namespace ⇒ Object (private)
324 325 326 |
# File 'app/models/import_dataset/darwin_core.rb', line 324 def destroy_namespace Namespace.find_by(id: ["identifier_namespace"])&.destroy # If in use or gone no deletion happens end |
#get_core_record_identifier_namespace ⇒ Object
210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 |
# File 'app/models/import_dataset/darwin_core.rb', line 210 def get_core_record_identifier_namespace id = .dig("namespaces", "core") if id.nil? || (@core_record_identifier_namespace ||= Namespace.find_by(id: id)).nil? random = SecureRandom.hex(4) project_name = Project.find(Current.project_id).name namespace_name = "#{core_records_identifier_name} namespace for \"#{description}\" dataset in \"#{project_name}\" project [#{random}]" @core_record_identifier_namespace = Namespace.create!( name: namespace_name, short_name: "#{core_records_identifier_name}-#{random}", verbatim_short_name: core_records_identifier_name, delimiter: ':' ) .deep_merge!({ "namespaces" => { "core" => @core_record_identifier_namespace.id } }) save! end @core_record_identifier_namespace end |
#get_dwc_default_values(table) ⇒ Object (private)
313 314 315 |
# File 'app/models/import_dataset/darwin_core.rb', line 313 def get_dwc_default_values(table) table.fields.select { |f| f.has_key? :default } end |
#get_dwc_headers(table) ⇒ Object (protected)
273 274 275 276 277 278 279 280 281 282 283 284 |
# File 'app/models/import_dataset/darwin_core.rb', line 273 def get_dwc_headers(table) headers = [] headers[table.id[:index]] = "id" if table.id table.fields.each { |f| headers[f[:index]] = get_normalized_dwc_term(f) if f[:index] } table.read_header.first.each_with_index { |f, i| headers[i] ||= f.strip } get_dwc_default_values(table).each.with_index(headers.length) { |f, i| headers[i] = get_normalized_dwc_term(f) } headers end |
#get_dwc_records(table) ⇒ Object (protected)
286 287 288 289 290 291 292 293 294 295 296 297 298 299 |
# File 'app/models/import_dataset/darwin_core.rb', line 286 def get_dwc_records(table) records = [] headers = get_dwc_headers(table) records = table.read.first.map do |row| record = {} row.each_with_index { |v, i| record[headers[i]] = v } defaults = get_dwc_default_values(table) defaults.each.with_index(headers.length - defaults.length) { |f, i| record[headers[i]] = f[:default] } record end return records end |
#get_field_mapping(field_name) ⇒ Object (protected)
301 302 303 |
# File 'app/models/import_dataset/darwin_core.rb', line 301 def get_field_mapping(field_name) get_fields_mapping[field_name.to_s.downcase] end |
#get_fields_mapping ⇒ Object (private)
307 308 309 310 311 |
# File 'app/models/import_dataset/darwin_core.rb', line 307 def get_fields_mapping @fields_mapping ||= ["core_headers"] .reject(&:nil?) .each.with_index.inject({}) { |m, (h, i)| m.merge({ h.downcase => i, i => h}) } end |
#get_normalized_dwc_term(field) ⇒ Object (private)
317 318 319 320 321 322 |
# File 'app/models/import_dataset/darwin_core.rb', line 317 def get_normalized_dwc_term(field) # TODO: Think what to do about complex namespaces like "/std/Iptc4xmpExt/2008-02-29/" (currently returning the full URI as header) term = field[:term].match(/\/([^\/]+)\/terms\/.*(?<=\/)([^\/]+)\/?$/) #headers[field[:index]] = term ? term[1..2].join(":") : field[:term] term ? term[2] : field[:term] end |
#get_records(source) ⇒ Object (protected)
243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 |
# File 'app/models/import_dataset/darwin_core.rb', line 243 def get_records(source) records = { core: [], extensions: {} } headers = { core: [], extensions: {} } if source.path =~ /\.zip\z/i dwc = ::DarwinCore.new(source.path) headers[:core] = get_dwc_headers(dwc.core) records[:core] = get_dwc_records(dwc.core) dwc.extensions.each do |extension| type = extension.properties[:rowType] records[:extensions][type] = get_dwc_records(extension) headers[:extensions][type] = get_dwc_headers(extension) end elsif source.path =~ /\.(txt|tsv|xlsx?|ods)\z/i if source.path =~ /\.(txt|tsv)\z/i records[:core] = CSV.read(source.path, headers: true, col_sep: "\t", quote_char: nil, encoding: 'bom|utf-8') else records[:core] = CSV.parse(Roo::Spreadsheet.open(source.path).to_csv, headers: true) end records[:core] = records[:core].map { |r| r.to_h } headers[:core] = records[:core].first.to_h.keys else raise "Unsupported input format" end return records, headers end |
#import(max_time, max_records, retry_errored: false, filters: nil, record_id: nil) ⇒ Hash
Returns the updated dataset records. Do not call if there are changes that have not been persisted
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# File 'app/models/import_dataset/darwin_core.rb', line 124 def import(max_time, max_records, retry_errored: false, filters: nil, record_id: nil) imported = [] lock_time = Time.now old_uuid = self.['import_uuid'] start_import do lock_time = Time.now - lock_time filters = self.['import_filters'] if filters.nil? retry_errored = self.['import_retry_errored'] if retry_errored.nil? start_id = self.['import_start_id'] if retry_errored status = ["Ready"] status << "Errored" if retry_errored records = add_filters(core_records.where(status: status), filters).order(:id).limit(max_records) #.preload_fields records = records.where(id: start_id..) if start_id records = core_records.where(id: record_id, status: %w{Ready Errored}) if record_id records = records.all start_time = Time.now - lock_time dwc_data_attributes = project.preferences["model_predicate_sets"].map do |model, predicate_ids| [model, Hash[ *Predicate.where(id: predicate_ids) .select { |p| /^http:\/\/rs.tdwg.org\/dwc\/terms\/.*/ =~ p.uri } .map {|p| [p.uri.split('/').last, p]} .flatten ] ] end.to_h records.each do |record| imported << record.import(dwc_data_attributes) break if 1000.0*(Time.now - start_time).abs > max_time end if imported.any? && record_id.nil? self..merge!({ 'import_start_id' => imported.last&.id + 1, 'import_filters' => filters, 'import_retry_errored' => retry_errored }) save! new_uuid = self.['import_uuid'] ImportDatasetImportJob.perform_later(self, new_uuid, max_time, max_records) unless old_uuid == new_uuid else self.stop_import end end imported end |
#progress(filters: nil) ⇒ Hash
Returns a hash with the record counts grouped by status
181 182 183 |
# File 'app/models/import_dataset/darwin_core.rb', line 181 def progress(filters: nil) add_filters(core_records, filters).group(:status).count end |
#set_import_settings(import_settings) ⇒ Object
Sets import settings for this dataset
203 204 205 206 207 208 |
# File 'app/models/import_dataset/darwin_core.rb', line 203 def set_import_settings(import_settings) ["import_settings"] ||= {} import_settings.each { |k, v| ["import_settings"].merge!({k => v}) } ["import_settings"] end |
#stage ⇒ Object
Stages DwC-A records into DB.
186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 |
# File 'app/models/import_dataset/darwin_core.rb', line 186 def stage if status == "Staging" # ActiveJob being retried could cause this state transaction do core_records_fields.delete_all dataset_records.delete_all end end update!(status: "Staging") if status == "Uploaded" if status != "Ready" perform_staging update!(status: "Ready") end end |
#start_import(&block) ⇒ String
Sets up import dataset for import and returns UUID. If already started same UUID is returned (unless last activity was more than 10 minutes ago). Do not call if there are changes that have not been persisted
82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'app/models/import_dataset/darwin_core.rb', line 82 def start_import(&block) with_lock do case self.status when 'Ready' self.status = 'Importing' self.['import_uuid'] = SecureRandom.uuid when 'Importing' self.['import_uuid'] = SecureRandom.uuid if self.updated_at < 10.minutes.ago else raise "Invalid initial state" end save! yield if block_given? end self.['import_uuid'] end |
#stop_import ⇒ Object
Sets import dataset to stop importing data. Do not call if there are changes that have not been persisted.
102 103 104 105 106 107 108 109 110 |
# File 'app/models/import_dataset/darwin_core.rb', line 102 def stop_import with_lock do if self.status == 'Importing' self.status = 'Ready' self..except!('import_uuid', 'import_start_id', 'import_filters', 'import_retry_errored') save! end end end |