From 789566eee785cb000e2a78ec8f5a66c17e0e1c95 Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 18:22:56 +0300 Subject: [PATCH 01/11] Choose partition by key for several brokers (to master) --- src/Broker.php | 17 +++++++++++ src/Producer/Process.php | 5 +--- src/Producer/SyncProcess.php | 5 +--- tests/Base/BrokerTest.php | 56 ++++++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index b0553d43..03290f7a 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -281,4 +281,21 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism)); } + + /** + * @param array $record + * @return int + */ + public function getPartitionId($record) + { + $topicInfos = $this->getTopics(); + $topicMeta = $topicInfos[$record['topic']]; + $partNums = array_keys($topicMeta); + if (isset($record['key']) && trim($record['key'])) { + $partId = $partNums[crc32($record['key']) % count($partNums)]; + } else { + $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; + } + return $partId; + } } diff --git a/src/Producer/Process.php b/src/Producer/Process.php index ccb5d775..160fa9aa 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -320,10 +320,7 @@ protected function convertRecordSet(array $recordSet): array $this->recordValidator->validate($record, $topics); $topicMeta = $topics[$record['topic']]; - $partNums = array_keys($topicMeta); - shuffle($partNums); - - $partId = ! isset($record['partId'], $topicMeta[$record['partId']]) ? $partNums[0] : $record['partId']; + $partId = $broker->getPartitionId($record); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index 71b1cb06..bf1883cd 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -161,10 +161,7 @@ protected function convertRecordSet(array $recordSet): array $this->recordValidator->validate($record, $topics); $topicMeta = $topics[$record['topic']]; - $partNums = array_keys($topicMeta); - shuffle($partNums); - - $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; + $partId = $broker->getPartitionId($record); $brokerId = $topicMeta[$partId]; $topicData = []; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index a788969d..96b8bf2f 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -150,4 +150,60 @@ private function getBroker(): Broker { return Broker::getInstance(); } + + /** + * testGetPartitionId + * + * @access public + * @return void + */ + public function testGetPartitionId() + { + $broker = \Kafka\Broker::getInstance(); + $data = [ + 'brokers' => [ + [ + 'host' => '127.0.0.1', + 'port' => '9092', + 'nodeId' => '0', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9192', + 'nodeId' => '1', + ], + [ + 'host' => '127.0.0.1', + 'port' => '9292', + 'nodeId' => '2', + ], + ], + 'topics' => [ + [ + 'topicName' => 'test', + 'errorCode' => 0, + 'partitions' => [ + [ + 'partitionId' => 0, + 'errorCode' => 0, + 'leader' => 0, + ], + [ + 'partitionId' => 1, + 'errorCode' => 0, + 'leader' => 2, + ], + ], + ], + ], + ]; + $broker->setData($data['topics'], $data['brokers']); + $data = [ + 'partId' => '1', + 'topic' => 'test', + 'value' => 'test message' + ]; + $partId = $broker->getPartitionId($data); + $this->assertEquals('1', $partId); + } } From ecd6185785595c95ab357fab4cbf54f4e4d51417 Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 18:37:43 +0300 Subject: [PATCH 02/11] Choose partition by key for several brokers (to master) --- src/Producer/Process.php | 1 - tests/Base/BrokerTest.php | 8 +------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/src/Producer/Process.php b/src/Producer/Process.php index 160fa9aa..4636550e 100644 --- a/src/Producer/Process.php +++ b/src/Producer/Process.php @@ -10,7 +10,6 @@ use Kafka\ProducerConfig; use Kafka\Protocol; use Psr\Log\LoggerAwareTrait; -use function array_keys; use function count; use function explode; use function in_array; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 96b8bf2f..e87bee46 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -151,15 +151,9 @@ private function getBroker(): Broker return Broker::getInstance(); } - /** - * testGetPartitionId - * - * @access public - * @return void - */ public function testGetPartitionId() { - $broker = \Kafka\Broker::getInstance(); + $broker = Broker::getInstance(); $data = [ 'brokers' => [ [ From 3cdce78c8a7944017dbfe6ac1b11bef9697b4ec0 Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 18:43:02 +0300 Subject: [PATCH 03/11] Choose partition by key for several brokers (to master) --- tests/Base/BrokerTest.php | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index e87bee46..f48e6be3 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -151,7 +151,8 @@ private function getBroker(): Broker return Broker::getInstance(); } - public function testGetPartitionId() + + public function testGetPartitionId(): void { $broker = Broker::getInstance(); $data = [ @@ -195,7 +196,7 @@ public function testGetPartitionId() $data = [ 'partId' => '1', 'topic' => 'test', - 'value' => 'test message' + 'value' => 'test message', ]; $partId = $broker->getPartitionId($data); $this->assertEquals('1', $partId); From 569d7d0bdcf9caf6e3a7346bec477b276e39a35f Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 18:49:51 +0300 Subject: [PATCH 04/11] correct tests --- src/Broker.php | 3 +++ tests/Base/BrokerTest.php | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/src/Broker.php b/src/Broker.php index 03290f7a..845a3ae4 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -13,6 +13,9 @@ use function shuffle; use function sprintf; use function strpos; +use function trim; +use function crc32; +use function count; class Broker { diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index f48e6be3..ccb3dfd3 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -152,6 +152,11 @@ private function getBroker(): Broker } + /** + * testGetPartitionId + * @access public + * @return void + */ public function testGetPartitionId(): void { $broker = Broker::getInstance(); From c23efc528c5a509838d98d0a540028c458644f0b Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 18:56:56 +0300 Subject: [PATCH 05/11] correct tests --- src/Broker.php | 7 +++---- src/Producer/SyncProcess.php | 1 - tests/Base/BrokerTest.php | 1 - 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index 845a3ae4..d91387b2 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -7,6 +7,8 @@ use Kafka\Sasl\Plain; use Kafka\Sasl\Scram; use function array_keys; +use function count; +use function crc32; use function explode; use function in_array; use function serialize; @@ -14,8 +16,6 @@ use function sprintf; use function strpos; use function trim; -use function crc32; -use function count; class Broker { @@ -287,9 +287,8 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism /** * @param array $record - * @return int */ - public function getPartitionId($record) + public function getPartitionId($record): int { $topicInfos = $this->getTopics(); $topicMeta = $topicInfos[$record['topic']]; diff --git a/src/Producer/SyncProcess.php b/src/Producer/SyncProcess.php index bf1883cd..658a22dd 100644 --- a/src/Producer/SyncProcess.php +++ b/src/Producer/SyncProcess.php @@ -9,7 +9,6 @@ use Kafka\ProducerConfig; use Kafka\Protocol\Protocol; use Psr\Log\LoggerAwareTrait; -use function array_keys; use function count; use function explode; use function json_encode; diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index ccb3dfd3..25369c20 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -155,7 +155,6 @@ private function getBroker(): Broker /** * testGetPartitionId * @access public - * @return void */ public function testGetPartitionId(): void { From 3578b14f9ac3da469b3bdccdbea0c61cefecfcea Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 19:01:45 +0300 Subject: [PATCH 06/11] correct tests --- tests/Base/BrokerTest.php | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/Base/BrokerTest.php b/tests/Base/BrokerTest.php index 25369c20..61b641be 100644 --- a/tests/Base/BrokerTest.php +++ b/tests/Base/BrokerTest.php @@ -151,7 +151,6 @@ private function getBroker(): Broker return Broker::getInstance(); } - /** * testGetPartitionId * @access public From cb340037ec4673d6c4ff789a313f04ad127151da Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 19:06:33 +0300 Subject: [PATCH 07/11] correct tests --- src/Broker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Broker.php b/src/Broker.php index d91387b2..48559ecc 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -298,6 +298,6 @@ public function getPartitionId($record): int } else { $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; } - return $partId; + return (int)$partId; } } From e985d14efe5c1b0e16aef05f6a744387b612ea8e Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 19:16:09 +0300 Subject: [PATCH 08/11] correct tests --- src/Broker.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index 48559ecc..f4f6a065 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -286,7 +286,7 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism } /** - * @param array $record + * @param mixed[] $record */ public function getPartitionId($record): int { @@ -298,6 +298,6 @@ public function getPartitionId($record): int } else { $partId = isset($record['partId'], $topicMeta[$record['partId']]) ? $record['partId'] : $partNums[0]; } - return (int)$partId; + return (int) $partId; } } From 45b6dfcf2f59618cad80f558916dc77e7c650bbd Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 19:21:45 +0300 Subject: [PATCH 09/11] correct tests --- src/Broker.php | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/Broker.php b/src/Broker.php index f4f6a065..7a8b0929 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -285,10 +285,7 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism)); } - /** - * @param mixed[] $record - */ - public function getPartitionId($record): int + public function getPartitionId(array $record): int { $topicInfos = $this->getTopics(); $topicMeta = $topicInfos[$record['topic']]; From eacbe3e605c117306a9c37db38a467024c52d55e Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 19:24:55 +0300 Subject: [PATCH 10/11] correct tests --- src/Broker.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Broker.php b/src/Broker.php index 7a8b0929..fb48ae04 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -285,6 +285,9 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism throw new Exception(sprintf('"%s" is an invalid SASL mechanism', $mechanism)); } + /** + * @param array $record + */ public function getPartitionId(array $record): int { $topicInfos = $this->getTopics(); From 211726138bbc9552c8472c8697562c7cd4a8bdcd Mon Sep 17 00:00:00 2001 From: Demyanov Vyacheslav Date: Thu, 24 May 2018 19:31:00 +0300 Subject: [PATCH 11/11] correct tests --- src/Broker.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Broker.php b/src/Broker.php index fb48ae04..08aa9208 100644 --- a/src/Broker.php +++ b/src/Broker.php @@ -286,7 +286,7 @@ private function getSaslMechanismProvider(Config $config): SaslMechanism } /** - * @param array $record + * @param mixed[] $record */ public function getPartitionId(array $record): int {