2020use RdKafka \Metadata \Collection as RdKafkaMetadataCollection ;
2121use RdKafka \Metadata \Partition as RdKafkaMetadataPartition ;
2222use RdKafka \Metadata \Topic as RdKafkaMetadataTopic ;
23+ use RdKafka \TopicPartition as RdKafkaTopicPartition ;
2324
2425/**
2526 * @covers \Jobcloud\Kafka\Consumer\AbstractKafkaConsumer
@@ -107,7 +108,14 @@ function () use ($rdKafkaMetadataTopicMock) {
107108 $ decoderMock = $ this ->getMockForAbstractClass (DecoderInterface::class);
108109 $ kafkaConsumer = new KafkaHighLevelConsumer ($ rdKafkaConsumerMock , $ kafkaConfigurationMock , $ decoderMock );
109110
110- $ rdKafkaConsumerMock ->expects (self ::once ())->method ('assign ' );
111+ $ rdKafkaConsumerMock ->expects (self ::once ())->method ('assign ' )->with (
112+ $ this ->callback (
113+ function (array $ assignment ) {
114+ self ::assertCount (2 , $ assignment );
115+ return true ;
116+ }
117+ )
118+ );
111119 $ rdKafkaConsumerMock
112120 ->expects (self ::once ())
113121 ->method ('getMetadata ' )
@@ -216,24 +224,73 @@ public function testUnsubscribeFailure(): void
216224 * @throws KafkaConsumerCommitException
217225 */
218226 public function testCommitSuccesss (): void
227+ {
228+ $ message = $ this ->getMockForAbstractClass (KafkaConsumerMessageInterface::class);
229+ $ message ->expects (self ::exactly (1 ))->method ('getOffset ' )->willReturn (0 );
230+ $ message ->expects (self ::exactly (1 ))->method ('getTopicName ' )->willReturn ('test ' );
231+ $ message ->expects (self ::exactly (1 ))->method ('getPartition ' )->willReturn (1 );
232+ $ message2 = $ this ->getMockForAbstractClass (KafkaConsumerMessageInterface::class);
233+ $ message2 ->expects (self ::exactly (1 ))->method ('getOffset ' )->willReturn (1 );
234+ $ message2 ->expects (self ::exactly (2 ))->method ('getTopicName ' )->willReturn ('test ' );
235+ $ message2 ->expects (self ::exactly (2 ))->method ('getPartition ' )->willReturn (1 );
236+ $ message3 = $ this ->getMockForAbstractClass (KafkaConsumerMessageInterface::class);
237+ $ message3 ->expects (self ::exactly (2 ))->method ('getOffset ' )->willReturn (2 );
238+ $ message3 ->expects (self ::exactly (1 ))->method ('getTopicName ' )->willReturn ('test ' );
239+ $ message3 ->expects (self ::exactly (1 ))->method ('getPartition ' )->willReturn (1 );
240+ $ message4 = $ this ->getMockForAbstractClass (KafkaConsumerMessageInterface::class);
241+ $ message4 ->expects (self ::exactly (1 ))->method ('getOffset ' )->willReturn (0 );
242+ $ message4 ->expects (self ::exactly (2 ))->method ('getTopicName ' )->willReturn ('test ' );
243+ $ message4 ->expects (self ::exactly (2 ))->method ('getPartition ' )->willReturn (2 );
244+
245+
246+ $ rdKafkaConsumerMock = $ this ->createMock (RdKafkaHighLevelConsumer::class);
247+ $ kafkaConfigurationMock = $ this ->createMock (KafkaConfiguration::class);
248+ $ decoderMock = $ this ->getMockForAbstractClass (DecoderInterface::class);
249+ $ kafkaConsumer = new KafkaHighLevelConsumer ($ rdKafkaConsumerMock , $ kafkaConfigurationMock , $ decoderMock );
250+ $ rdKafkaConsumerMock ->expects (self ::once ())->method ('commit ' )->with (
251+ $ this ->callback (
252+ function (array $ topicPartitions ) {
253+ self ::assertCount (2 , $ topicPartitions );
254+ self ::assertInstanceOf (RdKafkaTopicPartition::class, $ topicPartitions ['test-1 ' ]);
255+ self ::assertInstanceOf (RdKafkaTopicPartition::class, $ topicPartitions ['test-2 ' ]);
256+ self ::assertEquals (3 , $ topicPartitions ['test-1 ' ]->getOffset ());
257+ self ::assertEquals (1 , $ topicPartitions ['test-2 ' ]->getOffset ());
258+
259+ return true ;
260+ }
261+ )
262+ );
263+
264+ $ kafkaConsumer ->commit ([$ message2 , $ message , $ message3 , $ message4 ]);
265+ }
266+
267+ /**
268+ * @throws KafkaConsumerCommitException
269+ */
270+ public function testCommitSingleSuccesss (): void
219271 {
220272 $ message = $ this ->getMockForAbstractClass (KafkaConsumerMessageInterface::class);
221273 $ message ->expects (self ::exactly (1 ))->method ('getOffset ' )->willReturn (0 );
222274 $ message ->expects (self ::exactly (2 ))->method ('getTopicName ' )->willReturn ('test ' );
223275 $ message ->expects (self ::exactly (2 ))->method ('getPartition ' )->willReturn (1 );
224- $ message2 = $ this ->getMockForAbstractClass (KafkaConsumerMessageInterface::class);
225- $ message2 ->expects (self ::exactly (2 ))->method ('getOffset ' )->willReturn (1 );
226- $ message2 ->expects (self ::exactly (1 ))->method ('getTopicName ' )->willReturn ('test ' );
227- $ message2 ->expects (self ::exactly (1 ))->method ('getPartition ' )->willReturn (1 );
228276
229277
230278 $ rdKafkaConsumerMock = $ this ->createMock (RdKafkaHighLevelConsumer::class);
231279 $ kafkaConfigurationMock = $ this ->createMock (KafkaConfiguration::class);
232280 $ decoderMock = $ this ->getMockForAbstractClass (DecoderInterface::class);
233281 $ kafkaConsumer = new KafkaHighLevelConsumer ($ rdKafkaConsumerMock , $ kafkaConfigurationMock , $ decoderMock );
234- $ rdKafkaConsumerMock ->expects (self ::once ())->method ('commit ' );
282+ $ rdKafkaConsumerMock ->expects (self ::once ())->method ('commit ' )->with (
283+ $ this ->callback (
284+ function (array $ topicPartitions ) {
285+ self ::assertCount (1 , $ topicPartitions );
286+ self ::assertInstanceOf (RdKafkaTopicPartition::class, $ topicPartitions ['test-1 ' ]);
287+ self ::assertEquals (1 , $ topicPartitions ['test-1 ' ]->getOffset ());
288+ return true ;
289+ }
290+ )
291+ );
235292
236- $ kafkaConsumer ->commit ([ $ message, $ message2 ] );
293+ $ kafkaConsumer ->commit ($ message );
237294 }
238295
239296 /**
@@ -367,9 +424,10 @@ public function testKafkaConsumeWithDecode(): void
367424 $ message ->key = 'test ' ;
368425 $ message ->payload = null ;
369426 $ message ->topic_name = 'test_topic ' ;
370- $ message ->partition = 9 ;
371- $ message ->offset = 501 ;
372- $ message ->timestamp = 500 ;
427+ $ message ->partition = '9 ' ;
428+ $ message ->offset = '501 ' ;
429+ $ message ->timestamp = '500 ' ;
430+ $ message ->headers = 'header ' ;
373431 $ message ->err = RD_KAFKA_RESP_ERR_NO_ERROR ;
374432
375433 $ topics = [new TopicSubscription ('testTopic ' )];
@@ -395,6 +453,8 @@ function (KafkaConsumerMessageInterface $message) {
395453 self ::assertEquals (9 , $ message ->getPartition ());
396454 self ::assertEquals (501 , $ message ->getOffset ());
397455 self ::assertEquals (500 , $ message ->getTimestamp ());
456+ self ::assertEquals (['header ' ], $ message ->getHeaders ());
457+
398458 return true ;
399459 }
400460 )
0 commit comments