Source code for gnes.composer.base

#  Tencent is pleased to support the open source community by making GNES available.
#
#  Copyright (C) 2019 THL A29 Limited, a Tencent company. All rights reserved.
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

import copy
import random
import time
from collections import defaultdict
from typing import Dict, List

from pkg_resources import resource_stream
from ruamel.yaml import YAML, StringIO
from ruamel.yaml.comments import CommentedMap

from .. import __version__
from ..cli.parser import set_frontend_parser, \
    set_router_parser, _set_loadable_service_parser, set_preprocessor_parser, \
    set_indexer_parser
from ..helper import set_logger
from ..service.base import SocketType

_yaml = YAML()


[docs]class YamlComposer: comp2file = { 'Encoder': 'encode', 'Router': 'route', 'Indexer': 'index', 'Frontend': 'frontend', 'Preprocessor': 'preprocess' } comp2args = { 'Encoder': _set_loadable_service_parser().parse_args(['--yaml_path', 'BaseEncoder']), 'Router': set_router_parser().parse_args(['--yaml_path', 'BaseRouter']), 'Indexer': set_indexer_parser().parse_args(['--yaml_path', 'BaseIndexer']), 'Frontend': set_frontend_parser().parse_args([]), 'Preprocessor': set_preprocessor_parser().parse_args(['--yaml_path', 'BasePreprocessor']) }
[docs] class Layer: default_values = { 'name': None, 'yaml_path': None, 'py_path': None, 'image': None, 'replicas': 1, 'income': 'pull', } def __init__(self, layer_id: int = 0): self.layer_id = layer_id self.components = []
[docs] @staticmethod def get_value(comp: Dict, key: str): return comp.get(key, YamlComposer.Layer.default_values[key])
@property def is_homogenous(self): return len(self.components) == 1 @property def is_single_component(self): return self.is_homogenous and self.get_value(self.components[0], 'replicas') == 1 @property def is_homo_multi_component(self): return self.is_homogenous and not self.is_single_component @property def is_heto_single_component(self): return not self.is_homogenous and all(self.get_value(c, 'replicas') == 1 for c in self.components) @property def get_component_name(self, unique: bool = False): r = [c['name'] for c in self.components] if unique: r = list(set(r)) return r
[docs] def append(self, comp): self.components.append(comp)
def __repr__(self): return str(self.components)
def __init__(self, args): self.args = args self.logger = set_logger(self.__class__.__name__, self.args.verbose) with args.yaml_path: tmp = _yaml.load(args.yaml_path) stream = StringIO() _yaml.dump(tmp, stream) self.original_yaml = stream.getvalue().strip() self._name = tmp.get('name', args.name) self._port = tmp.get('port', args.port) self._networks = tmp.get('networks', {}) self._volumes = tmp.get('volumes', {}) self._layers = [] # type: List['YamlComposer.Layer'] self._num_layer = 0 if 'services' in tmp: self.add_layer() c = CommentedMap({'name': 'Frontend', 'grpc_port': self._port}) if self.check_fields(c): self.add_comp(c) for comp in tmp['services']: self.add_layer() if isinstance(comp, list): for c in comp: if self.check_fields(c): self.add_comp(c) elif self.check_fields(comp): self.add_comp(comp) else: raise ValueError(comp) else: self.logger.error('yaml file defines an empty graph! no "services" field exists!')
[docs] def check_fields(self, comp: Dict) -> bool: if 'name' not in comp: raise AttributeError('a component must have a name (choices: %s)' % ', '.join(self.comp2file.keys())) if comp['name'] not in self.comp2file: raise AttributeError( 'a component must be one of: %s, but given %s' % (', '.join(self.comp2file.keys()), comp['name'])) for k in comp: if k not in self.Layer.default_values: self.logger.warning('your yaml contains an unrecognized key named "%s"' % k) for k, v in self.Layer.default_values.items(): if k not in comp: comp[k] = v return True
[docs] def add_layer(self, layer: 'Layer' = None) -> None: self._layers.append(copy.deepcopy(layer) or self.Layer(layer_id=self._num_layer)) self._num_layer += 1
[docs] def add_comp(self, comp: Dict) -> None: self._layers[-1].append(comp)
[docs] def build_layers(self) -> List['YamlComposer.Layer']: all_layers = [] # type: List['YamlComposer.Layer'] for idx, layer in enumerate(self._layers[1:] + [self._layers[0]], 1): last_layer = self._layers[idx - 1] for l in self._add_router(last_layer, layer): all_layers.append(copy.deepcopy(l)) all_layers[0] = copy.deepcopy(self._layers[0]) # Frontend should always on the bind role assert all_layers[0].is_single_component assert all_layers[0].components[0]['name'] == 'Frontend' if all_layers[0].components[0]['socket_in'] == str(SocketType.SUB_CONNECT): # change to sub bind all_layers[0].components[0]['socket_in'] = str(SocketType.SUB_BIND) for c in all_layers[-1].components: c['socket_out'] = str(SocketType.PUB_CONNECT) if all_layers[0].components[0]['socket_in'] == str(SocketType.PULL_CONNECT): # change to sub bind all_layers[0].components[0]['socket_in'] = str(SocketType.PULL_BIND) for c in all_layers[-1].components: c['socket_out'] = str(SocketType.PUSH_CONNECT) return all_layers
[docs] @staticmethod def build_dockerswarm(all_layers: List['YamlComposer.Layer'], docker_img: str = 'gnes/gnes:latest-alpine', volumes: Dict = None, networks: Dict = None) -> str: with resource_stream('gnes', '/'.join(('resources', 'compose', 'gnes-swarm.yml'))) as r: swarm_lines = _yaml.load(r) config_dict = {} for l_idx, layer in enumerate(all_layers): for c_idx, c in enumerate(layer.components): c_name = '%s%d%d' % (c['name'], l_idx, c_idx) args = ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if a in YamlComposer.comp2args[c['name']] and a != 'yaml_path' and v] if 'yaml_path' in c and c['yaml_path'] is not None: if c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml'): args.append('--yaml_path /%s_yaml' % c_name) config_dict['%s_yaml' % c_name] = {'file': c['yaml_path']} else: args.append('--yaml_path %s' % c['yaml_path']) if l_idx + 1 < len(all_layers): next_layer = all_layers[l_idx + 1] _l_idx = l_idx + 1 else: next_layer = all_layers[0] _l_idx = 0 host_out_name = '' for _c_idx, _c in enumerate(next_layer.components): if _c['port_in'] == c['port_out']: host_out_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx) break if l_idx - 1 >= 0: last_layer = all_layers[l_idx - 1] _l_idx = l_idx - 1 else: last_layer = all_layers[-1] _l_idx = len(all_layers) - 1 host_in_name = '' for _c_idx, _c in enumerate(last_layer.components): if _c['port_out'] == c['port_in']: host_in_name = '%s%d%d' % (_c['name'], _l_idx, _c_idx) break if 'BIND' not in c['socket_out']: args.append('--host_out %s' % host_out_name) if 'BIND' not in c['socket_in']: args.append('--host_in %s' % host_in_name) cmd = '%s %s' % (YamlComposer.comp2file[c['name']], ' '.join(args)) swarm_lines['services'][c_name] = CommentedMap({ 'image': c['image'] or docker_img, 'command': cmd, }) rep_c = YamlComposer.Layer.get_value(c, 'replicas') if rep_c > 1: swarm_lines['services'][c_name]['deploy'] = CommentedMap({ 'replicas': YamlComposer.Layer.get_value(c, 'replicas'), 'restart_policy': { 'condition': 'on-failure', 'max_attempts': 3, } }) if 'yaml_path' in c and c['yaml_path'] is not None \ and (c['yaml_path'].endswith('.yml') or c['yaml_path'].endswith('.yaml')): swarm_lines['services'][c_name]['configs'] = ['%s_yaml' % c_name] if c['name'] == 'Frontend': swarm_lines['services'][c_name]['ports'] = ['%d:%d' % (c['grpc_port'], c['grpc_port'])] if volumes: swarm_lines['volumes'] = volumes if networks: swarm_lines['networks'] = volumes swarm_lines['configs'] = config_dict stream = StringIO() _yaml.dump(swarm_lines, stream) return stream.getvalue().strip()
[docs] @staticmethod def build_kubernetes(all_layers: List['YamlComposer.Layer'], *args, **kwargs): pass
[docs] @staticmethod def build_shell(all_layers: List['YamlComposer.Layer'], log_redirect: str = None) -> str: shell_lines = [] for layer in all_layers: for c in layer.components: rep_c = YamlComposer.Layer.get_value(c, 'replicas') shell_lines.append('printf "starting service \\e[1;33m%s\\e[0m with \e[1;33m%s\e[0m replicas...\\n"' % ( c['name'], rep_c)) for _ in range(rep_c): cmd = YamlComposer.comp2file[c['name']] args = ' '.join( ['--%s %s' % (a, str(v) if ' ' not in str(v) else ('"%s"' % str(v))) for a, v in c.items() if a in YamlComposer.comp2args[c['name']] and v]) shell_lines.append('gnes %s %s %s &' % ( cmd, args, '>> %s 2>&1' % log_redirect if log_redirect else '')) with resource_stream('gnes', '/'.join(('resources', 'compose', 'gnes-shell.sh'))) as r: return r.read().decode().replace('{{gnes-template}}', '\n'.join(shell_lines)).strip()
[docs] @staticmethod def build_mermaid(all_layers: List['YamlComposer.Layer'], mermaid_leftright: bool = False) -> str: mermaid_graph = [] cls_dict = defaultdict(set) for l_idx, layer in enumerate(all_layers[1:] + [all_layers[0]], 1): last_layer = all_layers[l_idx - 1] for c_idx, c in enumerate(last_layer.components): # if len(last_layer.components) > 1: # self.mermaid_graph.append('\tsubgraph %s%d' % (c['name'], c_idx)) for j in range(YamlComposer.Layer.get_value(c, 'replicas')): for c1_idx, c1 in enumerate(layer.components): if c1['port_in'] == c['port_out']: p = '((%s%s))' if c['name'] == 'Router' else '(%s%s)' p1 = '((%s%s))' if c1['name'] == 'Router' else '(%s%s)' for j1 in range(YamlComposer.Layer.get_value(c1, 'replicas')): _id, _id1 = '%s%s%s' % (last_layer.layer_id, c_idx, j), '%s%s%s' % ( layer.layer_id, c1_idx, j1) conn_type = ( c['socket_out'].split('_')[0] + '/' + c1['socket_in'].split('_')[0]).lower() s_id = '%s%s' % (c_idx if len(last_layer.components) > 1 else '', j if YamlComposer.Layer.get_value(c, 'replicas') > 1 else '') s1_id = '%s%s' % (c1_idx if len(layer.components) > 1 else '', j1 if YamlComposer.Layer.get_value(c1, 'replicas') > 1 else '') mermaid_graph.append( '\t%s%s%s-- %s -->%s%s%s' % ( c['name'], _id, p % (c['name'], s_id), conn_type, c1['name'], _id1, p1 % (c1['name'], s1_id))) cls_dict[c['name'] + 'CLS'].add('%s%s' % (c['name'], _id)) cls_dict[c1['name'] + 'CLS'].add('%s%s' % (c1['name'], _id1)) # if len(last_layer.components) > 1: # self.mermaid_graph.append('\tend') style = ['classDef FrontendCLS fill:#FFE0E0,stroke:#FFE0E0,stroke-width:1px;', 'classDef EncoderCLS fill:#FFDAAF,stroke:#FFDAAF,stroke-width:1px;', 'classDef IndexerCLS fill:#FFFBC1,stroke:#FFFBC1,stroke-width:1px;', 'classDef RouterCLS fill:#C9E8D2,stroke:#C9E8D2,stroke-width:1px;', 'classDef PreprocessorCLS fill:#CEEEEF,stroke:#CEEEEF,stroke-width:1px;'] class_def = ['class %s %s;' % (','.join(v), k) for k, v in cls_dict.items()] mermaid_str = '\n'.join( ['graph %s' % ('LR' if mermaid_leftright else 'TD')] + mermaid_graph + style + class_def) return mermaid_str.strip()
[docs] @staticmethod def build_html(generate_dict: Dict[str, str]) -> str: with resource_stream('gnes', '/'.join(('resources', 'compose', 'gnes-board.html'))) as r: html = r.read().decode() for k, v in generate_dict.items(): if v: html = html.replace('{{gnes-%s}}' % k, v) return html.strip()
[docs] def build_all(self): def std_or_print(f, content): if content and f: with f as fp: fp.write(content) self.logger.info('generated content is written to %s' % f) all_layers = self.build_layers() cmds = { 'mermaid': self.build_mermaid(all_layers, self.args.mermaid_leftright), 'shell': self.build_shell(all_layers, self.args.shell_log_redirect), 'yaml': self.original_yaml, 'image': self.args.docker_img, 'docker': self.build_dockerswarm(all_layers, self.args.docker_img, volumes=self._volumes, networks=self._networks), 'k8s': self.build_kubernetes(all_layers), 'timestamp': time.strftime("%a, %d %b %Y %H:%M:%S"), 'version': __version__ } cmds['html'] = self.build_html(cmds) std_or_print(self.args.graph_path, cmds['mermaid']) std_or_print(self.args.shell_path, cmds['shell']) std_or_print(self.args.swarm_path, cmds['docker']) std_or_print(self.args.k8s_path, cmds['k8s']) std_or_print(self.args.html_path, cmds['html']) return cmds
@staticmethod def _get_random_port(min_port: int = 49152, max_port: int = 65536) -> str: return str(random.randrange(min_port, max_port)) @staticmethod def _get_random_host(comp_name: str) -> str: return str(comp_name + str(random.randrange(0, 100))) def _add_router(self, last_layer: 'YamlComposer.Layer', layer: 'YamlComposer.Layer') -> List['YamlComposer.Layer']: def rule1(): # a shortcut fn: push connect the last and current last_layer.components[0]['socket_out'] = str(SocketType.PUSH_BIND) layer.components[0]['socket_in'] = str(SocketType.PULL_CONNECT) def rule2(): # a shortcut fn: pub connect the last and the current last_layer.components[0]['socket_out'] = str(SocketType.PUB_BIND) layer.components[0]['socket_in'] = str(SocketType.SUB_CONNECT) def rule3(): # a shortcut fn: (N)-2-(N) with push pull connection router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) r = CommentedMap({'name': 'Router', 'yaml_path': 'BaseRouter', 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUSH_BIND), 'port_in': last_layer.components[0]['port_out'], 'port_out': self._get_random_port()}) self.check_fields(r) for c in layer.components: c['socket_in'] = str(SocketType.PULL_CONNECT) c['port_in'] = r['port_out'] router_layer.append(r) router_layers.append(router_layer) def rule4(): # a shortcut fn: (N)-to-(1)&(1)&(1) last_layer.components[0]['socket_out'] = str(SocketType.PUB_BIND) for c in layer.components: c['socket_in'] = str(SocketType.SUB_CONNECT) def rule5(): # a shortcut fn: based on c3(): (N)-2-(N) with pub sub connection rule3() router_layers[0].components[0]['socket_out'] = str(SocketType.PUB_BIND) router_layers[0].components[0]['yaml_path'] = '"!PublishRouter {parameters: {num_part: %d}}"' \ % len(layer.components) for c in layer.components: c['socket_in'] = str(SocketType.SUB_CONNECT) def rule6(): last_layer.components[0]['socket_out'] = str(SocketType.PUB_BIND) router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 for c in layer.components: income = self.Layer.get_value(c, 'income') r = CommentedMap({'name': 'Router', 'yaml_path': 'BaseReduceRouter', 'socket_in': str(SocketType.SUB_CONNECT), 'socket_out': str(SocketType.PUSH_BIND) if income == 'pull' else str( SocketType.PUB_BIND), 'port_in': last_layer.components[0]['port_out'], 'port_out': self._get_random_port()}) self.check_fields(r) c['socket_in'] = str(SocketType.PULL_CONNECT) if income == 'pull' else str(SocketType.SUB_CONNECT) c['port_in'] = r['port_out'] router_layer.append(r) router_layers.append(router_layer) def rule7(): last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r0 = CommentedMap({'name': 'Router', 'yaml_path': '"!PublishRouter {parameters: {num_part: %d}}"' % len(layer.components), 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUB_BIND), 'port_in': self._get_random_port(), 'port_out': self._get_random_port()}) self.check_fields(r0) router_layer.append(r0) router_layers.append(router_layer) last_layer.components[0]['port_out'] = r0['port_in'] router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 for c in layer.components: r = CommentedMap({'name': 'Router', 'yaml_path': 'BaseRouter', 'socket_in': str(SocketType.SUB_CONNECT), 'socket_out': str(SocketType.PUSH_BIND), 'port_in': r0['port_out'], 'port_out': self._get_random_port()}) c['socket_in'] = str(SocketType.PULL_CONNECT) c['port_in'] = r['port_out'] self.check_fields(r) router_layer.append(r) router_layers.append(router_layer) def rule10(): last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r0 = CommentedMap({'name': 'Router', 'yaml_path': '"!PublishRouter {parameters: {num_part: %d}}"' % len(layer.components), 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUB_BIND), 'port_in': self._get_random_port(), 'port_out': self._get_random_port()}) self.check_fields(r0) router_layer.append(r0) router_layers.append(router_layer) last_layer.components[0]['port_out'] = r0['port_in'] for c in layer.components: c['socket_in'] = str(SocketType.SUB_CONNECT) c['port_in'] = r0['port_out'] def rule8(): router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 r = CommentedMap({'name': 'Router', 'yaml_path': 'BaseReduceRouter', 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUSH_BIND), 'port_in': self._get_random_port(), 'port_out': self._get_random_port()}) self.check_fields(r) for c in last_layer.components: last_income = self.Layer.get_value(c, 'income') if last_income == 'sub': c['socket_out'] = str(SocketType.PUSH_CONNECT) r_c = CommentedMap({'name': 'Router', 'yaml_path': 'BaseReduceRouter', 'socket_in': str(SocketType.PULL_BIND), 'socket_out': str(SocketType.PUSH_CONNECT), 'port_in': self._get_random_port(), 'port_out': r['port_in']}) c['port_out'] = r_c['port_in'] self.check_fields(r_c) router_layer.append(r_c) elif last_income == 'pull': c['socket_out'] = str(SocketType.PUSH_CONNECT) c['port_out'] = r['port_in'] for c in layer.components: c['socket_in'] = str(SocketType.PULL_CONNECT) c['port_in'] = r['port_out'] if router_layer.components: router_layers.append(router_layer) else: self._num_layer -= 1 router_layer = YamlComposer.Layer(layer_id=self._num_layer) self._num_layer += 1 router_layer.append(r) router_layers.append(router_layer) def rule9(): # a shortcut fn: push connect the last and current last_layer.components[0]['socket_out'] = str(SocketType.PUSH_CONNECT) layer.components[0]['socket_in'] = str(SocketType.PULL_BIND) router_layers = [] # type: List['self.Layer'] # bind the last out to current in if last_layer.is_single_component: last_layer.components[0]['port_out'] = self._get_random_port() for c in layer.components: c['port_in'] = last_layer.components[0]['port_out'] # 1-to-? if layer.is_single_component: # 1-to-(1) # no router is needed rule1() elif layer.is_homo_multi_component: # 1-to-(N) income = self.Layer.get_value(layer.components[0], 'income') if income == 'pull': rule1() elif income == 'sub': rule2() else: raise NotImplementedError('replica type: %s is not recognized!' % income) elif layer.is_heto_single_component: # 1-to-(1)&(1)&(1) rule4() else: # 1-to-(N)&(N)&(N) rule6() elif last_layer.is_homo_multi_component: # (N)-to-? last_layer.components[0]['port_out'] = self._get_random_port() last_income = self.Layer.get_value(last_layer.components[0], 'income') for c in layer.components: c['port_in'] = last_layer.components[0]['port_out'] if layer.is_single_component: if last_income == 'pull': # (N)-to-1 rule9() elif last_income == 'sub': # (N)-to-1 with a sync barrier rule3() else: raise NotImplementedError('replica type: %s is not recognized!' % last_income) elif layer.is_homo_multi_component: # (N)-to-(N) # need a router anyway if self.Layer.get_value(layer.components[0], 'income') == 'sub': rule5() else: rule3() elif layer.is_heto_single_component: # (N)-to-(1)&(1)&(1) rule5() else: income = self.Layer.get_value(layer.components[0], 'income') if income == 'pull': rule7() elif income == 'sub': rule10() else: raise NotImplementedError('replica type: %s is not recognized!' % last_income) elif last_layer.is_heto_single_component: rule8() else: rule8() return [last_layer, *router_layers]
[docs]def parse_http_data(data, args): import io if not data or 'yaml-config' not in data: return '<h1>Bad POST request</h1> your POST request does not contain "yaml-config" field!', 406 try: args.yaml_path = io.StringIO(data['yaml-config']) if data.get('mermaid_direction', 'top-down').lower() == 'left-right': args.mermaid_leftright = True else: args.mermaid_leftright = False if 'docker-image' in data: args.docker_img = data['docker-image'] else: args.docker_img = 'gnes/gnes:latest-alpine' return YamlComposer(args).build_all()['html'], 200 except Exception as e: return '<h1>Bad YAML input</h1> please kindly check the format, ' \ 'indent and content of your YAML file! <h3>Traceback: </h3><p><code>%s</code></p>' % e, 400