一个让python程序员关注应用逻辑和测试的微服务框架。

安装

安装Nameko

pip instal nameko
  • 1

安装nameko需要的依赖

sudo apt install rabbitmq-server
  • 1

入门程序

服务端

# helloworld.pyfrom nameko.rpc import rpcclass GreetingService:name = "greeting_service" # 自定义服务名称@rpc #入口点标记def hello(self, name):return "Hello, {}!".format(name)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

运行服务

nameko run helloworld
  • 1

正常输出

starting services: greeting_service
Connected to amqp://guest:**@127.0.0.1:5672//
  • 1
  • 2

命令行客户端

进入nameko交互模式

nameko shell
  • 1

发起客户端请求

n.rpc.greeting_service.hello(name="hello")
  • 1

正常输出

u'Hello, hello!'
  • 1

2.核心概念

服务

一个服务就是一个python类。这个类把服务逻辑封装到方法中,而且把任何的依赖都作为方法的参数。原因很简单,微服务的核心的就是解耦。
  • 1
进入点
进入点可以简单理解为带有@rpc标记的方法所关联的服务入口。在方法上使用了@rpc修饰的方法都将暴露给外部业务。这些进入点一般会会监视外部事件。例如一个消息队列中的消息事件,将触发进入点修饰的方法执行并返回结果。
  • 1
依赖
官方提出所有非服务核心逻辑的实现最好都以依赖的形式实现。
依赖其实是隐藏代码的一种很好的方式。
使用依赖时应该把所有依赖都进行声明。
  • 1
  • 2
  • 3
工作器
工作器就是进入点发放被触发的时候产生的微服务类实例。但是如果有依赖,那么就会被依赖的实例代替。
一个工作器实例只处理一次请求,提供的是无状态服务。
一个服务可以同时运行多个工作器,但最多只能是用户预定义的并发值。
  • 1
  • 2
  • 3

依赖注入

服务类的依赖添加是声明式的。声明时不是使用接口,而是通过使用参数进行声明。
这个参数是一个DependencyProvider。这个东西负责提供注入到服务工作器的对象。
所有的provider都要提供get_dependency()方法生产要注入到工作器中的对象。

工作器生命周期:

1.进入点触发
2.通过服务类初始化工作器
3.依赖出入到工作器
4.执行方法
5.工作器销毁
  • 1
  • 2
  • 3
  • 4
  • 5

伪代码:

worker = Service()
worker.other_rpc = worker.other_rpc.get_dependency()
worker.method()
del worker
  • 1
  • 2
  • 3
  • 4

依赖提供者在服务的持续时间内存活,而注入的依赖项对于每个工作器来说都是惟一的。

同步

Nameko基于eventlet库,这个库实现的同步模型是基于隐式yield模式的协程,通过“绿色的线程”提供同步功能。

隐式的yield基于monkey patching基础库。当一个线程等待IO时就会触发yield。通过命令==nameko run==启动的服务将会应用这个模式。

每一个工作器都有自己的线程。最大的同步工作器数量可以基于每个工作器等待IO时间的总量来动态调整。

工作器都是无状态的所以天生线程安全。但是外部依赖应该确保他们每个工作线程都是用同一个依赖或者多个工作器都能安全地同步访问。

然而c扩展体系都使用socket通信,他们通常都不认为绿色线程的工作能满足线程安全。其中就包括 librabbitmq, MySQLdb等。

扩展

所有的入口点和依赖提供者都作为“扩展”实现。因为他们存在于服务代码之外,又不是所有服务都需要的。(例如一个纯的AMQP暴露的服务将不会使用HTTP入口点)

Nameko有大量的内建扩展,一些是有社区提供的,而你也可以实现自己的扩展。

运行服务

运行服务需要的所有东西:服务类和有关的配置。
最简单的运行一个或者多个服务的方法是使用Nameko命令行:

nameko run module:[ServiceClass]
  • 1

这里的意思是运行某module下的所有服务或者运行某module下的特定的ServiceClass服务。

服务容器

每个服务类都委托给一个ServiceContainer。这个容器封装了所有需要运行一个服务的方法,而且装载了在服务类上的任何扩展。

使用ServiceContainer运行单个服务:

from nameko.containers import ServiceContainerclass Service:name = "service"# create a container
container = ServiceContainer(Service, config={})# ``container.extensions`` exposes all extensions used by the service
service_extensions = list(container.extensions)# start service
container.start()# stop service
container.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

服务运行器

ServiceRunner 是多个服务容器的简单包装,同时提供启动和停止所有包装容器的方法。这个其实是nameko run内部使用的,但这也能实现程序化控制。

from nameko.runners import ServiceRunner
from nameko.testing.utils import get_containerclass ServiceA:name = "service_a"class ServiceB:name = "service_b"# create a runner for ServiceA and ServiceB
runner = ServiceRunner(config={})
runner.add_service(ServiceA)
runner.add_service(ServiceB)# ``get_container`` will return the container for a particular service
container_a = get_container(runner, ServiceA)# start both services
runner.start()# stop both services
runner.stop()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

3.命令行接口

Nameko 提供了一个命令行接口尽可能方便地托管服务及与服务进行交互。

运行服务

nameko run <module>[:<ServiceClass>]
  • 1

