Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@

<suppress checks="(WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
files="Murmur3Test.java"/>
<suppress checks="ImportControl" files="AbstractConfigTest.java"/>

<!-- Connect -->
<suppress checks="ClassFanOutComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -81,7 +84,7 @@
public class ConfigDef {

private static final Pattern COMMA_WITH_WHITESPACE = Pattern.compile("\\s*,\\s*");

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigDef.class);
/**
* A unique Java object which represents the lack of a default value.
*/
Expand Down Expand Up @@ -536,6 +539,14 @@ Object parseValue(ConfigKey key, Object value, boolean isSet) {
// otherwise assign setting its default value
parsedValue = key.defaultValue;
}
if (key.validator instanceof ValidList && parsedValue instanceof List) {
List<?> originalListValue = (List<?>) parsedValue;
parsedValue = originalListValue.stream().distinct().collect(Collectors.toList());
if (originalListValue.size() != ((List<?>) parsedValue).size()) {
LOGGER.warn("Duplicate configuration \"{}\" values are found. Duplicates will be removed. The original value " +
"is: {}, the updated value is: {}", key.name, originalListValue, parsedValue);
}
}
if (key.validator != null) {
key.validator.ensureValid(key.name, parsedValue);
}
Expand Down Expand Up @@ -1070,8 +1081,10 @@ private void validateIndividualValues(String name, List<Object> values) {
}

public String toString() {
return validString + (isEmptyAllowed ? " (empty config allowed)" : " (empty not allowed)") +
(isNullAllowed ? " (null config allowed)" : " (null not allowed)");
String base = validString.validStrings.isEmpty() ? "any non-duplicate values" : validString.toString();
String emptyList = isEmptyAllowed ? ", empty list" : "";
String nullValue = isNullAllowed ? ", null" : "";
return base + emptyList + nullValue;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/
package org.apache.kafka.common.config;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.provider.EnvVarConfigProvider;
Expand All @@ -42,6 +46,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -292,22 +297,23 @@ public void testConfiguredInstancesClosedOnFailure() {

try {
Map<String, String> props = new HashMap<>();
String threeConsumerInterceptors = MockConsumerInterceptor.class.getName() + ", "
+ MockConsumerInterceptor.class.getName() + ", "
String threeConsumerInterceptors = CloseInterceptor.class.getName() + ", "
+ MockConsumerInterceptor.class.getName();
props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, threeConsumerInterceptors);
props.put("client.id", "test");
TestConfig testConfig = new TestConfig(props);

MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(3);
MockConsumerInterceptor.setThrowOnConfigExceptionThreshold(1);
assertThrows(
Exception.class,
() -> testConfig.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, Object.class)
);
assertEquals(3, MockConsumerInterceptor.CONFIG_COUNT.get());
assertEquals(3, MockConsumerInterceptor.CLOSE_COUNT.get());
assertEquals(1, MockConsumerInterceptor.CONFIG_COUNT.get());
assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
assertEquals(1, CloseInterceptor.CLOSE_COUNT.get());
} finally {
MockConsumerInterceptor.resetCounters();
CloseInterceptor.resetCounters();
}
}

Expand Down Expand Up @@ -746,4 +752,33 @@ public void configure(Map<String, ?> configs) {
configs.get(EXTRA_CONFIG);
}
}

public static class CloseInterceptor implements ConsumerInterceptor<String, String> {

public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);

@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
return null;
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
// no-op
}

@Override
public void close() {
CLOSE_COUNT.incrementAndGet();
}

@Override
public void configure(Map<String, ?> configs) {
// no-op
}

public static void resetCounters() {
CLOSE_COUNT.set(0);
}
}
}
Copy link
Collaborator

