diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index cdb7671e1..e1a1ce6c2 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -17,6 +17,7 @@ package v1beta1 import ( "fmt" + "sort" "strconv" "strings" @@ -46,6 +47,9 @@ const ( // ProcessRolesKey is used to identify which process roles the Kafka pod has ProcessRolesKey = "processRoles" + // PvcRolesKey is used to identify which process roles a PVC serves (broker, controller, or broker_controller) + PvcRolesKey = "pvcRoles" + // IsBrokerNodeKey is used to identify if the kafka pod is either a broker or a broker_controller IsBrokerNodeKey = "isBrokerNode" @@ -1102,12 +1106,23 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string { return util.CloneMap(bConfig.BrokerAnnotations) } +// processRolesValue returns the joined role string used for both the processRoles pod label +// and the pvcRoles PVC label. In KRaft mode the roles are joined with "_"; in ZK mode always "broker". +func (bConfig *BrokerConfig) processRolesValue(kRaftMode bool) string { + if kRaftMode { + roles := append([]string(nil), bConfig.Roles...) + sort.Strings(roles) + return strings.Join(roles, "_") + } + return BrokerNodeProcessRole +} + // GetBrokerLabels returns the labels that are applied to broker pods func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string { var kraftLabels map[string]string if kRaftMode { kraftLabels = map[string]string{ - ProcessRolesKey: strings.Join(bConfig.Roles, "_"), + ProcessRolesKey: bConfig.processRolesValue(kRaftMode), IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()), IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()), } @@ -1214,6 +1229,13 @@ func (bConfig *BrokerConfig) IsCombinedNode() bool { return bConfig.IsBrokerNode() && bConfig.IsControllerNode() } +// GetPvcRolesLabelValue returns the value for the pvcRoles label on PVCs. +// In KRaft mode the value mirrors processRoles (e.g. "broker", "controller", "broker_controller"). +// In ZooKeeper mode all nodes are brokers, so the value is always "broker". +func (bConfig *BrokerConfig) GetPvcRolesLabelValue(kRaftMode bool) string { + return bConfig.processRolesValue(kRaftMode) +} + // GetResources returns the broker specific Kubernetes resource func (bConfig *BrokerConfig) GetResources() *corev1.ResourceRequirements { if bConfig.Resources != nil { diff --git a/api/v1beta1/kafkacluster_types_test.go b/api/v1beta1/kafkacluster_types_test.go index 38a7be38e..996adc629 100644 --- a/api/v1beta1/kafkacluster_types_test.go +++ b/api/v1beta1/kafkacluster_types_test.go @@ -515,7 +515,7 @@ func TestGetBrokerLabels(t *testing.T) { BrokerIdLabelKey: strconv.Itoa(expectedBrokerId), KafkaCRLabelKey: expectedKafkaCRName, "test_label_key": "test_label_value", - ProcessRolesKey: "controller_broker", + ProcessRolesKey: "broker_controller", IsBrokerNodeKey: "true", IsControllerNodeKey: "true", }, diff --git a/pkg/resources/kafka/kafka.go b/pkg/resources/kafka/kafka.go index eec273a7a..af0a186d1 100644 --- a/pkg/resources/kafka/kafka.go +++ b/pkg/resources/kafka/kafka.go @@ -318,7 +318,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error { if storage.PvcSpec == nil { continue } - o, err := r.pvc(broker.Id, index, storage) + o, err := r.pvc(broker.Id, index, storage, brokerConfig, r.KafkaCluster.Spec.KRaftMode) if err != nil { return errors.WrapIfWithDetails(err, "failed to generate resource", "resources", "PersistentVolumeClaim") } diff --git a/pkg/resources/kafka/pvc.go b/pkg/resources/kafka/pvc.go index e0555a64d..4d6f59591 100644 --- a/pkg/resources/kafka/pvc.go +++ b/pkg/resources/kafka/pvc.go @@ -31,7 +31,7 @@ import ( "github.com/Masterminds/sprig/v3" ) -func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig) (*corev1.PersistentVolumeClaim, error) { +func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.StorageConfig, brokerConfig *v1beta1.BrokerConfig, kRaftMode bool) (*corev1.PersistentVolumeClaim, error) { errCtx := []interface{}{v1beta1.BrokerIdLabelKey, brokerId, mountPathAnnotationKey, storage.MountPath} pvcSpecYaml, err := yaml.Marshal(storage.PvcSpec) @@ -67,7 +67,10 @@ func (r *Reconciler) pvc(brokerId int32, storageIndex int, storage v1beta1.Stora fmt.Sprintf(brokerStorageTemplate, r.KafkaCluster.Name, brokerId, storageIndex), apiutil.MergeLabels( apiutil.LabelsForKafka(r.KafkaCluster.Name), - map[string]string{v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId)}, + map[string]string{ + v1beta1.BrokerIdLabelKey: fmt.Sprintf("%d", brokerId), + v1beta1.PvcRolesKey: brokerConfig.GetPvcRolesLabelValue(kRaftMode), + }, ), map[string]string{mountPathAnnotationKey: storage.MountPath}, r.KafkaCluster), Spec: pvcSpec, diff --git a/pkg/resources/kafka/pvc_test.go b/pkg/resources/kafka/pvc_test.go index 95520342c..796453f1c 100644 --- a/pkg/resources/kafka/pvc_test.go +++ b/pkg/resources/kafka/pvc_test.go @@ -47,13 +47,19 @@ func TestReconciler_pvc(t *testing.T) { }, } + brokerConfig := &v1beta1.BrokerConfig{} + testCases := []struct { testName string + brokerConfig *v1beta1.BrokerConfig + kRaftMode bool storageConfig v1beta1.StorageConfig expectedPersistentVolumeClaim *corev1.PersistentVolumeClaim }{ { - testName: "storage config with no template", + testName: "storage config with no template", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/kafka-logs-1", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -71,6 +77,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/kafka-logs-1", @@ -84,7 +91,9 @@ func TestReconciler_pvc(t *testing.T) { }, }, { - testName: "storage config with template", + testName: "storage config with template", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/kafka-logs-1", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -108,6 +117,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/kafka-logs-1", @@ -125,7 +135,9 @@ func TestReconciler_pvc(t *testing.T) { }, }, { - testName: "storage config with template and very long mount path", + testName: "storage config with template and very long mount path", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/mountpath/that/exceeds63characters/kafka-logs-123456789123456789", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -147,6 +159,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/mountpath/that/exceeds63characters/kafka-logs-123456789123456789", @@ -164,7 +177,9 @@ func TestReconciler_pvc(t *testing.T) { }, }, { - testName: "storage config with volume name template", + testName: "storage config with volume name template", + brokerConfig: brokerConfig, + kRaftMode: false, storageConfig: v1beta1.StorageConfig{ MountPath: "/kafka-logs-1", PvcSpec: &corev1.PersistentVolumeClaimSpec{ @@ -180,6 +195,7 @@ func TestReconciler_pvc(t *testing.T) { v1beta1.AppLabelKey: "kafka", v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", }, Annotations: map[string]string{ "mountPath": "/kafka-logs-1", @@ -190,6 +206,84 @@ func TestReconciler_pvc(t *testing.T) { }, }, }, + { + testName: "kraft controller-only node", + brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.ControllerNodeProcessRole}}, + kRaftMode: true, + storageConfig: v1beta1.StorageConfig{ + MountPath: "/kafka-logs-1", + PvcSpec: &corev1.PersistentVolumeClaimSpec{}, + }, + expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kafkaCluster.GetNamespace(), + Name: "", + GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()), + Labels: map[string]string{ + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), + v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "controller", + }, + Annotations: map[string]string{ + "mountPath": "/kafka-logs-1", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + }, + }, + { + testName: "kraft broker-only node", + brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.BrokerNodeProcessRole}}, + kRaftMode: true, + storageConfig: v1beta1.StorageConfig{ + MountPath: "/kafka-logs-1", + PvcSpec: &corev1.PersistentVolumeClaimSpec{}, + }, + expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kafkaCluster.GetNamespace(), + Name: "", + GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()), + Labels: map[string]string{ + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), + v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker", + }, + Annotations: map[string]string{ + "mountPath": "/kafka-logs-1", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + }, + }, + { + testName: "kraft combined broker+controller node", + brokerConfig: &v1beta1.BrokerConfig{Roles: []string{v1beta1.BrokerNodeProcessRole, v1beta1.ControllerNodeProcessRole}}, + kRaftMode: true, + storageConfig: v1beta1.StorageConfig{ + MountPath: "/kafka-logs-1", + PvcSpec: &corev1.PersistentVolumeClaimSpec{}, + }, + expectedPersistentVolumeClaim: &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: kafkaCluster.GetNamespace(), + Name: "", + GenerateName: fmt.Sprintf("%s-2-storage-1-", kafkaCluster.GetName()), + Labels: map[string]string{ + v1beta1.AppLabelKey: "kafka", + v1beta1.KafkaCRLabelKey: kafkaCluster.GetName(), + v1beta1.BrokerIdLabelKey: "2", + v1beta1.PvcRolesKey: "broker_controller", + }, + Annotations: map[string]string{ + "mountPath": "/kafka-logs-1", + }, + }, + Spec: corev1.PersistentVolumeClaimSpec{}, + }, + }, } t.Parallel() @@ -198,7 +292,7 @@ func TestReconciler_pvc(t *testing.T) { test := test t.Run(test.testName, func(t *testing.T) { - pvc, err := r.pvc(2, 1, test.storageConfig) + pvc, err := r.pvc(2, 1, test.storageConfig, test.brokerConfig, test.kRaftMode) assert.NilError(t, err, "PVC creation should succeed")