Class: ImportDataset::DarwinCore
Defined Under Namespace
Classes: Checklist, Occurrences, Unknown
Constant Summary
collapse
- CHECKLIST_ROW_TYPE =
'http://rs.tdwg.org/dwc/terms/Taxon'.freeze
- OCCURRENCES_ROW_TYPE =
'http://rs.tdwg.org/dwc/terms/Occurrence'.freeze
Instance Attribute Summary
#description, #metadata, #source_content_type, #source_file_name, #source_file_size, #source_updated_at, #status
Class Method Summary
collapse
Instance Method Summary
collapse
#delete_origin_relationships
#new_objects, #old_objects, #reject_origin_relationships, #set_origin
#errors_excepting, #full_error_messages_excepting, #identical, #is_community?, #is_destroyable?, #is_editable?, #is_in_use?, #is_in_users_projects?, #metamorphosize, #similar
#has_polymorphic_relationship?
transaction_with_retry
Constructor Details
#initialize(params) ⇒ DarwinCore
Returns a new instance of DarwinCore.
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'app/models/import_dataset/darwin_core.rb', line 13
def initialize(params)
import_settings = params&.delete(:import_settings)
super(params)
self.metadata = {
core_headers: [],
namespaces: {
core: nil,
eventID: nil
}
}
set_import_settings(import_settings || {})
end
|
Class Method Details
.create_with_subtype_detection(params) ⇒ Checklist, ...
Returns the appropriate ImportDataset::DarwinCore subclass instantiated (not saved) for the supplied params
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'app/models/import_dataset/darwin_core.rb', line 36
def self.create_with_subtype_detection(params)
core_type = nil
return Unknown.new unless params[:source]
begin
path = params[:source].tempfile.path
if path =~ /\.zip\z/i
dwc = ::DarwinCore.new(path)
core_type = dwc.core.data[:attributes][:rowType]
[dwc.core, *dwc.extensions].each do |table|
table.read { |data, errors| raise 'Errors found when reading data' unless errors.empty? }
end
else
if path =~ /\.(xlsx?|ods)\z/i
= CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f.strip}).
else
col_sep = default_if_absent(params.dig(:import_settings, :col_sep), "\t")
quote_char = default_if_absent(params.dig(:import_settings, :qoute_char), '"')
= CSV.read(path, headers: true, col_sep: col_sep, quote_char: quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f.strip}).
end
row_type = params.dig(:import_settings, :row_type)
if row_type
core_type = row_type
elsif .include? 'occurrenceID'
core_type = OCCURRENCES_ROW_TYPE
elsif .include? 'taxonID'
core_type = CHECKLIST_ROW_TYPE
end
end
rescue Errno::ENOENT, RuntimeError => e return Unknown.new(params.merge({error_message: "#{e.message}"}))
end
case core_type
when OCCURRENCES_ROW_TYPE
Occurrences.new(params)
when CHECKLIST_ROW_TYPE
Checklist.new(params)
else
Unknown.new(params.merge({error_message: "unknown DwC-A core type '#{core_type}'."}))
end
end
|
.default_if_absent(value, default) ⇒ Object
310
311
312
313
|
# File 'app/models/import_dataset/darwin_core.rb', line 310
def self.default_if_absent(value, default)
return default if value.nil? || value.empty?
value
end
|
Instance Method Details
#add_filters(records, filters) ⇒ Object
344
345
346
347
348
349
|
# File 'app/models/import_dataset/darwin_core.rb', line 344
def add_filters(records, filters)
filters&.each do |key, value|
records = records.where(id: core_records_fields.at(key.to_i).having_value(value).select(:dataset_record_id))
end
records
end
|
#check_field_set ⇒ Object
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
|
# File 'app/models/import_dataset/darwin_core.rb', line 365
def check_field_set
if source.staged?
if source.staged_path =~ /\.zip\z/i
= (::DarwinCore.new(source.staged_path).core)
else
if source.staged_path =~ /\.(xlsx?|ods)\z/i
= CSV.parse(Roo::Spreadsheet.open(source.staged_path).to_csv, headers: true).
else
= CSV.read(source.staged_path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8').
end
end
= self.class::MINIMUM_FIELD_SET -
.each do ||
errors.add(:source, "required field #{} missing.")
end
end
end
|
#core_records_fields ⇒ Object
28
29
30
|
# File 'app/models/import_dataset/darwin_core.rb', line 28
def core_records_fields
dataset_record_fields.with_record_class(core_records_class)
end
|
#core_records_mapped_fields ⇒ Integer
Returns the indexes of the mapped fields for the core records.
85
86
87
|
# File 'app/models/import_dataset/darwin_core.rb', line 85
def core_records_mapped_fields
core_records&.first&.get_mapped_fields(dwc_data_attributes) || []
end
|
#default_nomenclatural_code ⇒ Object
239
240
241
|
# File 'app/models/import_dataset/darwin_core.rb', line 239
def default_nomenclatural_code
self.metadata.dig('import_settings', 'nomenclatural_code')&.downcase&.to_sym || :iczn
end
|
#destroy_namespace ⇒ Object
340
341
342
|
# File 'app/models/import_dataset/darwin_core.rb', line 340
def destroy_namespace
Namespace.find_by(id: metadata['identifier_namespace'])&.destroy end
|
#dwc_data_attributes ⇒ Object
385
386
387
388
389
390
391
392
393
394
395
|
# File 'app/models/import_dataset/darwin_core.rb', line 385
def dwc_data_attributes
project.preferences['model_predicate_sets'].map do |model, predicate_ids|
[model, Hash[
*Predicate.where(id: predicate_ids)
.select { |p| /^http:\/\/rs\.tdwg\.org\/dwc\/terms\/.*/ =~ p.uri }
.map {|p| [p.uri.split('/').last, p]}
.flatten
]
]
end.to_h
end
|
#get_col_sep ⇒ Object
315
316
317
|
# File 'app/models/import_dataset/darwin_core.rb', line 315
def get_col_sep
DarwinCore.default_if_absent(metadata.dig('import_settings', 'col_sep'), "\t")
end
|
#get_core_record_identifier_namespace ⇒ Object
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
|
# File 'app/models/import_dataset/darwin_core.rb', line 212
def get_core_record_identifier_namespace
id = metadata.dig('namespaces', 'core')
if id.nil? || (@core_record_identifier_namespace ||= Namespace.find_by(id:)).nil?
random = SecureRandom.hex(4)
project_name = Project.find(Current.project_id).name
namespace_name = "#{core_records_identifier_name} namespace for \"#{description}\" dataset in \"#{project_name}\" project [#{random}]"
@core_record_identifier_namespace = Namespace.create!(
name: namespace_name,
short_name: "#{core_records_identifier_name}-#{random}",
verbatim_short_name: core_records_identifier_name,
delimiter: ':'
)
metadata.deep_merge!({
'namespaces' => {
'core' => @core_record_identifier_namespace.id
}
})
save!
end
@core_record_identifier_namespace
end
|
#get_dwc_default_values(table) ⇒ Object
329
330
331
|
# File 'app/models/import_dataset/darwin_core.rb', line 329
def get_dwc_default_values(table)
table.fields.select { |f| f.has_key? :default }
end
|
276
277
278
279
280
281
282
283
284
285
286
287
|
# File 'app/models/import_dataset/darwin_core.rb', line 276
def (table)
= []
[table.id[:index]] = 'id' if table.id
table.fields.each { |f| [f[:index]] = get_normalized_dwc_term(f) if f[:index] }
table..first&.each_with_index { |f, i| [i] ||= f.strip }
get_dwc_default_values(table).each.with_index(.length) { |f, i| [i] = get_normalized_dwc_term(f) }
end
|
#get_dwc_records(table) ⇒ Object
289
290
291
292
293
294
295
296
297
298
299
300
301
302
|
# File 'app/models/import_dataset/darwin_core.rb', line 289
def get_dwc_records(table)
records = []
= (table)
records = table.read.first.map do |row|
record = {}
row.each_with_index { |v, i| record[[i]] = v }
defaults = get_dwc_default_values(table)
defaults.each.with_index(.length - defaults.length) { |f, i| record[[i]] = f[:default] }
record
end
return records
end
|
#get_field_mapping(field_name) ⇒ Object
304
305
306
|
# File 'app/models/import_dataset/darwin_core.rb', line 304
def get_field_mapping(field_name)
get_fields_mapping[field_name.to_s.downcase]
end
|
#get_fields_mapping ⇒ Object
323
324
325
326
327
|
# File 'app/models/import_dataset/darwin_core.rb', line 323
def get_fields_mapping
@fields_mapping ||= metadata['core_headers']
.reject(&:nil?)
.each.with_index.inject({}) { |m, (h, i)| m.merge({ h.downcase => i, i => h}) }
end
|
#get_normalized_dwc_term(field) ⇒ Object
333
334
335
336
337
338
|
# File 'app/models/import_dataset/darwin_core.rb', line 333
def get_normalized_dwc_term(field)
term = field[:term].match(/\/([^\/]+)\/terms\/.*(?<=\/)([^\/]+)\/?$/)
term ? term[2] : field[:term]
end
|
#get_quote_char ⇒ Object
319
320
321
|
# File 'app/models/import_dataset/darwin_core.rb', line 319
def get_quote_char
DarwinCore.default_if_absent(metadata.dig('import_settings', 'quote_char'), '"')
end
|
#get_records(path) ⇒ Object
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
|
# File 'app/models/import_dataset/darwin_core.rb', line 245
def get_records(path)
records = { core: [], extensions: {} }
= { core: [], extensions: {} }
if path =~ /\.zip\z/i
dwc = ::DarwinCore.new(path)
[:core] = (dwc.core)
records[:core] = get_dwc_records(dwc.core)
dwc.extensions.each do |extension|
type = extension.properties[:rowType]
records[:extensions][type] = get_dwc_records(extension)
[:extensions][type] = (extension)
end
elsif path =~ /\.(csv|txt|tsv|xlsx?|ods)\z/i
if path =~ /\.(csv|txt|tsv)\z/i
records[:core] = CSV.read(path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f&.strip})
else
records[:core] = CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f&.strip})
end
records[:core] = records[:core].map { |r| r.to_h }
[:core] = records[:core].first.to_h.keys
else
raise 'Unsupported input format'
end
return records,
end
|
#import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil) ⇒ Hash
Returns the updated dataset records. Do not call if there are changes that have not been persisted
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
|
# File 'app/models/import_dataset/darwin_core.rb', line 134
def import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil)
imported = []
lock_time = Time.now
old_uuid = self.metadata['import_uuid']
start_import do
lock_time = Time.now - lock_time
filters = self.metadata['import_filters'] if filters.nil?
retry_errored = self.metadata['import_retry_errored'] if retry_errored.nil?
start_id = self.metadata['import_start_id'] if retry_errored
status = ['Ready']
status << 'Errored' if retry_errored
records = add_filters(core_records.where(status:), filters).order(:id).limit(max_records)
records = records.where(id: start_id..) if start_id
records = core_records.where(id: record_id, status: %w{Ready Errored}) if record_id
records = records.all
start_time = Time.now - lock_time
records.each do |record|
imported << record.import(dwc_data_attributes)
break if 1000.0*(Time.now - start_time).abs > max_time
end
if imported.any? && record_id.nil?
reload
self.metadata.merge!({
'import_start_id' => imported.last&.id + 1,
'import_filters' => filters,
'import_retry_errored' => retry_errored
})
save!
new_uuid = self.metadata['import_uuid']
ImportDatasetImportJob.perform_later(self, new_uuid, max_time, max_records) unless old_uuid == new_uuid
else
self.stop_import
end
end
imported
end
|
#progress(filters: nil) ⇒ Hash
Returns a hash with the record counts grouped by status
183
184
185
|
# File 'app/models/import_dataset/darwin_core.rb', line 183
def progress(filters: nil)
add_filters(core_records, filters).group(:status).count
end
|
#set_import_settings(import_settings) ⇒ Object
Sets import settings for this dataset
205
206
207
208
209
210
|
# File 'app/models/import_dataset/darwin_core.rb', line 205
def set_import_settings(import_settings)
metadata['import_settings'] ||= {}
import_settings.each { |k, v| metadata['import_settings'].merge!({k => v}) }
metadata['import_settings']
end
|
#stage ⇒ Object
Stages DwC-A records into DB.
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
|
# File 'app/models/import_dataset/darwin_core.rb', line 188
def stage
if status == 'Staging' transaction do
core_records_fields.delete_all
dataset_records.delete_all
end
end
update!(status: 'Staging') if status == 'Uploaded'
if status != 'Ready'
perform_staging
update!(status: 'Ready')
end
end
|
#start_import(&block) ⇒ String
Sets up import dataset for import and returns UUID. If already started same UUID is returned (unless last activity was more than 10 minutes ago). Do not call if there are changes that have not been persisted
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
# File 'app/models/import_dataset/darwin_core.rb', line 92
def start_import(&block)
with_lock do
case self.status
when 'Ready'
self.status = 'Importing'
self.metadata['import_uuid'] = SecureRandom.uuid
when 'Importing'
self.metadata['import_uuid'] = SecureRandom.uuid if self.updated_at < 10.minutes.ago
else
raise 'Invalid initial state'
end
save!
yield if block_given?
end
self.metadata['import_uuid']
end
|
#stop_import ⇒ Object
Sets import dataset to stop importing data. Do not call if there are changes that have not been persisted.
112
113
114
115
116
117
118
119
120
|
# File 'app/models/import_dataset/darwin_core.rb', line 112
def stop_import
with_lock do
if self.status == 'Importing'
self.status = 'Ready'
self.metadata.except!('import_uuid', 'import_start_id', 'import_filters', 'import_retry_errored')
save!
end
end
end
|
351
352
353
354
355
356
357
358
359
360
361
362
363
|
# File 'app/models/import_dataset/darwin_core.rb', line 351
def well_formed
begin
= get_records(source.staged_path).last[:core]
duplicates = .compact.map(&:downcase).tally.select { |_, count| count > 1 }.keys
if duplicates.any?
errors.add(:source, "Duplicate headers found: #{duplicates.join(', ')}")
end
rescue RuntimeError
errors.add(:source, 'A problem occurred when reading the data file. If this is a text file please make sure the selected string and field delimiters are correct.')
end
true
end
|