mirror of
https://github.com/TECHNOFAB11/zfs-localpv.git
synced 2025-12-12 06:20:11 +01:00
chore(refactor): refactor scheduler for ZFS-LocalPV
Signed-off-by: Pawan <pawan@mayadata.io>
This commit is contained in:
parent
30a7f2317e
commit
e1e8aa5839
3 changed files with 91 additions and 54 deletions
|
|
@ -33,6 +33,7 @@ import (
|
||||||
errors "github.com/openebs/zfs-localpv/pkg/common/errors"
|
errors "github.com/openebs/zfs-localpv/pkg/common/errors"
|
||||||
"github.com/openebs/zfs-localpv/pkg/common/helpers"
|
"github.com/openebs/zfs-localpv/pkg/common/helpers"
|
||||||
csipayload "github.com/openebs/zfs-localpv/pkg/response"
|
csipayload "github.com/openebs/zfs-localpv/pkg/response"
|
||||||
|
schd "github.com/openebs/zfs-localpv/pkg/scheduler"
|
||||||
analytics "github.com/openebs/zfs-localpv/pkg/usage"
|
analytics "github.com/openebs/zfs-localpv/pkg/usage"
|
||||||
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
|
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
|
||||||
)
|
)
|
||||||
|
|
@ -126,7 +127,13 @@ func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
|
||||||
|
|
||||||
vtype := zfs.GetVolumeType(fstype)
|
vtype := zfs.GetVolumeType(fstype)
|
||||||
|
|
||||||
selected := scheduler(req.AccessibilityRequirements, schld, pool)
|
nmap, err := getNodeMap(schld, pool)
|
||||||
|
if err != nil {
|
||||||
|
return "", status.Errorf(codes.Internal, "get node map failed : %s", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
// run the scheduler
|
||||||
|
selected := schd.Scheduler(req, nmap)
|
||||||
|
|
||||||
if len(selected) == 0 {
|
if len(selected) == 0 {
|
||||||
return "", status.Error(codes.Internal, "scheduler failed")
|
return "", status.Error(codes.Internal, "scheduler failed")
|
||||||
|
|
|
||||||
67
pkg/driver/schd_helper.go
Normal file
67
pkg/driver/schd_helper.go
Normal file
|
|
@ -0,0 +1,67 @@
|
||||||
|
/*
|
||||||
|
Copyright 2020 The OpenEBS Authors
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package driver
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
|
||||||
|
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// scheduling algorithm constants
|
||||||
|
const (
|
||||||
|
// pick the node where less volumes are provisioned for the given pool
|
||||||
|
// this will be the default scheduler when none provided
|
||||||
|
VolumeWeighted = "VolumeWeighted"
|
||||||
|
)
|
||||||
|
|
||||||
|
// getVolumeWeightedMap goes through all the pools on all the nodes
|
||||||
|
// and creats the node mapping of the volume for all the nodes.
|
||||||
|
// It returns a map which has nodes as key and volumes present
|
||||||
|
// on the nodes as corresponding value.
|
||||||
|
func getVolumeWeightedMap(pool string) (map[string]int64, error) {
|
||||||
|
nmap := map[string]int64{}
|
||||||
|
|
||||||
|
zvlist, err := volbuilder.NewKubeclient().
|
||||||
|
WithNamespace(zfs.OpenEBSNamespace).
|
||||||
|
List(metav1.ListOptions{})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nmap, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create the map of the volume count
|
||||||
|
// for the given pool
|
||||||
|
for _, zv := range zvlist.Items {
|
||||||
|
if zv.Spec.PoolName == pool {
|
||||||
|
nmap[zv.Spec.OwnerNodeID]++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nmap, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// getNodeMap returns the node mapping for the given scheduling algorithm
|
||||||
|
func getNodeMap(schd string, pool string) (map[string]int64, error) {
|
||||||
|
switch schd {
|
||||||
|
case VolumeWeighted:
|
||||||
|
return getVolumeWeightedMap(pool)
|
||||||
|
}
|
||||||
|
// return VolumeWeighted(default) if not specified
|
||||||
|
return getVolumeWeightedMap(pool)
|
||||||
|
}
|
||||||
|
|
@ -14,29 +14,19 @@ See the License for the specific language governing permissions and
|
||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package driver
|
package scheduler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"math"
|
"math"
|
||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
|
|
||||||
k8sapi "github.com/openebs/zfs-localpv/pkg/client/k8s/v1alpha1"
|
k8sapi "github.com/openebs/zfs-localpv/pkg/client/k8s/v1alpha1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// scheduling algorithm constants
|
// getNodeList gets the nodelist which satisfies the topology info
|
||||||
const (
|
func getNodeList(topo *csi.TopologyRequirement) ([]string, error) {
|
||||||
// pick the node where less volumes are provisioned for the given pool
|
|
||||||
// this will be the default scheduler when none provided
|
|
||||||
VolumeWeighted = "VolumeWeighted"
|
|
||||||
)
|
|
||||||
|
|
||||||
// GetNodeList gets the nodelist which satisfies the topology info
|
|
||||||
func GetNodeList(topo *csi.TopologyRequirement) ([]string, error) {
|
|
||||||
|
|
||||||
var nodelist []string
|
var nodelist []string
|
||||||
|
|
||||||
|
|
@ -64,54 +54,34 @@ func GetNodeList(topo *csi.TopologyRequirement) ([]string, error) {
|
||||||
return nodelist, nil
|
return nodelist, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// volumeWeightedScheduler goes through all the pools on the nodes mentioned
|
// runScheduler goes through the node mapping
|
||||||
// in the topology and picks the node which has less volume on
|
// in the topology and picks the node which is less weighted
|
||||||
// the given zfs pool.
|
func runScheduler(nodelist []string, nmap map[string]int64) string {
|
||||||
func volumeWeightedScheduler(nodelist []string, pool string) string {
|
|
||||||
var selected string
|
var selected string
|
||||||
|
|
||||||
zvlist, err := volbuilder.NewKubeclient().
|
var weight int64 = math.MaxInt64
|
||||||
WithNamespace(zfs.OpenEBSNamespace).
|
|
||||||
List(metav1.ListOptions{})
|
|
||||||
|
|
||||||
if err != nil {
|
// schedule it on the node which has less weight
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
volmap := map[string]int{}
|
|
||||||
|
|
||||||
// create the map of the volume count
|
|
||||||
// for the given pool
|
|
||||||
for _, zv := range zvlist.Items {
|
|
||||||
if zv.Spec.PoolName == pool {
|
|
||||||
volmap[zv.Spec.OwnerNodeID]++
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var numVol int = math.MaxInt32
|
|
||||||
|
|
||||||
// schedule it on the node which has less
|
|
||||||
// number of volume for the given pool
|
|
||||||
for _, node := range nodelist {
|
for _, node := range nodelist {
|
||||||
if volmap[node] < numVol {
|
if nmap[node] < weight {
|
||||||
selected = node
|
selected = node
|
||||||
numVol = volmap[node]
|
weight = nmap[node]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return selected
|
return selected
|
||||||
}
|
}
|
||||||
|
|
||||||
// scheduler schedules the PV as per topology constraints for
|
// Scheduler schedules the PV as per topology constraints for
|
||||||
// the given zfs pool.
|
// the given node weight.
|
||||||
func scheduler(topo *csi.TopologyRequirement, schld string, pool string) string {
|
func Scheduler(req *csi.CreateVolumeRequest, nmap map[string]int64) string {
|
||||||
|
topo := req.AccessibilityRequirements
|
||||||
if topo == nil ||
|
if topo == nil ||
|
||||||
len(topo.Preferred) == 0 {
|
len(topo.Preferred) == 0 {
|
||||||
klog.Errorf("scheduler: topology information not provided")
|
klog.Errorf("scheduler: topology information not provided")
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
nodelist, err := GetNodeList(topo)
|
nodelist, err := getNodeList(topo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("scheduler: can not get the nodelist err : %v", err.Error())
|
klog.Errorf("scheduler: can not get the nodelist err : %v", err.Error())
|
||||||
return ""
|
return ""
|
||||||
|
|
@ -125,12 +95,5 @@ func scheduler(topo *csi.TopologyRequirement, schld string, pool string) string
|
||||||
return nodelist[0]
|
return nodelist[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
switch schld {
|
return runScheduler(nodelist, nmap)
|
||||||
case VolumeWeighted:
|
|
||||||
return volumeWeightedScheduler(nodelist, pool)
|
|
||||||
default:
|
|
||||||
return volumeWeightedScheduler(nodelist, pool)
|
|
||||||
}
|
|
||||||
|
|
||||||
return ""
|
|
||||||
}
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue