feat(sanity): adding CSI Sanity test (#232)

* adding CSI Sanity test for ZFS-LocalPV
* make lowercase at all the places

Signed-off-by: Pawan <pawan@mayadata.io>
This commit is contained in:
Pawan Prakash Sharma 2020-12-10 11:53:16 +05:30 committed by GitHub
parent 5a5b043507
commit a73a59fd49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 342 additions and 59 deletions

View file

@ -26,6 +26,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8serror "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
@ -100,9 +101,41 @@ func getRoundedCapacity(size int64) int64 {
return ((size + Mi - 1) / Mi) * Mi
}
func waitForReadyVolume(volname string) error {
for true {
vol, err := zfs.GetZFSVolume(volname)
if err != nil {
return status.Errorf(codes.Internal,
"zfs: wait failed, not able to get the volume %s %s", volname, err.Error())
}
switch vol.Status.State {
case zfs.ZFSStatusReady:
return nil
}
time.Sleep(time.Second)
}
return nil
}
func waitForVolDestroy(volname string) error {
for true {
_, err := zfs.GetZFSVolume(volname)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return status.Errorf(codes.Internal,
"zfs: destroy wait failed, not able to get the volume %s %s", volname, err.Error())
}
time.Sleep(time.Second)
}
return nil
}
// CreateZFSVolume create new zfs volume from csi volume request
func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
volName := req.GetName()
volName := strings.ToLower(req.GetName())
size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)
// parameter keys may be mistyped from the CRD specification when declaring
@ -127,6 +160,24 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
vtype := zfs.GetVolumeType(fstype)
capacity := strconv.FormatInt(int64(size), 10)
if vol, err := zfs.GetZFSVolume(volName); err == nil {
if vol.DeletionTimestamp != nil {
if _, ok := parameters["wait"]; ok {
if err := waitForVolDestroy(volName); err != nil {
return "", err
}
}
} else {
if vol.Spec.Capacity != capacity {
return "", status.Errorf(codes.AlreadyExists,
"volume %s already present", volName)
}
return vol.Spec.OwnerNodeID, nil
}
}
nmap, err := getNodeMap(schld, pool)
if err != nil {
return "", status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
@ -136,14 +187,18 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
selected := schd.Scheduler(req, nmap)
if len(selected) == 0 {
return "", status.Error(codes.Internal, "scheduler failed")
// (hack): CSI Sanity test does not pass topology information
selected = parameters["node"]
if len(selected) == 0 {
return "", status.Error(codes.Internal, "scheduler failed, not able to select a node to create the PV")
}
}
klog.Infof("scheduled the volume %s/%s on node %s", pool, volName, selected)
volObj, err := volbuilder.NewBuilder().
WithName(volName).
WithCapacity(strconv.FormatInt(int64(size), 10)).
WithCapacity(capacity).
WithRecordSize(rs).
WithVolBlockSize(bs).
WithPoolName(pool).
@ -169,12 +224,18 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
"not able to provision the volume %s", err.Error())
}
if _, ok := parameters["wait"]; ok {
if err := waitForReadyVolume(volName); err != nil {
return "", err
}
}
return selected, nil
}
// CreateVolClone creates the clone from a volume
func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error) {
volName := req.GetName()
volName := strings.ToLower(req.GetName())
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
pool := helpers.GetInsensitiveParameter(&parameters, "poolname")
@ -222,8 +283,7 @@ func CreateVolClone(req *csi.CreateVolumeRequest, srcVol string) (string, error)
// CreateSnapClone creates the clone from a snapshot
func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
volName := req.GetName()
volName := strings.ToLower(req.GetName())
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
pool := helpers.GetInsensitiveParameter(&parameters, "poolname")
@ -233,7 +293,7 @@ func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, err
snapshotID := strings.Split(snapshot, "@")
if len(snapshotID) != 2 {
return "", status.Errorf(
codes.Internal,
codes.NotFound,
"snap name is not valid %s, {%s}",
snapshot,
"invalid snapshot name",
@ -242,7 +302,7 @@ func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, err
snap, err := zfs.GetZFSSnapshot(snapshotID[1])
if err != nil {
return "", status.Error(codes.Internal, err.Error())
return "", status.Error(codes.NotFound, err.Error())
}
if snap.Spec.PoolName != pool {
@ -263,7 +323,7 @@ func CreateSnapClone(req *csi.CreateVolumeRequest, snapshot string) (string, err
Build()
volObj.Spec = snap.Spec
volObj.Spec.SnapName = snapshot
volObj.Spec.SnapName = strings.ToLower(snapshot)
err = zfs.ProvisionVolume(volObj)
if err != nil {
@ -283,18 +343,18 @@ func (cs *controller) CreateVolume(
var err error
var selected string
volName := req.GetName()
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
pool := helpers.GetInsensitiveParameter(&parameters, "poolname")
size := getRoundedCapacity(req.GetCapacityRange().RequiredBytes)
contentSource := req.GetVolumeContentSource()
pvcName := helpers.GetInsensitiveParameter(&parameters, "csi.storage.k8s.io/pvc/name")
if err = cs.validateVolumeCreateReq(req); err != nil {
return nil, err
}
volName := strings.ToLower(req.GetName())
parameters := req.GetParameters()
// lower case keys, cf CreateZFSVolume()
pool := helpers.GetInsensitiveParameter(&parameters, "poolname")
size := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes())
contentSource := req.GetVolumeContentSource()
pvcName := helpers.GetInsensitiveParameter(&parameters, "csi.storage.k8s.io/pvc/name")
if contentSource != nil && contentSource.GetSnapshot() != nil {
snapshotID := contentSource.GetSnapshot().GetSnapshotId()
@ -307,7 +367,7 @@ func (cs *controller) CreateVolume(
}
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return nil, err
}
sendEventOrIgnore(pvcName, volName, strconv.FormatInt(int64(size), 10), "zfs-localpv", analytics.VolumeProvision)
@ -339,7 +399,7 @@ func (cs *controller) DeleteVolume(
return nil, err
}
volumeID := req.GetVolumeId()
volumeID := strings.ToLower(req.GetVolumeId())
// verify if the volume has already been deleted
vol, err := zfs.GetVolume(volumeID)
@ -348,6 +408,9 @@ func (cs *controller) DeleteVolume(
}
if err != nil {
if k8serror.IsNotFound(err) {
goto deleteResponse
}
return nil, errors.Wrapf(
err,
"failed to get volume for {%s}",
@ -371,6 +434,25 @@ deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}
func isValidVolumeCapabilities(volCaps []*csi.VolumeCapability) bool {
hasSupport := func(cap *csi.VolumeCapability) bool {
for _, c := range SupportedVolumeCapabilityAccessModes {
if c.GetMode() == cap.AccessMode.GetMode() {
return true
}
}
return false
}
foundAll := true
for _, c := range volCaps {
if !hasSupport(c) {
foundAll = false
}
}
return foundAll
}
// TODO Implementation will be taken up later
// ValidateVolumeCapabilities validates the capabilities
@ -380,8 +462,26 @@ func (cs *controller) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest,
) (*csi.ValidateVolumeCapabilitiesResponse, error) {
volumeID := strings.ToLower(req.GetVolumeId())
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume ID not provided")
}
volCaps := req.GetVolumeCapabilities()
if len(volCaps) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume capabilities not provided")
}
return nil, status.Error(codes.Unimplemented, "")
if _, err := zfs.GetZFSVolume(volumeID); err != nil {
return nil, status.Errorf(codes.NotFound, "Get volume failed err %s", err.Error())
}
var confirmed *csi.ValidateVolumeCapabilitiesResponse_Confirmed
if isValidVolumeCapabilities(volCaps) {
confirmed = &csi.ValidateVolumeCapabilitiesResponse_Confirmed{VolumeCapabilities: volCaps}
}
return &csi.ValidateVolumeCapabilitiesResponse{
Confirmed: confirmed,
}, nil
}
// ControllerGetCapabilities fetches controller capabilities
@ -406,16 +506,23 @@ func (cs *controller) ControllerExpandVolume(
ctx context.Context,
req *csi.ControllerExpandVolumeRequest,
) (*csi.ControllerExpandVolumeResponse, error) {
volumeID := strings.ToLower(req.GetVolumeId())
if volumeID == "" {
return nil, status.Errorf(
codes.InvalidArgument,
"ControllerExpandVolume: no volumeID provided",
)
}
/* round off the new size */
updatedSize := getRoundedCapacity(req.GetCapacityRange().GetRequiredBytes())
vol, err := zfs.GetZFSVolume(req.VolumeId)
vol, err := zfs.GetZFSVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"ControllerExpandVolumeRequest: failed to get ZFSVolume in for %s, {%s}",
req.VolumeId,
volumeID,
err.Error(),
)
}
@ -425,7 +532,7 @@ func (cs *controller) ControllerExpandVolume(
return nil, status.Errorf(
codes.Internal,
"ControllerExpandVolumeRequest: failed to parse volsize in for %s, {%s}",
req.VolumeId,
volumeID,
err.Error(),
)
}
@ -444,7 +551,7 @@ func (cs *controller) ControllerExpandVolume(
return nil, status.Errorf(
codes.Internal,
"failed to handle ControllerExpandVolumeRequest for %s, {%s}",
req.VolumeId,
volumeID,
err.Error(),
)
}
@ -454,6 +561,40 @@ func (cs *controller) ControllerExpandVolume(
Build(), nil
}
func verifySnapshotRequest(req *csi.CreateSnapshotRequest) error {
snapName := strings.ToLower(req.GetName())
volumeID := strings.ToLower(req.GetSourceVolumeId())
if snapName == "" || volumeID == "" {
return status.Errorf(
codes.InvalidArgument,
"CreateSnapshot error invalid request %s: %s",
volumeID, snapName,
)
}
snap, err := zfs.GetZFSSnapshot(snapName)
if err != nil {
if k8serror.IsNotFound(err) {
return nil
}
return status.Errorf(
codes.NotFound,
"CreateSnapshot error snap %s %s get failed : %s",
snapName, volumeID, err.Error(),
)
}
if snap.Labels[zfs.ZFSVolKey] != volumeID {
return status.Errorf(
codes.AlreadyExists,
"CreateSnapshot error snapshot %s already exist for different source vol %s: %s",
snapName, snap.Labels[zfs.ZFSVolKey], volumeID,
)
}
return nil
}
// CreateSnapshot creates a snapshot for given volume
//
// This implements csi.ControllerServer
@ -461,27 +602,34 @@ func (cs *controller) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
) (*csi.CreateSnapshotResponse, error) {
snapName := strings.ToLower(req.GetName())
volumeID := strings.ToLower(req.GetSourceVolumeId())
klog.Infof("CreateSnapshot volume %s@%s", req.SourceVolumeId, req.Name)
klog.Infof("CreateSnapshot volume %s@%s", volumeID, snapName)
err := verifySnapshotRequest(req)
if err != nil {
return nil, err
}
snapTimeStamp := time.Now().Unix()
state, err := zfs.GetZFSSnapshotStatus(req.Name)
state, err := zfs.GetZFSSnapshotStatus(snapName)
if err == nil {
return csipayload.NewCreateSnapshotResponseBuilder().
WithSourceVolumeID(req.SourceVolumeId).
WithSnapshotID(req.SourceVolumeId+"@"+req.Name).
WithSourceVolumeID(volumeID).
WithSnapshotID(volumeID+"@"+snapName).
WithCreationTime(snapTimeStamp, 0).
WithReadyToUse(state == zfs.ZFSStatusReady).
Build(), nil
}
vol, err := zfs.GetZFSVolume(req.SourceVolumeId)
vol, err := zfs.GetZFSVolume(volumeID)
if err != nil {
return nil, status.Errorf(
codes.Internal,
codes.NotFound,
"CreateSnapshot not able to get volume %s: %s, {%s}",
req.SourceVolumeId, req.Name,
volumeID, snapName,
err.Error(),
)
}
@ -489,14 +637,14 @@ func (cs *controller) CreateSnapshot(
labels := map[string]string{zfs.ZFSVolKey: vol.Name}
snapObj, err := snapbuilder.NewBuilder().
WithName(req.Name).
WithName(snapName).
WithLabels(labels).Build()
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to create snapshotobject for %s: %s, {%s}",
req.SourceVolumeId, req.Name,
volumeID, snapName,
err.Error(),
)
}
@ -508,16 +656,16 @@ func (cs *controller) CreateSnapshot(
return nil, status.Errorf(
codes.Internal,
"failed to handle CreateSnapshotRequest for %s: %s, {%s}",
req.SourceVolumeId, req.Name,
volumeID, snapName,
err.Error(),
)
}
state, _ = zfs.GetZFSSnapshotStatus(req.Name)
state, _ = zfs.GetZFSSnapshotStatus(snapName)
return csipayload.NewCreateSnapshotResponseBuilder().
WithSourceVolumeID(req.SourceVolumeId).
WithSnapshotID(req.SourceVolumeId+"@"+req.Name).
WithSourceVolumeID(volumeID).
WithSnapshotID(volumeID+"@"+snapName).
WithCreationTime(snapTimeStamp, 0).
WithReadyToUse(state == zfs.ZFSStatusReady).
Build(), nil
@ -531,18 +679,18 @@ func (cs *controller) DeleteSnapshot(
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {
if req.SnapshotId == "" {
return nil, status.Errorf(codes.InvalidArgument, "DeleteSnapshot: empty snapshotID")
}
klog.Infof("DeleteSnapshot request for %s", req.SnapshotId)
// snapshodID is formed as <volname>@<snapname>
// parsing them here
snapshotID := strings.Split(req.SnapshotId, "@")
if len(snapshotID) != 2 {
return nil, status.Errorf(
codes.Internal,
"failed to handle DeleteSnapshot for %s, {%s}",
req.SnapshotId,
"failed to get the snapshot name, Manual intervention required",
)
// should succeed when an invalid snapshot id is used
return &csi.DeleteSnapshotResponse{}, nil
}
if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(