@@ -74,54 +74,52 @@ type BlockStore struct {
7474 readStream segpb.SegmentServer_ReadFromBlockStreamClient
7575 appendCallbacks sync.Map
7676 readCallbacks sync.Map
77- appendMu sync.Mutex
78- readMu sync.Mutex
77+ appendMu sync.RWMutex
78+ readMu sync.RWMutex
7979}
8080
8181type appendCallback func (* segpb.AppendToBlockStreamResponse )
8282type readCallback func (* segpb.ReadFromBlockStreamResponse )
8383
8484func (s * BlockStore ) runAppendStreamRecv (ctx context.Context , stream segpb.SegmentServer_AppendToBlockStreamClient ) {
85- go func () {
86- for {
87- res , err := stream .Recv ()
88- if err != nil {
89- log .Error (ctx , "append stream recv failed" , map [string ]interface {}{
90- log .KeyError : err ,
91- })
92- break
93- }
94- c , _ := s .appendCallbacks .LoadAndDelete (res .ResponseId )
95- if c != nil {
96- c .(appendCallback )(res )
97- }
85+ for {
86+ res , err := stream .Recv ()
87+ if err != nil {
88+ log .Error (ctx , "append stream recv failed" , map [string ]interface {}{
89+ log .KeyError : err ,
90+ })
91+ break
92+ }
93+ c , _ := s .appendCallbacks .LoadAndDelete (res .Id )
94+ if c != nil {
95+ c .(appendCallback )(res )
9896 }
99- }()
97+ }
10098}
10199
102100func (s * BlockStore ) runReadStreamRecv (ctx context.Context , stream segpb.SegmentServer_ReadFromBlockStreamClient ) {
103- go func () {
104- for {
105- res , err := stream .Recv ()
106- if err != nil {
107- log .Error (ctx , "read stream recv failed" , map [string ]interface {}{
108- log .KeyError : err ,
109- })
110- break
111- }
112- c , _ := s .readCallbacks .LoadAndDelete (res .ResponseId )
113- if c != nil {
114- c .(readCallback )(res )
115- }
101+ for {
102+ res , err := stream .Recv ()
103+ if err != nil {
104+ log .Error (ctx , "read stream recv failed" , map [string ]interface {}{
105+ log .KeyError : err ,
106+ })
107+ break
116108 }
117- }()
109+ c , _ := s .readCallbacks .LoadAndDelete (res .Id )
110+ if c != nil {
111+ c .(readCallback )(res )
112+ }
113+ }
118114}
119115
120116func (s * BlockStore ) connectAppendStream (ctx context.Context ) (segpb.SegmentServer_AppendToBlockStreamClient , error ) {
117+ s .appendMu .RLock ()
121118 if s .appendStream != nil {
119+ defer s .appendMu .RUnlock ()
122120 return s .appendStream , nil
123121 }
124-
122+ s . appendMu . RUnlock ()
125123 s .appendMu .Lock ()
126124 defer s .appendMu .Unlock ()
127125
@@ -142,15 +140,17 @@ func (s *BlockStore) connectAppendStream(ctx context.Context) (segpb.SegmentServ
142140 return nil , err
143141 }
144142
145- s .runAppendStreamRecv (context .Background (), stream )
143+ go s .runAppendStreamRecv (context .Background (), stream )
146144 return stream , nil
147145}
148146
149147func (s * BlockStore ) connectReadStream (ctx context.Context ) (segpb.SegmentServer_ReadFromBlockStreamClient , error ) {
148+ s .readMu .RLock ()
150149 if s .readStream != nil {
150+ defer s .readMu .RUnlock ()
151151 return s .readStream , nil
152152 }
153-
153+ s . readMu . RUnlock ()
154154 s .readMu .Lock ()
155155 defer s .readMu .Unlock ()
156156
@@ -171,7 +171,7 @@ func (s *BlockStore) connectReadStream(ctx context.Context) (segpb.SegmentServer
171171 return nil , err
172172 }
173173
174- s .runReadStreamRecv (ctx , stream )
174+ go s .runReadStreamRecv (ctx , stream )
175175 return stream , nil
176176}
177177
@@ -180,6 +180,14 @@ func (s *BlockStore) Endpoint() string {
180180}
181181
182182func (s * BlockStore ) Close () {
183+ if s .appendStream != nil {
184+ s .appendStream .CloseSend ()
185+ s .appendStream = nil
186+ }
187+ if s .readStream != nil {
188+ s .readStream .CloseSend ()
189+ s .readStream = nil
190+ }
183191 s .client .Close ()
184192}
185193
@@ -219,15 +227,13 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
219227 resp * segpb.AppendToBlockStreamResponse
220228 )
221229
222- if s .appendStream == nil {
223- s .appendStream , err = s .connectAppendStream (_ctx )
224- if err != nil {
225- return nil , err
226- }
230+ s .appendStream , err = s .connectAppendStream (_ctx )
231+ if err != nil {
232+ return nil , err
227233 }
228234
229- // generate unique RequestId
230- requestID := rand .New (rand .NewSource (time .Now ().UnixNano ())).Uint64 ()
235+ // generate unique opaqueID
236+ opaqueID := rand .New (rand .NewSource (time .Now ().UnixNano ())).Uint64 ()
231237
232238 //TODO(jiangkai): delete the reference of CloudEvents/v2 in Vanus
233239 eventpbs := make ([]* cepb.CloudEvent , len (events ))
@@ -240,14 +246,14 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
240246 }
241247
242248 donec := make (chan struct {}, 1 )
243- s .appendCallbacks .Store (requestID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
249+ s .appendCallbacks .Store (opaqueID , appendCallback (func (res * segpb.AppendToBlockStreamResponse ) {
244250 resp = res
245251 donec <- struct {}{}
246252 }))
247253
248254 req := & segpb.AppendToBlockStreamRequest {
249- RequestId : requestID ,
250- BlockId : block ,
255+ Id : opaqueID ,
256+ BlockId : block ,
251257 Events : & cepb.CloudEventBatch {
252258 Events : eventpbs ,
253259 },
@@ -260,10 +266,10 @@ func (s *BlockStore) AppendManyStream(ctx context.Context, block uint64, events
260266 if stderr .Is (err , io .EOF ) {
261267 s .appendStream .CloseSend ()
262268 s .appendStream = nil
263- c , _ := s .appendCallbacks .LoadAndDelete (requestID )
269+ c , _ := s .appendCallbacks .LoadAndDelete (opaqueID )
264270 if c != nil {
265271 c .(appendCallback )(& segpb.AppendToBlockStreamResponse {
266- ResponseId : requestID ,
272+ Id : opaqueID ,
267273 ResponseCode : errpb .ErrorCode_CLOSED ,
268274 ResponseMsg : "append stream closed" ,
269275 Offsets : []int64 {},
@@ -340,18 +346,16 @@ func (s *BlockStore) ReadStream(
340346 resp * segpb.ReadFromBlockStreamResponse
341347 )
342348
343- if s .readStream == nil {
344- s .readStream , err = s .connectReadStream (_ctx )
345- if err != nil {
346- return []* ce.Event {}, err
347- }
349+ s .readStream , err = s .connectReadStream (_ctx )
350+ if err != nil {
351+ return []* ce.Event {}, err
348352 }
349353
350354 // generate unique RequestId
351- requestID := rand .Uint64 ()
355+ opaqueID := rand .Uint64 ()
352356
353357 donec := make (chan struct {}, 1 )
354- s .readCallbacks .Store (requestID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
358+ s .readCallbacks .Store (opaqueID , readCallback (func (res * segpb.ReadFromBlockStreamResponse ) {
355359 resp = res
356360 donec <- struct {}{}
357361 }))
@@ -370,10 +374,10 @@ func (s *BlockStore) ReadStream(
370374 if stderr .Is (err , io .EOF ) {
371375 s .readStream .CloseSend ()
372376 s .readStream = nil
373- c , _ := s .readCallbacks .LoadAndDelete (requestID )
377+ c , _ := s .readCallbacks .LoadAndDelete (opaqueID )
374378 if c != nil {
375379 c .(readCallback )(& segpb.ReadFromBlockStreamResponse {
376- ResponseId : requestID ,
380+ Id : opaqueID ,
377381 ResponseCode : errpb .ErrorCode_CLOSED ,
378382 ResponseMsg : "read stream closed" ,
379383 Events : & cepb.CloudEventBatch {
0 commit comments