Machine Controller

The Machine Controller handles reconciliation of Machine and MachineClass objects.

The Machine Controller Entry Point for any provider is at machine-controller-manager-provider-<name>/cmd/machine-controller/main.go

MC Launch

Dev

Build

A Makefile in the root of machine-controller-manager-provider-<name> builds the provider specific machine controller for linux with CGO enabled. The make build target invokes the shell script .ci/build to do this.

CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
  -a \
  -v \
  -o ${BINARY_PATH}/rel/machine-controller \
  cmd/machine-controller/main.goo

Launch

Assuming one has initialized the variables using make download-kubeconfigs, one can then use make start target which launches the MC with flags as shown below. Most of these timeout flags are redundant since exact same values are given in machine-controller-manager/pkg/util/provider/app/options.NewMCServer

go run -mod=vendor 
    cmd/machine-controller/main.go 
    --control-kubeconfig=$(CONTROL_KUBECONFIG) 
    --target-kubeconfig=$(TARGET_KUBECONFIG) 
    --namespace=$(CONTROL_NAMESPACE) 
    --machine-creation-timeout=20m 
    --machine-drain-timeout=5m 
    --machine-health-timeout=10m 
    --machine-pv-detach-timeout=2m 
    --machine-safety-apiserver-statuscheck-timeout=30s 
    --machine-safety-apiserver-statuscheck-period=1m 
    --machine-safety-orphan-vms-period=30m 
    --leader-elect=$(LEADER_ELECT) 
    --v=3

Prod

Build

A Dockerfile builds the provider specific machine controller and launches it directly with no CLI arguments. Hence uses coded defaults

RUN CGO_ENABLED=0 GOOS=$TARGETOS GOARCH=$TARGETARCH \
      go build \
      -mod=vendor \
      -o /usr/local/bin/machine-controller \
      cmd/machine-controller/main.go
COPY --from=builder /usr/local/bin/machine-controller /machine-controller
ENTRYPOINT ["/machine-controller"]

The machine-controller-manager deployment usually launches both the MC in a Pod with following arguments

./machine-controller
         --control-kubeconfig=inClusterConfig
         --machine-creation-timeout=20m
         --machine-drain-timeout=2h
         --machine-health-timeout=10m
         --namespace=shoot--i034796--tre
         --port=10259
         --target-kubeconfig=/var/run/secrets/gardener.cloud/shoot/generic-kubeconfig/kubeconfig
         --v=3

Launch Flow

%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%
flowchart TB

