feat(zfs-localpv): initial commit

provisioning and deprovisioning of
the volumes on the node where zfs pool
has already been setup. Pool name and the volume
parameters has to be given in storage class
which will be used to provision the volume.

Signed-off-by: Pawan <pawan@mayadata.io>
This commit is contained in:
Pawan 2019-09-12 12:32:17 +05:30 committed by Kiran Mova
parent 485e2a21f0
commit 9f5cf445df
46 changed files with 6339 additions and 0 deletions

View file

@ -0,0 +1,21 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// +k8s:deepcopy-gen=package,register
// Package v1alpha1 is the API version
// +groupName=openebs.io
package v1alpha1

View file

@ -0,0 +1,77 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion is group version used
// to register custom resources
//
// NOTE:
// This variable name should not be changed
var SchemeGroupVersion = schema.GroupVersion{
Group: "openebs.io",
Version: "v1alpha1",
}
// Resource takes an unqualified resource and
// returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.
WithResource(resource).
GroupResource()
}
var (
// SchemeBuilder is the scheme builder
// with scheme init functions to run
// for this API package
SchemeBuilder runtime.SchemeBuilder
localSchemeBuilder = &SchemeBuilder
// AddToScheme is a global function that
// registers this API group & version to
// a scheme
AddToScheme = localSchemeBuilder.AddToScheme
)
func init() {
// We only register manually written functions
// here. This registration of generated functions
// takes place in the generated files.
//
// NOTE:
// This separation makes the code compile even
// when the generated files are missing.
localSchemeBuilder.Register(addKnownTypes)
}
// Adds the list of known types to api.Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(
SchemeGroupVersion,
&ZFSVolume{},
&ZFSVolumeList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View file

@ -0,0 +1,100 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +resource:path=zfsvolume
// ZFSVolume represents a ZFS based volume
type ZFSVolume struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec VolumeInfo `json:"spec"`
}
// MountInfo contains the volume related info
// for all types of volumes in ZFSVolume
type MountInfo struct {
// FSType of a volume will specify the
// format type - ext4(default), xfs of PV
FSType string `json:"fsType"`
// AccessMode of a volume will hold the
// access mode of the volume
AccessModes []string `json:"accessModes"`
// MountPath of the volume will hold the
// path on which the volume is mounted
// on that node
MountPath string `json:"mountPath"`
// ReadOnly specifies if the volume needs
// to be mounted in ReadOnly mode
ReadOnly bool `json:"readOnly"`
// MountOptions specifies the options with
// which mount needs to be attempted
MountOptions []string `json:"mountOptions"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +resource:path=csivolumes
// ZFSVolumeList is a list of ZFSVolume resources
type ZFSVolumeList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []ZFSVolume `json:"items"`
}
// VolumeInfo contains the volume related info
// for all types of volumes in ZFSVolume
type VolumeInfo struct {
// OwnerNodeID is the Node ID which
// is the owner of this Volume
OwnerNodeID string `json:"ownerNodeID"`
// poolName specifies the name of the
// pool where this volume should be created
PoolName string `json:"poolName"`
// Capacity of the volume
Capacity string `json:"capacity"`
// BlockSize specifies the blocksize
// which we should use to create the zvol
BlockSize string `json:"blocksize"`
// Compression specifies if the it should
// enabled on the zvol
Compression string `json:"compression"`
// Dedup specifies the deduplication
// should be enabledd on the zvol
Dedup string `json:"dedup"`
// Thinprovision specifies if we should
// thin provisioned the volume or not
ThinProvision string `json:"thinProvison"`
}

View file

@ -0,0 +1,127 @@
// +build !ignore_autogenerated
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package v1alpha1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MountInfo) DeepCopyInto(out *MountInfo) {
*out = *in
if in.AccessModes != nil {
in, out := &in.AccessModes, &out.AccessModes
*out = make([]string, len(*in))
copy(*out, *in)
}
if in.MountOptions != nil {
in, out := &in.MountOptions, &out.MountOptions
*out = make([]string, len(*in))
copy(*out, *in)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MountInfo.
func (in *MountInfo) DeepCopy() *MountInfo {
if in == nil {
return nil
}
out := new(MountInfo)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *VolumeInfo) DeepCopyInto(out *VolumeInfo) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new VolumeInfo.
func (in *VolumeInfo) DeepCopy() *VolumeInfo {
if in == nil {
return nil
}
out := new(VolumeInfo)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ZFSVolume) DeepCopyInto(out *ZFSVolume) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZFSVolume.
func (in *ZFSVolume) DeepCopy() *ZFSVolume {
if in == nil {
return nil
}
out := new(ZFSVolume)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ZFSVolume) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ZFSVolumeList) DeepCopyInto(out *ZFSVolumeList) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ZFSVolume, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZFSVolumeList.
func (in *ZFSVolumeList) DeepCopy() *ZFSVolumeList {
if in == nil {
return nil
}
out := new(ZFSVolumeList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ZFSVolumeList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

208
pkg/builder/build.go Normal file
View file

@ -0,0 +1,208 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/common/errors"
)
// Builder is the builder object for ZFSVolume
type Builder struct {
volume *ZFSVolume
errs []error
}
// NewBuilder returns new instance of Builder
func NewBuilder() *Builder {
return &Builder{
volume: &ZFSVolume{
Object: &apis.ZFSVolume{},
},
}
}
// BuildFrom returns new instance of Builder
// from the provided api instance
func BuildFrom(volume *apis.ZFSVolume) *Builder {
if volume == nil {
b := NewBuilder()
b.errs = append(
b.errs,
errors.New("failed to build volume object: nil volume"),
)
return b
}
return &Builder{
volume: &ZFSVolume{
Object: volume,
},
}
}
// WithNamespace sets the namespace of csi volume
func (b *Builder) WithNamespace(namespace string) *Builder {
if namespace == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing namespace",
),
)
return b
}
b.volume.Object.Namespace = namespace
return b
}
// WithName sets the name of csi volume
func (b *Builder) WithName(name string) *Builder {
if name == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing name",
),
)
return b
}
b.volume.Object.Name = name
return b
}
// WithCapacity sets the Capacity of csi volume by converting string
// capacity into Quantity
func (b *Builder) WithCapacity(capacity string) *Builder {
if capacity == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing capacity",
),
)
return b
}
b.volume.Object.Spec.Capacity = capacity
return b
}
// WithCompression sets compression of CStorVolumeClaim
func (b *Builder) WithCompression(compression string) *Builder {
comp := "off"
if compression == "on" {
comp = "on"
}
b.volume.Object.Spec.Compression = comp
return b
}
// WithDedup sets compression of CStorVolumeClaim
func (b *Builder) WithDedup(dedup string) *Builder {
dp := "off"
if dedup == "on" {
dp = "on"
}
b.volume.Object.Spec.Dedup = dp
return b
}
// WithThinProv sets compression of CStorVolumeClaim
func (b *Builder) WithThinProv(thinprov string) *Builder {
tp := "no"
if thinprov == "yes" {
tp = "yes"
}
b.volume.Object.Spec.ThinProvision = tp
return b
}
// WithBlockSize sets blocksize of CStorVolumeClaim
func (b *Builder) WithBlockSize(blockSize string) *Builder {
bs := "4k"
if len(blockSize) > 0 {
bs = blockSize
}
b.volume.Object.Spec.BlockSize = bs
return b
}
func (b *Builder) WithPoolName(pool string) *Builder {
if pool == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing pool name",
),
)
return b
}
b.volume.Object.Spec.PoolName = pool
return b
}
func (b *Builder) WithNodename(name string) *Builder {
if name == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing node name",
),
)
return b
}
b.volume.Object.Spec.OwnerNodeID = name
return b
}
// WithLabels merges existing labels if any
// with the ones that are provided here
func (b *Builder) WithLabels(labels map[string]string) *Builder {
if len(labels) == 0 {
b.errs = append(
b.errs,
errors.New("failed to build cstorvolume object: missing labels"),
)
return b
}
if b.volume.Object.Labels == nil {
b.volume.Object.Labels = map[string]string{}
}
for key, value := range labels {
b.volume.Object.Labels[key] = value
}
return b
}
func (b *Builder) WithFinalizer(finalizer []string) *Builder {
b.volume.Object.Finalizers = append(b.volume.Object.Finalizers, finalizer...)
return b
}
// Build returns csi volume API object
func (b *Builder) Build() (*apis.ZFSVolume, error) {
if len(b.errs) > 0 {
return nil, errors.Errorf("%+v", b.errs)
}
return b.volume.Object, nil
}

72
pkg/builder/buildlist.go Normal file
View file

@ -0,0 +1,72 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
)
// ListBuilder enables building an instance of
// ZFSVolumeList
type ListBuilder struct {
list *apis.ZFSVolumeList
filters predicateList
}
// NewListBuilder returns a new instance of ListBuilder
func NewListBuilder() *ListBuilder {
return &ListBuilder{
list: &apis.ZFSVolumeList{},
}
}
// ListBuilderFrom returns a new instance of
// ListBuilder from API list instance
func ListBuilderFrom(vols apis.ZFSVolumeList) *ListBuilder {
b := &ListBuilder{list: &apis.ZFSVolumeList{}}
if len(vols.Items) == 0 {
return b
}
b.list.Items = append(b.list.Items, vols.Items...)
return b
}
// List returns the list of pod
// instances that was built by this
// builder
func (b *ListBuilder) List() *apis.ZFSVolumeList {
if b.filters == nil || len(b.filters) == 0 {
return b.list
}
filtered := &apis.ZFSVolumeList{}
for _, vol := range b.list.Items {
vol := vol // pin it
if b.filters.all(From(&vol)) {
filtered.Items = append(filtered.Items, vol)
}
}
return filtered
}
// WithFilter add filters on which the pod
// has to be filtered
func (b *ListBuilder) WithFilter(pred ...Predicate) *ListBuilder {
b.filters = append(b.filters, pred...)
return b
}

427
pkg/builder/kubernetes.go Normal file
View file

@ -0,0 +1,427 @@
// Copyright © 2019 The OpenEBS Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
import (
"encoding/json"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
client "github.com/openebs/zfs-localpv/pkg/common/kubernetes/client"
clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// getClientsetFn is a typed function that
// abstracts fetching of internal clientset
type getClientsetFn func() (clientset *clientset.Clientset, err error)
// getClientsetFromPathFn is a typed function that
// abstracts fetching of clientset from kubeConfigPath
type getClientsetForPathFn func(kubeConfigPath string) (
clientset *clientset.Clientset,
err error,
)
// createFn is a typed function that abstracts
// creating csi volume instance
type createFn func(
cs *clientset.Clientset,
upgradeResultObj *apis.ZFSVolume,
namespace string,
) (*apis.ZFSVolume, error)
// getFn is a typed function that abstracts
// fetching a csi volume instance
type getFn func(
cli *clientset.Clientset,
name,
namespace string,
opts metav1.GetOptions,
) (*apis.ZFSVolume, error)
// listFn is a typed function that abstracts
// listing of csi volume instances
type listFn func(
cli *clientset.Clientset,
namespace string,
opts metav1.ListOptions,
) (*apis.ZFSVolumeList, error)
// delFn is a typed function that abstracts
// deleting a csi volume instance
type delFn func(
cli *clientset.Clientset,
name,
namespace string,
opts *metav1.DeleteOptions,
) error
// updateFn is a typed function that abstracts
// updating csi volume instance
type updateFn func(
cs *clientset.Clientset,
vol *apis.ZFSVolume,
namespace string,
) (*apis.ZFSVolume, error)
// Kubeclient enables kubernetes API operations
// on csi volume instance
type Kubeclient struct {
// clientset refers to csi volume's
// clientset that will be responsible to
// make kubernetes API calls
clientset *clientset.Clientset
kubeConfigPath string
// namespace holds the namespace on which
// kubeclient has to operate
namespace string
// functions useful during mocking
getClientset getClientsetFn
getClientsetForPath getClientsetForPathFn
get getFn
list listFn
del delFn
create createFn
update updateFn
}
// KubeclientBuildOption defines the abstraction
// to build a kubeclient instance
type KubeclientBuildOption func(*Kubeclient)
// defaultGetClientset is the default implementation to
// get kubernetes clientset instance
func defaultGetClientset() (clients *clientset.Clientset, err error) {
config, err := client.GetConfig(client.New())
if err != nil {
return nil, err
}
return clientset.NewForConfig(config)
}
// defaultGetClientsetForPath is the default implementation to
// get kubernetes clientset instance based on the given
// kubeconfig path
func defaultGetClientsetForPath(
kubeConfigPath string,
) (clients *clientset.Clientset, err error) {
config, err := client.GetConfig(
client.New(client.WithKubeConfigPath(kubeConfigPath)))
if err != nil {
return nil, err
}
return clientset.NewForConfig(config)
}
// defaultGet is the default implementation to get
// a csi volume instance in kubernetes cluster
func defaultGet(
cli *clientset.Clientset,
name, namespace string,
opts metav1.GetOptions,
) (*apis.ZFSVolume, error) {
return cli.OpenebsV1alpha1().
ZFSVolumes(namespace).
Get(name, opts)
}
// defaultList is the default implementation to list
// csi volume instances in kubernetes cluster
func defaultList(
cli *clientset.Clientset,
namespace string,
opts metav1.ListOptions,
) (*apis.ZFSVolumeList, error) {
return cli.OpenebsV1alpha1().
ZFSVolumes(namespace).
List(opts)
}
// defaultCreate is the default implementation to delete
// a csi volume instance in kubernetes cluster
func defaultDel(
cli *clientset.Clientset,
name, namespace string,
opts *metav1.DeleteOptions,
) error {
deletePropagation := metav1.DeletePropagationForeground
opts.PropagationPolicy = &deletePropagation
err := cli.OpenebsV1alpha1().
ZFSVolumes(namespace).
Delete(name, opts)
return err
}
// defaultCreate is the default implementation to create
// a csi volume instance in kubernetes cluster
func defaultCreate(
cli *clientset.Clientset,
vol *apis.ZFSVolume,
namespace string,
) (*apis.ZFSVolume, error) {
return cli.OpenebsV1alpha1().
ZFSVolumes(namespace).
Create(vol)
}
// defaultUpdate is the default implementation to update
// a csi volume instance in kubernetes cluster
func defaultUpdate(
cli *clientset.Clientset,
vol *apis.ZFSVolume,
namespace string,
) (*apis.ZFSVolume, error) {
return cli.OpenebsV1alpha1().
ZFSVolumes(namespace).
Update(vol)
}
// withDefaults sets the default options
// of kubeclient instance
func (k *Kubeclient) withDefaults() {
if k.getClientset == nil {
k.getClientset = defaultGetClientset
}
if k.getClientsetForPath == nil {
k.getClientsetForPath = defaultGetClientsetForPath
}
if k.get == nil {
k.get = defaultGet
}
if k.list == nil {
k.list = defaultList
}
if k.del == nil {
k.del = defaultDel
}
if k.create == nil {
k.create = defaultCreate
}
if k.update == nil {
k.update = defaultUpdate
}
}
// WithClientSet sets the kubernetes client against
// the kubeclient instance
func WithClientSet(c *clientset.Clientset) KubeclientBuildOption {
return func(k *Kubeclient) {
k.clientset = c
}
}
// WithNamespace sets the kubernetes client against
// the provided namespace
func WithNamespace(namespace string) KubeclientBuildOption {
return func(k *Kubeclient) {
k.namespace = namespace
}
}
// WithNamespace sets the provided namespace
// against this Kubeclient instance
func (k *Kubeclient) WithNamespace(namespace string) *Kubeclient {
k.namespace = namespace
return k
}
// WithKubeConfigPath sets the kubernetes client
// against the provided path
func WithKubeConfigPath(path string) KubeclientBuildOption {
return func(k *Kubeclient) {
k.kubeConfigPath = path
}
}
// NewKubeclient returns a new instance of
// kubeclient meant for csi volume operations
func NewKubeclient(opts ...KubeclientBuildOption) *Kubeclient {
k := &Kubeclient{}
for _, o := range opts {
o(k)
}
k.withDefaults()
return k
}
func (k *Kubeclient) getClientsetForPathOrDirect() (
*clientset.Clientset,
error,
) {
if k.kubeConfigPath != "" {
return k.getClientsetForPath(k.kubeConfigPath)
}
return k.getClientset()
}
// getClientOrCached returns either a new instance
// of kubernetes client or its cached copy
func (k *Kubeclient) getClientOrCached() (*clientset.Clientset, error) {
if k.clientset != nil {
return k.clientset, nil
}
c, err := k.getClientsetForPathOrDirect()
if err != nil {
return nil,
errors.Wrapf(
err,
"failed to get clientset",
)
}
k.clientset = c
return k.clientset, nil
}
// Create creates a csi volume instance
// in kubernetes cluster
func (k *Kubeclient) Create(vol *apis.ZFSVolume) (*apis.ZFSVolume, error) {
if vol == nil {
return nil,
errors.New(
"failed to create csivolume: nil vol object",
)
}
cs, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to create csi volume {%s} in namespace {%s}",
vol.Name,
k.namespace,
)
}
return k.create(cs, vol, k.namespace)
}
// Get returns csi volume object for given name
func (k *Kubeclient) Get(
name string,
opts metav1.GetOptions,
) (*apis.ZFSVolume, error) {
if name == "" {
return nil,
errors.New(
"failed to get csi volume: missing csi volume name",
)
}
cli, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to get csi volume {%s} in namespace {%s}",
name,
k.namespace,
)
}
return k.get(cli, name, k.namespace, opts)
}
// GetRaw returns csi volume instance
// in bytes
func (k *Kubeclient) GetRaw(
name string,
opts metav1.GetOptions,
) ([]byte, error) {
if name == "" {
return nil, errors.New(
"failed to get raw csi volume: missing vol name",
)
}
csiv, err := k.Get(name, opts)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to get csi volume {%s} in namespace {%s}",
name,
k.namespace,
)
}
return json.Marshal(csiv)
}
// List returns a list of csi volume
// instances present in kubernetes cluster
func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.ZFSVolumeList, error) {
cli, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to list csi volumes in namespace {%s}",
k.namespace,
)
}
return k.list(cli, k.namespace, opts)
}
// Delete deletes the csi volume from
// kubernetes
func (k *Kubeclient) Delete(name string) error {
if name == "" {
return errors.New(
"failed to delete csivolume: missing vol name",
)
}
cli, err := k.getClientOrCached()
if err != nil {
return errors.Wrapf(
err,
"failed to delete csivolume {%s} in namespace {%s}",
name,
k.namespace,
)
}
return k.del(cli, name, k.namespace, &metav1.DeleteOptions{})
}
// Update updates this csi volume instance
// against kubernetes cluster
func (k *Kubeclient) Update(vol *apis.ZFSVolume) (*apis.ZFSVolume, error) {
if vol == nil {
return nil,
errors.New(
"failed to update csivolume: nil vol object",
)
}
cs, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to update csivolume {%s} in namespace {%s}",
vol.Name,
vol.Namespace,
)
}
return k.update(cs, vol, k.namespace)
}

115
pkg/builder/volume.go Normal file
View file

@ -0,0 +1,115 @@
// Copyright © 2019 The OpenEBS Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
)
// ZFSVolume is a wrapper over
// ZFSVolume API instance
type ZFSVolume struct {
Object *apis.ZFSVolume
}
// From returns a new instance of
// csi volume
func From(vol *apis.ZFSVolume) *ZFSVolume {
return &ZFSVolume{
Object: vol,
}
}
// Predicate defines an abstraction
// to determine conditional checks
// against the provided pod instance
type Predicate func(*ZFSVolume) bool
// PredicateList holds a list of predicate
type predicateList []Predicate
// ZFSVolumeList holds the list
// of csi volume instances
type ZFSVolumeList struct {
List apis.ZFSVolumeList
}
// Len returns the number of items present
// in the ZFSVolumeList
func (p *ZFSVolumeList) Len() int {
return len(p.List.Items)
}
// all returns true if all the predicates
// succeed against the provided ZFSVolume
// instance
func (l predicateList) all(p *ZFSVolume) bool {
for _, pred := range l {
if !pred(p) {
return false
}
}
return true
}
// HasLabels returns true if provided labels
// are present in the provided ZFSVolume instance
func HasLabels(keyValuePair map[string]string) Predicate {
return func(p *ZFSVolume) bool {
for key, value := range keyValuePair {
if !p.HasLabel(key, value) {
return false
}
}
return true
}
}
// HasLabel returns true if provided label
// is present in the provided ZFSVolume instance
func (p *ZFSVolume) HasLabel(key, value string) bool {
val, ok := p.Object.GetLabels()[key]
if ok {
return val == value
}
return false
}
// HasLabel returns true if provided label
// is present in the provided ZFSVolume instance
func HasLabel(key, value string) Predicate {
return func(p *ZFSVolume) bool {
return p.HasLabel(key, value)
}
}
// IsNil returns true if the csi volume instance
// is nil
func (p *ZFSVolume) IsNil() bool {
return p.Object == nil
}
// IsNil is predicate to filter out nil csi volume
// instances
func IsNil() Predicate {
return func(p *ZFSVolume) bool {
return p.IsNil()
}
}
// GetAPIObject returns csi volume's API instance
func (p *ZFSVolume) GetAPIObject() *apis.ZFSVolume {
return p.Object
}

106
pkg/common/env/env.go vendored Normal file
View file

@ -0,0 +1,106 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package env
import (
"os"
"strconv"
"strings"
)
// EnvironmentSetter abstracts setting of environment variable
type EnvironmentSetter func(envKey string, value string) (err error)
// EnvironmentGetter abstracts fetching value from an environment variable
type EnvironmentGetter func(envKey string) (value string)
// EnvironmentLookup abstracts looking up an environment variable
type EnvironmentLookup func(envKey string) (value string, present bool)
// Set sets the provided environment variable
//
// NOTE:
// This is an implementation of EnvironmentSetter
func Set(envKey string, value string) (err error) {
return os.Setenv(string(envKey), value)
}
// Get fetches value from the provided environment variable
//
// NOTE:
// This is an implementation of EnvironmentGetter
func Get(envKey string) (value string) {
return getEnv(string(envKey))
}
// GetOrDefault fetches value from the provided environment variable
// which on empty returns the defaultValue
// NOTE: os.Getenv is used here instead of os.LookupEnv because it is
// not required to know if the environment variable is defined on the system
func GetOrDefault(e string, defaultValue string) (value string) {
envValue := Get(e)
if len(envValue) == 0 {
// ENV not defined or set to ""
return defaultValue
} else {
return envValue
}
}
// Lookup looks up an environment variable
//
// NOTE:
// This is an implementation of EnvironmentLookup
func Lookup(envKey string) (value string, present bool) {
return lookupEnv(string(envKey))
}
// Truthy returns boolean based on the environment variable's value
//
// The lookup value can be truthy (i.e. 1, t, TRUE, true) or falsy (0, false,
// etc) based on strconv.ParseBool logic
func Truthy(envKey string) (truth bool) {
v, found := Lookup(envKey)
if !found {
return
}
truth, _ = strconv.ParseBool(v)
return
}
// LookupOrFalse looks up an environment variable and returns a string "false"
// if environment variable is not present. It returns appropriate values for
// other cases.
func LookupOrFalse(envKey string) string {
val, present := Lookup(envKey)
if !present {
return "false"
}
return strings.TrimSpace(val)
}
// getEnv fetches the provided environment variable's value
func getEnv(envKey string) (value string) {
return strings.TrimSpace(os.Getenv(envKey))
}
// lookupEnv looks up the provided environment variable
func lookupEnv(envKey string) (value string, present bool) {
value, present = os.LookupEnv(envKey)
value = strings.TrimSpace(value)
return
}

View file

@ -0,0 +1,99 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import (
"fmt"
)
// New returns an error with the supplied message.
// New also records the stack trace at the point it was called.
func New(message string) error {
return &err{
prefix: stackTraceMessagePrefix,
msg: message,
stack: callers(),
}
}
// Errorf formats according to a format specifier and returns the string
// as a value that satisfies error.
// Errorf also records the stack trace at the point it was called.
func Errorf(format string, args ...interface{}) error {
return &err{
prefix: stackTraceMessagePrefix,
msg: fmt.Sprintf(format, args...),
stack: callers(),
}
}
// Wrap annotates err with a new message.
// If err is nil, Wrap returns nil.
func Wrap(err error, message string) error {
if err == nil {
return nil
}
return &wrapper{wrapErrorMessagePrefix, message, err}
}
// Wrapf annotates err with the format specifier.
// If err is nil, Wrapf returns nil.
func Wrapf(err error, format string, args ...interface{}) error {
if err == nil {
return nil
}
return &wrapper{wrapErrorMessagePrefix, fmt.Sprintf(format, args...), err}
}
// WithStack annotates err with a stack trace at the
// point WithStack was called. If err is nil, WithStack returns nil.
func WithStack(err error) error {
if err == nil {
return nil
}
return &withStack{
stackTraceMessagePrefix,
err,
callers(),
}
}
// Cause returns the underlying cause of the error, if possible.
// An error value has a cause if it implements the following
// interface:
//
// type causer interface {
// Cause() error
// }
//
// If the error does not implement Cause, the original error will
// be returned. If the error is nil, nil will be returned without further
// investigation.
func Cause(err error) error {
type causer interface {
Cause() error
}
for err != nil {
cause, ok := err.(causer)
if !ok {
break
}
err = cause.Cause()
}
return err
}

188
pkg/common/errors/types.go Normal file
View file

@ -0,0 +1,188 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import (
"fmt"
"runtime"
"github.com/pkg/errors"
)
const (
wrapErrorMessagePrefix string = " -- "
listErrorMessagePrefix string = " - "
stackTraceMessagePrefix string = " "
)
// stack represents a stack of program counters.
type stack []uintptr
// callers returns stack of caller function
func callers() *stack {
const depth = 32
var pcs [depth]uintptr
n := runtime.Callers(3, pcs[:])
var st stack = pcs[0:n]
return &st
}
// err implements error interface that has a message and stack
type err struct {
prefix string
msg string
*stack
}
// Error is implementation of error interface
func (e *err) Error() string { return e.msg }
// Format is implementation of Formater interface
func (e *err) Format(s fmt.State, verb rune) {
message := wrapErrorMessagePrefix + e.msg
switch verb {
case 'v':
if s.Flag('+') {
fmt.Fprint(s, message)
for i, pc := range *e.stack {
if i > 0 {
return
}
f := errors.Frame(pc)
fmt.Fprintf(s, "\n%s%+v", e.prefix, f)
}
return
}
fallthrough
case 's', 'q':
fmt.Fprint(s, message)
}
}
// wrapper implements error interface that has a message and error
type wrapper struct {
prefix string
msg string
error
}
// Error is implementation of error interface
func (w *wrapper) Error() string { return w.msg }
// Cause is implementation of causer interface
func (w *wrapper) Cause() error { return w.error }
// Format is implementation of Formater interface
func (w *wrapper) Format(s fmt.State, verb rune) {
switch verb {
case 'v':
if s.Flag('+') {
fmt.Fprintf(s, "%+v\n", w.error)
fmt.Fprint(s, w.prefix+w.msg)
return
}
fallthrough
case 's', 'q':
fmt.Fprintf(s, "%s\n", w.error)
fmt.Fprint(s, w.prefix+w.msg)
}
}
// withStack implements error interface that has a stack and error
type withStack struct {
prefix string
error
*stack
}
// Format is implementation of Formater interface
func (ws *withStack) Format(s fmt.State, verb rune) {
message := wrapErrorMessagePrefix + fmt.Sprintf("%s", ws.error)
switch verb {
case 'v':
if s.Flag('+') {
fmt.Fprint(s, message)
for i, pc := range *ws.stack {
if i > 0 {
return
}
f := errors.Frame(pc)
fmt.Fprintf(s, "\n%s%+v", ws.prefix, f)
}
return
}
fallthrough
case 's', 'q':
fmt.Fprint(s, message)
}
}
// Cause is implementation of causer interface
func (ws *withStack) Cause() error { return ws.error }
// ErrorList is a wrapper over list of errors
// It implements error interface
type ErrorList struct {
Errors []error
msg string
}
// Error is implementation of error interface
func (el *ErrorList) Error() string {
message := ""
for _, err := range el.Errors {
message += err.Error() + ":"
}
el.msg = message
return message
}
// Format is implementation of Formater interface
func (el *ErrorList) Format(s fmt.State, verb rune) {
message := ""
for _, err := range el.Errors {
message += "\n" + listErrorMessagePrefix + err.Error()
}
fmt.Fprint(s, message)
}
// WithStack annotates ErrorList with a new message and
// stack trace of caller.
func (el *ErrorList) WithStack(message string) error {
if el == nil {
return nil
}
return &withStack{
stackTraceMessagePrefix,
Wrap(el, message),
callers(),
}
}
// WithStackf annotates ErrorList with the format specifier
// and stack trace of caller.
func (el *ErrorList) WithStackf(format string, args ...interface{}) error {
if el == nil {
return nil
}
return &withStack{
stackTraceMessagePrefix,
Wrapf(el, format, args...),
callers(),
}
}

View file

@ -0,0 +1,243 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"strings"
env "github.com/openebs/zfs-localpv/pkg/common/env"
"github.com/pkg/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
const (
// K8sMasterIPEnvironmentKey is the environment variable key used to
// determine the kubernetes master IP address
K8sMasterIPEnvironmentKey string = "OPENEBS_IO_K8S_MASTER"
// KubeConfigEnvironmentKey is the environment variable key used to
// determine the kubernetes config
KubeConfigEnvironmentKey string = "OPENEBS_IO_KUBE_CONFIG"
)
// getInClusterConfigFunc abstracts the logic to get
// kubernetes incluster config
//
// NOTE:
// typed function makes it simple to mock
type getInClusterConfigFunc func() (*rest.Config, error)
// buildConfigFromFlagsFunc provides the abstraction to get
// kubernetes config from provided flags
//
// NOTE:
// typed function makes it simple to mock
type buildConfigFromFlagsFunc func(string, string) (*rest.Config, error)
// GetConfigFunc provides the abstraction to get
// kubernetes config from provided client instance
//
// NOTE:
// typed function makes it simple to mock
type GetConfigFunc func(*Client) (*rest.Config, error)
// GetConfig returns kubernetes config instance
//
// NOTE:
// This is an implementation of GetConfigFunc
func GetConfig(c *Client) (*rest.Config, error) {
if c == nil {
return nil, errors.New("failed to get kubernetes config: nil client was provided")
}
return c.GetConfigForPathOrDirect()
}
// getKubeMasterIPFunc provides the abstraction to get
// kubernetes master IP address
//
// NOTE:
// typed function makes it simple to mock
type getKubeMasterIPFunc func(string) string
// getKubeConfigPathFunc provides the abstraction to get
// kubernetes config path
//
// NOTE:
// typed function makes it simple to mock
type getKubeConfigPathFunc func(string) string
// getKubernetesDynamicClientFunc provides the abstraction to get
// dynamic kubernetes clientset
//
// NOTE:
// typed function makes it simple to mock
type getKubernetesDynamicClientFunc func(*rest.Config) (dynamic.Interface, error)
// getKubernetesClientsetFunc provides the abstraction to get
// kubernetes clientset
//
// NOTE:
// typed function makes it simple to mock
type getKubernetesClientsetFunc func(*rest.Config) (*kubernetes.Clientset, error)
// Client provides common kuberenetes client operations
type Client struct {
IsInCluster bool // flag to let client point to its own cluster
KubeConfigPath string // kubeconfig path to get kubernetes clientset
// Below functions are useful during mock
// handle to get in cluster config
getInClusterConfig getInClusterConfigFunc
// handle to get desired kubernetes config
buildConfigFromFlags buildConfigFromFlagsFunc
// handle to get kubernetes clienset
getKubernetesClientset getKubernetesClientsetFunc
// handle to get dynamic kubernetes clientset
getKubernetesDynamicClient getKubernetesDynamicClientFunc
// handle to get kubernetes master IP
getKubeMasterIP getKubeMasterIPFunc
// handle to get kubernetes config path
getKubeConfigPath getKubeConfigPathFunc
}
// OptionFunc is a typed function that abstracts any kind of operation
// against the provided client instance
//
// This is the basic building block to create functional operations
// against the client instance
type OptionFunc func(*Client)
// New returns a new instance of client
func New(opts ...OptionFunc) *Client {
c := &Client{}
for _, o := range opts {
o(c)
}
withDefaults(c)
return c
}
func withDefaults(c *Client) {
if c.getInClusterConfig == nil {
c.getInClusterConfig = rest.InClusterConfig
}
if c.buildConfigFromFlags == nil {
c.buildConfigFromFlags = clientcmd.BuildConfigFromFlags
}
if c.getKubernetesClientset == nil {
c.getKubernetesClientset = kubernetes.NewForConfig
}
if c.getKubernetesDynamicClient == nil {
c.getKubernetesDynamicClient = dynamic.NewForConfig
}
if c.getKubeMasterIP == nil {
c.getKubeMasterIP = env.Get
}
if c.getKubeConfigPath == nil {
c.getKubeConfigPath = env.Get
}
}
// InCluster enables IsInCluster flag
func InCluster() OptionFunc {
return func(c *Client) {
c.IsInCluster = true
}
}
// WithKubeConfigPath sets kubeconfig path
// against this client instance
func WithKubeConfigPath(kubeConfigPath string) OptionFunc {
return func(c *Client) {
c.KubeConfigPath = kubeConfigPath
}
}
// Clientset returns a new instance of kubernetes clientset
func (c *Client) Clientset() (*kubernetes.Clientset, error) {
config, err := c.GetConfigForPathOrDirect()
if err != nil {
return nil, errors.Wrapf(err,
"failed to get kubernetes clientset: failed to get kubernetes config: IsInCluster {%t}: KubeConfigPath {%s}",
c.IsInCluster,
c.KubeConfigPath,
)
}
return c.getKubernetesClientset(config)
}
// Config returns the kubernetes config instance based on available criteria
func (c *Client) Config() (config *rest.Config, err error) {
// IsInCluster flag holds the top most priority
if c.IsInCluster {
return c.getInClusterConfig()
}
// ENV holds second priority
if strings.TrimSpace(c.getKubeMasterIP(K8sMasterIPEnvironmentKey)) != "" ||
strings.TrimSpace(c.getKubeConfigPath(KubeConfigEnvironmentKey)) != "" {
return c.getConfigFromENV()
}
// Defaults to InClusterConfig
return c.getInClusterConfig()
}
// ConfigForPath returns the kuberentes config instance based on KubeConfig path
func (c *Client) ConfigForPath(kubeConfigPath string) (config *rest.Config, err error) {
return c.buildConfigFromFlags("", kubeConfigPath)
}
func (c *Client) GetConfigForPathOrDirect() (config *rest.Config, err error) {
if c.KubeConfigPath != "" {
return c.ConfigForPath(c.KubeConfigPath)
}
return c.Config()
}
func (c *Client) getConfigFromENV() (config *rest.Config, err error) {
k8sMaster := c.getKubeMasterIP(K8sMasterIPEnvironmentKey)
kubeConfig := c.getKubeConfigPath(KubeConfigEnvironmentKey)
if strings.TrimSpace(k8sMaster) == "" &&
strings.TrimSpace(kubeConfig) == "" {
return nil, errors.Errorf(
"failed to get kubernetes config: missing ENV: atleast one should be set: {%s} or {%s}",
K8sMasterIPEnvironmentKey,
KubeConfigEnvironmentKey,
)
}
return c.buildConfigFromFlags(k8sMaster, kubeConfig)
}
// Dynamic returns a kubernetes dynamic client capable of invoking operations
// against kubernetes resources
func (c *Client) Dynamic() (dynamic.Interface, error) {
config, err := c.GetConfigForPathOrDirect()
if err != nil {
return nil, errors.Wrap(err, "failed to get dynamic client")
}
return c.getKubernetesDynamicClient(config)
}

View file

@ -0,0 +1,310 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"testing"
"github.com/pkg/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func fakeGetClientsetOk(c *rest.Config) (*kubernetes.Clientset, error) {
return &kubernetes.Clientset{}, nil
}
func fakeGetClientsetErr(c *rest.Config) (*kubernetes.Clientset, error) {
return nil, errors.New("fake error")
}
func fakeInClusterConfigOk() (*rest.Config, error) {
return &rest.Config{}, nil
}
func fakeInClusterConfigErr() (*rest.Config, error) {
return nil, errors.New("fake error")
}
func fakeBuildConfigFromFlagsOk(kubemaster string, kubeconfig string) (*rest.Config, error) {
return &rest.Config{}, nil
}
func fakeBuildConfigFromFlagsErr(kubemaster string, kubeconfig string) (*rest.Config, error) {
return nil, errors.New("fake error")
}
func fakeGetKubeConfigPathOk(e string) string {
return "fake"
}
func fakeGetKubeConfigPathNil(e string) string {
return ""
}
func fakeGetKubeMasterIPOk(e string) string {
return "fake"
}
func fakeGetKubeMasterIPNil(e string) string {
return ""
}
func fakeGetDynamicClientSetOk(c *rest.Config) (dynamic.Interface, error) {
return dynamic.NewForConfig(c)
}
func fakeGetDynamicClientSetNil(c *rest.Config) (dynamic.Interface, error) {
return nil, nil
}
func fakeGetDynamicClientSetErr(c *rest.Config) (dynamic.Interface, error) {
return nil, errors.New("fake error")
}
func TestNewInCluster(t *testing.T) {
c := New(InCluster())
if !c.IsInCluster {
t.Fatalf("test failed: expected IsInCluster as 'true' actual '%t'", c.IsInCluster)
}
}
func TestConfig(t *testing.T) {
tests := map[string]struct {
isInCluster bool
kubeConfigPath string
getInClusterConfig getInClusterConfigFunc
getKubeMasterIP getKubeMasterIPFunc
getKubeConfigPath getKubeConfigPathFunc
getConfigFromENV buildConfigFromFlagsFunc
isErr bool
}{
"t1": {true, "", fakeInClusterConfigOk, nil, nil, nil, false},
"t2": {true, "", fakeInClusterConfigErr, nil, nil, nil, true},
"t3": {false, "", fakeInClusterConfigErr, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathNil, nil, true},
"t4": {false, "", fakeInClusterConfigOk, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathNil, nil, false},
"t5": {false, "fakeKubeConfigPath", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathNil, fakeBuildConfigFromFlagsOk, false},
"t6": {false, "", nil, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, false},
"t7": {false, "", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, false},
"t8": {false, "fakeKubeConfigPath", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsErr, true},
"t9": {false, "fakeKubeConfigpath", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, false},
}
for name, mock := range tests {
name, mock := name, mock // pin It
t.Run(name, func(t *testing.T) {
c := &Client{
IsInCluster: mock.isInCluster,
KubeConfigPath: mock.kubeConfigPath,
getInClusterConfig: mock.getInClusterConfig,
getKubeMasterIP: mock.getKubeMasterIP,
getKubeConfigPath: mock.getKubeConfigPath,
buildConfigFromFlags: mock.getConfigFromENV,
}
_, err := c.Config()
if mock.isErr && err == nil {
t.Fatalf("test '%s' failed: expected no error actual '%s'", name, err)
}
})
}
}
func TestGetConfigFromENV(t *testing.T) {
tests := map[string]struct {
getKubeMasterIP getKubeMasterIPFunc
getKubeConfigPath getKubeConfigPathFunc
getConfigFromENV buildConfigFromFlagsFunc
isErr bool
}{
"t1": {fakeGetKubeMasterIPNil, fakeGetKubeConfigPathNil, nil, true},
"t2": {fakeGetKubeMasterIPNil, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, false},
"t3": {fakeGetKubeMasterIPOk, fakeGetKubeConfigPathNil, fakeBuildConfigFromFlagsOk, false},
"t4": {fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, false},
"t5": {fakeGetKubeMasterIPNil, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsErr, true},
"t6": {fakeGetKubeMasterIPOk, fakeGetKubeConfigPathNil, fakeBuildConfigFromFlagsErr, true},
"t7": {fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsErr, true},
}
for name, mock := range tests {
name, mock := name, mock // pin It
t.Run(name, func(t *testing.T) {
c := &Client{
getKubeMasterIP: mock.getKubeMasterIP,
getKubeConfigPath: mock.getKubeConfigPath,
buildConfigFromFlags: mock.getConfigFromENV,
}
_, err := c.getConfigFromENV()
if mock.isErr && err == nil {
t.Fatalf("test '%s' failed: expected error actual no error", name)
}
if !mock.isErr && err != nil {
t.Fatalf("test '%s' failed: expected no error actual '%s'", name, err)
}
})
}
}
func TestGetConfigFromPathOrDirect(t *testing.T) {
tests := map[string]struct {
kubeConfigPath string
getConfigFromFlags buildConfigFromFlagsFunc
getInClusterConfig getInClusterConfigFunc
isErr bool
}{
"T1": {"", fakeBuildConfigFromFlagsErr, fakeInClusterConfigOk, false},
"T2": {"fake-path", fakeBuildConfigFromFlagsOk, fakeInClusterConfigErr, false},
"T3": {"fake-path", fakeBuildConfigFromFlagsErr, fakeInClusterConfigOk, true},
"T4": {"", fakeBuildConfigFromFlagsOk, fakeInClusterConfigErr, true},
"T5": {"fake-path", fakeBuildConfigFromFlagsErr, fakeInClusterConfigErr, true},
}
for name, mock := range tests {
name, mock := name, mock // pin It
t.Run(name, func(t *testing.T) {
c := &Client{
KubeConfigPath: mock.kubeConfigPath,
buildConfigFromFlags: mock.getConfigFromFlags,
getInClusterConfig: mock.getInClusterConfig,
getKubeMasterIP: fakeGetKubeMasterIPNil,
getKubeConfigPath: fakeGetKubeConfigPathNil,
}
_, err := c.GetConfigForPathOrDirect()
if mock.isErr && err == nil {
t.Fatalf("test '%s' failed: expected error actual no error", name)
}
if !mock.isErr && err != nil {
t.Fatalf("test '%s' failed: expected no error actual '%s'", name, err)
}
})
}
}
func TestClientset(t *testing.T) {
tests := map[string]struct {
isInCluster bool
kubeConfigPath string
getInClusterConfig getInClusterConfigFunc
getKubeMasterIP getKubeMasterIPFunc
getKubeConfigPath getKubeConfigPathFunc
getConfigFromENV buildConfigFromFlagsFunc
getKubernetesClientset getKubernetesClientsetFunc
isErr bool
}{
"t10": {true, "", fakeInClusterConfigOk, nil, nil, nil, fakeGetClientsetOk, false},
"t11": {true, "", fakeInClusterConfigOk, nil, nil, nil, fakeGetClientsetErr, true},
"t12": {true, "", fakeInClusterConfigErr, nil, nil, nil, fakeGetClientsetOk, true},
"t21": {false, "", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathNil, fakeBuildConfigFromFlagsOk, fakeGetClientsetOk, false},
"t22": {false, "", nil, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, fakeGetClientsetOk, false},
"t23": {false, "", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, fakeGetClientsetOk, false},
"t24": {false, "fake-path", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsErr, fakeGetClientsetOk, true},
"t25": {false, "", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, fakeGetClientsetErr, true},
"t26": {false, "fakePath", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsErr, fakeGetClientsetOk, true},
"t30": {false, "", fakeInClusterConfigOk, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathNil, nil, fakeGetClientsetOk, false},
"t31": {false, "", fakeInClusterConfigOk, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathNil, nil, fakeGetClientsetErr, true},
"t32": {false, "", fakeInClusterConfigErr, fakeGetKubeMasterIPNil, fakeGetKubeConfigPathNil, nil, nil, true},
"t33": {false, "fakePath", nil, fakeGetKubeMasterIPOk, fakeGetKubeConfigPathOk, fakeBuildConfigFromFlagsOk, fakeGetClientsetOk, false},
}
for name, mock := range tests {
name, mock := name, mock // pin It
t.Run(name, func(t *testing.T) {
c := &Client{
IsInCluster: mock.isInCluster,
KubeConfigPath: mock.kubeConfigPath,
getInClusterConfig: mock.getInClusterConfig,
getKubeMasterIP: mock.getKubeMasterIP,
getKubeConfigPath: mock.getKubeConfigPath,
buildConfigFromFlags: mock.getConfigFromENV,
getKubernetesClientset: mock.getKubernetesClientset,
}
_, err := c.Clientset()
if mock.isErr && err == nil {
t.Fatalf("test '%s' failed: expected error actual no error", name)
}
if !mock.isErr && err != nil {
t.Fatalf("test '%s' failed: expected no error actual '%s'", name, err)
}
})
}
}
func TestDynamic(t *testing.T) {
tests := map[string]struct {
getKubeMasterIP getKubeMasterIPFunc
getInClusterConfig getInClusterConfigFunc
getKubernetesDynamicClientSet getKubernetesDynamicClientFunc
kubeConfigPath string
getConfigFromENV buildConfigFromFlagsFunc
getKubeConfigPath getKubeConfigPathFunc
isErr bool
}{
"t1": {fakeGetKubeMasterIPNil, fakeInClusterConfigErr, fakeGetDynamicClientSetOk, "fake-path", fakeBuildConfigFromFlagsOk, fakeGetKubeConfigPathNil, false},
"t2": {fakeGetKubeMasterIPNil, fakeInClusterConfigErr, fakeGetDynamicClientSetErr, "fake-path", fakeBuildConfigFromFlagsOk, fakeGetKubeConfigPathOk, true},
"t3": {fakeGetKubeMasterIPNil, fakeInClusterConfigErr, fakeGetDynamicClientSetOk, "fake-path", fakeBuildConfigFromFlagsErr, fakeGetKubeConfigPathOk, true},
"t4": {fakeGetKubeMasterIPOk, fakeInClusterConfigOk, fakeGetDynamicClientSetOk, "", fakeBuildConfigFromFlagsOk, fakeGetKubeConfigPathOk, false},
"t5": {fakeGetKubeMasterIPOk, fakeInClusterConfigErr, fakeGetDynamicClientSetErr, "", fakeBuildConfigFromFlagsOk, fakeGetKubeConfigPathOk, true},
"t6": {fakeGetKubeMasterIPNil, fakeInClusterConfigOk, fakeGetDynamicClientSetErr, "", fakeBuildConfigFromFlagsErr, fakeGetKubeConfigPathNil, true},
"t7": {fakeGetKubeMasterIPNil, fakeInClusterConfigErr, fakeGetDynamicClientSetOk, "", fakeBuildConfigFromFlagsErr, fakeGetKubeConfigPathNil, true},
"t8": {fakeGetKubeMasterIPNil, fakeInClusterConfigErr, fakeGetDynamicClientSetErr, "", fakeBuildConfigFromFlagsErr, fakeGetKubeConfigPathNil, true},
}
for name, mock := range tests {
name, mock := name, mock // pin It
t.Run(name, func(t *testing.T) {
c := &Client{
getKubeMasterIP: mock.getKubeMasterIP,
KubeConfigPath: mock.kubeConfigPath,
getInClusterConfig: mock.getInClusterConfig,
buildConfigFromFlags: mock.getConfigFromENV,
getKubeConfigPath: mock.getKubeConfigPath,
getKubernetesDynamicClient: mock.getKubernetesDynamicClientSet,
}
_, err := c.Dynamic()
if mock.isErr && err == nil {
t.Fatalf("test '%s' failed: expected error actual no error", name)
}
if !mock.isErr && err != nil {
t.Fatalf("test '%s' failed: expected no error but got '%v'", name, err)
}
})
}
}
func TestConfigForPath(t *testing.T) {
tests := map[string]struct {
kubeConfigPath string
getConfigFromPath buildConfigFromFlagsFunc
isErr bool
}{
"T1": {"", fakeBuildConfigFromFlagsErr, true},
"T2": {"fake-path", fakeBuildConfigFromFlagsOk, false},
}
for name, mock := range tests {
name, mock := name, mock // pin It
t.Run(name, func(t *testing.T) {
c := &Client{
KubeConfigPath: mock.kubeConfigPath,
buildConfigFromFlags: mock.getConfigFromPath,
}
_, err := c.ConfigForPath(mock.kubeConfigPath)
if mock.isErr && err == nil {
t.Fatalf("test '%s' failed: expected error actual no error", name)
}
if !mock.isErr && err != nil {
t.Fatalf("test '%s' failed: expected no error but got '%v'", name, err)
}
})
}
}

51
pkg/config/config.go Normal file
View file

@ -0,0 +1,51 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// Config struct fills the parameters of request or user input
type Config struct {
// DriverName to be registered at CSI
DriverName string
// PluginType flags if the driver is
// it is a node plugin or controller
// plugin
PluginType string
// Version of the CSI controller/node driver
Version string
// Endpoint on which requests are made by kubelet
// or external provisioner
//
// NOTE:
// - Controller/node plugin will listen on this
// - This will be a unix based socket
Endpoint string
// NodeID helps in differentiating the nodes on
// which node drivers are running. This is useful
// in case of topologies and publishing or
// unpublishing volumes on nodes
NodeID string
}
// Default returns a new instance of config
// required to initialize a driver instance
func Default() *Config {
return &Config{}
}

297
pkg/driver/agent.go Normal file
View file

@ -0,0 +1,297 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"github.com/Sirupsen/logrus"
"github.com/container-storage-interface/spec/lib/go/csi"
ctrl "github.com/openebs/zfs-localpv/cmd/controller"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/builder"
zvol "github.com/openebs/zfs-localpv/pkg/zfs"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sync"
)
// node is the server implementation
// for CSI NodeServer
type node struct {
driver *CSIDriver
}
// NewNode returns a new instance
// of CSI NodeServer
func NewNode(d *CSIDriver) csi.NodeServer {
var ControllerMutex = sync.RWMutex{}
// start the zfsvolume watcher
go func() {
err := ctrl.Start(&ControllerMutex)
if err != nil {
logrus.Errorf("Failed to start cstorvolume claim controller: %s", err.Error())
}
}()
return &node{
driver: d,
}
}
func GetVolAndMountInfo(
req *csi.NodePublishVolumeRequest,
) (*apis.ZFSVolume, *apis.MountInfo, error) {
var mountinfo apis.MountInfo
mountinfo.FSType = req.GetVolumeCapability().GetMount().GetFsType()
mountinfo.MountPath = req.GetTargetPath()
mountinfo.ReadOnly = req.GetReadonly()
mountinfo.MountOptions = append(mountinfo.MountOptions, req.GetVolumeCapability().GetMount().GetMountFlags()...)
getOptions := metav1.GetOptions{}
vol, err := builder.NewKubeclient().
WithNamespace(zvol.OpenEBSNamespace).
Get(req.GetVolumeId(), getOptions)
if err != nil {
return nil, nil, err
}
return vol, &mountinfo, nil
}
// NodePublishVolume publishes (mounts) the volume
// at the corresponding node at a given path
//
// This implements csi.NodeServer
func (ns *node) NodePublishVolume(
ctx context.Context,
req *csi.NodePublishVolumeRequest,
) (*csi.NodePublishVolumeResponse, error) {
var (
err error
)
if err = ns.validateNodePublishReq(req); err != nil {
return nil, err
}
vol, mountInfo, err := GetVolAndMountInfo(req)
if err != nil {
goto PublishVolumeResponse
}
// Create the zfs volume and attempt mount operation on the requested path
if err = zvol.CreateAndMountZvol(vol, mountInfo); err != nil {
goto PublishVolumeResponse
}
PublishVolumeResponse:
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodePublishVolumeResponse{}, nil
}
// NodeUnpublishVolume unpublishes (unmounts) the volume
// from the corresponding node from the given path
//
// This implements csi.NodeServer
func (ns *node) NodeUnpublishVolume(
ctx context.Context,
req *csi.NodeUnpublishVolumeRequest,
) (*csi.NodeUnpublishVolumeResponse, error) {
var (
err error
vol *apis.ZFSVolume
currentMounts []string
)
if err = ns.validateNodeUnpublishReq(req); err != nil {
return nil, err
}
targetPath := req.GetTargetPath()
volumeID := req.GetVolumeId()
getOptions := metav1.GetOptions{}
vol, err = builder.NewKubeclient().
WithNamespace(zvol.OpenEBSNamespace).
Get(volumeID, getOptions)
if err != nil {
return nil, err
}
zfsvolume := vol.Spec.PoolName + "/" + vol.Name
devpath := zvol.ZFS_DEVPATH + zfsvolume
currentMounts, err = zvol.GetMounts(devpath)
if err != nil {
return nil, err
} else if len(currentMounts) == 0 {
goto NodeUnpublishResponse
} else if len(currentMounts) == 1 {
if currentMounts[0] != targetPath {
return nil, status.Error(codes.Internal, "device not mounted at right path")
}
} else {
logrus.Errorf(
"can not unmount, more than one mounts for volume:%s path %s mounts: %v",
volumeID, targetPath, currentMounts,
)
return nil, status.Error(codes.Internal, "device not mounted at rightpath")
}
if vol, err = zvol.GetZFSVolume(volumeID); (err != nil) || (vol == nil) {
goto NodeUnpublishResponse
}
if err = zvol.UmountVolume(vol, req.GetTargetPath()); err != nil {
goto NodeUnpublishResponse
}
NodeUnpublishResponse:
logrus.Infof("hostpath: volume %s path: %s has been unmounted.",
volumeID, targetPath)
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// NodeGetInfo returns node details
//
// This implements csi.NodeServer
func (ns *node) NodeGetInfo(
ctx context.Context,
req *csi.NodeGetInfoRequest,
) (*csi.NodeGetInfoResponse, error) {
return &csi.NodeGetInfoResponse{
NodeId: ns.driver.config.NodeID,
}, nil
}
// NodeGetCapabilities returns capabilities supported
// by this node service
//
// This implements csi.NodeServer
func (ns *node) NodeGetCapabilities(
ctx context.Context,
req *csi.NodeGetCapabilitiesRequest,
) (*csi.NodeGetCapabilitiesResponse, error) {
return &csi.NodeGetCapabilitiesResponse{
Capabilities: []*csi.NodeServiceCapability{
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_UNKNOWN,
},
},
},
},
}, nil
}
// TODO
// This needs to be implemented
//
// NodeStageVolume mounts the volume on the staging
// path
//
// This implements csi.NodeServer
func (ns *node) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
) (*csi.NodeStageVolumeResponse, error) {
return &csi.NodeStageVolumeResponse{}, nil
}
// NodeUnstageVolume unmounts the volume from
// the staging path
//
// This implements csi.NodeServer
func (ns *node) NodeUnstageVolume(
ctx context.Context,
req *csi.NodeUnstageVolumeRequest,
) (*csi.NodeUnstageVolumeResponse, error) {
return &csi.NodeUnstageVolumeResponse{}, nil
}
// TODO
// Verify if this needs to be implemented
//
// NodeExpandVolume resizes the filesystem if required
//
// If ControllerExpandVolumeResponse returns true in
// node_expansion_required then FileSystemResizePending
// condition will be added to PVC and NodeExpandVolume
// operation will be queued on kubelet
//
// This implements csi.NodeServer
func (ns *node) NodeExpandVolume(
ctx context.Context,
req *csi.NodeExpandVolumeRequest,
) (*csi.NodeExpandVolumeResponse, error) {
return nil, nil
}
// NodeGetVolumeStats returns statistics for the
// given volume
//
// This implements csi.NodeServer
func (ns *node) NodeGetVolumeStats(
ctx context.Context,
in *csi.NodeGetVolumeStatsRequest,
) (*csi.NodeGetVolumeStatsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (ns *node) validateNodePublishReq(
req *csi.NodePublishVolumeRequest,
) error {
if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument,
"Volume capability missing in request")
}
if len(req.GetVolumeId()) == 0 {
return status.Error(codes.InvalidArgument,
"Volume ID missing in request")
}
return nil
}
func (ns *node) validateNodeUnpublishReq(
req *csi.NodeUnpublishVolumeRequest,
) error {
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument,
"Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument,
"Target path missing in request")
}
return nil
}

375
pkg/driver/controller.go Normal file
View file

@ -0,0 +1,375 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"fmt"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/openebs/zfs-localpv/pkg/builder"
errors "github.com/openebs/zfs-localpv/pkg/common/errors"
csipayload "github.com/openebs/zfs-localpv/pkg/response"
zvol "github.com/openebs/zfs-localpv/pkg/zfs"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// controller is the server implementation
// for CSI Controller
type controller struct {
driver *CSIDriver
capabilities []*csi.ControllerServiceCapability
}
// NewController returns a new instance
// of CSI controller
func NewController(d *CSIDriver) csi.ControllerServer {
return &controller{
driver: d,
capabilities: newControllerCapabilities(),
}
}
// SupportedVolumeCapabilityAccessModes contains the list of supported access
// modes for the volume
var SupportedVolumeCapabilityAccessModes = []*csi.VolumeCapability_AccessMode{
&csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
},
}
// CreateVolume provisions a volume
func (cs *controller) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) {
logrus.Infof("received request to create volume {%s} vol{%v}", req.GetName(), req)
var err error
if err = cs.validateVolumeCreateReq(req); err != nil {
return nil, err
}
volName := req.GetName()
size := req.GetCapacityRange().RequiredBytes
bs := req.GetParameters()["blocksize"]
compression := req.GetParameters()["compression"]
dedup := req.GetParameters()["dedup"]
pool := req.GetParameters()["poolname"]
tp := req.GetParameters()["thinprovision"]
volObj, err := builder.NewBuilder().
WithName(volName).
WithCapacity(strconv.FormatInt(int64(size), 10)).
WithBlockSize(bs).
WithPoolName(pool).
WithDedup(dedup).
WithThinProv(tp).
WithCompression(compression).Build()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = zvol.ProvisionVolume(size, volObj)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return csipayload.NewCreateVolumeResponseBuilder().
WithName(volName).
WithCapacity(size).
Build(), nil
}
// DeleteVolume deletes the specified volume
func (cs *controller) DeleteVolume(
ctx context.Context,
req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
logrus.Infof("received request to delete volume {%s}", req.VolumeId)
var (
err error
)
if err = cs.validateDeleteVolumeReq(req); err != nil {
return nil, err
}
volumeID := req.GetVolumeId()
// verify if the volume has already been deleted
vol, err := zvol.GetVolume(volumeID)
if vol != nil && vol.DeletionTimestamp != nil {
goto deleteResponse
}
// Delete the corresponding ZV CR
err = zvol.DeleteVolume(volumeID)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to handle delete volume request for {%s}",
volumeID,
)
}
deleteResponse:
return csipayload.NewDeleteVolumeResponseBuilder().Build(), nil
}
// TODO Implementation will be taken up later
// ValidateVolumeCapabilities validates the capabilities
// required to create a new volume
// This implements csi.ControllerServer
func (cs *controller) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest,
) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerGetCapabilities fetches controller capabilities
//
// This implements csi.ControllerServer
func (cs *controller) ControllerGetCapabilities(
ctx context.Context,
req *csi.ControllerGetCapabilitiesRequest,
) (*csi.ControllerGetCapabilitiesResponse, error) {
resp := &csi.ControllerGetCapabilitiesResponse{
Capabilities: cs.capabilities,
}
return resp, nil
}
// ControllerExpandVolume resizes previously provisioned volume
//
// This implements csi.ControllerServer
func (cs *controller) ControllerExpandVolume(
ctx context.Context,
req *csi.ControllerExpandVolumeRequest,
) (*csi.ControllerExpandVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// CreateSnapshot creates a snapshot for given volume
//
// This implements csi.ControllerServer
func (cs *controller) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// DeleteSnapshot deletes given snapshot
//
// This implements csi.ControllerServer
func (cs *controller) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ListSnapshots lists all snapshots for the
// given volume
//
// This implements csi.ControllerServer
func (cs *controller) ListSnapshots(
ctx context.Context,
req *csi.ListSnapshotsRequest,
) (*csi.ListSnapshotsResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerUnpublishVolume removes a previously
// attached volume from the given node
//
// This implements csi.ControllerServer
func (cs *controller) ControllerUnpublishVolume(
ctx context.Context,
req *csi.ControllerUnpublishVolumeRequest,
) (*csi.ControllerUnpublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ControllerPublishVolume attaches given volume
// at the specified node
//
// This implements csi.ControllerServer
func (cs *controller) ControllerPublishVolume(
ctx context.Context,
req *csi.ControllerPublishVolumeRequest,
) (*csi.ControllerPublishVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// GetCapacity return the capacity of the
// given volume
//
// This implements csi.ControllerServer
func (cs *controller) GetCapacity(
ctx context.Context,
req *csi.GetCapacityRequest,
) (*csi.GetCapacityResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// ListVolumes lists all the volumes
//
// This implements csi.ControllerServer
func (cs *controller) ListVolumes(
ctx context.Context,
req *csi.ListVolumesRequest,
) (*csi.ListVolumesResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
// validateCapabilities validates if provided capabilities
// are supported by this driver
func validateCapabilities(caps []*csi.VolumeCapability) bool {
for _, cap := range caps {
if !IsSupportedVolumeCapabilityAccessMode(cap.AccessMode.Mode) {
return false
}
}
return true
}
func (cs *controller) validateDeleteVolumeReq(req *csi.DeleteVolumeRequest) error {
volumeID := req.GetVolumeId()
if volumeID == "" {
return status.Error(
codes.InvalidArgument,
"failed to handle delete volume request: missing volume id",
)
}
err := cs.validateRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
)
if err != nil {
return errors.Wrapf(
err,
"failed to handle delete volume request for {%s} : validation failed",
volumeID,
)
}
return nil
}
// IsSupportedVolumeCapabilityAccessMode valides the requested access mode
func IsSupportedVolumeCapabilityAccessMode(
accessMode csi.VolumeCapability_AccessMode_Mode,
) bool {
for _, access := range SupportedVolumeCapabilityAccessModes {
if accessMode == access.Mode {
return true
}
}
return false
}
// newControllerCapabilities returns a list
// of this controller's capabilities
func newControllerCapabilities() []*csi.ControllerServiceCapability {
fromType := func(
cap csi.ControllerServiceCapability_RPC_Type,
) *csi.ControllerServiceCapability {
return &csi.ControllerServiceCapability{
Type: &csi.ControllerServiceCapability_Rpc{
Rpc: &csi.ControllerServiceCapability_RPC{
Type: cap,
},
},
}
}
var capabilities []*csi.ControllerServiceCapability
for _, cap := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
} {
capabilities = append(capabilities, fromType(cap))
}
return capabilities
}
// validateRequest validates if the requested service is
// supported by the driver
func (cs *controller) validateRequest(
c csi.ControllerServiceCapability_RPC_Type,
) error {
for _, cap := range cs.capabilities {
if c == cap.GetRpc().GetType() {
return nil
}
}
return status.Error(
codes.InvalidArgument,
fmt.Sprintf("failed to validate request: {%s} is not supported", c),
)
}
func (cs *controller) validateVolumeCreateReq(req *csi.CreateVolumeRequest) error {
err := cs.validateRequest(
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
)
if err != nil {
return errors.Wrapf(
err,
"failed to handle create volume request for {%s}",
req.GetName(),
)
}
if req.GetName() == "" {
return status.Error(
codes.InvalidArgument,
"failed to handle create volume request: missing volume name",
)
}
volCapabilities := req.GetVolumeCapabilities()
if volCapabilities == nil {
return status.Error(
codes.InvalidArgument,
"failed to handle create volume request: missing volume capabilities",
)
}
return nil
}

104
pkg/driver/driver.go Normal file
View file

@ -0,0 +1,104 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"github.com/Sirupsen/logrus"
"github.com/container-storage-interface/spec/lib/go/csi"
config "github.com/openebs/zfs-localpv/pkg/config"
)
// volume can only be published once as
// read/write on a single node, at any
// given time
var supportedAccessMode = &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}
// TODO check if this can be renamed to Base
//
// CSIDriver defines a common data structure
// for drivers
type CSIDriver struct {
// TODO change the field names to make it
// readable
config *config.Config
ids csi.IdentityServer
ns csi.NodeServer
cs csi.ControllerServer
cap []*csi.VolumeCapability_AccessMode
}
// GetVolumeCapabilityAccessModes fetches the access
// modes on which the volume can be exposed
func GetVolumeCapabilityAccessModes() []*csi.VolumeCapability_AccessMode {
supported := []csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
}
var vcams []*csi.VolumeCapability_AccessMode
for _, vcam := range supported {
logrus.Infof("enabling volume access mode: %s", vcam.String())
vcams = append(vcams, newVolumeCapabilityAccessMode(vcam))
}
return vcams
}
func newVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *csi.VolumeCapability_AccessMode {
return &csi.VolumeCapability_AccessMode{Mode: mode}
}
// New returns a new driver instance
func New(config *config.Config) *CSIDriver {
driver := &CSIDriver{
config: config,
cap: GetVolumeCapabilityAccessModes(),
}
switch config.PluginType {
case "controller":
driver.cs = NewController(driver)
case "agent":
// Start monitor goroutine to monitor the
// ZfsVolume CR. If there is any event
// related to the volume like destroy or
// property change, handle it accordingly.
driver.ns = NewNode(driver)
}
// Identity server is common to both node and
// controller, it is required to register,
// share capabilities and probe the corresponding
// driver
driver.ids = NewIdentity(driver)
return driver
}
// Run starts the CSI plugin by communicating
// over the given endpoint
func (d *CSIDriver) Run() error {
// Initialize and start listening on grpc server
s := NewNonBlockingGRPCServer(d.config.Endpoint, d.ids, d.cs, d.ns)
s.Start()
s.Wait()
return nil
}

170
pkg/driver/grpc.go Normal file
View file

@ -0,0 +1,170 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"fmt"
"net"
"os"
"strings"
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/Sirupsen/logrus"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// parseEndpoint should have a valid prefix(unix/tcp) to return a valid endpoint parts
func parseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
}
// logGRPC logs all the grpc related errors, i.e the final errors
// which are returned to the grpc clients
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
logrus.Infof("GRPC call: %s", info.FullMethod)
logrus.Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
resp, err := handler(ctx, req)
if err != nil {
logrus.Errorf("GRPC error: %v", err)
} else {
logrus.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
return resp, err
}
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start()
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
// NewNonBlockingGRPCServer returns a new instance of NonBlockingGRPCServer
func NewNonBlockingGRPCServer(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{
endpoint: ep,
idnty_server: ids,
ctrl_server: cs,
agent_server: ns}
}
// NonBlocking server
// dont block the execution for a task to complete.
// use wait group to wait for all the tasks dispatched.
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
endpoint string
idnty_server csi.IdentityServer
ctrl_server csi.ControllerServer
agent_server csi.NodeServer
}
// Start grpc server for serving CSI endpoints
func (s *nonBlockingGRPCServer) Start() {
s.wg.Add(1)
go s.serve(s.endpoint, s.idnty_server, s.ctrl_server, s.agent_server)
return
}
// Wait for the service to stop
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
// Stop the service forcefully
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
// ForceStop the service
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
// serve starts serving requests at the provided endpoint based on the type of
// plugin. In this function all the csi related interfaces are provided by
// container-storage-interface
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
logrus.Fatal(err.Error())
}
// Clear off the addr if it is already present, this is done to remove stale
// entries, as this path is shared with the OS and will be the same
// everytime the plugin restarts, its possible that the last instance leaves
// a stale entry
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
logrus.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
logrus.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
// Create a new grpc server, all the request from csi client to
// create/delete/... will hit this server
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
logrus.Infof("Listening for connections on address: %#v", listener.Addr())
// Start serving requests on the grpc server created
server.Serve(listener)
}

112
pkg/driver/identity.go Normal file
View file

@ -0,0 +1,112 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/openebs/zfs-localpv/pkg/version"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// identity is the server implementation
// for CSI IdentityServer
type identity struct {
driver *CSIDriver
}
// NewIdentity returns a new instance of CSI
// IdentityServer
func NewIdentity(d *CSIDriver) csi.IdentityServer {
return &identity{
driver: d,
}
}
// GetPluginInfo returns the version and name of
// this service
//
// This implements csi.IdentityServer
func (id *identity) GetPluginInfo(
ctx context.Context,
req *csi.GetPluginInfoRequest,
) (*csi.GetPluginInfoResponse, error) {
if id.driver.config.DriverName == "" {
return nil, status.Error(codes.Unavailable, "missing driver name")
}
if id.driver.config.Version == "" {
return nil, status.Error(codes.Unavailable, "missing driver version")
}
return &csi.GetPluginInfoResponse{
Name: id.driver.config.DriverName,
// TODO
// verify which version needs to be used:
// config.version or version.Current()
VendorVersion: version.Current(),
}, nil
}
// TODO
// Need to implement this
//
// Probe checks if the plugin is running or not
//
// This implements csi.IdentityServer
func (id *identity) Probe(
ctx context.Context,
req *csi.ProbeRequest,
) (*csi.ProbeResponse, error) {
return &csi.ProbeResponse{}, nil
}
// GetPluginCapabilities returns supported capabilities
// of this plugin
//
// Currently it reports whether this plugin can serve
// the Controller interface. Controller interface methods
// are called dependant on this
//
// This implements csi.IdentityServer
func (id *identity) GetPluginCapabilities(
ctx context.Context,
req *csi.GetPluginCapabilitiesRequest,
) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
},
}, nil
}

64
pkg/response/create.go Normal file
View file

@ -0,0 +1,64 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"github.com/container-storage-interface/spec/lib/go/csi"
)
// CreateVolumeResponseBuilder helps building an
// instance of csi CreateVolumeResponse
type CreateVolumeResponseBuilder struct {
response *csi.CreateVolumeResponse
}
// NewCreateVolumeResponseBuilder returns a new
// instance of CreateVolumeResponseBuilder
func NewCreateVolumeResponseBuilder() *CreateVolumeResponseBuilder {
return &CreateVolumeResponseBuilder{
response: &csi.CreateVolumeResponse{
Volume: &csi.Volume{},
},
}
}
// WithName sets the name against the
// CreateVolumeResponse instance
func (b *CreateVolumeResponseBuilder) WithName(name string) *CreateVolumeResponseBuilder {
b.response.Volume.VolumeId = name
return b
}
// WithName sets the capacity against the
// CreateVolumeResponse instance
func (b *CreateVolumeResponseBuilder) WithCapacity(capacity int64) *CreateVolumeResponseBuilder {
b.response.Volume.CapacityBytes = capacity
return b
}
// WithContext sets the context against the
// CreateVolumeResponse instance
func (b *CreateVolumeResponseBuilder) WithContext(ctx map[string]string) *CreateVolumeResponseBuilder {
b.response.Volume.VolumeContext = ctx
return b
}
// Build returns the constructed instance
// of csi CreateVolumeResponse
func (b *CreateVolumeResponseBuilder) Build() *csi.CreateVolumeResponse {
return b.response
}

41
pkg/response/delete.go Normal file
View file

@ -0,0 +1,41 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"github.com/container-storage-interface/spec/lib/go/csi"
)
// DeleteVolumeResponseBuilder helps building an
// instance of csi DeleteVolumeResponse
type DeleteVolumeResponseBuilder struct {
response *csi.DeleteVolumeResponse
}
// NewDeleteVolumeResponseBuilder returns a new
// instance of DeleteVolumeResponseBuilder
func NewDeleteVolumeResponseBuilder() *DeleteVolumeResponseBuilder {
return &DeleteVolumeResponseBuilder{
response: &csi.DeleteVolumeResponse{},
}
}
// Build returns the constructed instance
// of csi DeleteVolumeResponse
func (b *DeleteVolumeResponseBuilder) Build() *csi.DeleteVolumeResponse {
return b.response
}

115
pkg/version/version.go Normal file
View file

@ -0,0 +1,115 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package version
import (
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"github.com/Sirupsen/logrus"
)
var (
// GitCommit that was compiled; filled in by
// the compiler.
GitCommit string
// Version is the version of this repo; filled
// in by the compiler
Version string
// VersionMeta is a pre-release marker for the
// version. If this is "" (empty string) then
// it means that it is a final release. Otherwise,
// this is a pre-release such as "dev" (in
// development), "beta", "rc1", etc.
VersionMeta string
)
const (
versionFile string = "/src/github.com/openebs/zfs-localpv/VERSION"
buildMetaFile string = "/src/github.com/openebs/zfs-localpv/BUILDMETA"
)
// Current returns current version of csi driver
func Current() string {
return Get()
}
// Get returns current version from global
// Version variable. If Version is unset then
// from VERSION file at the root of this repo.
func Get() string {
if Version != "" {
return Version
}
path := filepath.Join(os.Getenv("GOPATH") + versionFile)
vBytes, err := ioutil.ReadFile(path)
if err != nil {
logrus.Errorf("failed to get version: %s", err.Error())
return ""
}
return strings.TrimSpace(string(vBytes))
}
// GetBuildMeta returns build type from
// global VersionMeta variable. If VersionMeta
// is unset then this is fetched from BUILDMETA
// file at the root of this repo.
func GetBuildMeta() string {
if VersionMeta != "" {
return "-" + VersionMeta
}
path := filepath.Join(os.Getenv("GOPATH") + buildMetaFile)
vBytes, err := ioutil.ReadFile(path)
if err != nil {
logrus.Errorf("failed to get build version: %s", err.Error())
return ""
}
return "-" + strings.TrimSpace(string(vBytes))
}
// GetGitCommit returns Git commit SHA-1 from
// global GitCommit variable. If GitCommit is
// unset this calls Git directly.
func GetGitCommit() string {
if GitCommit != "" {
return GitCommit
}
cmd := exec.Command("git", "rev-parse", "--verify", "HEAD")
output, err := cmd.Output()
if err != nil {
logrus.Errorf("failed to get git commit: %s", err.Error())
return ""
}
return strings.TrimSpace(string(output))
}
// Verbose returns version details with git
// commit info
func Verbose() string {
return strings.Join([]string{Get(), GetGitCommit()[0:7]}, "-")
}

145
pkg/zfs/mount.go Normal file
View file

@ -0,0 +1,145 @@
package zfs
import (
"fmt"
"os"
"path/filepath"
"github.com/Sirupsen/logrus"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/mount"
)
// FormatAndMountZvol formats and mounts the created volume to the desired mount path
func FormatAndMountZvol(devicePath string, mountInfo *apis.MountInfo) error {
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()}
err := mounter.FormatAndMount(devicePath, mountInfo.MountPath, mountInfo.FSType, mountInfo.MountOptions)
if err != nil {
logrus.Errorf(
"zfspv: failed to mount volume %s [%s] to %s, error %v",
devicePath, mountInfo.FSType, mountInfo.MountPath, err,
)
return err
}
logrus.Infof("created zvol %v and mounted %v fs %v", devicePath, mountInfo.MountPath, mountInfo.FSType)
return nil
}
// UmountVolume unmounts the volume and the corresponding mount path is removed
func UmountVolume(vol *apis.ZFSVolume, targetPath string,
) error {
mounter := &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()}
_, _, err := mount.GetDeviceNameFromMount(mounter, targetPath)
if err != nil {
logrus.Errorf(
"zfspv umount volume: failed to get device from mnt: %s\nError: %v",
targetPath, err,
)
return err
}
if pathExists, pathErr := mount.PathExists(targetPath); pathErr != nil {
return fmt.Errorf("Error checking if path exists: %v", pathErr)
} else if !pathExists {
logrus.Warningf(
"Warning: Unmount skipped because path does not exist: %v",
targetPath,
)
return nil
}
if err = mounter.Unmount(targetPath); err != nil {
logrus.Errorf(
"zfspv umount volume: failed to unmount: %s\nError: %v",
targetPath, err,
)
return err
}
if err := os.RemoveAll(targetPath); err != nil {
logrus.Errorf("zfspv: failed to remove mount path Error: %v", err)
return err
}
logrus.Infof("umount done path %v", targetPath)
return nil
}
// GetMounts gets mountpoints for the specified volume
func GetMounts(devicepath string) ([]string, error) {
var (
currentMounts []string
err error
mountList []mount.MountPoint
)
dev, err := filepath.EvalSymlinks(devicepath)
if err != nil {
return nil, err
}
mounter := mount.New("")
// Get list of mounted paths present with the node
if mountList, err = mounter.List(); err != nil {
return nil, err
}
for _, mntInfo := range mountList {
if mntInfo.Device == dev {
currentMounts = append(currentMounts, mntInfo.Path)
}
}
return currentMounts, nil
}
// CreateAndMountZvol creates the zfs Volume
// and mounts the disk to the specified path
func CreateAndMountZvol(vol *apis.ZFSVolume, mount *apis.MountInfo) error {
if len(mount.MountPath) == 0 {
return status.Error(codes.InvalidArgument, "mount path missing in request")
}
if len(vol.Spec.OwnerNodeID) > 0 &&
vol.Spec.OwnerNodeID != NodeID {
return status.Error(codes.Internal, "volume is owned by different node")
}
devicePath, err := createZvol(vol)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
err = UpdateZvolInfo(vol)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
/*
* This check is the famous *Wall Of North*
* It will not let the volume to be mounted
* at more than two places. The volume should
* be unmounted before proceeding to the mount
* operation.
*/
currentMounts, err := GetMounts(devicePath)
if err != nil {
return err
} else if len(currentMounts) >= 1 {
logrus.Errorf(
"can not mount, more than one mounts for volume:%s dev %s mounts: %v",
vol.Name, devicePath, currentMounts,
)
return status.Error(codes.Internal, "device already mounted")
}
err = FormatAndMountZvol(devicePath, mount)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
return err
}

138
pkg/zfs/volume.go Normal file
View file

@ -0,0 +1,138 @@
// Copyright © 2019 The OpenEBS Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package zfs
import (
"github.com/Sirupsen/logrus"
"os"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/builder"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const (
// OpenEBSNamespace is the environment variable to get openebs namespace
//
// This environment variable is set via kubernetes downward API
OpenEBSNamespaceKey string = "OPENEBS_NAMESPACE"
// ZFSFinalizer for the ZfsVolume CR
ZFSFinalizer string = "zfs.openebs.io/finalizer"
// ZFSNodeKey will be used to insert Label
// in ZfsVolume CR
ZFSNodeKey string = "kubernetes.io/nodename"
)
var (
// OpenEBSNamespace is openebs system namespace
OpenEBSNamespace string
// NodeID is the NodeID of the node on which the pod is present
NodeID string
)
func init() {
OpenEBSNamespace = os.Getenv(OpenEBSNamespaceKey)
if OpenEBSNamespace == "" {
logrus.Fatalf("OPENEBS_NAMESPACE environment variable not set")
}
NodeID = os.Getenv("OPENEBS_NODE_ID")
if NodeID == "" && os.Getenv("OPENEBS_NODE_DRIVER") != "" {
logrus.Fatalf("NodeID environment variable not set")
}
}
// ProvisionVolume creates a ZFSVolume(zv) CR,
// watcher for zvc is present in CSI agent
func ProvisionVolume(
size int64,
vol *apis.ZFSVolume,
) error {
_, err := builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(vol)
if err == nil {
logrus.Infof("provisioned volume %s", vol.Name)
}
return err
}
// GetVolume the corresponding ZFSVolume CR
func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
return builder.NewKubeclient().
WithNamespace(OpenEBSNamespace).
Get(volumeID, metav1.GetOptions{})
}
// DeleteVolume deletes the corresponding ZFSVol CR
func DeleteVolume(volumeID string) (err error) {
err = builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
if err == nil {
logrus.Infof("deprovisioned volume %s", volumeID)
}
return
}
// GetVolList fetches the current Published Volume list
func GetVolList(volumeID string) (*apis.ZFSVolumeList, error) {
listOptions := v1.ListOptions{
LabelSelector: ZFSNodeKey + "=" + NodeID,
}
return builder.NewKubeclient().
WithNamespace(OpenEBSNamespace).List(listOptions)
}
// GetZFSVolume fetches the current Published csi Volume
func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
getOptions := metav1.GetOptions{}
vol, err := builder.NewKubeclient().
WithNamespace(OpenEBSNamespace).Get(volumeID, getOptions)
return vol, err
}
// UpdateZvolInfo updates ZFSVolume CR with node id and finalizer
func UpdateZvolInfo(vol *apis.ZFSVolume) error {
finalizers := []string{ZFSFinalizer}
labels := map[string]string{ZFSNodeKey: NodeID}
if vol.Finalizers != nil {
return nil
}
newVol, err := builder.BuildFrom(vol).
WithNodename(NodeID).
WithFinalizer(finalizers).
WithLabels(labels).Build()
if err != nil {
return err
}
_, err = builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
return err
}
// RemoveZvolFinalizer adds finalizer to ZFSVolume CR
func RemoveZvolFinalizer(vol *apis.ZFSVolume) error {
vol.Finalizers = nil
_, err := builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(vol)
return err
}

133
pkg/zfs/zfs_util.go Normal file
View file

@ -0,0 +1,133 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package zfs
import (
"os"
"github.com/Sirupsen/logrus"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"k8s.io/kubernetes/pkg/util/mount"
)
const (
ZFS_DEVPATH = "/dev/zvol/"
)
func PropertyChanged(oldVol *apis.ZFSVolume, newVol *apis.ZFSVolume) bool {
return oldVol.Spec.Compression != newVol.Spec.Compression ||
oldVol.Spec.Dedup != newVol.Spec.Dedup ||
oldVol.Spec.Capacity != newVol.Spec.Capacity
}
// createZvol creates the zvol and returns the corresponding diskPath
// of the volume which gets created on the node
func createZvol(vol *apis.ZFSVolume) (string, error) {
var out []byte
zvol := vol.Spec.PoolName + "/" + vol.Name
devicePath := ZFS_DEVPATH + zvol
if _, err := os.Stat(devicePath); os.IsNotExist(err) {
if vol.Spec.ThinProvision == "yes" {
out, err = mount.NewOsExec().Run(
"zfs", "create",
"-s",
"-V", vol.Spec.Capacity,
"-b", vol.Spec.BlockSize,
"-o", "compression="+vol.Spec.Compression,
"-o", "dedup="+vol.Spec.Dedup,
zvol,
)
} else {
out, err = mount.NewOsExec().Run(
"zfs", "create",
"-V", vol.Spec.Capacity,
"-b", vol.Spec.BlockSize,
"-o", "compression="+vol.Spec.Compression,
"-o", "dedup="+vol.Spec.Dedup,
zvol,
)
}
if err != nil {
logrus.Errorf(
"zfs: could not create zvol %v vol %v error: %s", zvol, vol, string(out),
)
return "", err
}
logrus.Infof("created zvol %s", zvol)
} else if err == nil {
logrus.Infof("using existing zvol %v", zvol)
} else {
return "", err
}
return devicePath, nil
}
// SetZvolProp sets the zvol property
func SetZvolProp(vol *apis.ZFSVolume) error {
var out []byte
var err error
zvol := vol.Spec.PoolName + "/" + vol.Name
devicePath := ZFS_DEVPATH + zvol
if _, err = os.Stat(devicePath); err == nil {
// TODO(pawan) need to find a way to identify
// which property has changed
out, err = mount.NewOsExec().Run(
"zfs", "set",
"volsize="+vol.Spec.Capacity,
"compression="+vol.Spec.Compression,
"dedup="+vol.Spec.Dedup,
zvol,
)
if err != nil {
logrus.Errorf(
"zfs: could not set property on zvol %v vol %v error: %s", zvol, vol, string(out),
)
return err
}
logrus.Infof("property set on zvol %s", zvol)
}
return err
}
// DestroyZvol deletes the zvol
func DestroyZvol(vol *apis.ZFSVolume) error {
var out []byte
zvol := vol.Spec.PoolName + "/" + vol.Name
devicePath := ZFS_DEVPATH + zvol
if _, err := os.Stat(devicePath); err == nil {
out, err = mount.NewOsExec().Run(
"zfs", "destroy",
"-R",
zvol,
)
if err != nil {
logrus.Errorf(
"zfs: could not destroy zvol %v vol %v error: %s", zvol, vol, string(out),
)
return err
}
logrus.Infof("destroyed zvol %s", zvol)
}
return nil
}