Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
bin
charts/**/charts
charts/koperator/requirements.lock

charts/kafka-operator/ingress
# Test binary, build with `go test -c`
*.test

Expand Down
13 changes: 10 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,16 @@ type KafkaClusterSpec struct {
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=false
// +optional
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
KRaftMode bool `json:"kRaft"`
HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
// DebugEnabled is used to decide whether to create a separate loadbalancer services for the
// Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
// cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
// a kafkaCluster instance on a Kind Cluster.
// +kubebuilder:default=false
// +optional
DebugEnabled bool `json:"debugEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
Expand Down
8 changes: 8 additions & 0 deletions charts/kafka-operator/crds/kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down
8 changes: 8 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19231,6 +19231,14 @@ spec:
type: object
type: array
type: object
debugEnabled:
default: false
description: |-
DebugEnabled is used to decide whether to create a separate loadbalancer services for the
Kafka and Cruise Control Pods. These services will expose the internal listener ports of the Kafka
cluster with LoadBalancer type, which can be used for running Koperator on a local machine against
a kafkaCluster instance on a Kind Cluster.
type: boolean
disruptionBudget:
description: DisruptionBudget defines the configuration for PodDisruptionBudget
where the workload is managed by the kafka-operator
Expand Down
10 changes: 10 additions & 0 deletions config/samples/simpleZookeeper.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
apiVersion: zookeeper.pravega.io/v1beta1
kind: ZookeeperCluster
metadata:
name: zookeeper-server
namespace: zookeeper
spec:
replicas: 3
persistence:
reclaimPolicy: Delete

3 changes: 2 additions & 1 deletion config/samples/simplekafkacluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ metadata:
controller-tools.k8s.io: "1.0"
name: kafka
spec:
debugEnabled: true
kRaft: false
monitoringConfig:
jmxImage: "ghcr.io/adobe/koperator/jmx-javaagent:1.4.0"
headlessServiceEnabled: true
headlessServiceEnabled: false
zkAddresses:
- "zookeeper-server-client.zookeeper:2181"
propagateLabels: false
Expand Down
22 changes: 22 additions & 0 deletions config/scaleops/CustomOwnerGrouping.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@

kind: CustomOwnerGrouping
apiVersion: analysis.scaleops.sh/v1alpha1
metadata:
name: kafkabroker
namespace: scaleops-system
spec:
groupBy:
positiveRegexMatch: false
groupBys:
- labels:
- 'isBrokerNode: true'
positiveRegexMatch: false
topOwnerController:
apiVersion: kafka.banzaicloud.io/v1beta1
kind: KafkaCluster
displayOptions:
hideGeneratedSuffix: true
fields:
- ownerName
defaultPolicy: kafka-brokers
enabled: true
9 changes: 8 additions & 1 deletion pkg/resources/cruisecontrol/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ import (
)

func (r *Reconciler) service() runtime.Object {
return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMeta(
fmt.Sprintf(serviceNameTemplate, r.KafkaCluster.Name),
apiutil.MergeLabels(ccLabelSelector(r.KafkaCluster.Name), r.KafkaCluster.Labels),
r.KafkaCluster,
),
Spec: corev1.ServiceSpec{
Selector: ccLabelSelector(r.KafkaCluster.Name),
Type: corev1.ServiceTypeClusterIP,
Ports: []corev1.ServicePort{
{
Name: "cc",
Expand All @@ -50,4 +51,10 @@ func (r *Reconciler) service() runtime.Object {
},
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
8 changes: 7 additions & 1 deletion pkg/resources/kafka/allBrokerService.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (r *Reconciler) allBrokerService() runtime.Object {
usedPorts = append(usedPorts,
generateServicePortForAdditionalPorts(r.KafkaCluster.Spec.AdditionalPorts)...)

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(
fmt.Sprintf(kafkautils.AllBrokerServiceTemplate, r.KafkaCluster.GetName()),
apiutil.LabelsForKafka(r.KafkaCluster.GetName()),
Expand All @@ -52,4 +52,10 @@ func (r *Reconciler) allBrokerService() runtime.Object {
Ports: usedPorts,
},
}

if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}

return svc
}
6 changes: 5 additions & 1 deletion pkg/resources/kafka/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Protocol: corev1.ProtocolTCP,
})

return &corev1.Service{
svc := &corev1.Service{
ObjectMeta: templates.ObjectMetaWithAnnotations(fmt.Sprintf("%s-%d", r.KafkaCluster.Name, id),
apiutil.MergeLabels(
apiutil.LabelsForKafka(r.KafkaCluster.Name),
Expand All @@ -61,4 +61,8 @@ func (r *Reconciler) service(id int32, _ *v1beta1.BrokerConfig) runtime.Object {
Ports: usedPorts,
},
}
if r.KafkaCluster.Spec.DebugEnabled {
svc.Spec.Type = corev1.ServiceTypeLoadBalancer
}
return svc
}
Loading
Loading