- Machine Controller
- MC Launch
- Machine Controller Loop
- app.Run
- app.StartControllers
- Machine Controller Initialization
- 1. NewController factory func
- 1.1 Init Controller Struct
- 1.2 Assign Listers and HasSynced funcs to controller struct
- 1.3 Register Controller Event Handlers on Informers.
- Machine Controller Run
Machine Controller
The Machine Controller handles reconciliation of Machine and MachineClass objects.
The Machine Controller Entry Point for any provider is at
machine-controller-manager-provider-<name>/cmd/machine-controller/main.go
MC Launch
Dev
Build
A Makefile
in the root of machine-controller-manager-provider-<name>
builds the provider specific machine controller for linux with CGO enabled. The make build
target invokes the shell script .ci/build to do this.
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-a \
-v \
-o ${BINARY_PATH}/rel/machine-controller \
cmd/machine-controller/main.goo
Launch
Assuming one has initialized the variables using make download-kubeconfigs
, one can then use make start
target which launches the MC with flags as shown below.
Most of these timeout flags are redundant since exact same values are
given in machine-controller-manager/pkg/util/provider/app/options.NewMCServer
go run -mod=vendor
cmd/machine-controller/main.go
--control-kubeconfig=$(CONTROL_KUBECONFIG)
--target-kubeconfig=$(TARGET_KUBECONFIG)
--namespace=$(CONTROL_NAMESPACE)
--machine-creation-timeout=20m
--machine-drain-timeout=5m
--machine-health-timeout=10m
--machine-pv-detach-timeout=2m
--machine-safety-apiserver-statuscheck-timeout=30s
--machine-safety-apiserver-statuscheck-period=1m
--machine-safety-orphan-vms-period=30m
--leader-elect=$(LEADER_ELECT)
--v=3
Prod
Build
A Dockerfile
builds the provider specific machine controller and launches it directly with no CLI arguments. Hence uses coded defaults
RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH \
go build \
-mod=vendor \
-o /usr/local/bin/machine-controller \
cmd/machine-controller/main.go
COPY --from=builder /usr/local/bin/machine-controller /machine-controller
ENTRYPOINT ["/machine-controller"]
The machine-controller-manager
deployment usually launches both the MC in a Pod with following arguments
./machine-controller
--control-kubeconfig=inClusterConfig
--machine-creation-timeout=20m
--machine-drain-timeout=2h
--machine-health-timeout=10m
--namespace=shoot--i034796--tre
--port=10259
--target-kubeconfig=/var/run/secrets/gardener.cloud/shoot/generic-kubeconfig/kubeconfig
--v=3
Launch Flow
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB Begin(("cmd/ machine-controller/ main.go")) -->NewMCServer["mc=options.NewMCServer"] -->AddFlaogs["mc.AddFlags(pflag.CommandLine)"] -->LogOptions["options := k8s.io/component/base/logs.NewOptions() options.AddFlags(pflag.CommandLine)"] -->InitFlags["flag.InitFlags"] InitFlags--local-->NewLocalDriver[" driver, err := local.NewDriver(s.ControlKubeconfig) if err exit "] InitFlags--aws-->NewPlatformDriver[" driver := aws.NewAWSDriver(&spi.PluginSPIImpl{})) OR driver := cp.NewAzureDriver(&spi.PluginSPIImpl{}) //etc "] NewLocalDriver-->AppRun[" err := app.Run(mc, driver) "] NewPlatformDriver-->AppRun AppRun-->End(("if err != nil os.Exit(1)"))
Summary
-
Creates machine-controller-manager/pkg/util/provider/app/options.MCServer using
options.NewMCServer
which is the main context object for the machinecontroller that embeds a options.MachineControllerConfiguration.options.NewMCServer
initializesoptions.MCServer
struct with default values forPort: 10258
,Namespace: default
,ConcurrentNodeSyncs: 50
: number of worker go-routines that are used to process items from a work queue. See Worker belowNodeConditions: "KernelDeadLock,ReadonlyFilesystem,DiskPressure,NetworkUnavailable"
(failure node conditions that indicate that a machine is un-healthy)MinResyncPeriod: 12 hours
,KubeAPIQPS: 20
,KubeAPIBurst:30
: config params for k8s clients. See rest.Config
-
calls
MCServer.AddFlags
which defines all parsing flags for the machine controller into fields ofMCServer
instance created in the last step. -
calls
k8s.io/component-base/logs.NewOptions
and thenoptions.AddFlags
for logging options. TODO: Should get rid of this when moving tologr
.)- See Logging In Gardener Components.
- Then use the logchecktool.
-
Driver initialization code varies according to the provider type.
- Local Driver
- calls
NewDriver
with control kube config that creates a controller runtime client (sigs.k8s.io/controller-runtime/pkg/client
) which then callspkg/local/driver.NewDriver
passing the controlloer-runtime client which constructs alocaldriver
encapsulating the passed in client. driver := local.NewDriver(c)
- the
localdriver
implements Driver is the facade for creation/deletion of vm's
- calls
- Provider Specific Driver Example
driver := aws.NewAWSDriver(&spi.PluginSPIImpl{})
driver := cp.NewAzureDriver(&spi.PluginSPIImpl{})
spi.PluginSPIImpl
is a struct that implements a provider specific interface that initializes a provider session.
- Local Driver
-
calls app.Run passing in the previously created
MCServer
andDriver
instances.
Machine Controller Loop
app.Run
app.Run
is the function that setups the main control loop of the machine controller server.
Summary
- app.Run(options *options.MCServer, driver driver.Driver) is the common run loop for all provider Machine Controllers.
- Creates
targetkubeconfig
andcontrolkubeconfig
of typek8s.io/client-go/rest.Config
from the target kube config path usingclientcmd.BuildConfigFromFlags
- Set fields such as
config.QPS
andconfig.Burst
in bothtargetkubeconfig
andcontrolkubeconfig
from the passed inoptions.MCServer
- Create
kubeClientControl
from thecontrolkubeconfig
using the standard client-go client factory metohd:kubernetes.NewForConfig
that returns aclient-go/kubernetes.Clientset
- Similarly create another
Clientset
namedleaderElectionClient
usingcontrolkubeconfig
- Start a go routine using the function
startHTTP
that registers a bunch of http handlers for the go profiler, prometheus metrics and the health check. - Call
createRecorder
passing thekubeClientControl
client set instance that returns a client-go/tools/record.EventRecorder- Creates a new
eventBroadcaster
of type event.EventBroadcaster - Set the logging function of the broadcaster to
klog.Infof
. - Sets the event sink using
eventBroadcaster.StartRecordingToSink
passing the event interface askubeClient.CoreV1().RESTClient()).Events("")
. Effectively events will be published remotely. - Returns the
record.EventRecorder
associated with theeventBroadcaster
usingeventBroadcaster.NewRecorder
- Creates a new
- Constructs an anonymous function assigned to
run
variable which does the following:- Initializes a
stop
receive channel. - Creates a
controlMachineClientBuilder
usingmachineclientbuilder.SimpleClientBuilder
using thecontrolkubeconfig
. - Creates a
controlCoreClientBuidler
usingcoreclientbuilder.SimpleControllerClientBuilder
wrappingcontrolkubeconfig
. - Creates
targetCoreClientBuilder
usingcoreclientbuilder.SimpleControllerClientBuilder
wrappingcontrolkubeconfig
. - Call the
app.StartControllers
function passing theoptions
,driver
,controlkubeconfig
,targetkubeconfig
,controlMachineClientBuilder
,controlCoreClientBuilder
,targetCoreClientBuilder
,recorder
andstop
channel.- // Q: if you are going to pass the controlkubeconfig and targetkubeconfig - why not create the client builders inside the startcontrollers ?
- if
app.StartcOntrollers
return an error panic and exitrun
.
- Initializes a
- use leaderelection.RunOrDie to start a leader election and pass the previously created
run
function to as the callback forOnStartedLeading
.OnStartedLeading
callback is invoked when a leaderelector client starts leading.
app.StartControllers
app.StartControllers starts all controller loops which are part of the machine controller.
func StartControllers(options *options.MCServer,
controlCoreKubeconfig *rest.Config,
targetCoreKubeconfig *rest.Config,
controlMachineClientBuilder machineclientbuilder.ClientBuilder,
controlCoreClientBuilder coreclientbuilder.ClientBuilder,
targetCoreClientBuilder coreclientbuilder.ClientBuilder,
driver driver.Driver,
recorder record.EventRecorder,
stop <-chan struct{}) error
- Calls
getAvailableResources
using thecontrolCoreClientBuilder
that returns amap[schema.GroupVersionResource]bool
assigned toavailableresources
getAvailableResources
waits till the api server is running by checking its/healthz
usingwait.PollImmediate
. keeps re-creating the client usingclientbuilder.Client
method.- then uses
client.Discovery().ServerResources
which returns returns the supported resources for all groups and versions as a slice of *metav1.APIResourceList (which encapsulates a []APIResource) and then converts that to amap[schema.GroupVersionResource]bool
`
- Creates a
controlMachineClient
usingcontrolMachineClientBuilder.ClientOrDie("machine-controller").MachineV1alpha1()
which is a client of type MachineV1alpha1Interface. This interface is a composition of MachineGetter,MachineClassesGetter, MachineDeploymentsGetter and MachineSetsGetter allowing access to CRUD interface for machines, machine classes, machine deployments and machine sets. This client targets the control cluster - ie the cluster holding the machine crd's. - creates a
controlCoreClient
(of type: kubernetes.Clientset which is the standard k8s client-go client for accessing the k8s control cluster. - creates a
targetCoreClient
(of type: kubernetes.Clientset) which is the standard k8s client-go client for accessing the target cluster - in which machines will be spawned. - obtain the target cluster k8s version using the discovery interface and preserve it in
targetKubernetesVersion
- if the
availableResources
does not contain the machine GVR, exitapp.StartControllers
with error. - creates the following informer factories:
controlMachineInformerfactory
using the generated pkg/client/informers/externalversions#NewFilteredSharedInformerFactory passing the conrol machine client, the configured min resync period and control namespace.- Create
controlCoreInformerfactory
using the client-go core informers#NewFilteredSharedInformerFactory passing in the control core client, min resync period and control namespace. - Similarly create
targetCoreInformerFactory
- Get the controller's Machine Informers Facade using
controlMachineInformerfactory.Machine().V1alpha1()
and assign tomachinesharedinformers
- Now create the
machinecontroller
using machinecontroller.NewController factory function, passing the below:- control namespace from
options.MCServer.Namespace
SafetyOptions
fromoptions.MCServer.SafetyOptions
NodeConditions
fromoptions.MCserver.NodeConditions
. (by default these would be : "KernelDeadlock,ReadonlyFilesystem,DiskPressure,NetworkUnavailable")- clients:
controlMachineClient
,controlCoreClient
,targetCoreClient
- the
driver
- Target Cluster Informers obtained from
targetCoreInformerfactory
: - Control Cluster Informers obtained from
controlCoreInformerFactory
- MachineClassInformer, MachineInformer using
machinesharedinformers.MachineClasses()
andmachinesharedinformers.Machines()
- The event recorder created earlier
targetKubernetesVersion
- control namespace from
- Start
controlMachineInformerFactory
,controlCoreInformerFactory
andtargetCoreInformerFactory
by calling SharedInformerfactory.Start passing thestop
channel. - Launches the machinecontroller.Run in new go-routine passing the stop channel.
- Block forever using a
select{}
Machine Controller Initialization
the machine controller is constructed using controller.NewController
factory function which initializes the controller
struct.
1. NewController factory func
mc is constructed using the factory function below:
func NewController(
namespace string,
controlMachineClient machineapi.MachineV1alpha1Interface,
controlCoreClient kubernetes.Interface,
targetCoreClient kubernetes.Interface,
driver driver.Driver,
pvcInformer coreinformers.PersistentVolumeClaimInformer,
pvInformer coreinformers.PersistentVolumeInformer,
secretInformer coreinformers.SecretInformer,
nodeInformer coreinformers.NodeInformer,
pdbV1beta1Informer policyv1beta1informers.PodDisruptionBudgetInformer,
pdbV1Informer policyv1informers.PodDisruptionBudgetInformer,
volumeAttachmentInformer storageinformers.VolumeAttachmentInformer,
machineClassInformer machineinformers.MachineClassInformer,
machineInformer machineinformers.MachineInformer,
recorder record.EventRecorder,
safetyOptions options.SafetyOptions,
nodeConditions string,
bootstrapTokenAuthExtraGroups string,
targetKubernetesVersion *semver.Version,
) (Controller, error)
1.1 Init Controller Struct
Create and Initialize the Controller struct initializing rate-limiting work queues for secrets: controller.secretQueue
, nodes: controller.nodeQueue
, machines: controller.machineQueue
, machineclass: controller.machineClassQueue
. Along with 2 work queues used by safety controllers: controller.machineSafetyOrphanVMsQueue
and controller.machineSafetyAPIServerQueue
Example:
controller := &controller {
//...
secretQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"),
machineQueue=workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machine"),
//...
}
1.2 Assign Listers and HasSynced funcs to controller struct
// initialize controller listers from the passed-in shared informers (8 listers)
controller.pvcLister = pvcInformer
controller.pvLister = pvinformer.Lister()
controller.machineLister = machineinformer.lister()
controller.pdbV1Lister = pdbV1Informer.Lister()
controller.pdbV1Synced = pdbV1Informer.Informer().HasSynced
// ...
// assign the HasSynced function from the passed-in shared informers
controller.pvcSynced = pvcInformer.Informer().HasSynced
controller.pvSynced = pvInformer.Informer().HasSynced
controller.machineSynced = machineInformer.Informer().HasSynced
1.3 Register Controller Event Handlers on Informers.
An informer invokes registered event handler when a k8s object changes.
Event handlers are registered using <ResourceType>Informer().AddEventhandler
function.
The controller initialization registers add//delete event handlers for secrets. add/update/delete event handlers for MachineClass, Machine and Node informers.
The event handlers generally add the object keys to the appropriate work queues which are later picked up and reconciled in processing in controller.Run
.
The work queue is used to separate the delivery of the object from its processing. resource event handler functions extract the key of the delivered object and add it to the relevant work queue for future processing. (in controller.Run
)
Example
secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.secretAdd,
DeleteFunc: controller.secretDelete,
})
1.3.1 Secret Informer Callback
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB SecretInformerAddCallback -->SecretAdd["controller.secretAdd(obj)"] -->GetSecretKey["key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)"] -->AddSecretQ["if err != nil c.secretQueue.Add(key)"] SecretInformeDeleteCallback -->SecretAdd
We must check for the DeletedFinalStateUnknown state of that secret in the cache before enqueuing its key. The DeletedFinalStateUnknown
state means that the object has been deleted but that the watch deletion event was missed while disconnected from apiserver and the controller didn't react accordingly. Hence if there is no error, we can add the key to the queue.
1.3.2 Machine Class Informer Callbacks
MachineClass Add/Delete Callback 1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB MachineClassInformerAddCallback1 --> MachineAdd["controller.machineClassToSecretAdd(obj)"] -->CastMC[" mc, ok := obj.(*v1alpha1.MachineClass) "] -->EnqueueSecret[" c.secretQueue.Add(mc.SecretRef.Namespace + '/' + mc.SecretRef.Name) c.secretQueue.Add(mc.CredentialSecretRef.Namespace + '/' + mc.CredentialSecretRef.Namespace.Name) "] MachineClassToSecretDeleteCallback1 -->MachineAdd
MachineClass Update Callback 1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB MachineClassInformerUpdateCallback1 --> MachineAdd["controller.machineClassToSecretUpdate(oldObj, newObj)"] -->CastMC[" old, ok := oldObj.(*v1alpha1.MachineClass) new, ok := newObj.(*v1alpha1.MachineClass) "] -->RefNotEqual{"old.SecretRef != new.SecretRef?"} --Yes-->EnqueueSecret[" c.secretQueue.Add(old.SecretRef.Namespace + '/' + old.SecretRef.Name) c.secretQueue.Add(new.SecretRef.Namespace + '/' + new.SecretRef.Name) "] -->CredRefNotEqual{"old.CredentialsSecretRef!= new.CredentialsSecretRef?"} --Yes-->EnqueueCredSecretRef[" c.secretQueue.Add(old.CredentialsSecretRef.Namespace + '/' + old.CredentialsSecretRef.Name) c.secretQueue.Add(new.CredentialsSecretRef.Namespace + '/' + new.CredentialsSecretRef.Name) "]
MachineClass Add/Delete/Update Callback 2
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB MachineClassInformerAddCallback2 --> MachineAdd["controller.machineClassAdd(obj)"] -->CastMC[" key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil c.machineClassQueue.Add(key) "] MachineClassInformerDeleteCallback2 -->MachineAdd MachineClassInformeUpdateCallback2 -->MachineUpdate["controller.machineClassUpdate(oldObj,obj)"] -->MachineAdd
1.3.2 Machine Informer Callbacks
Machine Add/Update/Delete Callbacks 1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB MachineAddCallback1 -->AddMachine["controller.addMachine(obj)"] -->EnqueueMachine[" key, err := cache.MetaNamespaceKeyFunc(obj) //Q: why don't we use DeletionHandlingMetaNamespaceKeyFunc here ? if err!=nil c.machineQueue.Add(key) "] MachineUpdateCallback1-->AddMachine MachineDeleteCallback1-->AddMachine
Machine Update/Delete Callbacks 2
DISCUSS THIS.
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB MachineUpdateCallback2 -->UpdateMachineToSafety["controller.updateMachineToSafety(oldObj, newObj)"] -->EnqueueSafetyQ[" newM := newObj.(*v1alpha1.Machine) if multipleVMsBackingMachineFound(newM) { c.machineSafetyOrphanVMsQueue.Add('') } "] MachineDeleteCallback2 -->DeleteMachineToSafety["deleteMachineToSafety(obj)"] -->EnqueueSafetyQ1[" c.machineSafetyOrphanVMsQueue.Add('') "]
1.3.3 Node Informer Callbacks
Node Add Callback
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB NodeaAddCallback -->InvokeAddNodeToMachine["controller.addNodeToMachine(obj)"] -->AddNodeToMachine[" node := obj.(*corev1.Node) if node.ObjectMeta.Annotations has NotManagedByMCM return; key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil return "] -->GetMachineFromNode[" machine := (use machineLister to get first machine whose 'node' label equals key) "] -->ChkMachine{" machine.Status.CurrentStatus.Phase != 'CrashLoopBackOff' && nodeConditionsHaveChanged( machine.Status.Conditions, node.Status.Conditions) ? "} --Yes-->EnqueueMachine[" mKey, err := cache.MetaNamespaceKeyFunc(obj) if err != nil return controller.machineQueue.Add(mKey) "]
Node Delete Callback
This is straightforward - it checks that the node has an associated machine and if so, enqueues the machine on the machineQueue
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TB NodeDeleteCallback -->InvokeDeleteNodeToMachine["controller.deleteNodeToMachine(obj)"] -->DeleteNodeToMachine[" key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err != nil return "] -->GetMachineFromNode[" machine := (use machineLister to get first machine whose 'node' label equals key) if err != nil return "] -->EnqueueMachine[" mKey, err := cache.MetaNamespaceKeyFunc(obj) if err != nil return controller.machineQueue.Add(mKey) "]
Node Update Callback
controller.updateNodeTomachine
is specified as UpdateFunc
registered for the nodeInformer
.
In a nutshell, it simply delegates to AddNodeTomachine(newobj)
described earlier, except if the node has the annotation machineutils.TriggerDeletionByMCM
(value: node.machine.sapcloud.io/trigger-deletion-by-mcm
). In this case it gets the machine
obj corresponding to the node and then leverages controller.controlMachineClient
to delete the machine object.
NOTE: This annotation was introduced for the user to add on the node. This gives them an indirect way to delete the machine object because they don’t have access to control plane.
Snippet shown below with error handling+logging omitted.
func (c *controller) updateNodeToMachine(oldobj, newobj interface{}) {
node := newobj.(*corev1.node)
// check for the triggerdeletionbymcm annotation on the node object
// if it is present then mark the machine object for deletion
if value, ok := node.annotations[machineutils.TriggerDeletionByMCM]; ok && value == "true" {
machine, err := c.getMachineFromnOde(node.name)
if machine.deletiontimestamp == nil {
c.controlmachineclient
.Machines(c.namespace)
.Delete(context.Background(), machine.Name, metav1.Deleteoptions{});
}
} else {
c.addnodeToMachine(newobj)
}
}
Machine Controller Run
func (c *controller) Run(workers int, stopch <-chan struct{}) {
// ...
}
1. Wait for Informer Caches to Sync
When an informer starts, it will build a cache of all resources it currently watches which is lost when the application restarts. This means that on startup, each of your handler functions will be invoked as the initial state is built. If this is not desirable, one should wait until the caches are synced before performing any updates. This can be done using the cache.WaitForCacheSync function.
if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.pvcSynced, c.pvSynced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) {
runtimeutil.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
2. Register On Prometheus
The Machine controller struct implements the prometheus.Collector interface and can therefore be then be registered on prometheus metrics registry.
prometheus.MustRegister(controller)
Collectors which are added to the registry will collect metrics to expose them via the metrics endpoint of the MCM every time when the endpoint is called.
2.1 Describe Metrics (controller.Describe)
All promethueus.Metric that are collected must first be described using a prometheus.Desc which is the immutable meta-data about a metric.
As can be seen below the machine controller sends a description of metrics.MachineCountDesc to prometheus. this is mcm_machine_items_total
which is the count of machines managed by controller.
This Describe
callback is called by prometheus.MustRegister
Doubt: we currently appear to have only have one metric for the mc ?
var MachineCountDesc = prometheus.NewDesc("mcm_machine_items_total", "Count of machines currently managed by the mcm.", nil, nil)
func (c *controller) Describe(ch chan<- *prometheus.desc) {
ch <- metrics.MachineCountDesc
}
2.1 Collect Metrics (controller.Collect)
Collect
is called by the prometheus registry when collecting
metrics. The implementation sends each collected metric via the
provided channel and returns once the last metric has been sent. the
descriptor of each sent metric is one of those returned by Describe
// Collect is method required to implement the prometheus.Collect interface.
func (c *controller) Collect(ch chan<- prometheus.Metric) {
c.CollectMachineMetrics(ch)
c.CollectMachineControllerFrozenStatus(ch)
}
2.1.1 Collect Machine Metrics
func (c *controller) CollectMachineMetrics(ch chan<- prometheus.Metric)
A [prometheus.Metric])(https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#Metric) models a sample linking data points together over time. Custom labels (with their own values) can be added to each data point
A prometheus.Gauge is a Metric that represents a single numerical value that can arbitrarily go up and down. We use a Gauge for the machine count.
A prometheus.GaugeVec is a factory for creating a set of gauges all with the same description but which have different data values for the metric labels.
Machine information about a machine managed by MCM is described on Prometheus using a prometheus.GaugeVec constructed using the factory function prometheus.NewGaugeVec.
A prometheus.Desc is the descriptor used by every Prometheus Metric. It is essentially the immutable meta-data of a Metric that includes fully qualified name of the metric, the help string and the metric label names.
We have 3 such gauge vecs for machine metrics and 1 gauge metric for the machine count as seen below.
Q: Discuss Why do we need the 3 gauge vecs ?
Example:
var MachineCountDesc = prometheus.NewDesc("mcm_machine_items_total", "Count of machines currently managed by the mcm.", nil, nil)
//MachineInfo Information of the Machines currently managed by the mcm.
var MachineInfo prometheus.GaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "mcm",
Subsystem: "machine",
Name: "info",
Help: "Information of the Machines currently managed by the mcm.",
}, []string{"name", "namespace", "createdAt",
"spec_provider_id", "spec_class_api_group", "spec_class_kind", "spec_class_name"})
// MachineStatusCondition Information of the mcm managed Machines' status conditions
var MachineStatusCondition = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: machineSubsystem,
Name: "status_condition",
Help: "Information of the mcm managed Machines' status conditions.",
}, []string{"name", "namespace", "condition"})
//MachineCSPhase Current status phase of the Machines currently managed by the mcm.
MachineCSPhase = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: machineSubsystem,
Name: "current_status_phase",
Help: "Current status phase of the Machines currently managed by the mcm.",
}, []string{"name", "namespace"})
One invokes the GaugeVec.With method passing a prometheus.Labels which is a map[string]string
to obtain a prometheus.Gauge.
- Gets the list of machines using the
machineLister
- Iterate through list of machines. Use the
MachineInfo.With
method to initialize the labels for each metric and obtain theGauge
. UseGauge.Set
to set value for metric. - Create the machine count metric
metric, err := prometheus.NewConstMetric(metrics.MachineCountDesc, prometheus.GaugeValue, float64(len(machineList)))
- Set the metric value and send the metric to the prometheus metric channel:
metric, err := prometheus.NewConstMetric(metrics.MachineCountDesc, prometheus.GaugeValue, float64(len(machineList)))
3. Create controller worker go-routines specifying reconcile functions
Finally use worker.Run
to create and runs a worker routine that just processes items in the specified queue. The worker will run until stopCh
is closed. The worker go-routine will be added to the wait group when started and marked done when finished.
Q: reconcileClusterNodeKey
seems useless ?
func (c *controller) Run(workers int, stopch <-chan struct{}) {
//...
waitGroup sync.WaitGroup
for i := 0; i < workers; i++ {
worker.Run(c.secretQueue, "ClusterSecret", worker.DefaultMaxRetries, true, c.reconcileClusterSecretKey, stopCh, &waitGroup)
worker.Run(c.machineClassQueue, "ClusterMachineClass", worker.DefaultMaxRetries, true, c.reconcileClusterMachineClassKey, stopCh, &waitGroup)
worker.Run(c.machineQueue, "ClusterMachine", worker.DefaultMaxRetries, true, c.reconcileClusterMachineKey, stopCh, &waitGroup)
worker.Run(c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", worker.DefaultMaxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, stopCh, &waitGroup)
worker.Run(c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", worker.DefaultMaxRetries, true, c.reconcileClusterMachineSafetyAPIServer, stopCh, &waitGroup)
}
<-stopch
waitGroup.wait()
}
func Run(queue workqueue.ratelimitinginterface, resourcetype string, maxretries int, forgetaftersuccess bool, reconciler func(key string) error, stopch <-chan struct{}, waitgroup *sync.waitgroup) {
waitgroup.add(1)
go func() {
wait.until(worker(queue, resourcetype, maxretries, forgetaftersuccess, reconciler), time.second, stopch)
waitgroup.done()
}()
}
3.1 worker.worker
NOTE: Puzzled that basic routine like this is NOT part of client-go lib. Its likely repeated across thosands of controllers (prob with bugs). Thankfully controller-runtime obviates the need for soemthing like this.
worker returns a function that
- de-queues items (keys) from the work
queue
. thekey
s that are obtained using workqueue.get
to be strings of the formnamespace/name
of the resource. - processes them by invoking the
reconciler(key)
function- the purpose of the
reconciler
is to compares the actual state with the desired state, and attempts to converge the two. it should then update thestatus
block of the resource. - if
reconciler
returns an error, requeue the item up tomaxretries
before giving up.
- the purpose of the
- marks items as done.
func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(key string) error) func() {
return func() {
exit := false
for !exit {
exit = func() bool {
key, quit := queue.Get()
if quit {
return true
}
defer queue.Done(key)
err := reconciler(key.(string))
if err == nil {
if forgetAfterSuccess {
queue.Forget(key)
}
return false
}
if queue.NumRequeues(key) < maxRetries {
queue.AddRateLimited(key)
return false
}
queue.Forget(key)
return false
}()
}
}
}
4. Reconciliation functions executed by worker
The controller starts worker go-routines that pop out keys from the relevant workqueue and execute the reconcile function.
See reconcile chapters