温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

nova基于ubs机制扩展scheduler-filter

发布时间:2020-07-15 12:31:17 来源:网络 阅读:862 作者:沈猪猪 栏目:开发技术

ubs全称是utilization-based scheduler,算是对scheduler filter的一种扩展实现。计算节点通过resource_tracker收集监控的资源,存入数据库的compute_nodes的metric字段。


这里演示根据ceph osd系统盘的数量来优先调度的场景(纯属demo)

环境: rdo L版


1、在nova setup.cfg中增加一条记录(rpm包装出来的是entry_points.txt)

vim /usr/lib/python2.7/site-packages/nova-4.0rc6-py2.7.egg-info/entry_points.txt
[nova.compute.monitors.osd]
virt_driver = nova.compute.monitors.osd.virt_driver:Monitor  # compute.monitor下已经有cpu的实现了


2、加载osd monitor

vim /usr/lib/python2.7/site-packages/nova/compute/monitors/__init__.py
class MonitorHandler(object):
    NAMESPACES = [
        'nova.compute.monitors.cpu',
        'nova.compute.monitors.osd',     # 增加osd monitor
    ]


3、 

vim /usr/lib/python2.7/site-packages/nova/compute/monitors/base.py   # 增加MonitorBase_OSD类、OSDMonitorBase类
@six.add_metaclass(abc.ABCMeta)
class MonitorBase_OSD(object):
    """Base class for all resource monitor plugins."""
    def __init__(self, compute_manager):
        self.compute_manager = compute_manager
        self.source = None
    @abc.abstractmethod
    def get_metric(self, name):
        """Return a (value, timestamp) tuple for the supplied metric name.
        :param name: The name/key for the metric to grab the value for.
        """
        raise NotImplementedError('get_metric')
    @abc.abstractmethod
    def get_metric_names(self):
        """Get available metric names.
        Get available metric names, which are represented by a set of keys
        that can be used to check conflicts and duplications
        :returns: set containing one or more values from
            :py:attr: nova.objects.fields.MonitorMetricType.ALL
        """
        raise NotImplementedError('get_metric_names')
    def add_metrics_to_list(self, metrics_list):
        """Adds metric objects to a supplied list object.
        :param metric_list: nova.objects.MonitorMetricList that the monitor
                            plugin should append nova.objects.MonitorMetric
                            objects to.
        """
        metric_names = self.get_metric_names()
        metrics = []
        for name in metric_names:
            value, timestamp = self.get_metric(name)
            metric = objects.OSDMonitorMetric(name=name,
                                              value=value,
                                              timestamp=timestamp,
                                              source=self.source)
            metrics.append(metric)
        metrics_list.objects.extend(metrics)

class OSDMonitorBase(MonitorBase_OSD):
    """Base class for all monitors that return CPU-related metrics."""
    def get_metric_names(self):
        return set([
            fields.OSDMonitorMetricType.OSD_NUM,
        ])


4、创建monitor osd目录

[root@node_172_16_214_111 ~(keystone_admin)]# ll /usr/lib/python2.7/site-packages/nova/compute/monitors/osd/
total 12
-rw-r--r--. 1 root root    0 Sep 17 07:35 __init__.py
-rw-r--r--. 1 root root  157 Sep 17 09:49 __init__.pyc
-rw-r--r--. 1 root root 2648 Sep 17 15:33 virt_driver.py
-rw-r--r--. 1 root root 2711 Sep 17 09:49 virt_driver.pyc

[root@node_172_16_214_111 ~(keystone_admin)]# cat /usr/lib/python2.7/site-packages/nova/compute/monitors/osd/virt_driver.py
"""
CPU monitor based on virt driver to retrieve CPU information
"""
import pyudev
import re
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from nova.compute.monitors import base
from nova import exception
from nova.i18n import _LE
CONF = cfg.CONF
CONF.import_opt('compute_driver', 'nova.virt.driver')
LOG = logging.getLogger(__name__)

class Monitor(base.OSDMonitorBase):
    """CPU monitor that uses the virt driver's get_host_cpu_stats() call."""
    def __init__(self, resource_tracker):
        super(Monitor, self).__init__(resource_tracker)
        self.source = CONF.compute_driver
        self.driver = resource_tracker.driver
        self._data = {}
        self._cpu_stats = {}
        
    def get_metric(self, name):
        self._update_data()
        return self._data[name], self._data["timestamp"]
        
    def _update_data(self):                              # 采集数据的就是这个函数
        # Don't allow to call this function so frequently (<= 1 sec)
        now = timeutils.utcnow()
        if self._data.get("timestamp") is not None:
            delta = now - self._data.get("timestamp")
            if delta.seconds <= 1:
                return
        self._data = {}
        self._data["timestamp"] = now
        # Extract node's CPU statistics.
        try:
            context = pyudev.Context()
            pattern = re.compile('^/dev/vd[a-z]')
            osd_devices = []
            for device in context.list_devices(DEVTYPE='disk'):
                major = device['MAJOR']
                if major == '8' or re.search(pattern, device.device_node):
                   osd_devices.append(device.device_node)
            self._data["osd.num"] = len(osd_devices)
       
        except (NotImplementedError, TypeError, KeyError):
            LOG.exception(_LE("Not all properties needed are implemented "
                              "in the compute driver"))
            raise exception.ResourceMonitorError(
                monitor=self.__class__.__name__)


5、

vim /usr/lib/python2.7/site-packages/nova/objects/__init__.py
__import__('nova.objects.monitor_metric')
__import__('nova.objects.monitor_osd_metric')   # 增加新object,跟数据库交互


6、创建新的object类

[root@node_172_16_214_111 ~(keystone_admin)]# cat /usr/lib/python2.7/site-packages/nova/objects/monitor_osd_metric.py

from oslo_serialization import jsonutils
from oslo_utils import timeutils
from nova.objects import base
from nova.objects import fields
from nova import utils

# NOTE(jwcroppe): Used to determine which fields whose value we need to adjust
# (read: divide by 100.0) before sending information to the RPC notifier since
# these values were expected to be within the range [0, 1].
FIELDS_REQUIRING_CONVERSION = []
@base.NovaObjectRegistry.register
class OSDMonitorMetric(base.NovaObject):
    # Version 1.0: Initial version
    # Version 1.1: Added NUMA support
    VERSION = '1.1'
    fields = {
        'name': fields.OSDMonitorMetricTypeField(nullable=False),
        'value': fields.IntegerField(nullable=False),
        'numa_membw_values': fields.DictOfIntegersField(nullable=True),
        'timestamp': fields.DateTimeField(nullable=False),
        # This will be the stevedore extension full class name
        # for the plugin from which the metric originates.
        'source': fields.StringField(nullable=False),
    }
    def obj_make_compatible(self, primitive, target_version):
        super(OSDMonitorMetric, self).obj_make_compatible(primitive,
                                                       target_version)
        target_version = utils.convert_version_to_tuple(target_version)
        if target_version < (1, 1) and 'numa_nodes_values' in primitive:
            del primitive['numa_membw_values']
    # NOTE(jaypipes): This method exists to convert the object to the
    # format expected by the RPC notifier for metrics events.
    def to_dict(self):
        dict_to_return = {
            'name': self.name,
            # NOTE(jaypipes): This is what jsonutils.dumps() does to
            # datetime.datetime objects, which is what timestamp is in
            # this object as well as the original simple dict metrics
            'timestamp': timeutils.strtime(self.timestamp),
            'source': self.source,
        }
        if self.obj_attr_is_set('value'):
            if self.name in FIELDS_REQUIRING_CONVERSION:
                dict_to_return['value'] = self.value / 100.0
            else:
                dict_to_return['value'] = self.value
        elif self.obj_attr_is_set('numa_membw_values'):
            dict_to_return['numa_membw_values'] = self.numa_membw_values
        return dict_to_return
        
@base.NovaObjectRegistry.register
class OSDMonitorMetricList(base.ObjectListBase, base.NovaObject):
    # Version 1.0: Initial version
    # Version 1.1: MonitorMetric version 1.1
    VERSION = '1.1'
    fields = {
        'objects': fields.ListOfObjectsField('OSDMonitorMetric'),
    }
    obj_relationships = {
        'objects': [('1.0', '1.0'), ('1.1', '1.1')],
    }
    @classmethod
    def from_json(cls, metrics):
        """Converts a legacy json object into a list of MonitorMetric objs
        and finally returns of MonitorMetricList
        :param metrics: a string of json serialized objects
        :returns: a MonitorMetricList Object.
        """
        metrics = jsonutils.loads(metrics) if metrics else []
        metric_list = [
            OSDMonitorMetric(**metric) for metric in metrics]
        return OSDMonitorMetricList(objects=metric_list)
    # NOTE(jaypipes): This method exists to convert the object to the
    # format expected by the RPC notifier for metrics events.
    def to_list(self):
        return [m.to_dict() for m in self.objects]


7、定义OSDMonitorMetricTypeField

vim /usr/lib/python2.7/site-packages/nova/objects/fields.py
class OSDMonitorMetricTypeField(BaseEnumField):
    AUTO_TYPE = OSDMonitorMetricType()
    
class OSDMonitorMetricType(Enum):
    OSD_NUM = "osd.num"
    ALL = (
        OSD_NUM,
    )
    
    def __init__(self):
        super(OSDMonitorMetricType, self).__init__(
            valid_values=OSDMonitorMetricType.ALL)


8、修改metrics获取方式

vim /usr/lib/python2.7/site-packages/nova/scheduler/host_manager.py
def update_from_compute_node(self, compute):   
#        self.metrics = objects.MonitorMetricList.from_json(compute.metrics)      # 先注释掉cpu monitor
        self.metrics = objects.OSDMonitorMetricList.from_json(compute.metrics)

vim /usr/lib/python2.7/site-packages/nova/compute/resource_tracker.py            # 先注释掉cpu monitor
def _get_host_metrics(self, context, nodename):
        """Get the metrics from monitors and
        notify information to message bus.
        """