发现并运行服务类。这个命令将会在前台启动服务并且运行到进程终止。

也可以用–config选项重写默认配置,并提供一个YAML 格式的配置文件

nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
  • 1
# foobar.yamlAMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10LOGGING:version: 1handlers:console:class: logging.StreamHandlerroot:level: DEBUGhandlers: [console]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

LOGGING 项会传递到logging.config.dictConfig(),而且会适应调用的样式。

配置值可以通过内建的Config依赖提供器读取。

环境变量解决方案

YAML配置文件为环境变量提供了基本的支持。可以使用bash风格的语法:${ENV_VAR},另外还可以提供默认值${ENV_VAR:default_value}

# foobar.yaml
AMQP_URI: pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}
  • 1
  • 2

使用环境变量的运行方式示例

$ RABBITMQ_USER=user RABBITMQ_PASSWORD=password RABBITMQ_HOST=host nameko run --config ./foobar.yaml <module>[:<ServiceClass>]
  • 1

如果需要在YAML文件里使用引号(引号里使用环境变量),显式声明!env_var处理器是必须的。

# foobar.yaml
AMQP_URI: !env_var "pyamqp://${RABBITMQ_USER:guest}:${RABBITMQ_PASSWORD:password}@${RABBITMQ_HOST:localhost}"
  • 1
  • 2

与运行的服务进行交互

nameko shell
  • 1

上述命令是为了与远程服务工作,运行了一个交互式python脚本环境。这是规范的交互式解释器,提供了一个添加到了内建命名空间的特殊的模块n,以支持RPC调用和分发事件。

发起RPC到目标服务
$ nameko shell
>>> n.rpc.target_service.target_method(...)
# RPC response
  • 1
  • 2
  • 3
作为源服务分发事件
$ nameko shell
>>> n.dispatch_event("source_service", "event_type", "event_payload")
  • 1
  • 2

4.内建的扩展

RPC

Nameko包含了一个基于AMQP的RPC实现。它包括@rpc入口点,一个与其他服务对话的代理,以及一个非Nameko客户端也能发起RPC调用到集群的独立的代理

from nameko.rpc import rpc, RpcProxyclass ServiceY:name = "service_y"@rpcdef append_identifier(self, value):return u"{}-y".format(value)class ServiceX:name = "service_x"y = RpcProxy("service_y")@rpcdef remote_method(self, value):res = u"{}-x".format(value)return self.y.append_identifier(res)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
from nameko.standalone.rpc import ClusterRpcProxyconfig = {'AMQP_URI': AMQP_URI  # e.g. "pyamqp://guest:guest@localhost"
}with ClusterRpcProxy(config) as cluster_rpc:cluster_rpc.service_x.remote_method("hellø")  # "hellø-x-y"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

一般的RPC调用会一直阻塞直到远程方法完成为止。但是代理也提供了一个异步调用模式到后台或者并行化RPC调用:

with ClusterRpcProxy(config) as cluster_rpc:hello_res = cluster_rpc.service_x.remote_method.call_async("hello")world_res = cluster_rpc.service_x.remote_method.call_async("world")# do work while waitinghello_res.result()  # "hello-x-y"world_res.result()  # "world-x-y"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

在一个集群里面拥有超过一个的目标服务的实例,RPC请求在这些实例间循环。请求只会由目标服务中的一个实例来处理。

AMQP消息只会在请求被成功处理后才被确认。如果服务确认消息失败,AMQP连接关闭(例如服务进程被杀死)broker将重试调用然后分发消息到可用的服务实例上。

请求和相应的负载为了通过网线传输而被序列化到JSON

事件(发布订阅)

Nameko 事件是一个异步的消息系统,实现了发布订阅模式。服务分发事件,而这些事件可以被0到多个的其他服务所接收。

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpcclass ServiceA:""" Event dispatching service. """name = "service_a"dispatch = EventDispatcher()@rpcdef dispatching_method(self, payload):self.dispatch("event_type", payload)class ServiceB:""" Event listening service. """name = "service_b"@event_handler("service_a", "event_type")def handle_event(self, payload):print("service b received:", payload)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

EventHandler进入点有三个处理器类型决定事件消息是如何被一个集群接收的:
SERVICE_POOL:所有事件处理器通过服务名称联合在一起,并且每个池中的只有一个实例接受到事件,类似于RPC进入点的集群行为。这是默认的处理类型。
BROADCAST:每个监听服务实例都会接收到事件。
SINGLETON:只有一个监听服务实例会接收到事件。

广播的例子:

from nameko.events import BROADCAST, event_handlerclass ListenerService:name = "listener"@event_handler("monitor", "ping", handler_type=BROADCAST, reliable_delivery=False)def ping(self, payload):# all running services will respondprint("pong from {}".format(self.name))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

为了通过网络传输,事件都被序列化成了JSON。

HTTP GET & POST

Nameko的HTTP进入点 支持简单的GET和POST

# http.pyimport json
from nameko.web.handlers import httpclass HttpService:name = "http_service"@http('GET', '/get/<int:value>')def get_method(self, request, value):return json.dumps({'value': value})@http('POST', '/post')def do_post(self, request):return u"received: {}".format(request.get_data(as_text=True))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

启动服务

$nameko run http
starting services: http_service
  • 1
  • 2

测试服务

