diff --git a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java index ba19619410ff9..a59649aeb7bd2 100644 --- a/clients/src/main/java/org/apache/kafka/common/internals/Topic.java +++ b/clients/src/main/java/org/apache/kafka/common/internals/Topic.java @@ -28,13 +28,14 @@ public class Topic { public static final String TRANSACTION_STATE_TOPIC_NAME = "__transaction_state"; public static final String SHARE_GROUP_STATE_TOPIC_NAME = "__share_group_state"; public static final String CLUSTER_METADATA_TOPIC_NAME = "__cluster_metadata"; + public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata"; public static final TopicPartition CLUSTER_METADATA_TOPIC_PARTITION = new TopicPartition( CLUSTER_METADATA_TOPIC_NAME, 0 ); public static final String LEGAL_CHARS = "[a-zA-Z0-9._-]"; - private static final Set INTERNAL_TOPICS = Set.of(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME); + private static final Set INTERNAL_TOPICS = Set.of(GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, SHARE_GROUP_STATE_TOPIC_NAME, REMOTE_LOG_METADATA_TOPIC_NAME); private static final int MAX_NAME_LENGTH = 249; diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6ca9014f65410..c9a2b9908b252 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -529,7 +529,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (authorizedRequestInfo.isEmpty) sendResponseCallback(Map.empty) else { - val internalTopicsAllowed = request.header.clientId == "__admin_client" + val internalTopicsAllowed = request.header.clientId != null && request.header.clientId.startsWith("__") val transactionSupportedOperation = AddPartitionsToTxnManager.produceRequestVersionToTransactionSupportedOperation(request.header.apiVersion()) // call the replica manager to append messages to the replicas replicaManager.handleProduceAppend( diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java index 721d58e34d86f..90452d72f9c22 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.internals.Topic; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -41,7 +42,7 @@ */ public final class TopicBasedRemoteLogMetadataManagerConfig { - public static final String REMOTE_LOG_METADATA_TOPIC_NAME = "__remote_log_metadata"; + public static final String REMOTE_LOG_METADATA_TOPIC_NAME = Topic.REMOTE_LOG_METADATA_TOPIC_NAME; public static final String REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP = "remote.log.metadata.topic.replication.factor"; public static final String REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP = "remote.log.metadata.topic.num.partitions"; diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java index 62c16a746ffce..228160d999c01 100644 --- a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java +++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java @@ -2634,7 +2634,6 @@ public static boolean isRemoteLogEnabled(boolean remoteStorageSystemEnable, LogC // Remote log is enabled only for non-compact and non-internal topics return remoteStorageSystemEnable && !(config.compact || Topic.isInternal(topic) - || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic) || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) && config.remoteStorageEnable(); }