@Yunyung Yunyung Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you cover the case have one element is empty string in List like ("a", "", "a") and ("a", "", "c"). It’s possible, for example: bootstrap.servers=localhost:9091,,localhost:9092 (Two consecutive commas; this would result in List.of(localhost:9091, "", "localhost:9092")

And for completeness, should this kind of case be clarified in KIP?

Original file line number Diff line number Diff line change
Expand Up @@ -760,34 +760,55 @@ public void testListSizeValidatorToString() {

@Test
public void testListValidatorAnyNonDuplicateValues() {
ConfigDef.ValidList allowAnyNonDuplicateValues = ConfigDef.ValidList.anyNonDuplicateValues(true, true);
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", null));
ConfigException exception1 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "a")));
ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyListAndNull = ConfigDef.ValidList.anyNonDuplicateValues(true, true);
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("a", "b", "c")));
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of()));
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", null));
ConfigException exception1 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be duplicated.", exception1.getMessage());
ConfigException exception2 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
ConfigException exception2 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.", exception2.getMessage());

ConfigException exception3 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyListAndNull.ensureValid("test.config", List.of("a", "", "b")));
assertEquals("Configuration 'test.config' values must not be empty.", exception3.getMessage());

ConfigDef.ValidList allowAnyNonDuplicateValuesAndNull = ConfigDef.ValidList.anyNonDuplicateValues(false, true);
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "b", "c")));
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", null));
ConfigException exception3 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
assertEquals("Configuration 'test.config' must not be empty. Valid values include: any non-empty value", exception3.getMessage());
ConfigException exception4 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be duplicated.", exception4.getMessage());
ConfigException exception5 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.", exception5.getMessage());
ConfigException exception4 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of()));
assertEquals("Configuration 'test.config' must not be empty. Valid values include: any non-empty value", exception4.getMessage());
ConfigException exception5 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be duplicated.", exception5.getMessage());
ConfigException exception6 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.", exception6.getMessage());
ConfigException exception7 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndNull.ensureValid("test.config", List.of("a", "", "b")));
assertEquals("Configuration 'test.config' values must not be empty.", exception7.getMessage());


ConfigDef.ValidList allowAnyNonDuplicateValuesAndEmptyList = ConfigDef.ValidList.anyNonDuplicateValues(true, false);
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "b", "c")));
assertDoesNotThrow(() -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of()));
ConfigException exception6 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
assertEquals("Configuration 'test.config' values must not be null.", exception6.getMessage());
ConfigException exception7 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be duplicated.", exception7.getMessage());
ConfigException exception8 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.", exception8.getMessage());
ConfigException exception8 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", null));
assertEquals("Configuration 'test.config' values must not be null.", exception8.getMessage());
ConfigException exception9 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "a")));
assertEquals("Configuration 'test.config' values must not be duplicated.", exception9.getMessage());
ConfigException exception10 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.", exception10.getMessage());
ConfigException exception11 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValuesAndEmptyList.ensureValid("test.config", List.of("a", "", "b")));
assertEquals("Configuration 'test.config' values must not be empty.", exception11.getMessage());


ConfigDef.ValidList allowAnyNonDuplicateValues = ConfigDef.ValidList.anyNonDuplicateValues(false, false);
assertDoesNotThrow(() -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "b", "c")));
ConfigException exception12 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", null));
assertEquals("Configuration 'test.config' values must not be null.", exception12.getMessage());
ConfigException exception13 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of()));
assertEquals("Configuration 'test.config' must not be empty. Valid values include: any non-empty value", exception13.getMessage());
ConfigException exception14 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "", "b")));
assertEquals("Configuration 'test.config' values must not be empty.", exception14.getMessage());
ConfigException exception15 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("")));
assertEquals("Configuration 'test.config' values must not be empty.", exception15.getMessage());
ConfigException exception16 = assertThrows(ConfigException.class, () -> allowAnyNonDuplicateValues.ensureValid("test.config", List.of("a", "", "b")));
assertEquals("Configuration 'test.config' values must not be empty.", exception16.getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@ public class AllowlistConnectorClientConfigOverridePolicy extends AbstractConnec
private static final String ALLOWLIST_CONFIG_DOC = "List of client configurations that can be overridden by " +
"connectors. If empty, connectors can't override any client configurations.";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC);
.define(
ALLOWLIST_CONFIG,
ConfigDef.Type.LIST,
ALLOWLIST_CONFIG_DEFAULT,
ConfigDef.ValidList.anyNonDuplicateValues(true, false),
ConfigDef.Importance.MEDIUM, ALLOWLIST_CONFIG_DOC
);