$ curl -i localhost:8000/get/42
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 13
Date: Fri, 13 Feb 2015 14:51:18 GMT{'value': 42}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
$ curl -i -d "post body" localhost:8000/post
HTTP/1.1 200 OK
Content-Type: text/plain; charset=utf-8
Content-Length: 19
Date: Fri, 13 Feb 2015 14:55:01 GMTreceived: post body
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

HTTP进入点是基于werkzeug库的。服务方法必须返回以下的任一值:

一个字符串,将变成响应实体response body
一个二元组,(status code, response body)
一个三元组,(status_code, headers dict, response body)
一个werkzeug.wrappers.Response实例
  • 1
  • 2
  • 3
  • 4
# advanced_http.pyfrom nameko.web.handlers import http
from werkzeug.wrappers import Responseclass Service:name = "advanced_http_service"@http('GET', '/privileged')def forbidden(self, request):return 403, "Forbidden"@http('GET', '/headers')def redirect(self, request):return 201, {'Location': 'https://www.example.com/widget/1'}, ""@http('GET', '/custom')def custom(self, request):return Response("payload")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

运行服务

$ nameko run advanced_http
starting services: advanced_http_service
  • 1
  • 2

测试:

$ curl -i localhost:8000/privileged
HTTP/1.1 403 FORBIDDEN
Content-Type: text/plain; charset=utf-8
Content-Length: 9
Date: Fri, 13 Feb 2015 14:58:02 GMT
  • 1
  • 2
  • 3
  • 4
  • 5
curl -i localhost:8000/headers
HTTP/1.1 201 CREATED
Location: https://www.example.com/widget/1
Content-Type: text/plain; charset=utf-8
Content-Length: 0
Date: Fri, 13 Feb 2015 14:58:48 GMT
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

可以通过重写response_from_exception()方法格式化控制的错误返回(服务器异常控制)。

import json
from nameko.web.handlers import HttpRequestHandler
from werkzeug.wrappers import Response
from nameko.exceptions import safe_for_serializationclass HttpError(Exception):error_code = 'BAD_REQUEST'status_code = 400class InvalidArgumentsError(HttpError):error_code = 'INVALID_ARGUMENTS'#重写异常处理
class HttpEntrypoint(HttpRequestHandler):def response_from_exception(self, exc):if isinstance(exc, HttpError):response = Response(json.dumps({'error': exc.error_code,'message': safe_for_serialization(exc),}),status=exc.status_code,mimetype='application/json')return responsereturn HttpRequestHandler.response_from_exception(self, exc)http = HttpEntrypoint.decoratorclass Service:name = "http_service"@http('GET', '/custom_exception')def custom_exception(self, request):raise InvalidArgumentsError("Argument `foo` is required.")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

运行

$ nameko run http_exceptions
starting services: http_service
  • 1
  • 2

测试

$ curl -i http://localhost:8000/custom_exception
HTTP/1.1 400 BAD REQUEST
Content-Type: application/json
Content-Length: 72
Date: Thu, 06 Aug 2015 09:53:56 GMT{"message": "Argument `foo` is required.", "error": "INVALID_ARGUMENTS"}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

计时器

计时器是一个每达到可配置的秒数时刻就触发的简单的入口点。计时器是非集群定制的,而且在所有的服务实例都会触发。

from nameko.timer import timerclass Service:name ="service"@timer(interval=1)def ping(self):# method executed every secondprint("pong")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

5.内建的Dependency Providers

Nameko包含了一些常用的Dependency Providers。

Config

Config是一个简单的依赖提供器,提供了在运行时只读读取配置值的能力。

from nameko.dependency_providers import Config
from nameko.web.handlers import httpclass Service:name = "test_config"config = Config()@propertydef foo_enabled(self):return self.config.get('FOO_FEATURE_ENABLED', False)@http('GET', '/foo')def foo(self, request):if not self.foo_enabled:return 403, "FeatureNotEnabled"return 'foo'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

6.社区支持(Community)

社区有大量不是核心项目但你会发现在开发自己的nameko服务是很有用的nameko扩展和补充的库。

扩展

nameko-sqlalchemy
nameko-sentry
nameko-amqp-retry
nameko-bayeux-client
nameko-slack
nameko-eventlog-dispatcher
nameko-redis-py
nameko-redis
nameko-statsd

补充库

django-nameko
flask_nameko
nameko-proxy

7.测试服务

哲学

Nameko规约设计得很容易进行测试。服务可能很小而且功能单一,而且依赖注入使得它很简单就可以替换及分离函数的片段。

nameko自己的测试套件使用pytest库。

单元测试

单元测试通常意味着分离地测试一个单一的服务。例如没有了任何或者大部分的依赖。

worker_factory()工具将从一个服务类创建工作器,并使用mock.MagicMock创建的对象替换掉原来的依赖。 依赖函数可以通过 side_effect和 return_value伪造。

