Skip to content

Commit 03f84d7

Browse files
authored
misc fixes to handling of "id" field; add last_event_id to StreamEvent (#26)
1 parent 75f8349 commit 03f84d7

File tree

5 files changed

+71
-20
lines changed

5 files changed

+71
-20
lines changed

lib/ld-eventsource/client.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ def read_stream(cxn)
315315
end
316316
end
317317
end
318-
event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks))
318+
event_parser = Impl::EventParser.new(Impl::BufferedLineReader.lines_from(chunks), @last_id)
319319

320320
event_parser.items.each do |item|
321321
return if @stopped.value
@@ -331,7 +331,7 @@ def read_stream(cxn)
331331

332332
def dispatch_event(event)
333333
@logger.debug { "Received event: #{event}" }
334-
@last_id = event.id
334+
@last_id = event.id if !event.id.nil?
335335

336336
# Pass the event to the caller
337337
@on[:event].call(event)
@@ -354,7 +354,7 @@ def build_headers
354354
'Cache-Control' => 'no-cache',
355355
'User-Agent' => 'ruby-eventsource'
356356
}
357-
h['Last-Event-Id'] = @last_id if !@last_id.nil?
357+
h['Last-Event-Id'] = @last_id if !@last_id.nil? && @last_id != ""
358358
h.merge(@headers)
359359
end
360360
end

lib/ld-eventsource/events.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ module SSE
1111
# if there were multiple `data:` lines, they are concatenated with newlines
1212
# @!attribute id
1313
# @return [String] the string that appeared after `id:` in the stream if any, or nil
14+
# @!attribute last_event_id
15+
# @return [String] the `id:` value that was most recently seen in an event from
16+
# this stream; this differs from the `id` property in that it retains the same value
17+
# in subsequent events if they do not provide their own `id:`
1418
#
15-
StreamEvent = Struct.new(:type, :data, :id)
19+
StreamEvent = Struct.new(:type, :data, :id, :last_event_id)
1620
end

lib/ld-eventsource/impl/event_parser.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ class EventParser
2323
# @param [Enumerator] lines an enumerator that will yield one line of text at a time;
2424
# the lines should not include line terminators
2525
#
26-
def initialize(lines)
26+
def initialize(lines, last_event_id = nil)
2727
@lines = lines
28+
@last_event_id = last_event_id
2829
reset_buffers
2930
end
3031

@@ -65,7 +66,10 @@ def process_field(name, value)
6566
@data << value
6667
@have_data = true
6768
when "id"
68-
@id = value
69+
if !value.include?("\x00")
70+
@id = value
71+
@last_event_id = value
72+
end
6973
when "retry"
7074
if /^(?<num>\d+)$/ =~ value
7175
return SetRetryInterval.new(num.to_i)
@@ -76,7 +80,7 @@ def process_field(name, value)
7680

7781
def maybe_create_event
7882
return nil if !@have_data
79-
StreamEvent.new(@type || :message, @data, @id)
83+
StreamEvent.new(@type || :message, @data, @id, @last_event_id)
8084
end
8185
end
8286
end

spec/client_spec.rb

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,17 @@
77
describe SSE::Client do
88
subject { SSE::Client }
99

10-
let(:simple_event_1) { SSE::StreamEvent.new(:go, "foo", "a")}
11-
let(:simple_event_2) { SSE::StreamEvent.new(:stop, "bar", "b")}
10+
let(:simple_event_1) { SSE::StreamEvent.new(:go, "foo")}
11+
let(:simple_event_2) { SSE::StreamEvent.new(:stop, "bar")}
1212
let(:simple_event_1_text) { <<-EOT
1313
event: go
1414
data: foo
15-
id: a
1615
1716
EOT
1817
}
1918
let(:simple_event_2_text) { <<-EOT
2019
event: stop
2120
data: bar
22-
id: b
2321
2422
EOT
2523
}
@@ -254,8 +252,11 @@ def send_stream_content(res, content, keep_open:)
254252
server.setup_response("/") do |req,res|
255253
requests << req
256254
attempt += 1
257-
send_stream_content(res, attempt == 1 ? simple_event_1_text : simple_event_2_text,
258-
keep_open: attempt == 2)
255+
if attempt == 1
256+
send_stream_content(res, "data: foo\nid: a\n\n", keep_open: false)
257+
else
258+
send_stream_content(res, "data: bar\nid: b\n\n", keep_open: true)
259+
end
259260
end
260261

261262
event_sink = Queue.new
@@ -266,7 +267,7 @@ def send_stream_content(res, content, keep_open:)
266267
with_client(client) do |client|
267268
req1 = requests.pop
268269
req2 = requests.pop
269-
expect(req2.header["last-event-id"]).to eq([ simple_event_1.id ])
270+
expect(req2.header["last-event-id"]).to eq([ "a" ])
270271
end
271272
end
272273
end

spec/event_parser_spec.rb

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,18 @@ def verify_parsed_events(lines:, expected_events:)
99
expect(output).to eq(expected_events)
1010
end
1111

12+
it "parses an event with only data" do
13+
lines = [
14+
"data: def",
15+
""
16+
]
17+
ep = subject.new(lines)
18+
19+
expected_event = SSE::StreamEvent.new(:message, "def", nil)
20+
output = ep.items.to_a
21+
expect(output).to eq([ expected_event ])
22+
end
23+
1224
it "parses an event with all fields" do
1325
lines = [
1426
"event: abc",
@@ -18,23 +30,53 @@ def verify_parsed_events(lines:, expected_events:)
1830
]
1931
ep = subject.new(lines)
2032

21-
expected_event = SSE::StreamEvent.new(:abc, "def", "1")
33+
expected_event = SSE::StreamEvent.new(:abc, "def", "1", "1")
2234
output = ep.items.to_a
2335
expect(output).to eq([ expected_event ])
2436
end
2537

26-
it "parses an event with only data" do
38+
it "retains last_event_id if latest event had no id" do
2739
lines = [
40+
"event: abc",
2841
"data: def",
2942
""
3043
]
31-
ep = subject.new(lines)
44+
ep = subject.new(lines, "1")
3245

33-
expected_event = SSE::StreamEvent.new(:message, "def", nil)
46+
expected_event = SSE::StreamEvent.new(:abc, "def", nil, "1")
3447
output = ep.items.to_a
3548
expect(output).to eq([ expected_event ])
3649
end
3750

51+
it "can override previous last_event_id with a blank id" do
52+
lines = [
53+
"event: abc",
54+
"data: def",
55+
"id:",
56+
""
57+
]
58+
ep = subject.new(lines, "1")
59+
60+
expected_event = SSE::StreamEvent.new(:abc, "def", "", "")
61+
output = ep.items.to_a
62+
expect(output).to eq([ expected_event ])
63+
end
64+
65+
it "ignores id field if it contains a null character" do
66+
# per SSE spec (9.2.6)
67+
lines = [
68+
"event: abc",
69+
"data: def",
70+
"id: 12\x0034",
71+
""
72+
]
73+
ep = subject.new(lines, "1")
74+
75+
expected_event = SSE::StreamEvent.new(:abc, "def", nil, "1")
76+
output = ep.items.to_a
77+
expect(output).to eq([ expected_event ])
78+
end
79+
3880
it "parses an event with multi-line data" do
3981
lines = [
4082
"data: def",
@@ -146,8 +188,8 @@ def verify_parsed_events(lines:, expected_events:)
146188
]
147189
ep = subject.new(lines)
148190

149-
expected_event_1 = SSE::StreamEvent.new(:abc, "def", "1")
150-
expected_event_2 = SSE::StreamEvent.new(:message, "ghi", nil)
191+
expected_event_1 = SSE::StreamEvent.new(:abc, "def", "1", "1")
192+
expected_event_2 = SSE::StreamEvent.new(:message, "ghi", nil, "1")
151193
output = ep.items.to_a
152194
expect(output).to eq([ expected_event_1, expected_event_2 ])
153195
end

0 commit comments

Comments
 (0)