#        metrics = objects.MonitorMetricList()
        metrics = objects.OSDMonitorMetricList()


9、修改nova配置文件

vim /etc/nova/nova.conf
[METRICS]
# determine how metrics are weighed: score = -1.0 * cpu.percent
# # the list format is weight_setting = name1=1.0, name2=-1.0
weight_setting = osd.num=5.0
[DEFAULT]
compute_monitors=osd.virt_driver
scheduler_weight_classes=nova.scheduler.weights.metrics.MetricsWeigher
scheduler_host_subset_size = 1


scheduler知识点小记:

/usr/lib/python2.7/site-packages/nova/scheduler/driver.py    # 默认host_manager driver是nova.scheduler.host_manager.HostManager
scheduler_driver_opts = [
    cfg.StrOpt('scheduler_host_manager',
               default='nova.scheduler.host_manager.HostManager',
               help='The scheduler host manager class to use'),
    ]
    
    
/usr/lib/python2.7/site-packages/nova/scheduler/manager.py    # 默认scheduler driver是nova.scheduler.filter_scheduler.FilterScheduler
scheduler_driver_opts = [
    cfg.StrOpt('scheduler_driver',
               default='nova.scheduler.filter_scheduler.FilterScheduler',
               help='Default driver to use for the scheduler'),
    cfg.IntOpt('scheduler_driver_task_period',
               default=60,
               help='How often (in seconds) to run periodic tasks in '
                    'the scheduler driver of your choice. '
                    'Please note this is likely to interact with the value '
                    'of service_down_time, but exactly how they interact '
                    'will depend on your choice of scheduler driver.'),
]

/usr/lib/python2.7/site-packages/nova/scheduler/filter_scheduler.py    # 过滤出主机节点
    def _schedule(self, context, request_spec, filter_properties):
        """Returns a list of hosts that meet the required specs,
        ordered by their fitness.
        """
        elevated = context.elevated()
        instance_properties = request_spec['instance_properties']
        # NOTE(danms): Instance here is still a dict, which is converted from
        # an object. The pci_requests are a dict as well. Convert this when
        # we get an object all the way to this path.
        # TODO(sbauza): Will be fixed later by the RequestSpec object
        pci_requests = instance_properties.get('pci_requests')
        if pci_requests:
            pci_requests = (
                objects.InstancePCIRequests.from_request_spec_instance_props(
                    pci_requests))
            instance_properties['pci_requests'] = pci_requests
        instance_type = request_spec.get("instance_type", None)
        update_group_hosts = filter_properties.get('group_updated', False)
        config_options = self._get_configuration_options()
        filter_properties.update({'context': context,
                                  'request_spec': request_spec,
                                  'config_options': config_options,
                                  'instance_type': instance_type})
        # Find our local list of acceptable hosts by repeatedly
        # filtering and weighing our options. Each time we choose a
        # host, we virtually consume resources on it so subsequent
        # selections can adjust accordingly.
        # Note: remember, we are using an iterator here. So only
        # traverse this list once. This can bite you if the hosts
        # are being scanned in a filter or weighing function.
        hosts = self._get_all_host_states(elevated)
        selected_hosts = []
        num_instances = request_spec.get('num_instances', 1)
        for num in range(num_instances):
            # Filter local hosts based on requirements ...
            hosts = self.host_manager.get_filtered_hosts(hosts,           # 先经过filter driver过滤
                    filter_properties, index=num)
            if not hosts:
                # Can't get any more locally.
                break
            LOG.debug("Filtered %(hosts)s", {'hosts': hosts})
            weighed_hosts = self.host_manager.get_weighed_hosts(hosts,    # 然后再计算hosts权重列表(从大到小排列)
                    filter_properties)
            LOG.debug("Weighed %(hosts)s", {'hosts': weighed_hosts})
            scheduler_host_subset_size = CONF.scheduler_host_subset_size
            if scheduler_host_subset_size > len(weighed_hosts):
                scheduler_host_subset_size = len(weighed_hosts)
            if scheduler_host_subset_size < 1:
                scheduler_host_subset_size = 1
            chosen_host = random.choice(
                weighed_hosts[0:scheduler_host_subset_size])
            LOG.debug("Selected host: %(host)s", {'host': chosen_host})
            selected_hosts.append(chosen_host)
            # Now consume the resources so the filter/weights
            # will change for the next instance.
            chosen_host.obj.consume_from_instance(instance_properties)
            if update_group_hosts is True:
                # NOTE(sbauza): Group details are serialized into a list now
                # that they are populated by the conductor, we need to
                # deserialize them
                if isinstance(filter_properties['group_hosts'], list):
                    filter_properties['group_hosts'] = set(
                        filter_properties['group_hosts'])
                filter_properties['group_hosts'].add(chosen_host.obj.host)
        return selected_hosts



参考链接

https://01.org/sites/default/files/utilization_based_scheduing_in_openstack_compute_nova-revision002.pdf









向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI