mirror of
https://github.com/TECHNOFAB11/zfs-localpv.git
synced 2025-12-12 06:20:11 +01:00
feat(provision): try volume creation on all the nodes
Currently controller picks one node and the node agent keeps on trying to create the volume on that node. There might not be enough space available on that node to create the volume. The controller can try on all the nodes sequentially and fail the request if volume creation fails on all the nodes which satisfies the topology contraints. Signed-off-by: Pawan <pawan@mayadata.io>
This commit is contained in:
parent
8cc56377bd
commit
04f7635b6f
8 changed files with 128 additions and 80 deletions
|
|
@ -186,6 +186,6 @@ type VolStatus struct {
|
|||
// The state "Pending" means that the volume creation request has not
|
||||
// processed yet. The state "Ready" means that the volume has been created
|
||||
// and it is ready for the use.
|
||||
// +kubebuilder:validation:Enum=Pending;Ready
|
||||
// +kubebuilder:validation:Enum=Pending;Ready;Failed
|
||||
State string `json:"state,omitempty"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,23 +101,6 @@ func getRoundedCapacity(size int64) int64 {
|
|||
return ((size + Mi - 1) / Mi) * Mi
|
||||
}
|
||||
|
||||
func waitForReadyVolume(volname string) error {
|
||||
for true {
|
||||
vol, err := zfs.GetZFSVolume(volname)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Internal,
|
||||
"zfs: wait failed, not able to get the volume %s %s", volname, err.Error())
|
||||
}
|
||||
|
||||
switch vol.Status.State {
|
||||
case zfs.ZFSStatusReady:
|
||||
return nil
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func waitForVolDestroy(volname string) error {
|
||||
for true {
|
||||
_, err := zfs.GetZFSVolume(volname)
|
||||
|
|
@ -129,6 +112,7 @@ func waitForVolDestroy(volname string) error {
|
|||
"zfs: destroy wait failed, not able to get the volume %s %s", volname, err.Error())
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
klog.Infof("waiting for volume to be destroyed %s", volname)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -151,7 +135,7 @@ func waitForReadySnapshot(snapname string) error {
|
|||
}
|
||||
|
||||
// CreateZFSVolume create new zfs volume from csi volume request
|
||||
func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
|
||||
func CreateZFSVolume(ctx context.Context, req *csi.CreateVolumeRequest) (string, error) {
|
||||
volName := strings.ToLower(req.GetName())
|
||||
size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)
|
||||
|
||||
|
|
@ -191,6 +175,10 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
|
|||
return "", status.Errorf(codes.AlreadyExists,
|
||||
"volume %s already present", volName)
|
||||
}
|
||||
if vol.Status.State != zfs.ZFSStatusReady {
|
||||
return "", status.Errorf(codes.Aborted,
|
||||
"volume %s request already pending", volName)
|
||||
}
|
||||
return vol.Spec.OwnerNodeID, nil
|
||||
}
|
||||
}
|
||||
|
|
@ -200,21 +188,19 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
|
|||
return "", status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
|
||||
}
|
||||
|
||||
// run the scheduler get the preferred nodelist
|
||||
var selected string
|
||||
nodelist := schd.Scheduler(req, nmap)
|
||||
if len(nodelist) != 0 {
|
||||
selected = nodelist[0]
|
||||
}
|
||||
if len(selected) == 0 {
|
||||
var prfList []string
|
||||
|
||||
if node, ok := parameters["node"]; ok {
|
||||
// (hack): CSI Sanity test does not pass topology information
|
||||
selected = parameters["node"]
|
||||
if len(selected) == 0 {
|
||||
return "", status.Error(codes.Internal, "scheduler failed, not able to select a node to create the PV")
|
||||
}
|
||||
prfList = append(prfList, node)
|
||||
} else {
|
||||
// run the scheduler
|
||||
prfList = schd.Scheduler(req, nmap)
|
||||
}
|
||||
|
||||
klog.Infof("scheduled the volume %s/%s on node %s", pool, volName, selected)
|
||||
if len(prfList) == 0 {
|
||||
return "", status.Error(codes.Internal, "scheduler failed, node list is empty for creating the PV")
|
||||
}
|
||||
|
||||
volObj, err := volbuilder.NewBuilder().
|
||||
WithName(volName).
|
||||
|
|
@ -227,7 +213,6 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
|
|||
WithKeyFormat(kf).
|
||||
WithKeyLocation(kl).
|
||||
WithThinProv(tp).
|
||||
WithOwnerNode(selected).
|
||||
WithVolumeType(vtype).
|
||||
WithVolumeStatus(zfs.ZFSStatusPending).
|
||||
WithFsType(fstype).
|
||||
|
|
@ -238,17 +223,36 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
|
|||
return "", status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
err = zfs.ProvisionVolume(volObj)
|
||||
if err != nil {
|
||||
return "", status.Errorf(codes.Internal,
|
||||
"not able to provision the volume %s", err.Error())
|
||||
klog.Infof("zfs: trying volume creation %s/%s on node %s", pool, volName, prfList)
|
||||
|
||||
// try volume creation sequentially on all nodes
|
||||
for _, node := range prfList {
|
||||
vol, _ := volbuilder.BuildFrom(volObj).WithOwnerNode(node).WithVolumeStatus(zfs.ZFSStatusPending).Build()
|
||||
|
||||
timeout := false
|
||||
|
||||
timeout, err = zfs.ProvisionVolume(ctx, vol)
|
||||
if err == nil {
|
||||
return node, nil
|
||||
}
|
||||
|
||||
// if timeout reached, return the error and let csi retry the volume creation
|
||||
if timeout {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return selected, nil
|
||||
if err != nil {
|
||||
// volume provisioning failed, delete the zfs volume resource
|
||||
zfs.DeleteVolume(volName) // ignore error
|
||||
}
|
||||
|
||||
return "", status.Errorf(codes.Internal,
|
||||
"not able to provision the volume, nodes %v, err : %s", prfList, err.Error())
|
||||
}
|
||||
|
||||
// CreateVolClone creates the clone from a volume
|
||||
func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error) {
|
||||
func CreateVolClone(ctx context.Context, req *csi.CreateVolumeRequest, srcVol string) (string, error) {
|
||||
volName := strings.ToLower(req.GetName())
|
||||
parameters := req.GetParameters()
|
||||
// lower case keys, cf CreateZFSVolume()
|
||||
|
|
@ -286,17 +290,17 @@ func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error)
|
|||
// use the snapshot name same as new volname
|
||||
volObj.Spec.SnapName = vol.Name + "@" + volName
|
||||
|
||||
err = zfs.ProvisionVolume(volObj)
|
||||
_, err = zfs.ProvisionVolume(ctx, volObj)
|
||||
if err != nil {
|
||||
return "", status.Errorf(codes.Internal,
|
||||
"clone: not able to provision the volume %s", err.Error())
|
||||
"clone: not able to provision the volume err : %s", err.Error())
|
||||
}
|
||||
|
||||
return selected, nil
|
||||
}
|
||||
|
||||
// CreateSnapClone creates the clone from a snapshot
|
||||
func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
|
||||
func CreateSnapClone(ctx context.Context, req *csi.CreateVolumeRequest, snapshot string) (string, error) {
|
||||
volName := strings.ToLower(req.GetName())
|
||||
parameters := req.GetParameters()
|
||||
// lower case keys, cf CreateZFSVolume()
|
||||
|
|
@ -339,10 +343,10 @@ func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, err
|
|||
volObj.Spec = snap.Spec
|
||||
volObj.Spec.SnapName = strings.ToLower(snapshot)
|
||||
|
||||
err = zfs.ProvisionVolume(volObj)
|
||||
_, err = zfs.ProvisionVolume(ctx, volObj)
|
||||
if err != nil {
|
||||
return "", status.Errorf(codes.Internal,
|
||||
"not able to provision the clone volume %s", err.Error())
|
||||
"not able to provision the clone volume err : %s", err.Error())
|
||||
}
|
||||
|
||||
return selected, nil
|
||||
|
|
@ -372,23 +376,19 @@ func (cs *controller) CreateVolume(
|
|||
if contentSource != nil && contentSource.GetSnapshot() != nil {
|
||||
snapshotID := contentSource.GetSnapshot().GetSnapshotId()
|
||||
|
||||
selected, err = CreateSnapClone(req, snapshotID)
|
||||
selected, err = CreateSnapClone(ctx, req, snapshotID)
|
||||
} else if contentSource != nil && contentSource.GetVolume() != nil {
|
||||
srcVol := contentSource.GetVolume().GetVolumeId()
|
||||
selected, err = CreateVolClone(req, srcVol)
|
||||
selected, err = CreateVolClone(ctx, req, srcVol)
|
||||
} else {
|
||||
selected, err = CreateZFSVolume(req)
|
||||
selected, err = CreateZFSVolume(ctx, req)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if _, ok := parameters["wait"]; ok {
|
||||
if err := waitForReadyVolume(volName); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
klog.Infof("created the volume %s/%s on node %s", pool, volName, selected)
|
||||
|
||||
sendEventOrIgnore(pvcName, volName, strconv.FormatInt(int64(size), 10), "zfs-localpv", analytics.VolumeProvision)
|
||||
|
||||
|
|
@ -411,11 +411,7 @@ func (cs *controller) DeleteVolume(
|
|||
|
||||
klog.Infof("received request to delete volume {%s}", req.VolumeId)
|
||||
|
||||
var (
|
||||
err error
|
||||
)
|
||||
|
||||
if err = cs.validateDeleteVolumeReq(req); err != nil {
|
||||
if err := cs.validateDeleteVolumeReq(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
@ -438,6 +434,11 @@ func (cs *controller) DeleteVolume(
|
|||
)
|
||||
}
|
||||
|
||||
// if volume is not ready, create volume will delete it
|
||||
if vol.Status.State != zfs.ZFSStatusReady {
|
||||
return nil, status.Error(codes.Internal, "can not delete, volume creation is in progress")
|
||||
}
|
||||
|
||||
// Delete the corresponding ZV CR
|
||||
err = zfs.DeleteVolume(volumeID)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -95,7 +95,9 @@ func (c *ZVController) syncZV(zv *apis.ZFSVolume) error {
|
|||
err = zfs.CreateVolume(zv)
|
||||
}
|
||||
if err == nil {
|
||||
err = zfs.UpdateZvolInfo(zv)
|
||||
err = zfs.UpdateZvolInfo(zv, zfs.ZFSStatusReady)
|
||||
} else {
|
||||
err = zfs.UpdateZvolInfo(zv, zfs.ZFSStatusFailed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -132,7 +134,8 @@ func (c *ZVController) updateZV(oldObj, newObj interface{}) {
|
|||
|
||||
oldZV, _ := oldObj.(*apis.ZFSVolume)
|
||||
if zfs.PropertyChanged(oldZV, newZV) ||
|
||||
c.isDeletionCandidate(newZV) {
|
||||
c.isDeletionCandidate(newZV) ||
|
||||
newZV.Status.State == zfs.ZFSStatusPending {
|
||||
klog.Infof("Got update event for ZV %s/%s", newZV.Spec.PoolName, newZV.Name)
|
||||
c.enqueueZV(newZV)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,8 +15,11 @@
|
|||
package zfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1"
|
||||
"github.com/openebs/zfs-localpv/pkg/builder/bkpbuilder"
|
||||
|
|
@ -80,18 +83,63 @@ func init() {
|
|||
GoogleAnalyticsEnabled = os.Getenv(GoogleAnalyticsKey)
|
||||
}
|
||||
|
||||
func checkVolCreation(ctx context.Context, volname string) (bool, error) {
|
||||
timeout := time.After(10 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true, fmt.Errorf("zfs: context deadline reached")
|
||||
case <-timeout:
|
||||
return true, fmt.Errorf("zfs: vol creation timeout reached")
|
||||
default:
|
||||
vol, err := GetZFSVolume(volname)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("zfs: wait failed, not able to get the volume %s %s", volname, err.Error())
|
||||
}
|
||||
|
||||
switch vol.Status.State {
|
||||
case ZFSStatusReady:
|
||||
return false, nil
|
||||
case ZFSStatusFailed:
|
||||
return false, fmt.Errorf("zfs: volume creation failed")
|
||||
}
|
||||
|
||||
klog.Infof("zfs: waiting for volume %s/%s to be created on node %s",
|
||||
vol.Spec.PoolName, volname, vol.Spec.OwnerNodeID)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ProvisionVolume creates a ZFSVolume(zv) CR,
|
||||
// watcher for zvc is present in CSI agent
|
||||
func ProvisionVolume(
|
||||
ctx context.Context,
|
||||
vol *apis.ZFSVolume,
|
||||
) error {
|
||||
) (bool, error) {
|
||||
timeout := false
|
||||
zv, err := GetZFSVolume(vol.Name)
|
||||
|
||||
_, err := volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(vol)
|
||||
if err == nil {
|
||||
klog.Infof("provisioned volume %s", vol.Name)
|
||||
// update the spec and status
|
||||
zv.Spec = vol.Spec
|
||||
zv.Status = vol.Status
|
||||
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(zv)
|
||||
} else {
|
||||
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(vol)
|
||||
}
|
||||
|
||||
return err
|
||||
if err == nil {
|
||||
timeout, err = checkVolCreation(ctx, vol.Name)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
klog.Infof("zfs: volume %s/%s provisioning failed on node %s err: %s",
|
||||
vol.Spec.PoolName, vol.Name, vol.Spec.OwnerNodeID, err.Error())
|
||||
}
|
||||
|
||||
return timeout, err
|
||||
}
|
||||
|
||||
// ResizeVolume resizes the zfs volume
|
||||
|
|
@ -138,7 +186,9 @@ func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
|
|||
func DeleteVolume(volumeID string) (err error) {
|
||||
err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
|
||||
if err == nil {
|
||||
klog.Infof("deprovisioned volume %s", volumeID)
|
||||
klog.Infof("zfs: deleted the volume %s", volumeID)
|
||||
} else {
|
||||
klog.Infof("zfs: volume %s deletion failed %s", volumeID, err.Error())
|
||||
}
|
||||
|
||||
return
|
||||
|
|
@ -179,17 +229,22 @@ func GetZFSVolumeState(volID string) (string, string, error) {
|
|||
}
|
||||
|
||||
// UpdateZvolInfo updates ZFSVolume CR with node id and finalizer
|
||||
func UpdateZvolInfo(vol *apis.ZFSVolume) error {
|
||||
finalizers := []string{ZFSFinalizer}
|
||||
func UpdateZvolInfo(vol *apis.ZFSVolume, status string) error {
|
||||
finalizers := []string{}
|
||||
labels := map[string]string{ZFSNodeKey: NodeID}
|
||||
|
||||
if vol.Finalizers != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch status {
|
||||
case ZFSStatusReady:
|
||||
finalizers = append(finalizers, ZFSFinalizer)
|
||||
}
|
||||
|
||||
newVol, err := volbuilder.BuildFrom(vol).
|
||||
WithFinalizer(finalizers).
|
||||
WithVolumeStatus(ZFSStatusReady).
|
||||
WithVolumeStatus(status).
|
||||
WithLabels(labels).Build()
|
||||
|
||||
if err != nil {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue