Class: ImportDataset::DarwinCore
Defined Under Namespace
Classes: Checklist, Occurrences, Unknown
Constant Summary
collapse
- CHECKLIST_ROW_TYPE =
'http://rs.tdwg.org/dwc/terms/Taxon'.freeze
- OCCURRENCES_ROW_TYPE =
'http://rs.tdwg.org/dwc/terms/Occurrence'.freeze
Instance Attribute Summary
#description, #metadata, #source_content_type, #source_file_name, #source_file_size, #source_updated_at, #status
Class Method Summary
collapse
Instance Method Summary
collapse
#delete_origin_relationships
#new_objects, #old_objects, #reject_origin_relationships, #set_origin
#errors_excepting, #full_error_messages_excepting, #identical, #is_community?, #is_destroyable?, #is_editable?, #is_in_use?, #is_in_users_projects?, #metamorphosize, #similar
#has_polymorphic_relationship?
transaction_with_retry
Constructor Details
#initialize(params) ⇒ DarwinCore
Returns a new instance of DarwinCore.
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
# File 'app/models/import_dataset/darwin_core.rb', line 11
def initialize(params)
import_settings = params&.delete(:import_settings)
super(params)
self.metadata = {
core_headers: [],
namespaces: {
core: nil,
eventID: nil
}
}
set_import_settings(import_settings || {})
end
|
Class Method Details
.create_with_subtype_detection(params) ⇒ Checklist, ...
Returns the appropriate ImportDataset::DarwinCore subclass instantiated (not saved) for the supplied params
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# File 'app/models/import_dataset/darwin_core.rb', line 34
def self.create_with_subtype_detection(params)
core_type = nil
return Unknown.new unless params[:source]
begin
path = params[:source].tempfile.path
if path =~ /\.zip\z/i
dwc = ::DarwinCore.new(path)
core_type = dwc.core.data[:attributes][:rowType]
[dwc.core, *dwc.extensions].each do |table|
table.read { |data, errors| raise 'Errors found when reading data' unless errors.empty? }
end
else
if path =~ /\.(xlsx?|ods)\z/i
= CSV.parse(Roo::Spreadsheet.open(path).to_csv, headers: true, header_converters: lambda {|f| f.strip}).
else
= CSV.read(path, headers: true, col_sep: "\t", quote_char: nil, encoding: 'bom|utf-8', header_converters: lambda {|f| f.strip}).
end
row_type = params.dig(:import_settings, :row_type)
if row_type
core_type = row_type
elsif .include? 'occurrenceID'
core_type = OCCURRENCES_ROW_TYPE
elsif .include? 'taxonID'
core_type = CHECKLIST_ROW_TYPE
end
end
rescue Errno::ENOENT, RuntimeError => e return Unknown.new(params.merge({error_message: "#{e.message}"}))
end
case core_type
when OCCURRENCES_ROW_TYPE
Occurrences.new(params)
when CHECKLIST_ROW_TYPE
Checklist.new(params)
else
Unknown.new(params.merge({error_message: "unknown DwC-A core type '#{core_type}'."}))
end
end
|
Instance Method Details
#add_filters(records, filters) ⇒ Object
343
344
345
346
347
348
|
# File 'app/models/import_dataset/darwin_core.rb', line 343
def add_filters(records, filters)
filters&.each do |key, value|
records = records.where(id: core_records_fields.at(key.to_i).with_value(value).select(:dataset_record_id))
end
records
end
|
#core_records_fields ⇒ Object
26
27
28
|
# File 'app/models/import_dataset/darwin_core.rb', line 26
def core_records_fields
dataset_record_fields.with_record_class(core_records_class)
end
|
#default_nomenclatural_code ⇒ Object
238
239
240
|
# File 'app/models/import_dataset/darwin_core.rb', line 238
def default_nomenclatural_code
self.metadata.dig('import_settings', 'nomenclatural_code')&.downcase&.to_sym || :iczn
end
|
#destroy_namespace ⇒ Object
339
340
341
|
# File 'app/models/import_dataset/darwin_core.rb', line 339
def destroy_namespace
Namespace.find_by(id: metadata['identifier_namespace'])&.destroy end
|
#get_col_sep ⇒ Object
314
315
316
|
# File 'app/models/import_dataset/darwin_core.rb', line 314
def get_col_sep
return_default_if_absent(metadata.dig("import_settings", "col_sep"), "\t")
end
|
#get_core_record_identifier_namespace ⇒ Object
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
# File 'app/models/import_dataset/darwin_core.rb', line 211
def get_core_record_identifier_namespace
id = metadata.dig('namespaces', 'core')
if id.nil? || (@core_record_identifier_namespace ||= Namespace.find_by(id:)).nil?
random = SecureRandom.hex(4)
project_name = Project.find(Current.project_id).name
namespace_name = "#{core_records_identifier_name} namespace for \"#{description}\" dataset in \"#{project_name}\" project [#{random}]"
@core_record_identifier_namespace = Namespace.create!(
name: namespace_name,
short_name: "#{core_records_identifier_name}-#{random}",
verbatim_short_name: core_records_identifier_name,
delimiter: ':'
)
metadata.deep_merge!({
'namespaces' => {
'core' => @core_record_identifier_namespace.id
}
})
save!
end
@core_record_identifier_namespace
end
|
#get_dwc_default_values(table) ⇒ Object
328
329
330
|
# File 'app/models/import_dataset/darwin_core.rb', line 328
def get_dwc_default_values(table)
table.fields.select { |f| f.has_key? :default }
end
|
275
276
277
278
279
280
281
282
283
284
285
286
|
# File 'app/models/import_dataset/darwin_core.rb', line 275
def (table)
= []
[table.id[:index]] = 'id' if table.id
table.fields.each { |f| [f[:index]] = get_normalized_dwc_term(f) if f[:index] }
table..first&.each_with_index { |f, i| [i] ||= f.strip }
get_dwc_default_values(table).each.with_index(.length) { |f, i| [i] = get_normalized_dwc_term(f) }
end
|
#get_dwc_records(table) ⇒ Object
288
289
290
291
292
293
294
295
296
297
298
299
300
301
|
# File 'app/models/import_dataset/darwin_core.rb', line 288
def get_dwc_records(table)
records = []
= (table)
records = table.read.first.map do |row|
record = {}
row.each_with_index { |v, i| record[[i]] = v }
defaults = get_dwc_default_values(table)
defaults.each.with_index(.length - defaults.length) { |f, i| record[[i]] = f[:default] }
record
end
return records
end
|
#get_field_mapping(field_name) ⇒ Object
303
304
305
|
# File 'app/models/import_dataset/darwin_core.rb', line 303
def get_field_mapping(field_name)
get_fields_mapping[field_name.to_s.downcase]
end
|
#get_fields_mapping ⇒ Object
322
323
324
325
326
|
# File 'app/models/import_dataset/darwin_core.rb', line 322
def get_fields_mapping
@fields_mapping ||= metadata['core_headers']
.reject(&:nil?)
.each.with_index.inject({}) { |m, (h, i)| m.merge({ h.downcase => i, i => h}) }
end
|
#get_normalized_dwc_term(field) ⇒ Object
332
333
334
335
336
337
|
# File 'app/models/import_dataset/darwin_core.rb', line 332
def get_normalized_dwc_term(field)
term = field[:term].match(/\/([^\/]+)\/terms\/.*(?<=\/)([^\/]+)\/?$/)
term ? term[2] : field[:term]
end
|
#get_quote_char ⇒ Object
318
319
320
|
# File 'app/models/import_dataset/darwin_core.rb', line 318
def get_quote_char
return_default_if_absent(metadata.dig("import_settings", "quote_char"), "\"")
end
|
#get_records(source) ⇒ Object
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
# File 'app/models/import_dataset/darwin_core.rb', line 244
def get_records(source)
records = { core: [], extensions: {} }
= { core: [], extensions: {} }
if source.path =~ /\.zip\z/i
dwc = ::DarwinCore.new(source.path)
[:core] = (dwc.core)
records[:core] = get_dwc_records(dwc.core)
dwc.extensions.each do |extension|
type = extension.properties[:rowType]
records[:extensions][type] = get_dwc_records(extension)
[:extensions][type] = (extension)
end
elsif source.path =~ /\.(csv|txt|tsv|xlsx?|ods)\z/i
if source.path =~ /\.(csv|txt|tsv)\z/i
records[:core] = CSV.read(source.path, headers: true, col_sep: get_col_sep, quote_char: get_quote_char, encoding: 'bom|utf-8', header_converters: lambda {|f| f&.strip})
else
records[:core] = CSV.parse(Roo::Spreadsheet.open(source.path).to_csv, headers: true, header_converters: lambda {|f| f&.strip})
end
records[:core] = records[:core].map { |r| r.to_h }
[:core] = records[:core].first.to_h.keys
else
raise 'Unsupported input format'
end
return records,
end
|
#import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil) ⇒ Hash
Returns the updated dataset records. Do not call if there are changes that have not been persisted
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
|
# File 'app/models/import_dataset/darwin_core.rb', line 124
def import(max_time, max_records, retry_errored: nil, filters: nil, record_id: nil)
imported = []
lock_time = Time.now
old_uuid = self.metadata['import_uuid']
start_import do
lock_time = Time.now - lock_time
filters = self.metadata['import_filters'] if filters.nil?
retry_errored = self.metadata['import_retry_errored'] if retry_errored.nil?
start_id = self.metadata['import_start_id'] if retry_errored
status = ['Ready']
status << 'Errored' if retry_errored
records = add_filters(core_records.where(status:), filters).order(:id).limit(max_records)
records = records.where(id: start_id..) if start_id
records = core_records.where(id: record_id, status: %w{Ready Errored}) if record_id
records = records.all
start_time = Time.now - lock_time
dwc_data_attributes = project.preferences['model_predicate_sets'].map do |model, predicate_ids|
[model, Hash[
*Predicate.where(id: predicate_ids)
.select { |p| /^http:\/\/rs\.tdwg\.org\/dwc\/terms\/.*/ =~ p.uri }
.map {|p| [p.uri.split('/').last, p]}
.flatten
]
]
end.to_h
records.each do |record|
imported << record.import(dwc_data_attributes)
break if 1000.0*(Time.now - start_time).abs > max_time
end
if imported.any? && record_id.nil?
reload
self.metadata.merge!({
'import_start_id' => imported.last&.id + 1,
'import_filters' => filters,
'import_retry_errored' => retry_errored
})
save!
new_uuid = self.metadata['import_uuid']
ImportDatasetImportJob.perform_later(self, new_uuid, max_time, max_records) unless old_uuid == new_uuid
else
self.stop_import
end
end
imported
end
|
#progress(filters: nil) ⇒ Hash
Returns a hash with the record counts grouped by status
182
183
184
|
# File 'app/models/import_dataset/darwin_core.rb', line 182
def progress(filters: nil)
add_filters(core_records, filters).group(:status).count
end
|
#return_default_if_absent(value, default) ⇒ Object
309
310
311
312
|
# File 'app/models/import_dataset/darwin_core.rb', line 309
def return_default_if_absent(value, default)
return default if value.nil? || value.empty?
value
end
|
#set_import_settings(import_settings) ⇒ Object
Sets import settings for this dataset
204
205
206
207
208
209
|
# File 'app/models/import_dataset/darwin_core.rb', line 204
def set_import_settings(import_settings)
metadata['import_settings'] ||= {}
import_settings.each { |k, v| metadata['import_settings'].merge!({k => v}) }
metadata['import_settings']
end
|
#stage ⇒ Object
Stages DwC-A records into DB.
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
|
# File 'app/models/import_dataset/darwin_core.rb', line 187
def stage
if status == 'Staging' transaction do
core_records_fields.delete_all
dataset_records.delete_all
end
end
update!(status: 'Staging') if status == 'Uploaded'
if status != 'Ready'
perform_staging
update!(status: 'Ready')
end
end
|
#start_import(&block) ⇒ String
Sets up import dataset for import and returns UUID. If already started same UUID is returned (unless last activity was more than 10 minutes ago). Do not call if there are changes that have not been persisted
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
|
# File 'app/models/import_dataset/darwin_core.rb', line 82
def start_import(&block)
with_lock do
case self.status
when 'Ready'
self.status = 'Importing'
self.metadata['import_uuid'] = SecureRandom.uuid
when 'Importing'
self.metadata['import_uuid'] = SecureRandom.uuid if self.updated_at < 10.minutes.ago
else
raise 'Invalid initial state'
end
save!
yield if block_given?
end
self.metadata['import_uuid']
end
|
#stop_import ⇒ Object
Sets import dataset to stop importing data. Do not call if there are changes that have not been persisted.
102
103
104
105
106
107
108
109
110
|
# File 'app/models/import_dataset/darwin_core.rb', line 102
def stop_import
with_lock do
if self.status == 'Importing'
self.status = 'Ready'
self.metadata.except!('import_uuid', 'import_start_id', 'import_filters', 'import_retry_errored')
save!
end
end
end
|