""" Service unit testing best practice.
"""from nameko.rpc import RpcProxy, rpc
from nameko.testing.services import worker_factoryclass ConversionService(object):""" Service under test"""name = "conversions"maths_rpc = RpcProxy("maths")@rpcdef inches_to_cm(self, inches):return self.maths_rpc.multiply(inches, 2.54)@rpcdef cms_to_inches(self, cms):return self.maths_rpc.divide(cms, 2.54)def test_conversion_service():# create worker with mock dependenciesservice = worker_factory(ConversionService)# add side effects to the mock proxy to the "maths" serviceservice.maths_rpc.multiply.side_effect = lambda x, y: x * yservice.maths_rpc.divide.side_effect = lambda x, y: x / y# test inches_to_cm business logicassert service.inches_to_cm(300) == 762service.maths_rpc.multiply.assert_called_once_with(300, 2.54)# test cms_to_inches business logicassert service.cms_to_inches(762) == 300service.maths_rpc.divide.assert_called_once_with(762, 2.54)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

有些情况下使用一个代替性的依赖比伪造依赖更有用。这个可能是一个全功能的替换(例如一个测试数据库会话)或者一个轻量级的提供部分功能的垫片。

""" Service unit testing best practice, with an alternative dependency.
"""import pytest
from sqlalchemy import Column, Integer, String, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmakerfrom nameko.rpc import rpc
from nameko.testing.services import worker_factory# using community extension from http://pypi.python.org/pypi/nameko-sqlalchemy
from nameko_sqlalchemy import SessionBase = declarative_base()class Result(Base):__tablename__ = 'model'id = Column(Integer, primary_key=True)value = Column(String(64))class Service:""" Service under test"""name = "service"db = Session(Base)@rpcdef save(self, value):result = Result(value=value)self.db.add(result)self.db.commit()@pytest.fixture
def session():""" Create a test database and session"""engine = create_engine('sqlite:///:memory:')Base.metadata.create_all(engine)session_cls = sessionmaker(bind=engine)return session_cls()def test_service(session):# create instance, providing the test database sessionservice = worker_factory(Service, db=session)# verify ``save`` logic by querying the test databaseservice.save("helloworld")assert session.query(Result.value).all() == [("helloworld",)]
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56

集成测试

在nameko中集成测试意味着在数个服务间测试接口。建议的方法是以正常的方式运行所有的被测试的服务,然后使用帮助类触发入口点的行为。

""" Service integration testing best practice.
"""from nameko.rpc import rpc, RpcProxy
from nameko.testing.utils import get_container
from nameko.testing.services import entrypoint_hookclass ServiceX:""" Service under test"""name = "service_x"y = RpcProxy("service_y")@rpcdef remote_method(self, value):res = "{}-x".format(value)return self.y.append_identifier(res)class ServiceY:""" Service under test"""name = "service_y"@rpcdef append_identifier(self, value):return "{}-y".format(value)def test_service_x_y_integration(runner_factory, rabbit_config):# run services in the normal mannerrunner = runner_factory(rabbit_config, ServiceX, ServiceY)runner.start()# artificially fire the "remote_method" entrypoint on ServiceX# and verify responsecontainer = get_container(runner, ServiceX)with entrypoint_hook(container, "remote_method") as entrypoint:assert entrypoint("value") == "value-x-y"
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

注意这里在ServiceX 和ServiceY之间的接口就像在正常操作一样。

对于一个特殊的测试,接口如果在超出测试范围的(与测试无关的),可以使用下面的其中一个测试帮助器进行禁用。

restrict_entrypoints
nameko.testing.services.restrict_entrypoints(container, *entrypoints)
  • 1

限制在container 的进入点为特定的名称的进入点。

这些方法必须在容器启动前被调用。

用法

下面的服务定义有两个进入点:

class Service(object):name = "service"@timer(interval=1)def foo(self, arg):pass@rpcdef bar(self, arg)pass@rpcdef baz(self, arg):passcontainer = ServiceContainer(Service, config)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

禁用在foo上的定时器进入点,只留下RPC进入点

restrict_entrypoints(container, "bar", "baz")
  • 1

注意不可能单独地把多个进入点看成同一个方法。

replace_dependencies
nameko.testing.services.replace_dependencies(container, *dependencies, **dependency_map)
  • 1

替换依赖提供器在容器上使用伪造的依赖提供器。

在 * dependencies声明的依赖将由一个MockDependencyProvider替换,这个MockDependencyProvider会注入一个魔术伪造器而不是依赖。

另外,你可能使用关键字参数命名依赖,而且提供MockDependencyProvider应该注入的代替的值。

为每个在 * dependencies声明的依赖返回MockDependencyProvider.dependency,这样对替换的依赖的调用就能被检查出来。如果只有一个依赖被替换,则返回一个单一的对象,否则由生成器yield出在 * dependencies声明的依赖。注意任何在 * * dependency_map指出的替换的依赖都不会被返回。

替换是在容器实例里执行的,而且对服务类没有影响。所以新的容器实例不会影响旧的容器实例。

用法

from nameko.rpc import RpcProxy, rpc
from nameko.standalone.rpc import ServiceRpcProxyclass ConversionService(object):name = "conversions"maths_rpc = RpcProxy("maths")@rpcdef inches_to_cm(self, inches):return self.maths_rpc.multiply(inches, 2.54)@rpcdef cm_to_inches(self, cms):return self.maths_rpc.divide(cms, 2.54)container = ServiceContainer(ConversionService, config)
mock_maths_rpc = replace_dependencies(container, "maths_rpc")
mock_maths_rpc.divide.return_value = 39.37container.start()with ServiceRpcProxy('conversions', config) as proxy:proxy.cm_to_inches(100)# assert that the dependency was called as expected
mock_maths_rpc.divide.assert_called_once_with(100, 2.54)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

通过关键字指定特殊的替换

class StubMaths(object):def divide(self, val1, val2):return val1 / val2replace_dependencies(container, maths_rpc=StubMaths())container.start()with ServiceRpcProxy('conversions', config) as proxy:assert proxy.cm_to_inches(127) == 50.0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

完整的例子

使用两个范围限制帮助器

"""
This file defines several toy services that interact to form a shop of the
famous ACME Corporation. The AcmeShopService relies on the StockService,
InvoiceService and PaymentService to fulfil its orders. They are not best
practice examples! They're minimal services provided for the test at the
bottom of the file.``test_shop_integration`` is a full integration test of the ACME shop
"checkout flow". It demonstrates how to test the multiple ACME services in
combination with each other, including limiting service interactions by
replacing certain entrypoints and dependencies.
"""from collections import defaultdictimport pytestfrom nameko.extensions import DependencyProvider
from nameko.events import EventDispatcher, event_handler
from nameko.exceptions import RemoteError
from nameko.rpc import rpc, RpcProxy
from nameko.standalone.rpc import ServiceRpcProxy
from nameko.testing.services import replace_dependencies, restrict_entrypoints
from nameko.testing.utils import get_container
from nameko.timer import timerclass NotLoggedInError(Exception):passclass ItemOutOfStockError(Exception):passclass ItemDoesNotExistError(Exception):passclass ShoppingBasket(DependencyProvider):""" A shopping basket tied to the current ``user_id``."""def __init__(self):self.baskets = defaultdict(list)def get_dependency(self, worker_ctx):class Basket(object):def __init__(self, basket):self._basket = basketself.worker_ctx = worker_ctxdef add(self, item):self._basket.append(item)def __iter__(self):for item in self._basket:yield itemtry:user_id = worker_ctx.data['user_id']except KeyError:raise NotLoggedInError()return Basket(self.baskets[user_id])class AcmeShopService:name = "acmeshopservice"user_basket = ShoppingBasket()stock_rpc = RpcProxy('stockservice')invoice_rpc = RpcProxy('invoiceservice')payment_rpc = RpcProxy('paymentservice')fire_event = EventDispatcher()@rpcdef add_to_basket(self, item_code):""" Add item identified by ``item_code`` to the shopping basket.This is a toy example! Ignore the obvious race condition."""stock_level = self.stock_rpc.check_stock(item_code)if stock_level > 0:self.user_basket.add(item_code)self.fire_event("item_added_to_basket", item_code)return item_coderaise ItemOutOfStockError(item_code)@rpcdef checkout(self):""" Take payment for all items in the shopping basket."""total_price = sum(self.stock_rpc.check_price(item)for item in self.user_basket)# prepare invoiceinvoice = self.invoice_rpc.prepare_invoice(total_price)# take paymentself.payment_rpc.take_payment(invoice)# fire checkout event if prepare_invoice and take_payment succeededcheckout_event_data = {'invoice': invoice,'items': list(self.user_basket)}self.fire_event("checkout_complete", checkout_event_data)return total_priceclass Warehouse(DependencyProvider):""" A database of items in the warehouse.This is a toy example! A dictionary is not a database."""def __init__(self):self.database = {'anvil': {'price': 100,'stock': 3},'dehydrated_boulders': {'price': 999,'stock': 12},'invisible_paint': {'price': 10,'stock': 30},'toothpicks': {'price': 1,'stock': 0}}def get_dependency(self, worker_ctx):return self.databaseclass StockService:name = "stockservice"warehouse = Warehouse()@rpcdef check_price(self, item_code):""" Check the price of an item."""try:return self.warehouse[item_code]['price']except KeyError:raise ItemDoesNotExistError(item_code)@rpcdef check_stock(self, item_code):""" Check the stock level of an item."""try:return self.warehouse[item_code]['stock']except KeyError:raise ItemDoesNotExistError(item_code)@rpc@timer(100)def monitor_stock(self):""" Periodic stock monitoring method. Can also be triggered manuallyover RPC.This is an expensive process that we don't want to exercise duringintegration testing..."""raise NotImplementedError()@event_handler('acmeshopservice', "checkout_complete")def dispatch_items(self, event_data):""" Dispatch items from stock on successful checkouts.This is an expensive process that we don't want to exercise duringintegration testing..."""raise NotImplementedError()class AddressBook(DependencyProvider):""" A database of user details, keyed on user_id."""def __init__(self):self.address_book = {'wile_e_coyote': {'username': 'wile_e_coyote','fullname': 'Wile E Coyote','address': '12 Long Road, High Cliffs, Utah',},}def get_dependency(self, worker_ctx):def get_user_details():try:user_id = worker_ctx.data['user_id']except KeyError:raise NotLoggedInError()return self.address_book.get(user_id)return get_user_detailsclass InvoiceService:name = "invoiceservice"get_user_details = AddressBook()@rpcdef prepare_invoice(self, amount):""" Prepare an invoice for ``amount`` for the current user."""address = self.get_user_details().get('address')fullname = self.get_user_details().get('fullname')username = self.get_user_details().get('username')msg = "Dear {}. Please pay ${} to ACME Corp.".format(fullname, amount)invoice = {'message': msg,'amount': amount,'customer': username,'address': address}return invoiceclass PaymentService:name = "paymentservice"@rpcdef take_payment(self, invoice):""" Take payment from a customer according to ``invoice``.This is an expensive process that we don't want to exercise duringintegration testing..."""raise NotImplementedError()# =============================================================================
# Begin test
# =============================================================================@pytest.yield_fixture
def rpc_proxy_factory(rabbit_config):""" Factory fixture for standalone RPC proxies.Proxies are started automatically so they can be used without a ``with``statement. All created proxies are stopped at the end of the test, whenthis fixture closes."""all_proxies = []def make_proxy(service_name, **kwargs):proxy = ServiceRpcProxy(service_name, rabbit_config, **kwargs)all_proxies.append(proxy)return proxy.start()yield make_proxyfor proxy in all_proxies:proxy.stop()def test_shop_checkout_integration(rabbit_config, runner_factory, rpc_proxy_factory
):""" Simulate a checkout flow as an integration test.Requires instances of AcmeShopService, StockService and InvoiceServiceto be running. Explicitly replaces the rpc proxy to PaymentService sothat service doesn't need to be hosted.Also replaces the event dispatcher dependency on AcmeShopService anddisables the timer entrypoint on StockService. Limiting the interactionsof services in this way reduces the scope of the integration test andeliminates undesirable side-effects (e.g. processing events unnecessarily)."""context_data = {'user_id': 'wile_e_coyote'}shop = rpc_proxy_factory('acmeshopservice', context_data=context_data)runner = runner_factory(rabbit_config, AcmeShopService, StockService, InvoiceService)# replace ``event_dispatcher`` and ``payment_rpc``  dependencies on# AcmeShopService with ``MockDependencyProvider``\sshop_container = get_container(runner, AcmeShopService)fire_event, payment_rpc = replace_dependencies(shop_container, "fire_event", "payment_rpc")# restrict entrypoints on StockServicestock_container = get_container(runner, StockService)restrict_entrypoints(stock_container, "check_price", "check_stock")runner.start()# add some items to the basketassert shop.add_to_basket("anvil") == "anvil"assert shop.add_to_basket("invisible_paint") == "invisible_paint"# try to buy something that's out of stockwith pytest.raises(RemoteError) as exc_info:shop.add_to_basket("toothpicks")assert exc_info.value.exc_type == "ItemOutOfStockError"# provide a mock response from the payment servicepayment_rpc.take_payment.return_value = "Payment complete."# checkoutres = shop.checkout()total_amount = 100 + 10assert res == total_amount# verify integration with mocked out payment servicepayment_rpc.take_payment.assert_called_once_with({'customer': "wile_e_coyote",'address': "12 Long Road, High Cliffs, Utah",'amount': total_amount,'message': "Dear Wile E Coyote. Please pay $110 to ACME Corp."})# verify events fired as expectedassert fire_event.call_count == 3if __name__ == "__main__":import syspytest.main(sys.argv)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333

其他的帮助方法

entrypoint_hook
提供了context_data 模仿特殊调用上下文的方法。
  • 1
import pytestfrom nameko.contextdata import Language
from nameko.rpc import rpc
from nameko.testing.services import entrypoint_hookclass HelloService:""" Service under test"""name = "hello_service"language = Language()@rpcdef hello(self, name):greeting = "Hello"if self.language == "fr":greeting = "Bonjour"elif self.language == "de":greeting = "Gutentag"return "{}, {}!".format(greeting, name)@pytest.mark.parametrize("language, greeting", [("en", "Hello"),("fr", "Bonjour"),("de", "Gutentag"),
])
def test_hello_languages(language, greeting, container_factory, rabbit_config):container = container_factory(HelloService, rabbit_config)container.start()context_data = {'language': language}with entrypoint_hook(container, 'hello', context_data) as hook:assert hook("Matt") == "{}, Matt!".format(greeting)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
entrypoint_waiter

提供了控阻塞调用的测试方法。

from nameko.events import event_handler
from nameko.standalone.events import event_dispatcher
from nameko.testing.services import entrypoint_waiterclass ServiceB:""" Event listening service."""name = "service_b"@event_handler("service_a", "event_type")def handle_event(self, payload):print("service b received", payload)def test_event_interface(container_factory, rabbit_config):container = container_factory(ServiceB, rabbit_config)container.start()dispatch = event_dispatcher(rabbit_config)# prints "service b received payload" before "exited"with entrypoint_waiter(container, 'handle_event'):dispatch("service_a", "event_type", "payload")print("exited")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

8.编写扩展

结构

所有的扩展都是nameko.extensions.Extension的子类。这个基类提供了基本的扩展的架构,特别是下面的方法可以被重写以添加特定的功能。

Extension.setup()
在容器启动前调用绑定的扩展。
扩展应该在这里做任何必要的初始化工作。
  • 1
  • 2
Extension.start()
当容器成功启动时调用绑定的扩展。
这个方法只会在所有的的扩展都成功调用setup后才会调用。如果扩展影响外部事件,那么它现在就会开始影响它们。
  • 1
  • 2
Extension.stop()
在容器开始关闭时调用。
所有扩展应该优雅地在这里关闭。
  • 1
  • 2

