Class: Export::Dwca::Occurrence::PredicateExporter

Inherits:
Object
  • Object
show all
Includes:
PostgresqlFunctions
Defined in:
lib/export/dwca/occurrence/predicate_exporter.rb

Overview

Service object for exporting data attribute data in DwCA exports.

Instance Method Summary collapse

Methods included from PostgresqlFunctions

#create_api_link_for_model_id_function, #create_authorship_sentence_function, #create_csv_sanitize_function, #create_image_file_url_function, #create_image_metadata_url_function, #create_image_url_functions, #create_sled_image_file_url_function

Constructor Details

#initialize(core_scope:, collection_object_predicate_ids: [], collecting_event_predicate_ids: []) ⇒ PredicateExporter

Returns a new instance of PredicateExporter.

Parameters:

  • core_scope (ActiveRecord::Relation)

    DwcOccurrence scope for ordering

  • collection_object_predicate_ids (Array<Integer>) (defaults to: [])

    predicate IDs for collection objects

  • collecting_event_predicate_ids (Array<Integer>) (defaults to: [])

    predicate IDs for collecting events



11
12
13
14
15
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 11

def initialize(core_scope:, collection_object_predicate_ids: [], collecting_event_predicate_ids: [])
  @core_scope = core_scope
  @collection_object_predicate_ids = collection_object_predicate_ids
  @collecting_event_predicate_ids = collecting_event_predicate_ids
end

Instance Method Details

#all_possible_predicatesObject (private)

All possible predicates based on configuration (not filtered by actual usage).



98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 98

def all_possible_predicates
  @all_possible_predicates ||= begin
    co_preds = @collection_object_predicate_ids.empty? ? [] :
      ControlledVocabularyTerm
        .where(id: @collection_object_predicate_ids)
        .order(:name)
        .pluck(:name)
        .map { |name| "TW:DataAttribute:CollectionObject:#{name}" }

    ce_preds = @collecting_event_predicate_ids.empty? ? [] :
      ControlledVocabularyTerm
        .where(id: @collecting_event_predicate_ids)
        .order(:name)
        .pluck(:name)
        .map { |name| "TW:DataAttribute:CollectingEvent:#{name}" }

    co_preds + ce_preds
  end
end

#cleanup_temp_tablesObject (private)



59
60
61
62
63
64
65
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 59

def cleanup_temp_tables
  conn = ActiveRecord::Base.connection
  conn.execute('DROP TABLE IF EXISTS temp_predicate_pivot')
  conn.execute('DROP TABLE IF EXISTS temp_predicate_pivot_co')
  conn.execute('DROP TABLE IF EXISTS temp_predicate_pivot_ce')
  conn.execute('DROP TABLE IF EXISTS temp_co_order')
end

#collecting_event_predicate_namesHash (private)

Returns Maps cvt_id => full predicate name for collecting event predicates e.g., => “TW:DataAttribute:CollectingEvent:bar”.

Returns:

  • (Hash)

    Maps cvt_id => full predicate name for collecting event predicates e.g., => “TW:DataAttribute:CollectingEvent:bar”



137
138
139
140
141
142
143
144
145
146
147
148
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 137

def collecting_event_predicate_names
  return {} if @collecting_event_predicate_ids.empty?

  q = ControlledVocabularyTerm
    .where(id: @collecting_event_predicate_ids)
    .select(:id, :name)
    .to_sql

  ActiveRecord::Base.connection.execute(q).each_with_object({}) do |row, hash|
    hash[row['id'].to_i] = "TW:DataAttribute:CollectingEvent:#{row['name']}"
  end
end

#collecting_eventsObject (private)



67
68
69
70
71
72
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 67

def collecting_events
  ::CollectingEvent
    .with(co_scoped: collection_objects.unscope(:order).select(:id, :collecting_event_id))
    .joins('JOIN co_scoped ON co_scoped.collecting_event_id = collecting_events.id')
    .distinct
end

#collection_object_predicate_namesHash (private)

Returns Maps cvt_id => full predicate name for collection object predicates e.g., => “TW:DataAttribute:CollectionObject:foo”.

Returns:

  • (Hash)

    Maps cvt_id => full predicate name for collection object predicates e.g., => “TW:DataAttribute:CollectionObject:foo”



121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 121

def collection_object_predicate_names
  return {} if @collection_object_predicate_ids.empty?

  q = ControlledVocabularyTerm
    .select("id, CONCAT('TW:DataAttribute:CollectionObject:', name) AS predicate_name")
    .where(id: @collection_object_predicate_ids)
    .to_sql

  ActiveRecord::Base.connection.execute(q).each_with_object({}) do |row, hash|
    hash[row['id'].to_i] = row['predicate_name']
  end
end

#collection_objectsObject (private)



74
75
76
77
78
79
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 74

def collection_objects
  ::CollectionObject
    .with(dwc_scoped: @core_scope.unscope(:order).select(:dwc_occurrence_object_id, :dwc_occurrence_object_type))
    .joins("JOIN dwc_scoped ON dwc_scoped.dwc_occurrence_object_id = collection_objects.id AND dwc_scoped.dwc_occurrence_object_type = 'CollectionObject'")
    .select(:id, :collecting_event_id, :type)
end

#create_collecting_event_predicate_pivot_table(conn:, ce_pred_names:, delimiter:) ⇒ Object (private)

Creates temp_predicate_pivot_ce



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 269

def create_collecting_event_predicate_pivot_table(conn:, ce_pred_names:, delimiter:)
  return if ce_pred_names.empty?

  ce_select_cols = ce_pred_names.map do |cvt_id, pred_name|
    quoted_name = conn.quote_column_name(pred_name)
    "STRING_AGG(pg_temp.sanitize_csv(ce_da.value), '#{delimiter}' ORDER BY ce_da.id)
     FILTER (WHERE ce_da.controlled_vocabulary_term_id = #{cvt_id}) AS #{quoted_name}"
  end

  ce_cols_sql = ce_select_cols.any? ?
    ",\n  #{ce_select_cols.join(",\n  ")}" : ''

  ce_sql = <<~SQL
    CREATE TEMP TABLE temp_predicate_pivot_ce AS
    SELECT
      ce.id AS collecting_event_id#{ce_cols_sql}
    FROM collecting_events ce
    JOIN (SELECT DISTINCT collecting_event_id FROM temp_predicate_pivot_co) co
      ON co.collecting_event_id = ce.id
    LEFT JOIN data_attributes ce_da ON ce_da.attribute_subject_id = ce.id
      AND ce_da.attribute_subject_type = 'CollectingEvent'
      AND ce_da.type = 'InternalAttribute'
      AND ce_da.controlled_vocabulary_term_id IN (#{ce_pred_names.keys.join(',')})
    GROUP BY ce.id
  SQL

  conn.execute(ce_sql)
  Rails.logger.debug 'dwca_export: temp_predicate_pivot_ce created'
end

#create_collection_object_predicate_pivot_table(conn:, co_pred_names:, delimiter:, co_src_sql:) ⇒ Object (private)

Creates temp_predicate_pivot_co



233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 233

def create_collection_object_predicate_pivot_table(conn:, co_pred_names:, delimiter:, co_src_sql:)
  co_select_cols = co_pred_names.map do |cvt_id, pred_name|
    quoted_name = conn.quote_column_name(pred_name)
    "STRING_AGG(pg_temp.sanitize_csv(co_da.value), '#{delimiter}' ORDER BY co_da.id)
     FILTER (WHERE co_da.controlled_vocabulary_term_id = #{cvt_id}) AS #{quoted_name}"
  end

  co_cols_sql = co_select_cols.any? ?
    ",\n  #{co_select_cols.join(",\n  ")}" : ''

  co_join_sql = if co_pred_names.any?
    <<~SQL
      LEFT JOIN data_attributes co_da ON co_da.attribute_subject_id = co.id
        AND co_da.attribute_subject_type = 'CollectionObject'
        AND co_da.type = 'InternalAttribute'
        AND co_da.controlled_vocabulary_term_id IN (#{co_pred_names.keys.join(',')})
    SQL
  else
    ''
  end

  co_sql = <<~SQL
    CREATE TEMP TABLE temp_predicate_pivot_co AS
    SELECT
      co.id AS co_id,
      co.collecting_event_id AS collecting_event_id#{co_cols_sql}
    FROM (#{co_src_sql}) co
    #{co_join_sql}
    GROUP BY co.id, co.collecting_event_id
  SQL

  conn.execute(co_sql)
  Rails.logger.debug 'dwca_export: temp_predicate_pivot_co created'
end

#create_pivoted_predicate_tablesObject (private)

  • temp_predicate_pivot_co: pivot CO attributes only, grouped by co_id

  • temp_predicate_pivot_ce: pivot CE attributes only, grouped by collecting_event_id

  • temp_predicate_pivot: join the two pivots (1 row per co_id), no join fan-out

  • temp_co_order: stable output ordering



154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 154

def create_pivoted_predicate_tables
  conn = ActiveRecord::Base.connection

  # Defensive cleanup in case a previous run died mid-stream on a pooled connection.
  conn.execute('DROP TABLE IF EXISTS temp_predicate_pivot')
  conn.execute('DROP TABLE IF EXISTS temp_predicate_pivot_co')
  conn.execute('DROP TABLE IF EXISTS temp_predicate_pivot_ce')
  conn.execute('DROP TABLE IF EXISTS temp_co_order')

  co_pred_names = collection_object_predicate_names
  ce_pred_names = collecting_event_predicate_names
  delimiter = Shared::IsDwcOccurrence::DWC_DELIMITER

  co_src_sql = collection_objects
    .unscope(:order)
    .select(:id, :collecting_event_id)
    .to_sql

  create_collection_object_predicate_pivot_table(
    conn:,
    co_pred_names:,
    delimiter:,
    co_src_sql:
  )

  create_collecting_event_predicate_pivot_table(
    conn:,
    ce_pred_names:,
    delimiter:
  )

  # Combined pivot.
  combined_cols = []
  combined_cols << 'co.co_id'

  co_pred_names.values.each do |pred_name|
    combined_cols << "co.#{conn.quote_column_name(pred_name)}"
  end

  if ce_pred_names.any?
    ce_pred_names.values.each do |pred_name|
      combined_cols << "ce.#{conn.quote_column_name(pred_name)}"
    end
  end

  combined_join_sql = if ce_pred_names.any?
    <<~SQL
      LEFT JOIN temp_predicate_pivot_ce ce
        ON ce.collecting_event_id = co.collecting_event_id
    SQL
  else
    ''
  end

  combined_sql = <<~SQL
    CREATE TEMP TABLE temp_predicate_pivot AS
    SELECT
      #{combined_cols.join(",\n    ")}
    FROM temp_predicate_pivot_co co
    #{combined_join_sql}
  SQL

  conn.execute(combined_sql)
  Rails.logger.debug 'dwca_export: temp_predicate_pivot created'

  order_sql = <<-SQL
    CREATE TEMP TABLE temp_co_order AS
    SELECT
      dwc_occurrences.dwc_occurrence_object_id as co_id,
      ROW_NUMBER() OVER (ORDER BY dwc_occurrences.id) as ord
    FROM (#{@core_scope.unscope(:order).to_sql}) dwc_occurrences
    WHERE dwc_occurrences.dwc_occurrence_object_type = 'CollectionObject'
  SQL

  conn.execute(order_sql)
  Rails.logger.debug 'dwca_export: temp_co_order created'
end

#export_to(output_file) ⇒ Tempfile

Main export method - writes predicate data to output file.

Parameters:

  • output_file (Tempfile, File)

    output file for predicate TSV data

Returns:

  • (Tempfile)

    the output file



20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 20

def export_to(output_file)
  create_csv_sanitize_function
  return output_file if all_possible_predicates.empty?

  create_pivoted_predicate_tables

  used_preds = predicates_with_data
  return output_file if used_preds.empty?

  Rails.logger.debug 'dwca_export: predicate_data reading from temp table'

  conn = ActiveRecord::Base.connection
  column_list = used_preds.map { |pred| conn.quote_column_name(pred) }.join(', ')

  copy_sql = <<-SQL
    COPY (
      SELECT #{column_list}
      FROM temp_co_order
      LEFT JOIN temp_predicate_pivot ON temp_predicate_pivot.co_id = temp_co_order.co_id
      ORDER BY temp_co_order.ord
    ) TO STDOUT WITH (FORMAT CSV, DELIMITER E'\\t', HEADER, NULL '')
  SQL

  conn.raw_connection.copy_data(copy_sql) do
    while (row = conn.raw_connection.get_copy_data)
      output_file.write(row.force_encoding(Encoding::UTF_8))
    end
  end

  Rails.logger.debug 'dwca_export: predicate_data written'
  output_file
ensure
  cleanup_temp_tables
  output_file.flush
  output_file.rewind
end

#predicates_with_dataObject (private)

Finds which predicate columns in the combined temp table actually have non-NULL values.



82
83
84
85
86
87
88
89
90
91
92
93
94
95
# File 'lib/export/dwca/occurrence/predicate_exporter.rb', line 82

def predicates_with_data
  return [] if all_possible_predicates.empty?

  conn = ActiveRecord::Base.connection

  checks = all_possible_predicates.map.with_index do |pred, idx|
    quoted = conn.quote_column_name(pred)
    "CASE WHEN COUNT(#{quoted}) > 0 THEN #{conn.quote(pred)} ELSE NULL END AS check_#{idx}"
  end

  sql = "SELECT #{checks.join(', ')} FROM temp_predicate_pivot"
  result = conn.execute(sql).first
  result.values.compact
end