From ae7ff783d531f4def1dcb3daf790c94c05b81799 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Tue, 8 May 2018 11:23:56 +0300 Subject: [PATCH 01/11] Choose partition by key for several brokers --- src/Producer/SyncProcess.php | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index d09096f9..e454ee36 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -134,12 +134,15 @@ protected function convertMessage($data) $topicMeta = $topicInfos[$value['topic']]; $partNums = array_keys($topicMeta); - shuffle($partNums); - $partId = 0; - if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) { - $partId = $partNums[0]; + if (isset($value['key'])&&trim($value['key'])){ + $partId = crc32($value['key']) % count($partNums); } else { - $partId = $value['partId']; + shuffle($partNums); + if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) { + $partId = $partNums[0]; + } else { + $partId = $value['partId']; + } } $brokerId = $topicMeta[$partId]; From 08478d36af78c83c9a29371098425d06f9eb51ae Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 10:31:36 +0300 Subject: [PATCH 02/11] Choose partition by key for several brokers --- src/Producer/SyncProcess.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index e454ee36..18e94d97 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -134,7 +134,7 @@ protected function convertMessage($data) $topicMeta = $topicInfos[$value['topic']]; $partNums = array_keys($topicMeta); - if (isset($value['key'])&&trim($value['key'])){ + if (isset($value['key']) && trim($value['key'])){ $partId = crc32($value['key']) % count($partNums); } else { shuffle($partNums); From 5050dfb7b3df79e421c303bc5cbf587b42e2e611 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 10:32:49 +0300 Subject: [PATCH 03/11] Choose partition by key for several brokers --- src/Producer/SyncProcess.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 18e94d97..de29341f 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -134,7 +134,7 @@ protected function convertMessage($data) $topicMeta = $topicInfos[$value['topic']]; $partNums = array_keys($topicMeta); - if (isset($value['key']) && trim($value['key'])){ + if (isset($value['key']) && trim($value['key'])) { $partId = crc32($value['key']) % count($partNums); } else { shuffle($partNums); From 89b7fffe4a359af279942d5f96bbd33a39f64b40 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 14:57:16 +0300 Subject: [PATCH 04/11] =?UTF-8?q?=D0=9Coved=20the=20logic=20of=20the=20cho?= =?UTF-8?q?ice=20of=20partitions=20to=20the=20broker=20method=20and=20add?= =?UTF-8?q?=20a=20test.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Broker.php | 20 ++++++++++++++++++++ src/Producer/Process.php | 8 +------- src/Producer/SyncProcess.php | 11 +---------- tests/Base/BrokerTest.php | 15 +++++++++++++++ 4 files changed, 37 insertions(+), 17 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index c4f909d9..a6196bda 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -225,4 +225,24 @@ private function getSaslMechanismProvider() : SaslMechanism } return $provider; } + + /** + * @param array $data + * @param array $partNums + * @return int + */ + public function getPartitionId($data, $partNums) + { + if (isset($data['key']) && trim($data['key'])) { + $partId = $partNums[crc32($data['key']) % count($partNums)]; + } else { + if (! isset($data['partId']) || ! isset($topicMeta[$data['partId']])) { + shuffle($partNums); + $partId = $partNums[0]; + } else { + $partId = $data['partId']; + } + } + return $partId; + } } diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 1a8247ff..8716bb5e 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -294,13 +294,7 @@ protected function convertMessage($data) $topicMeta = $topicInfos[$value['topic']]; $partNums = array_keys($topicMeta); - shuffle($partNums); - $partId = 0; - if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) { - $partId = $partNums[0]; - } else { - $partId = $value['partId']; - } + $partId = $broker->getPartitionId($value, $partNums); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index de29341f..9ef25655 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -134,16 +134,7 @@ protected function convertMessage($data) $topicMeta = $topicInfos[$value['topic']]; $partNums = array_keys($topicMeta); - if (isset($value['key']) && trim($value['key'])) { - $partId = crc32($value['key']) % count($partNums); - } else { - shuffle($partNums); - if (! isset($value['partId']) || ! isset($topicMeta[$value['partId']])) { - $partId = $partNums[0]; - } else { - $partId = $value['partId']; - } - } + $partId = $broker->getPartitionId($value, $partNums); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 46e21a4b..38b81a52 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -174,4 +174,19 @@ public function testGetSocketNotSetConfig() $this->assertInstanceOf(\Kafka\SocketSync::class, $socket); } + + /** + * testGetPartitionId + * + * @access public + * @return void + */ + public function testGetPartitionId() + { + $broker = \Kafka\Broker::getInstance(); + $partNums = [3, 2, 2]; + $data = ['key' => '123']; + $partId = $broker->getPartitionId($data, $partNums); + $this->assertEquals('2', $partId); + } } From 136b1f739eff90305169d3c6dc25f39a87e681b4 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 15:08:49 +0300 Subject: [PATCH 05/11] =?UTF-8?q?=D0=9Coved=20the=20logic=20of=20the=20cho?= =?UTF-8?q?ice=20of=20partitions=20to=20the=20broker=20method=20and=20add?= =?UTF-8?q?=20a=20test.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Broker.php | 3 ++- src/Producer/Process.php | 3 +-- src/Producer/SyncProcess.php | 3 +-- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index a6196bda..0b2e0b10 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -231,8 +231,9 @@ private function getSaslMechanismProvider() : SaslMechanism * @param array $partNums * @return int */ - public function getPartitionId($data, $partNums) + public function getPartitionId($data, $topicMeta) { + $partNums = array_keys($topicMeta); if (isset($data['key']) && trim($data['key'])) { $partId = $partNums[crc32($data['key']) % count($partNums)]; } else { diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 8716bb5e..9f7847fd 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -293,8 +293,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partNums = array_keys($topicMeta); - $partId = $broker->getPartitionId($value, $partNums); + $partId = $broker->getPartitionId($value, $topicMeta); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 9ef25655..29d6d6c6 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -133,8 +133,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partNums = array_keys($topicMeta); - $partId = $broker->getPartitionId($value, $partNums); + $partId = $broker->getPartitionId($value, $topicMeta); $brokerId = $topicMeta[$partId]; $topicData = []; From ab67b898bdaf5a4cb4f4fca9f7cfa8771ce4080b Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 15:38:23 +0300 Subject: [PATCH 06/11] =?UTF-8?q?=D0=9Coved=20the=20logic=20of=20the=20cho?= =?UTF-8?q?ice=20of=20partitions=20to=20the=20broker=20method=20and=20add?= =?UTF-8?q?=20a=20test.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Broker.php | 4 +++- src/Producer/Process.php | 2 +- src/Producer/SyncProcess.php | 2 +- tests/Base/BrokerTest.php | 7 +++---- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index 0b2e0b10..33cafc7b 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -231,8 +231,10 @@ private function getSaslMechanismProvider() : SaslMechanism * @param array $partNums * @return int */ - public function getPartitionId($data, $topicMeta) + public function getPartitionId($data) { + $topicInfos = $this->getTopics(); + $topicMeta = $topicInfos[$data['topic']]; $partNums = array_keys($topicMeta); if (isset($data['key']) && trim($data['key'])) { $partId = $partNums[crc32($data['key']) % count($partNums)]; diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 9f7847fd..7a8324ec 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -293,7 +293,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partId = $broker->getPartitionId($value, $topicMeta); + $partId = $broker->getPartitionId($value); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 29d6d6c6..712b72c4 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -133,7 +133,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partId = $broker->getPartitionId($value, $topicMeta); + $partId = $broker->getPartitionId($value); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 38b81a52..3cd1bf9b 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -184,9 +184,8 @@ public function testGetSocketNotSetConfig() public function testGetPartitionId() { $broker = \Kafka\Broker::getInstance(); - $partNums = [3, 2, 2]; - $data = ['key' => '123']; - $partId = $broker->getPartitionId($data, $partNums); - $this->assertEquals('2', $partId); + $data = ['partId' => '1', ]; + $partId = $broker->getPartitionId($data); + $this->assertEquals('1', $partId); } } From d3590419f23e62118f90a9bb11ce4f19cfdc3b74 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 15:45:57 +0300 Subject: [PATCH 07/11] =?UTF-8?q?=D0=9Coved=20the=20logic=20of=20the=20cho?= =?UTF-8?q?ice=20of=20partitions=20to=20the=20broker=20method=20and=20add?= =?UTF-8?q?=20a=20test.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/Base/BrokerTest.php | 40 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 3cd1bf9b..8d890d74 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -183,7 +183,45 @@ public function testGetSocketNotSetConfig() */ public function testGetPartitionId() { - $broker = \Kafka\Broker::getInstance(); + $broker = \Kafka\Broker::getInstance(); + $data = [ + 'brokers' => [ + [ + 'host' => '127.0.0.1', + 'port' => '9092', + 'nodeId' => '0', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9192', + 'nodeId' => '1', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9292', + 'nodeId' => '2', + ], + ], + 'topics' => [ + [ + 'topicName' => 'test', + 'errorCode' => 0, + 'partitions' => [ + [ + 'partitionId' => 0, + 'errorCode' => 0, + 'leader' => 0, + ], + [ + 'partitionId' => 1, + 'errorCode' => 0, + 'leader' => 2, + ], + ], + ], + ], + ]; + $broker->setData($data['topics'], $data['brokers']); $data = ['partId' => '1', ]; $partId = $broker->getPartitionId($data); $this->assertEquals('1', $partId); From 984d51e8a17010386e534565e987c5c1ecaff57c Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 15:46:24 +0300 Subject: [PATCH 08/11] =?UTF-8?q?=D0=9Coved=20the=20logic=20of=20the=20cho?= =?UTF-8?q?ice=20of=20partitions=20to=20the=20broker=20method=20and=20add?= =?UTF-8?q?=20a=20test.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/Base/BrokerTest.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 8d890d74..d13b7543 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -222,7 +222,7 @@ public function testGetPartitionId() ], ]; $broker->setData($data['topics'], $data['brokers']); - $data = ['partId' => '1', ]; + $data = ['partId' => '1']; $partId = $broker->getPartitionId($data); $this->assertEquals('1', $partId); } From 57fc9798fe76ad51e571902db08a1938cc144bda Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 15:53:23 +0300 Subject: [PATCH 09/11] =?UTF-8?q?=D0=9Coved=20the=20logic=20of=20the=20cho?= =?UTF-8?q?ice=20of=20partitions=20to=20the=20broker=20method=20and=20add?= =?UTF-8?q?=20a=20test.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/Base/BrokerTest.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index d13b7543..1f2a98f0 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -184,7 +184,7 @@ public function testGetSocketNotSetConfig() public function testGetPartitionId() { $broker = \Kafka\Broker::getInstance(); - $data = [ + $data = [ 'brokers' => [ [ 'host' => '127.0.0.1', @@ -222,7 +222,11 @@ public function testGetPartitionId() ], ]; $broker->setData($data['topics'], $data['brokers']); - $data = ['partId' => '1']; + $data = [ + 'partId' => '1', + 'topic' => 'test', + 'value' => 'test message' + ]; $partId = $broker->getPartitionId($data); $this->assertEquals('1', $partId); } From d3344652df368fd1e58c57afa32041db1ac8d510 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 16:01:27 +0300 Subject: [PATCH 10/11] Code quality test --- src/Producer/Process.php | 2 +- src/Producer/SyncProcess.php | 2 +- tests/Base/BrokerTest.php | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 7a8324ec..e121a2ce 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -293,7 +293,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partId = $broker->getPartitionId($value); + $partId = $broker->getPartitionId($value); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 712b72c4..c2a51cf3 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -133,7 +133,7 @@ protected function convertMessage($data) } $topicMeta = $topicInfos[$value['topic']]; - $partId = $broker->getPartitionId($value); + $partId = $broker->getPartitionId($value); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 1f2a98f0..4f6d0b7d 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -184,7 +184,7 @@ public function testGetSocketNotSetConfig() public function testGetPartitionId() { $broker = \Kafka\Broker::getInstance(); - $data = [ + $data = [ 'brokers' => [ [ 'host' => '127.0.0.1', @@ -222,7 +222,7 @@ public function testGetPartitionId() ], ]; $broker->setData($data['topics'], $data['brokers']); - $data = [ + $data = [ 'partId' => '1', 'topic' => 'test', 'value' => 'test message' From b59ef2525218e8495619df31694881f434a74563 Mon Sep 17 00:00:00 2001 From: "vyacheslav.demyanov" Date: Thu, 10 May 2018 16:10:16 +0300 Subject: [PATCH 11/11] Code quality test --- src/Broker.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index 33cafc7b..881deee0 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -234,8 +234,8 @@ private function getSaslMechanismProvider() : SaslMechanism public function getPartitionId($data) { $topicInfos = $this->getTopics(); - $topicMeta = $topicInfos[$data['topic']]; - $partNums = array_keys($topicMeta); + $topicMeta = $topicInfos[$data['topic']]; + $partNums = array_keys($topicMeta); if (isset($data['key']) && trim($data['key'])) { $partId = $partNums[crc32($data['key']) % count($partNums)]; } else {