编写依赖提供器

所有的依赖提供器都应该是nameko.extensions.DependencyProvider的子类,并实现了get_dependency()方法返回一个注入到服务工作器中的一个实例。

推荐的做法是为依赖注入最小的必须的接口。这将减少测试面也更容易由测试代码覆盖。

依赖提供器也会绑定到工作器的生命周期。下面的三个方法会为每个工作器的所有依赖提供器中被调用

DependencyProvider.worker_setup(worker_ctx)
在服务工作器工作之前执行一个任务。
依赖应该在这里做任何的预处理,在遇到失败事件是抛出异常。
参数:worker_ctx : WorkerContext
nameko.containers.ServiceContainer.spawn_worker
  • 1
  • 2
  • 3
  • 4
DependencyProvider.worker_result(worker_ctx, result=None, exc_info=None)
在服务工作器执行获得结果后调用。
依赖应该在这里处理结果。这个方法会在任何工作器完成工作时为所有的依赖实例进行调用。
例如:一个数据库会话依赖可能会输出缓存的事务
  • 1
  • 2
  • 3
DependencyProvider.worker_teardown(worker_ctx)
在服务工作器完成任务后调用。
依赖可以在这里做任何的后处理,在出现失败事件时抛出异常。
例如:一个数据库会话可能会提交会话。
  • 1
  • 2
  • 3

同步和线程安全

由get_dependency()返回的对象应该是线程安全的,因为它将会被多个同步的正在运行的工作器访问。

在执行服务的相同的线程中,工作器生命周期方法会被调用。这意味着,你能定义线程本地变量,而且通过每个方法访问他们。

例子

一个简单的DependencyProvider 发送消息到SQS队列

from nameko.extensions import DependencyProviderimport boto3class SqsSend(DependencyProvider):def __init__(self, url, region="eu-west-1", **kwargs):self.url = urlself.region = regionsuper(SqsSend, self).__init__(**kwargs)def setup(self):self.client = boto3.client('sqs', region_name=self.region)def get_dependency(self, worker_ctx):def send_message(payload):# assumes boto client is thread-safe for this action, because it# happens inside the worker threadsself.client.send_message(QueueUrl=self.url,MessageBody=payload)return send_message
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

写入口点

你可以实现一个新的入口点扩展,如果你想为初始化服务支持新的传输或机制。

一个入口点最少的要求
1.继承nameko.extensions.Entrypoint
2.实现start()方法,当容器启动时启动入口点。如果后台线程是必要的,推荐做法是使用一个由服务线程管理的线程。
3.在合适的时候调用spawn_worker()绑定容器

例子

一个从SQS队列接收消息的简单入口点。

from nameko.extensions import Entrypoint
from functools import partialimport boto3class SqsReceive(Entrypoint):def __init__(self, url, region="eu-west-1", **kwargs):self.url = urlself.region = regionsuper(SqsReceive, self).__init__(**kwargs)def setup(self):self.client = boto3.client('sqs', region_name=self.region)def start(self):self.container.spawn_managed_thread(self.run, identifier="SqsReceiver.run")def run(self):while True:response = self.client.receive_message(QueueUrl=self.url,WaitTimeSeconds=5,)messages = response.get('Messages', ())for message in messages:self.handle_message(message)def handle_message(self, message):handle_result = partial(self.handle_result, message)args = (message['Body'],)kwargs = {}self.container.spawn_worker(self, args, kwargs, handle_result=handle_result)def handle_result(self, message, worker_ctx, result, exc_info):# assumes boto client is thread-safe for this action, because it# happens inside the worker threadsself.client.delete_message(QueueUrl=self.url,ReceiptHandle=message['ReceiptHandle'])return result, exc_inforeceive = SqsReceive.decorator
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

在服务类中使用

from .sqs_receive import receiveclass SqsService(object):name = "sqs-service"@receive('https://sqs.eu-west-1.amazonaws.com/123456789012/nameko-sqs')def handle_sqs_message(self, body):""" This method is called by the `receive` entrypoint whenevera message sent to the given SQS queue."""print(body)return body
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

期望的异常

Entrypoint 基类构造器能接受一个异常的列表,用于区分方法返回的异常的不同。

class Service:name = "service"auth = Auth()@rpc(expected_exceptions=Unauthorized)def update(self, data):if not self.auth.has_role("admin"):raise Unauthorized()# perform updateraise TypeError("Whoops, genuine error.")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

更觉不同的异常可以,用于检查而做出不同的响应逻辑。相关的检测库有 nameko-sentry。

敏感的参数

Entrypoint 构造器允许你标记某个部分的参数为敏感参数。

class Service:name = "service"auth = Auth()@rpc(sensitive_arguments="password", expected_exceptions=Unauthenticated)def login(self, username, password):# raises Unauthenticated if username/password do not matchreturn self.auth.authenticate(username, password)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

nameko.utils.get_redacted_args()方法将隐藏敏感参数返回 **

一个有用的扩展可以记录和保存进口点的调用信息:nameko-tracer

