Class: ImportDataset::DarwinCore
Defined Under Namespace
Classes: Checklist, Occurrences, Unknown
Constant Summary
collapse
- CHECKLIST_ROW_TYPE =
'http://rs.tdwg.org/dwc/terms/Taxon'.freeze
- OCCURRENCES_ROW_TYPE =
'http://rs.tdwg.org/dwc/terms/Occurrence'.freeze
Instance Attribute Summary
#description, #metadata, #source_content_type, #source_file_name, #source_file_size, #source_updated_at, #status
Class Method Summary
collapse
Instance Method Summary
collapse
#delete_origin_relationships
#new_objects, #old_objects, #reject_origin_relationships, #set_origin
#errors_excepting, #full_error_messages_excepting, #identical, #is_community?, #is_destroyable?, #is_editable?, #is_in_use?, #is_in_users_projects?, #metamorphosize, #similar
#has_polymorphic_relationship?
transaction_with_retry
Constructor Details
#initialize(params) ⇒ DarwinCore
Returns a new instance of DarwinCore.
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
# File 'app/models/import_dataset/darwin_core.rb', line 13
def initialize(params)
import_settings = params&.delete(:import_settings)
super(params)
self.metadata = {
core_headers: [],
namespaces: {
core: nil,
eventID: nil
}
}
set_import_settings(import_settings || {})
end
|
Class Method Details
.create_with_subtype_detection(params) ⇒ Checklist, ...
Returns the appropriate ImportDataset::DarwinCore subclass instantiated (not saved) for the supplied params
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
# File 'app/models/import_dataset/darwin_core.rb', line 36
def self.create_with_subtype_detection(params)
core_type = nil
return Unknown.new unless params[:source]
begin
path = params[:source].tempfile.path
if path =~ /\.zip\z/i
dwc = ::DarwinCore.new(path)
core_type = dwc.core.data[:attributes][:rowType]
[dwc.core, *dwc.extensions].each do |table|
table.read { |data, errors| raise 'Errors found when reading data' unless errors.empty? }
end
else
if path =~ /\.(xlsx?|ods)\z/i
= CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f.strip}).
else
col_sep = default_if_absent(params.dig(:import_settings, :col_sep), "\t")
quote_char = default_if_absent(params.dig(:import_settings, :qoute_char), '"')
= CSV.read(path, headers: true, col_sep: col_sep, quote_char: quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f.strip}).
end
row_type = params.dig(:import_settings, :row_type)
if row_type
core_type = row_type
elsif .include? 'occurrenceID'
core_type = OCCURRENCES_ROW_TYPE
elsif .include? 'taxonID'
core_type = CHECKLIST_ROW_TYPE
end
end
rescue Errno::ENOENT, RuntimeError => e return Unknown.new(params.merge({error_message: "#{e.message}"}))
end
case core_type
when OCCURRENCES_ROW_TYPE
Occurrences.new(params)
when CHECKLIST_ROW_TYPE
Checklist.new(params)
else
Unknown.new(params.merge({error_message: "unknown DwC-A core type '#{core_type}'."}))
end
end
|
.default_if_absent(value, default) ⇒ Object
313
314
315
316
|
# File 'app/models/import_dataset/darwin_core.rb', line 313
def self.default_if_absent(value, default)
return default if value.nil? || value.empty?
value
end
|
Instance Method Details
#add_filters(records, filters) ⇒ Object
347
348
349
350
351
352
|
# File 'app/models/import_dataset/darwin_core.rb', line 347
def add_filters(records, filters)
filters&.each do |key, value|
records = records.where(id: core_records_fields.at(key.to_i).having_value(value).select(:dataset_record_id))
end
records
end
|
#check_field_set ⇒ Object
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
|
# File 'app/models/import_dataset/darwin_core.rb', line 363
def check_field_set
if source.staged?
if source.staged_path =~ /\.zip\z/i
= (::DarwinCore.new(source.staged_path).core)
else
if source.staged_path =~ /\.(xlsx?|ods)\z/i
= CSV.parse(Roo::Spreadsheet.open(source.staged_path).to_csv, headers: true).
else
= CSV.read(source.staged_path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8').
end
end
= self.class::MINIMUM_FIELD_SET -
.each do ||
errors.add(:source, "required field #{} missing.")
end
end
end
|
#core_records_are_readable ⇒ Object
354
355
356
357
358
359
360
361
|
# File 'app/models/import_dataset/darwin_core.rb', line 354
def core_records_are_readable
begin
get_records(source.staged_path)
rescue RuntimeError
errors.add(:source, "A problem occurred when reading the data file. If this is a text file please make sure the selected string and field delimiters are correct.")
end
true
end
|
#core_records_fields ⇒ Object
28
29
30
|
# File 'app/models/import_dataset/darwin_core.rb', line 28
def core_records_fields
dataset_record_fields.with_record_class(core_records_class)
end
|
#default_nomenclatural_code ⇒ Object
242
243
244
|
# File 'app/models/import_dataset/darwin_core.rb', line 242
def default_nomenclatural_code
self.metadata.dig('import_settings', 'nomenclatural_code')&.downcase&.to_sym || :iczn
end
|
#destroy_namespace ⇒ Object
343
344
345
|
# File 'app/models/import_dataset/darwin_core.rb', line 343
def destroy_namespace
Namespace.find_by(id: metadata['identifier_namespace'])&.destroy end
|
#get_col_sep ⇒ Object
318
319
320
|
# File 'app/models/import_dataset/darwin_core.rb', line 318
def get_col_sep
DarwinCore.default_if_absent(metadata.dig("import_settings", "col_sep"), "\t")
end
|
#get_core_record_identifier_namespace ⇒ Object
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
|
# File 'app/models/import_dataset/darwin_core.rb', line 215
def get_core_record_identifier_namespace
id = metadata.dig('namespaces', 'core')
if id.nil? || (@core_record_identifier_namespace ||= Namespace.find_by(id:)).nil?
random = SecureRandom.hex(4)
project_name = Project.find(Current.project_id).name
namespace_name = "#{core_records_identifier_name} namespace for \"#{description}\" dataset in \"#{project_name}\" project [#{random}]"
@core_record_identifier_namespace = Namespace.create!(
name: namespace_name,
short_name: "#{core_records_identifier_name}-#{random}",
verbatim_short_name: core_records_identifier_name,
delimiter: ':'
)
metadata.deep_merge!({
'namespaces' => {
'core' => @core_record_identifier_namespace.id
}
})
save!
end
@core_record_identifier_namespace
end
|
#get_dwc_default_values(table) ⇒ Object
332
333
334
|
# File 'app/models/import_dataset/darwin_core.rb', line 332
def get_dwc_default_values(table)
table.fields.select { |f| f.has_key? :default }
end
|
279
280
281
282
283
284
285
286
287
288
289
290
|
# File 'app/models/import_dataset/darwin_core.rb', line 279
def (table)
= []
[table.id[:index]] = 'id' if table.id
table.fields.each { |f| [f[:index]] = get_normalized_dwc_term(f) if f[:index] }
table..first&.each_with_index { |f, i| [i] ||= f.strip }
get_dwc_default_values(table).each.with_index(.length) { |f, i| [i] = get_normalized_dwc_term(f) }
end
|
#get_dwc_records(table) ⇒ Object
292
293
294
295
296
297
298
299
300
301
302
303
304
305
|
# File 'app/models/import_dataset/darwin_core.rb', line 292
def get_dwc_records(table)
records = []
= (table)
records = table.read.first.map do |row|
record = {}
row.each_with_index { |v, i| record[[i]] = v }
defaults = get_dwc_default_values(table)
defaults.each.with_index(.length - defaults.length) { |f, i| record[[i]] = f[:default] }
record
end
return records
end
|
#get_field_mapping(field_name) ⇒ Object
307
308
309
|
# File 'app/models/import_dataset/darwin_core.rb', line 307
def get_field_mapping(field_name)
get_fields_mapping[field_name.to_s.downcase]
end
|
#get_fields_mapping ⇒ Object
326
327
328
329
330
|
# File 'app/models/import_dataset/darwin_core.rb', line 326
def get_fields_mapping
@fields_mapping ||= metadata['core_headers']
.reject(&:nil?)
.each.with_index.inject({}) { |m, (h, i)| m.merge({ h.downcase => i, i => h}) }
end
|
#get_normalized_dwc_term(field) ⇒ Object
336
337
338
339
340
341
|
# File 'app/models/import_dataset/darwin_core.rb', line 336
def get_normalized_dwc_term(field)
term = field[:term].match(/\/([^\/]+)\/terms\/.*(?<=\/)([^\/]+)\/?$/)
term ? term[2] : field[:term]
end
|
#get_quote_char ⇒ Object
322
323
324
|
# File 'app/models/import_dataset/darwin_core.rb', line 322
def get_quote_char
DarwinCore.default_if_absent(metadata.dig("import_settings", "quote_char"), "\"")
end
|
#get_records(path) ⇒ Object
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
|
# File 'app/models/import_dataset/darwin_core.rb', line 248
def get_records(path)
records = { core: [], extensions: {} }
= { core: [], extensions: {} }
if path =~ /\.zip\z/i
dwc = ::DarwinCore.new(path)
[:core] = (dwc.core)
records[:core] = get_dwc_records(dwc.core)
dwc.extensions.each do |extension|
type = extension.properties[:rowType]
records[:extensions][type] = get_dwc_records(extension)
[:extensions][type] = (extension)
end
elsif path =~ /\.(csv|txt|tsv|xlsx?|ods)\z/i
if path =~ /\.(csv|txt|tsv)\z/i
records[:core] = CSV.read(path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f&.strip})
else
records[:core] = CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f&.strip})
end
records[:core] = records[:core].map { |r| r.to_h }
[:core] = records[:core].first.to_h.keys
else
raise 'Unsupported input format'
end
return records,
end
|
#import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil) ⇒ Hash
Returns the updated dataset records. Do not call if there are changes that have not been persisted
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
|
# File 'app/models/import_dataset/darwin_core.rb', line 128
def import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil)
imported = []
lock_time = Time.now
old_uuid = self.metadata['import_uuid']
start_import do
lock_time = Time.now - lock_time
filters = self.metadata['import_filters'] if filters.nil?
retry_errored = self.metadata['import_retry_errored'] if retry_errored.nil?
start_id = self.metadata['import_start_id'] if retry_errored
status = ['Ready']
status << 'Errored' if retry_errored
records = add_filters(core_records.where(status:), filters).order(:id).limit(max_records)
records = records.where(id: start_id..) if start_id
records = core_records.where(id: record_id, status: %w{Ready Errored}) if record_id
records = records.all
start_time = Time.now - lock_time
dwc_data_attributes = project.preferences['model_predicate_sets'].map do |model, predicate_ids|
[model, Hash[
*Predicate.where(id: predicate_ids)
.select { |p| /^http:\/\/rs\.tdwg\.org\/dwc\/terms\/.*/ =~ p.uri }
.map {|p| [p.uri.split('/').last, p]}
.flatten
]
]
end.to_h
records.each do |record|
imported << record.import(dwc_data_attributes)
break if 1000.0*(Time.now - start_time).abs > max_time
end
if imported.any? && record_id.nil?
reload
self.metadata.merge!({
'import_start_id' => imported.last&.id + 1,
'import_filters' => filters,
'import_retry_errored' => retry_errored
})
save!
new_uuid = self.metadata['import_uuid']
ImportDatasetImportJob.perform_later(self, new_uuid, max_time, max_records) unless old_uuid == new_uuid
else
self.stop_import
end
end
imported
end
|
#progress(filters: nil) ⇒ Hash
Returns a hash with the record counts grouped by status
186
187
188
|
# File 'app/models/import_dataset/darwin_core.rb', line 186
def progress(filters: nil)
add_filters(core_records, filters).group(:status).count
end
|
#set_import_settings(import_settings) ⇒ Object
Sets import settings for this dataset
208
209
210
211
212
213
|
# File 'app/models/import_dataset/darwin_core.rb', line 208
def set_import_settings(import_settings)
metadata['import_settings'] ||= {}
import_settings.each { |k, v| metadata['import_settings'].merge!({k => v}) }
metadata['import_settings']
end
|
#stage ⇒ Object
Stages DwC-A records into DB.
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
# File 'app/models/import_dataset/darwin_core.rb', line 191
def stage
if status == 'Staging' transaction do
core_records_fields.delete_all
dataset_records.delete_all
end
end
update!(status: 'Staging') if status == 'Uploaded'
if status != 'Ready'
perform_staging
update!(status: 'Ready')
end
end
|
#start_import(&block) ⇒ String
Sets up import dataset for import and returns UUID. If already started same UUID is returned (unless last activity was more than 10 minutes ago). Do not call if there are changes that have not been persisted
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# File 'app/models/import_dataset/darwin_core.rb', line 86
def start_import(&block)
with_lock do
case self.status
when 'Ready'
self.status = 'Importing'
self.metadata['import_uuid'] = SecureRandom.uuid
when 'Importing'
self.metadata['import_uuid'] = SecureRandom.uuid if self.updated_at < 10.minutes.ago
else
raise 'Invalid initial state'
end
save!
yield if block_given?
end
self.metadata['import_uuid']
end
|
#stop_import ⇒ Object
Sets import dataset to stop importing data. Do not call if there are changes that have not been persisted.
106
107
108
109
110
111
112
113
114
|
# File 'app/models/import_dataset/darwin_core.rb', line 106
def stop_import
with_lock do
if self.status == 'Importing'
self.status = 'Ready'
self.metadata.except!('import_uuid', 'import_start_id', 'import_filters', 'import_retry_errored')
save!
end
end
end
|