diff --git a/lib/logstash/inputs/s3.rb b/lib/logstash/inputs/s3.rb index 36a1f65..8bf3fe1 100644 --- a/lib/logstash/inputs/s3.rb +++ b/lib/logstash/inputs/s3.rb @@ -376,18 +376,22 @@ def ignore_filename?(filename) def process_log(queue, log) @logger.debug("Processing", :bucket => @bucket, :key => log.key) object = @s3bucket.object(log.key) + # Eager-loads the object data so the last_modified field is populated right before the download + object.load filename = File.join(temporary_directory, File.basename(log.key)) if download_remote_file(object, filename) if process_local_log(queue, filename, object) - if object.last_modified == log.last_modified + refreshed_object = @s3bucket.object(log.key) + # If the object was modified during download and processing, do not backup/delete it and process it again during the next iteration + if object.last_modified == refreshed_object.last_modified backup_to_bucket(object) backup_to_dir(filename) delete_file_from_bucket(object) FileUtils.remove_entry_secure(filename, true) - sincedb.write(log.last_modified) + sincedb.write(object.last_modified) else - @logger.info("#{log.key} is updated at #{object.last_modified} and will process in the next cycle") + @logger.info("#{log.key} was updated at #{refreshed_object.last_modified} and will process in the next cycle") end end else diff --git a/spec/inputs/s3_spec.rb b/spec/inputs/s3_spec.rb index b34b682..beaee8f 100644 --- a/spec/inputs/s3_spec.rb +++ b/spec/inputs/s3_spec.rb @@ -32,6 +32,7 @@ FileUtils.mkdir_p(sincedb_path) Aws.config[:stub_responses] = true Thread.abort_on_exception = true + allow_any_instance_of(Aws::S3::Object).to receive(:load) end context "when interrupting the plugin" do @@ -583,7 +584,7 @@ end end - context 's3 object updated after getting summary' do + context 's3 object updated during processing' do it 'should not update sincedb' do s3_summary = [ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), @@ -592,6 +593,8 @@ s3_objects = [ double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'YESTERDAY', :last_modified => Time.now.round - day, :content_length => 5, :storage_class => 'STANDARD'), + double(:key => 'TODAY', :last_modified => Time.now.round - (cutoff * 10), :content_length => 5, :storage_class => 'STANDARD'), double(:key => 'TODAY_UPDATED', :last_modified => Time.now.round, :content_length => 5, :storage_class => 'STANDARD') ]