# by dictionary key
@entrypoint(sensitive_arguments="foo.a")
def method(self, foo):pass>>> get_redacted_args(method, foo={'a': 1, 'b': 2})
... {'foo': {'a': '******', 'b': 2}}# list index
@entrypoint(sensitive_arguments="foo.a[1]")
def method(self, foo):pass>>> get_redacted_args(method, foo=[{'a': [1, 2, 3]}])
... {'foo': {'a': [1, '******', 3]}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

繁殖后台线程

扩展,需要在一个线程里执行工作,它可能需要选择使用spawn_managed_thread()委托service 容器来管理线程。

 def start(self):self.container.spawn_managed_thread(self.run, identifier="SqsReceiver.run")
  • 1
  • 2
  • 3
  • 4

推荐委托线程管理到容器,因为

1.管理的线程将总是当容器停止或者被杀死时被终止。
2.在管理的线程中未处理的异常会由容器捕获,而且会导致线程产生错误信息并终止,这个机制可以防止进程挂起。

Nameko中文文档(翻译)相关推荐

  1. Apache Spark 2.2.0 中文文档 翻译活动

    为什么80%的码农都做不了架构师?>>>    Spark 2.2.0 已然发布(2017-07-11 发布) 5 天了,更新了一些新套路吧! 此版本从 Structured Str ...

  2. DeBreath 去除呼吸声vst插件中文文档(翻译)

    文档地址:https://www.waves.com/1lib/pdf/plugins/debreath.pdf 介绍: 如果你曾经难以从人声轨道中移动出呼吸声,特别是句子开头的吸气的声音,你或许会问 ...

  3. intros.js中文文档-翻译不易

    Tour API introJs([targetElm])# 创建一个对象 参数: targetElm : String (optional) 参数是一个可选的字符串 例子:#intro-farm. ...

  4. webpack2.x 中文文档 翻译 之 出口Output

    出口Output 影响编译的输出 告诉webpack怎样编译输出的文件 允许多个入口一个出口的情况. 用法 const config = {output: 'bundle.js' };module.e ...

  5. blockly自定义中文出问题_Blockly 中文文档(翻译)

    目录 概述 获取代码 注入 Blockly 配置 代码生成器 引入和导出块 云存储 概述 Blockly 可以很容易的添加到你的 web 应用, 用户拖拽砖块, Blockly 就会生成代码, 你的应 ...

  6. python中文语法提示_Python官方中文文档上线了:各种教程已汉化,不用再苦等野生翻译...

    终于,Python有官方中文文档了. 从今往后,不论是版本新变化,入门教程,语法讲解,Python模块安装指南--各种各样的手册,都可以直接看中文了. △ 不是谷歌翻译哟 你看,比起英文原版,中文的语 ...

  7. 【翻译】fancyBox3 中文文档

    fancyBox3 中文文档 译文永久地址:kangkai124.github.io/fancybox/ 说明:本文档仅供参考,更新不及时请查看官方文档 1. 介绍 fancyBox 是一个 Java ...

  8. 【翻译】fancyBox 3中文文档

    fancyBox3 中文文档 译文永久地址:https://kangkai124.github.io/fancybox/ 说明:本文档仅供参考,更新不及时请查看官方文档 1. 介绍 fancyBox是 ...

  9. GitHub 中文文档正式发布

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 中国作为全球最大的人口大国,所属开发者在 GitHub 上的占比自 ...

最新文章

  1. C#实现HttpPost提交文件
  2. 深入浅出之个性化推荐系统实践
  3. 2019年,中国要推进这70个工程项目
  4. java 升级1.8_升级系统中的java到1.8版本详解
  5. 2018.12.22 spoj7258 Lexicographical Substring Search(后缀自动机)
  6. 使用 Edit + MASM 5.0 编译器 + Linker 连接器
  7. 从“没有免费的午餐”理论看机器学习模型
  8. 你以为面试官问的是分布式缓存,其实他想问……
  9. 面料正反、倒顺、经纬判别方法
  10. linux压缩比,Linux下各压缩方式测试(压缩率和使用时间)
  11. PHP中巧用curl 并发减少获取第三方网页内容时间
  12. python怎么进阶_你真的会自学么?大佬整理的python进阶路径(长更)
  13. 想下以前的CyanogenMod 源码怎么办
  14. 川大计算机系1999级高伟,四川大学计算机系主任魏骁勇研发人脸识别无人机课堂点名...
  15. 【MySQL】MySQL的四种事务隔离级别
  16. 流量分类方法设计(一)——参考论文整理
  17. 《VALORANT》: 双塔迷城的诞生
  18. 无线传感网络的定位和时间同步技术简述
  19. ALV清缓存_SAP刘梦_新浪博客
  20. 用计算机处理信息图文,浅谈计算机技术在报纸图文信息处理中的应用

热门文章

  1. 电子元器件基础知识大全详解
  2. Windows平台搭建Spark开发环境(Intellij idea 2020.1社区版+Maven 3.6.3+Scala 2.11.8)
  3. 弘辽科技:淘宝新店运营推广的几大技巧
  4. 02-K3S 架构及快速入门
  5. MySQL - 8 递归查询树结构
  6. 格雷希尔快速连接器GripSeal 密封性测试密封堵头 G15Pro系列使用说明
  7. 第二届“红明谷”杯数据安全大赛-安全意识赛
  8. python要学什么英文歌_可以读英语课文的软件
  9. monad_Monad界面
  10. zzuli 1787: 生化危机 (BFS)