feat(scheduling): add zfs pool capacity tracking (#335)

Signed-off-by: shubham <shubham.bajpai@mayadata.io>
This commit is contained in:
Shubham Bajpai 2021-05-31 18:59:59 +05:30 committed by GitHub
parent 4fce22afb5
commit 3eb2c9e894
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 2284 additions and 7 deletions

View file

@ -22,11 +22,22 @@ import (
"strings"
"time"
k8sapi "github.com/openebs/lib-csi/pkg/client/k8s"
zfsapi "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset"
informers "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8serror "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
errors "github.com/openebs/lib-csi/pkg/common/errors"
@ -52,15 +63,69 @@ const (
type controller struct {
driver *CSIDriver
capabilities []*csi.ControllerServiceCapability
indexedLabel string
k8sNodeInformer cache.SharedIndexInformer
zfsNodeInformer cache.SharedIndexInformer
}
// NewController returns a new instance
// of CSI controller
func NewController(d *CSIDriver) csi.ControllerServer {
return &controller{
ctrl := &controller{
driver: d,
capabilities: newControllerCapabilities(),
}
if err := ctrl.init(); err != nil {
klog.Fatalf("init controller: %v", err)
}
return ctrl
}
func (cs *controller) init() error {
cfg, err := k8sapi.Config().Get()
if err != nil {
return errors.Wrapf(err, "failed to build kubeconfig")
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return errors.Wrap(err, "failed to build k8s clientset")
}
openebsClient, err := clientset.NewForConfig(cfg)
if err != nil {
return errors.Wrap(err, "failed to build openebs clientset")
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
openebsInformerfactory := informers.NewSharedInformerFactoryWithOptions(openebsClient,
0, informers.WithNamespace(zfs.OpenEBSNamespace))
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
cs.k8sNodeInformer = kubeInformerFactory.Core().V1().Nodes().Informer()
cs.zfsNodeInformer = openebsInformerfactory.Zfs().V1().ZFSNodes().Informer()
if err = cs.zfsNodeInformer.AddIndexers(map[string]cache.IndexFunc{
LabelIndexName(cs.indexedLabel): LabelIndexFunc(cs.indexedLabel),
}); err != nil {
return errors.Wrapf(err, "failed to add index on label %v", cs.indexedLabel)
}
go cs.k8sNodeInformer.Run(stopCh)
go cs.zfsNodeInformer.Run(stopCh)
// wait for all the caches to be populated.
klog.Info("waiting for k8s & zfs node informer caches to be synced")
cache.WaitForCacheSync(stopCh,
cs.k8sNodeInformer.HasSynced,
cs.zfsNodeInformer.HasSynced)
klog.Info("synced k8s & zfs node informer caches")
return nil
}
// SupportedVolumeCapabilityAccessModes contains the list of supported access
@ -784,7 +849,7 @@ func (cs *controller) ControllerPublishVolume(
}
// GetCapacity return the capacity of the
// given volume
// given node topology segment.
//
// This implements csi.ControllerServer
func (cs *controller) GetCapacity(
@ -792,7 +857,89 @@ func (cs *controller) GetCapacity(
req *csi.GetCapacityRequest,
) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
var segments map[string]string
if topology := req.GetAccessibleTopology(); topology != nil {
segments = topology.Segments
}
nodeNames, err := cs.filterNodesByTopology(segments)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
zfsNodesCache := cs.zfsNodeInformer.GetIndexer()
params := req.GetParameters()
poolParam := helpers.GetInsensitiveParameter(&params, "poolname")
var availableCapacity int64
for _, nodeName := range nodeNames {
v, exists, err := zfsNodesCache.GetByKey(zfs.OpenEBSNamespace + "/" + nodeName)
if err != nil {
klog.Warning("unexpected error after querying the zfsNode informer cache")
continue
}
if !exists {
continue
}
zfsNode := v.(*zfsapi.ZFSNode)
// rather than summing all free capacity, we are calculating maximum
// zv size that gets fit in given pool.
// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking#available-capacity-vs-maximum-volume-size &
// https://github.com/container-storage-interface/spec/issues/432 for more details
for _, zpool := range zfsNode.Pools {
if zpool.Name != poolParam {
continue
}
freeCapacity := zpool.Free.Value()
if availableCapacity < freeCapacity {
availableCapacity = freeCapacity
}
}
}
return &csi.GetCapacityResponse{
AvailableCapacity: availableCapacity,
}, nil
}
func (cs *controller) filterNodesByTopology(segments map[string]string) ([]string, error) {
nodesCache := cs.k8sNodeInformer.GetIndexer()
if len(segments) == 0 {
return nodesCache.ListKeys(), nil
}
filterNodes := func(vs []interface{}) ([]string, error) {
var names []string
selector := labels.SelectorFromSet(segments)
for _, v := range vs {
meta, err := apimeta.Accessor(v)
if err != nil {
return nil, err
}
if selector.Matches(labels.Set(meta.GetLabels())) {
names = append(names, meta.GetName())
}
}
return names, nil
}
// first see if we need to filter the informer cache by indexed label,
// so that we don't need to iterate over all the nodes for performance
// reasons in large cluster.
indexName := LabelIndexName(cs.indexedLabel)
if _, ok := nodesCache.GetIndexers()[indexName]; !ok {
// run through all the nodes in case indexer doesn't exists.
return filterNodes(nodesCache.List())
}
if segValue, ok := segments[cs.indexedLabel]; ok {
vs, err := nodesCache.ByIndex(indexName, segValue)
if err != nil {
return nil, errors.Wrapf(err, "query indexed store indexName=%v indexKey=%v",
indexName, segValue)
}
return filterNodes(vs)
}
return filterNodes(nodesCache.List())
}
// ListVolumes lists all the volumes
@ -862,6 +1009,7 @@ func newControllerCapabilities() []*csi.ControllerServiceCapability {
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
csi.ControllerServiceCapability_RPC_EXPAND_VOLUME,
csi.ControllerServiceCapability_RPC_GET_CAPACITY,
} {
capabilities = append(capabilities, fromType(cap))
}
@ -914,3 +1062,24 @@ func (cs *controller) validateVolumeCreateReq(req *csi.CreateVolumeRequest) erro
}
return nil
}
// LabelIndexName add prefix for label index.
func LabelIndexName(label string) string {
return "l:" + label
}
// LabelIndexFunc defines index values for given label.
func LabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
meta, err := apimeta.Accessor(obj)
if err != nil {
return nil, fmt.Errorf(
"k8s api object type (%T) doesn't implements metav1.Object interface: %v", obj, err)
}
var vs []string
if v, ok := meta.GetLabels()[label]; ok {
vs = append(vs, v)
}
return vs, nil
}
}