private List<String> allowlist = ALLOWLIST_CONFIG_DEFAULT;

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/log/LogConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class LogConfigTest {
assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "compact,delete")
assertThrows(classOf[ConfigException], () => validateCleanupPolicy())
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete")
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "delete,delete,delete")
validateCleanupPolicy()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, "")
validateCleanupPolicy()
Expand Down
12 changes: 2 additions & 10 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,6 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "HOST://localhost:9091,LB://localhost:9092")
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "HOST://localhost:9091,LB://localhost:9091")
KafkaConfig.fromProps(props)

// but not duplicate names
props.setProperty(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "HOST://localhost:9091,HOST://localhost:9091")
assertBadConfigContainingMessage(props, "Configuration 'advertised.listeners' values must not be duplicated.")
}

@Test
Expand All @@ -247,10 +243,6 @@ class KafkaConfigTest {
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("If you have two listeners on the same port then one needs to be IPv4 and the other IPv6"))

props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,PLAINTEXT://127.0.0.1:9092")
val exception = assertThrows(classOf[ConfigException], () => KafkaConfig.fromProps(props))
assertTrue(exception.getMessage.contains("values must not be duplicated."))

props.put(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://127.0.0.1:9092,SSL://127.0.0.1:9092,SASL_SSL://127.0.0.1:9092")
caught = assertThrows(classOf[IllegalArgumentException], () => KafkaConfig.fromProps(props))
assertTrue(caught.getMessage.contains("Each listener must have a different port"))
Expand Down Expand Up @@ -301,7 +293,7 @@ class KafkaConfigTest {
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")

assertBadConfigContainingMessage(props,
assertBadConfigContainingMessage(props,
"Missing required configuration \"controller.listener.names\" which has no default value.")

props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
Expand All @@ -322,7 +314,7 @@ class KafkaConfigTest {
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")

assertFalse(isValidKafkaConfig(props))
assertBadConfigContainingMessage(props,
assertBadConfigContainingMessage(props,
"Missing required configuration \"controller.listener.names\" which has no default value.")

props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
Expand Down
4 changes: 3 additions & 1 deletion docs/upgrade.html
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ <h5><a id="upgrade_420_notable" href="#upgrade_420_notable">Notable changes in 4
<ul>
<li>Null values are no longer accepted for most LIST-type configurations, except those that explicitly
allow a null default value or where a null value has a well-defined semantic meaning.</li>
<li>Duplicate entries within the same list are no longer permitted.</li>
<li>Most LIST-type configurations no longer accept duplicate entries, except in cases where duplicates
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Should we change ValidList.ensureValid() to only log a warning for duplicated values?
  2. The following makes the generated "Valid Values" in html quite verbose. It would be useful to improve that. For example, if empty is not allowed, we probably don't need to say it. If empty is allowed, we could add "empty".
        public String toString() {
            return validString + (isEmptyAllowed ? " (empty config allowed)" : " (empty not allowed)") +
                    (isNullAllowed ? " (null config allowed)" : " (null not allowed)");
        }

Also, "Valid Values" for anyNonDuplicateValues will show up as [], which is counter intuitive.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change ValidList.ensureValid() to only log a warning for duplicated values?

I think we can keep the exception-throwing behavior, but we now preprocess the user’s configuration to remove duplicate entries.

The following makes the generated "Valid Values" in html quite verbose.

I have updated the document
asdfsfd
sdaf
dlfsakfds

are explicitly supported. However, if users configure duplicate entries, the internal deduplication
logic will still handle them.</li>
<li>Empty lists are no longer allowed, except in configurations where an empty list has a well-defined
semantic meaning.</li>
</ul>
Expand Down