Begin(("cmd/
machine-controller/
main.go"))
-->NewMCServer["mc=options.NewMCServer"]
-->AddFlaogs["mc.AddFlags(pflag.CommandLine)"]
-->LogOptions["options := k8s.io/component/base/logs.NewOptions()
	options.AddFlags(pflag.CommandLine)"]
-->InitFlags["flag.InitFlags"]
InitFlags--local-->NewLocalDriver["
	driver, err := local.NewDriver(s.ControlKubeconfig)
	if err exit
"]
InitFlags--aws-->NewPlatformDriver["
	driver := aws.NewAWSDriver(&spi.PluginSPIImpl{}))
	OR
	driver := cp.NewAzureDriver(&spi.PluginSPIImpl{})
	//etc
"]

NewLocalDriver-->AppRun["
	err := app.Run(mc, driver)
"]
NewPlatformDriver-->AppRun
AppRun-->End(("if err != nil 
os.Exit(1)"))

Summary

  1. Creates machine-controller-manager/pkg/util/provider/app/options.MCServer using options.NewMCServer which is the main context object for the machinecontroller that embeds a options.MachineControllerConfiguration.

    options.NewMCServer initializes options.MCServer struct with default values for

    • Port: 10258,
    • Namespace: default,
    • ConcurrentNodeSyncs: 50: number of worker go-routines that are used to process items from a work queue. See Worker below
    • NodeConditions: "KernelDeadLock,ReadonlyFilesystem,DiskPressure,NetworkUnavailable" (failure node conditions that indicate that a machine is un-healthy)
    • MinResyncPeriod: 12 hours, KubeAPIQPS: 20, KubeAPIBurst:30: config params for k8s clients. See rest.Config
  2. calls MCServer.AddFlags which defines all parsing flags for the machine controller into fields of MCServer instance created in the last step.

  3. calls k8s.io/component-base/logs.NewOptions and then options.AddFlags for logging options. TODO: Should get rid of this when moving to logr.)

  4. Driver initialization code varies according to the provider type.

    • Local Driver
      • calls NewDriver with control kube config that creates a controller runtime client (sigs.k8s.io/controller-runtime/pkg/client) which then calls pkg/local/driver.NewDriver passing the controlloer-runtime client which constructs a localdriver encapsulating the passed in client.
      • driver := local.NewDriver(c)
      • the localdriver implements Driver is the facade for creation/deletion of vm's
    • Provider Specific Driver Example
      • driver := aws.NewAWSDriver(&spi.PluginSPIImpl{})
      • driver := cp.NewAzureDriver(&spi.PluginSPIImpl{})
      • spi.PluginSPIImpl is a struct that implements a provider specific interface that initializes a provider session.
  5. calls app.Run passing in the previously created MCServer and Driver instances.

Machine Controller Loop

app.Run

app.Run is the function that setups the main control loop of the machine controller server.

Summary

  1. app.Run(options *options.MCServer, driver driver.Driver) is the common run loop for all provider Machine Controllers.
  2. Creates targetkubeconfig and controlkubeconfig of type k8s.io/client-go/rest.Config from the target kube config path using clientcmd.BuildConfigFromFlags
  3. Set fields such as config.QPS and config.Burst in both targetkubeconfig and controlkubeconfig from the passed in options.MCServer
  4. Create kubeClientControl from the controlkubeconfig using the standard client-go client factory metohd: kubernetes.NewForConfig that returns a client-go/kubernetes.Clientset
  5. Similarly create another Clientset named leaderElectionClient using controlkubeconfig
  6. Start a go routine using the function startHTTP that registers a bunch of http handlers for the go profiler, prometheus metrics and the health check.
  7. Call createRecorder passing the kubeClientControl client set instance that returns a client-go/tools/record.EventRecorder
    1. Creates a new eventBroadcaster of type event.EventBroadcaster
    2. Set the logging function of the broadcaster to klog.Infof.
    3. Sets the event sink using eventBroadcaster.StartRecordingToSink passing the event interface as kubeClient.CoreV1().RESTClient()).Events(""). Effectively events will be published remotely.
    4. Returns the record.EventRecorder associated with the eventBroadcaster using eventBroadcaster.NewRecorder
  8. Constructs an anonymous function assigned to run variable which does the following:
    1. Initializes a stop receive channel.
    2. Creates a controlMachineClientBuilder using machineclientbuilder.SimpleClientBuilder using the controlkubeconfig.
    3. Creates a controlCoreClientBuidler using coreclientbuilder.SimpleControllerClientBuilder wrapping controlkubeconfig.
    4. Creates targetCoreClientBuilder using coreclientbuilder.SimpleControllerClientBuilder wrapping controlkubeconfig.
    5. Call the app.StartControllers function passing the options, driver, controlkubeconfig, targetkubeconfig, controlMachineClientBuilder, controlCoreClientBuilder, targetCoreClientBuilder, recorder and stop channel.
      • // Q: if you are going to pass the controlkubeconfig and targetkubeconfig - why not create the client builders inside the startcontrollers ?
    6. if app.StartcOntrollers return an error panic and exit run.
  9. use leaderelection.RunOrDie to start a leader election and pass the previously created run function to as the callback for OnStartedLeading. OnStartedLeading callback is invoked when a leaderelector client starts leading.

app.StartControllers

app.StartControllers starts all controller loops which are part of the machine controller.

func StartControllers(options *options.MCServer,
	controlCoreKubeconfig *rest.Config,
	targetCoreKubeconfig *rest.Config,
	controlMachineClientBuilder machineclientbuilder.ClientBuilder,
	controlCoreClientBuilder coreclientbuilder.ClientBuilder,
	targetCoreClientBuilder coreclientbuilder.ClientBuilder,
	driver driver.Driver,
	recorder record.EventRecorder,
	stop <-chan struct{}) error
  1. Calls getAvailableResources using the controlCoreClientBuilder that returns a map[schema.GroupVersionResource]bool assigned to availableresources
    • getAvailableResources waits till the api server is running by checking its /healthz using wait.PollImmediate. keeps re-creating the client using clientbuilder.Client method.
    • then uses client.Discovery().ServerResources which returns returns the supported resources for all groups and versions as a slice of *metav1.APIResourceList (which encapsulates a []APIResource) and then converts that to a map[schema.GroupVersionResource]bool `
  2. Creates a controlMachineClient using controlMachineClientBuilder.ClientOrDie("machine-controller").MachineV1alpha1() which is a client of type MachineV1alpha1Interface. This interface is a composition of MachineGetter,MachineClassesGetter, MachineDeploymentsGetter and MachineSetsGetter allowing access to CRUD interface for machines, machine classes, machine deployments and machine sets. This client targets the control cluster - ie the cluster holding the machine crd's.
  3. creates a controlCoreClient (of type: kubernetes.Clientset which is the standard k8s client-go client for accessing the k8s control cluster.
  4. creates a targetCoreClient (of type: kubernetes.Clientset) which is the standard k8s client-go client for accessing the target cluster - in which machines will be spawned.
  5. obtain the target cluster k8s version using the discovery interface and preserve it in targetKubernetesVersion
  6. if the availableResources does not contain the machine GVR, exit app.StartControllers with error.
  7. creates the following informer factories:
  1. Now create the machinecontroller using machinecontroller.NewController factory function, passing the below:
  2. Start controlMachineInformerFactory, controlCoreInformerFactory and targetCoreInformerFactory by calling SharedInformerfactory.Start passing the stop channel.
  3. Launches the machinecontroller.Run in new go-routine passing the stop channel.
  4. Block forever using a select{}

Machine Controller Initialization

the machine controller is constructed using controller.NewController factory function which initializes the controller struct.

1. NewController factory func

mc is constructed using the factory function below:

func NewController(
	namespace string,
	controlMachineClient machineapi.MachineV1alpha1Interface,
	controlCoreClient kubernetes.Interface,
	targetCoreClient kubernetes.Interface,
	driver driver.Driver,
	pvcInformer coreinformers.PersistentVolumeClaimInformer,
	pvInformer coreinformers.PersistentVolumeInformer,
	secretInformer coreinformers.SecretInformer,
	nodeInformer coreinformers.NodeInformer,
	pdbV1beta1Informer policyv1beta1informers.PodDisruptionBudgetInformer,
	pdbV1Informer policyv1informers.PodDisruptionBudgetInformer,
	volumeAttachmentInformer storageinformers.VolumeAttachmentInformer,
	machineClassInformer machineinformers.MachineClassInformer,
	machineInformer machineinformers.MachineInformer,
	recorder record.EventRecorder,
	safetyOptions options.SafetyOptions,
	nodeConditions string,
	bootstrapTokenAuthExtraGroups string,
	targetKubernetesVersion *semver.Version,
) (Controller, error) 

1.1 Init Controller Struct

Create and Initialize the Controller struct initializing rate-limiting work queues for secrets: controller.secretQueue, nodes: controller.nodeQueue, machines: controller.machineQueue, machineclass: controller.machineClassQueue. Along with 2 work queues used by safety controllers: controller.machineSafetyOrphanVMsQueue and controller.machineSafetyAPIServerQueue

Example:

controller := &controller {
	//...
 secretQueue:                   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "secret"),
 machineQueue=workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "machine"),
	//...
}

1.2 Assign Listers and HasSynced funcs to controller struct

	// initialize controller listers from the passed-in shared informers (8 listers)
	controller.pvcLister = pvcInformer
	controller.pvLister = pvinformer.Lister()
    controller.machineLister = machineinformer.lister()

	controller.pdbV1Lister = pdbV1Informer.Lister()
	controller.pdbV1Synced = pdbV1Informer.Informer().HasSynced

	// ...

	// assign the HasSynced function from the passed-in shared informers
	controller.pvcSynced = pvcInformer.Informer().HasSynced
	controller.pvSynced = pvInformer.Informer().HasSynced
    controller.machineSynced = machineInformer.Informer().HasSynced

1.3 Register Controller Event Handlers on Informers.

An informer invokes registered event handler when a k8s object changes.

Event handlers are registered using <ResourceType>Informer().AddEventhandler function.

The controller initialization registers add//delete event handlers for secrets. add/update/delete event handlers for MachineClass, Machine and Node informers.

The event handlers generally add the object keys to the appropriate work queues which are later picked up and reconciled in processing in controller.Run.

The work queue is used to separate the delivery of the object from its processing. resource event handler functions extract the key of the delivered object and add it to the relevant work queue for future processing. (in controller.Run)

Example

secretInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
	AddFunc:    controller.secretAdd,
	DeleteFunc: controller.secretDelete,
})
1.3.1 Secret Informer Callback
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB

SecretInformerAddCallback
-->SecretAdd["controller.secretAdd(obj)"]
-->GetSecretKey["key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)"]
-->AddSecretQ["if err != nil c.secretQueue.Add(key)"]

SecretInformeDeleteCallback
-->SecretAdd

We must check for the DeletedFinalStateUnknown state of that secret in the cache before enqueuing its key. The DeletedFinalStateUnknown state means that the object has been deleted but that the watch deletion event was missed while disconnected from apiserver and the controller didn't react accordingly. Hence if there is no error, we can add the key to the queue.

1.3.2 Machine Class Informer Callbacks
MachineClass Add/Delete Callback 1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
MachineClassInformerAddCallback1
-->
MachineAdd["controller.machineClassToSecretAdd(obj)"]
-->CastMC["
	mc, ok := obj.(*v1alpha1.MachineClass)
"]
-->EnqueueSecret["
	c.secretQueue.Add(mc.SecretRef.Namespace + '/' + 
	mc.SecretRef.Name)
	c.secretQueue.Add(mc.CredentialSecretRef.Namespace + '/' + mc.CredentialSecretRef.Namespace.Name)
"]

MachineClassToSecretDeleteCallback1
-->MachineAdd
MachineClass Update Callback 1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
MachineClassInformerUpdateCallback1
-->
MachineAdd["controller.machineClassToSecretUpdate(oldObj, newObj)"]
-->CastMC["
	old, ok := oldObj.(*v1alpha1.MachineClass)
	new, ok := newObj.(*v1alpha1.MachineClass)
"]
-->RefNotEqual{"old.SecretRef != 
	new.SecretRef?"}
--Yes-->EnqueueSecret["
	c.secretQueue.Add(old.SecretRef.Namespace + '/' + old.SecretRef.Name)
	c.secretQueue.Add(new.SecretRef.Namespace + '/' + new.SecretRef.Name)
"]
-->CredRefNotEqual{"old.CredentialsSecretRef!=
new.CredentialsSecretRef?"}
--Yes-->EnqueueCredSecretRef["
c.secretQueue.Add(old.CredentialsSecretRef.Namespace + '/' + old.CredentialsSecretRef.Name)
c.secretQueue.Add(new.CredentialsSecretRef.Namespace + '/' + new.CredentialsSecretRef.Name)
"]
MachineClass Add/Delete/Update Callback 2
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
MachineClassInformerAddCallback2
-->
MachineAdd["controller.machineClassAdd(obj)"]
-->CastMC["
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil c.machineClassQueue.Add(key)
"]
MachineClassInformerDeleteCallback2
-->MachineAdd

MachineClassInformeUpdateCallback2
-->MachineUpdate["controller.machineClassUpdate(oldObj,obj)"]
-->MachineAdd
1.3.2 Machine Informer Callbacks
Machine Add/Update/Delete Callbacks 1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
MachineAddCallback1
-->AddMachine["controller.addMachine(obj)"]
-->EnqueueMachine["
	key, err := cache.MetaNamespaceKeyFunc(obj)
	//Q: why don't we use DeletionHandlingMetaNamespaceKeyFunc here ?
	if err!=nil c.machineQueue.Add(key)
"]
MachineUpdateCallback1-->AddMachine
MachineDeleteCallback1-->AddMachine
Machine Update/Delete Callbacks 2

DISCUSS THIS.

%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
MachineUpdateCallback2
-->UpdateMachineToSafety["controller.updateMachineToSafety(oldObj, newObj)"]
-->EnqueueSafetyQ["
	newM := newObj.(*v1alpha1.Machine)
	if multipleVMsBackingMachineFound(newM) {
		c.machineSafetyOrphanVMsQueue.Add('')
	}
"]
MachineDeleteCallback2
-->DeleteMachineToSafety["deleteMachineToSafety(obj)"]
-->EnqueueSafetyQ1["
	c.machineSafetyOrphanVMsQueue.Add('')
"]
1.3.3 Node Informer Callbacks
Node Add Callback
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
NodeaAddCallback
-->InvokeAddNodeToMachine["controller.addNodeToMachine(obj)"]
-->AddNodeToMachine["
	node := obj.(*corev1.Node)
	if node.ObjectMeta.Annotations has NotManagedByMCM return;
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil return
"]
-->GetMachineFromNode["
	machine := (use machineLister to get first machine whose 'node' label equals key)
"]
-->ChkMachine{"
machine.Status.CurrentStatus.Phase != 'CrashLoopBackOff'
&&
nodeConditionsHaveChanged(
  machine.Status.Conditions, 
  node.Status.Conditions) ?
"}
--Yes-->EnqueueMachine["
	mKey, err := cache.MetaNamespaceKeyFunc(obj)
	if err != nil return
	controller.machineQueue.Add(mKey)
"]

Node Delete Callback

This is straightforward - it checks that the node has an associated machine and if so, enqueues the machine on the machineQueue

%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%

flowchart TB
NodeDeleteCallback
-->InvokeDeleteNodeToMachine["controller.deleteNodeToMachine(obj)"]
-->DeleteNodeToMachine["
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
	if err != nil return
"]
-->GetMachineFromNode["
	machine := (use machineLister to get first machine whose 'node' label equals key)
	if err != nil return
"]
-->EnqueueMachine["
	mKey, err := cache.MetaNamespaceKeyFunc(obj)
	if err != nil return
	controller.machineQueue.Add(mKey)
"]
Node Update Callback

controller.updateNodeTomachine is specified as UpdateFunc registered for the nodeInformer.

In a nutshell, it simply delegates to AddNodeTomachine(newobj) described earlier, except if the node has the annotation machineutils.TriggerDeletionByMCM (value: node.machine.sapcloud.io/trigger-deletion-by-mcm). In this case it gets the machine obj corresponding to the node and then leverages controller.controlMachineClient to delete the machine object.

NOTE: This annotation was introduced for the user to add on the node. This gives them an indirect way to delete the machine object because they don’t have access to control plane.

Snippet shown below with error handling+logging omitted.

func (c *controller) updateNodeToMachine(oldobj, newobj interface{}) {
	node := newobj.(*corev1.node)
	// check for the triggerdeletionbymcm annotation on the node object
	// if it is present then mark the machine object for deletion
	if value, ok := node.annotations[machineutils.TriggerDeletionByMCM]; ok && value == "true" {
		machine, err := c.getMachineFromnOde(node.name)
		if machine.deletiontimestamp == nil {
			c.controlmachineclient
			.Machines(c.namespace)
			.Delete(context.Background(), machine.Name, metav1.Deleteoptions{});		
		} 
	}  else {
		c.addnodeToMachine(newobj)
	}
}

Machine Controller Run

func (c *controller) Run(workers int, stopch <-chan struct{}) {
	// ...
}

1. Wait for Informer Caches to Sync

When an informer starts, it will build a cache of all resources it currently watches which is lost when the application restarts. This means that on startup, each of your handler functions will be invoked as the initial state is built. If this is not desirable, one should wait until the caches are synced before performing any updates. This can be done using the cache.WaitForCacheSync function.

if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.pvcSynced, c.pvSynced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) {
	runtimeutil.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
	return
}

2. Register On Prometheus

The Machine controller struct implements the prometheus.Collector interface and can therefore be then be registered on prometheus metrics registry.

prometheus.MustRegister(controller)

Collectors which are added to the registry will collect metrics to expose them via the metrics endpoint of the MCM every time when the endpoint is called.

2.1 Describe Metrics (controller.Describe)

All promethueus.Metric that are collected must first be described using a prometheus.Desc which is the immutable meta-data about a metric.

As can be seen below the machine controller sends a description of metrics.MachineCountDesc to prometheus. this is mcm_machine_items_total which is the count of machines managed by controller.

This Describe callback is called by prometheus.MustRegister

Doubt: we currently appear to have only have one metric for the mc ?

var	MachineCountDesc = prometheus.NewDesc("mcm_machine_items_total", "Count of machines currently managed by the mcm.", nil, nil)

func (c *controller) Describe(ch chan<- *prometheus.desc) {
	ch <- metrics.MachineCountDesc
}
2.1 Collect Metrics (controller.Collect)

Collect is called by the prometheus registry when collecting metrics. The implementation sends each collected metric via the provided channel and returns once the last metric has been sent. the descriptor of each sent metric is one of those returned by Describe

// Collect is method required to implement the prometheus.Collect interface.
func (c *controller) Collect(ch chan<- prometheus.Metric) {
	c.CollectMachineMetrics(ch)
	c.CollectMachineControllerFrozenStatus(ch)
}
2.1.1 Collect Machine Metrics
func (c *controller) CollectMachineMetrics(ch chan<- prometheus.Metric) 

A [prometheus.Metric])(https://pkg.go.dev/github.com/prometheus/client_golang/prometheus#Metric) models a sample linking data points together over time. Custom labels (with their own values) can be added to each data point

A prometheus.Gauge is a Metric that represents a single numerical value that can arbitrarily go up and down. We use a Gauge for the machine count.

A prometheus.GaugeVec is a factory for creating a set of gauges all with the same description but which have different data values for the metric labels.

Machine information about a machine managed by MCM is described on Prometheus using a prometheus.GaugeVec constructed using the factory function prometheus.NewGaugeVec.

A prometheus.Desc is the descriptor used by every Prometheus Metric. It is essentially the immutable meta-data of a Metric that includes fully qualified name of the metric, the help string and the metric label names.

We have 3 such gauge vecs for machine metrics and 1 gauge metric for the machine count as seen below.

Q: Discuss Why do we need the 3 gauge vecs ?

Example:


var	MachineCountDesc = prometheus.NewDesc("mcm_machine_items_total", "Count of machines currently managed by the mcm.", nil, nil)

//MachineInfo Information of the Machines currently managed by the mcm.
var MachineInfo prometheus.GaugeVec = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: "mcm",
		Subsystem: "machine",
		Name:      "info",
		Help:      "Information of the Machines currently managed by the mcm.",
	}, []string{"name", "namespace", "createdAt",
		"spec_provider_id", "spec_class_api_group", "spec_class_kind", "spec_class_name"})

// MachineStatusCondition Information of the mcm managed Machines' status conditions
var	MachineStatusCondition = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: namespace,
		Subsystem: machineSubsystem,
		Name:      "status_condition",
		Help:      "Information of the mcm managed Machines' status conditions.",
	}, []string{"name", "namespace", "condition"})

//MachineCSPhase Current status phase of the Machines currently managed by the mcm.
	MachineCSPhase = prometheus.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: namespace,
		Subsystem: machineSubsystem,
		Name:      "current_status_phase",
		Help:      "Current status phase of the Machines currently managed by the mcm.",
	}, []string{"name", "namespace"})

One invokes the GaugeVec.With method passing a prometheus.Labels which is a map[string]string to obtain a prometheus.Gauge.

  1. Gets the list of machines using the machineLister
  2. Iterate through list of machines. Use the MachineInfo.With method to initialize the labels for each metric and obtain the Gauge. Use Gauge.Set to set value for metric.
  3. Create the machine count metric
	metric, err := prometheus.NewConstMetric(metrics.MachineCountDesc, prometheus.GaugeValue, float64(len(machineList)))
  1. Set the metric value and send the metric to the prometheus metric channel:
metric, err := prometheus.NewConstMetric(metrics.MachineCountDesc, prometheus.GaugeValue, float64(len(machineList)))

3. Create controller worker go-routines specifying reconcile functions

Finally use worker.Run to create and runs a worker routine that just processes items in the specified queue. The worker will run until stopCh is closed. The worker go-routine will be added to the wait group when started and marked done when finished.

Q: reconcileClusterNodeKey seems useless ?

func (c *controller) Run(workers int, stopch <-chan struct{}) {
	//...
waitGroup sync.WaitGroup
for i := 0; i < workers; i++ {
	worker.Run(c.secretQueue, "ClusterSecret", worker.DefaultMaxRetries, true, c.reconcileClusterSecretKey, stopCh, &waitGroup)
	worker.Run(c.machineClassQueue, "ClusterMachineClass", worker.DefaultMaxRetries, true, c.reconcileClusterMachineClassKey, stopCh, &waitGroup)
	worker.Run(c.machineQueue, "ClusterMachine", worker.DefaultMaxRetries, true, c.reconcileClusterMachineKey, stopCh, &waitGroup)
	worker.Run(c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", worker.DefaultMaxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, stopCh, &waitGroup)
	worker.Run(c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", worker.DefaultMaxRetries, true, c.reconcileClusterMachineSafetyAPIServer, stopCh, &waitGroup)
}
<-stopch
waitGroup.wait()
}

func Run(queue workqueue.ratelimitinginterface, resourcetype string, maxretries int, forgetaftersuccess bool, reconciler func(key string) error, stopch <-chan struct{}, waitgroup *sync.waitgroup) {
	waitgroup.add(1)
	go func() {
		wait.until(worker(queue, resourcetype, maxretries, forgetaftersuccess, reconciler), time.second, stopch)
		waitgroup.done()
	}()
}
3.1 worker.worker

NOTE: Puzzled that basic routine like this is NOT part of client-go lib. Its likely repeated across thosands of controllers (prob with bugs). Thankfully controller-runtime obviates the need for soemthing like this.

worker returns a function that

  1. de-queues items (keys) from the work queue. the keys that are obtained using work queue.get to be strings of the form namespace/name of the resource.
  2. processes them by invoking the reconciler(key) function
    1. the purpose of the reconciler is to compares the actual state with the desired state, and attempts to converge the two. it should then update the status block of the resource.
    2. if reconciler returns an error, requeue the item up to maxretries before giving up.
  3. marks items as done.
func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(key string) error) func() {
	return func() {
		exit := false
		for !exit {
			exit = func() bool {
				key, quit := queue.Get()
				if quit {
					return true
				}
				defer queue.Done(key)

				err := reconciler(key.(string))
				if err == nil {
					if forgetAfterSuccess {
						queue.Forget(key)
					}
					return false
				}

				if queue.NumRequeues(key) < maxRetries {
					queue.AddRateLimited(key)
					return false
				}

				queue.Forget(key)
				return false
			}()
		}
	}
}

4. Reconciliation functions executed by worker

The controller starts worker go-routines that pop out keys from the relevant workqueue and execute the reconcile function.

See reconcile chapters